Source code for quixote.inspection.exec

import os
import pwd
import shlex
import signal
import subprocess
from typing import List, Mapping, Union, Optional, Type

from ._errors import *
from .check import expect_true, fail


[docs]class CompletedProcess: """ Class representing a completed process, and providing access to its arguments, its output, and its return code """
[docs] def __init__(self, completed_subprocess: subprocess.CompletedProcess): self.args: Union[str, List[str]] = completed_subprocess.args self.raw_stdout: bytes = completed_subprocess.stdout self.raw_stderr: bytes = completed_subprocess.stderr self._stdout: Optional[str] = None self._stderr: Optional[str] = None self.return_code: int = completed_subprocess.returncode
[docs] def __bool__(self): return self.return_code == 0
[docs] def __repr__(self): return f"CompletedProcess(\n" + \ ",\n".join(f" {name}={value!r}" for name, value in self.__dict__.items()) + \ "\n)"
@staticmethod def _decode_output(raw_output: bytes, encoding: str = "utf-8"): if raw_output is not None: return raw_output.decode(encoding) return None
[docs] def check_decode(self, message: str, *, error_kind: Type = KOError, encoding: str = "utf-8"): """ Check whether the output of the process can be decoded according to a given encoding :param message: message in case of failure to explain the reason of said failure :param error_kind: exception to raise if the check failed :param encoding: encoding to use to decode the data """ try: self._stdout = self._decode_output(self.raw_stdout, encoding) except UnicodeDecodeError as e: raise error_kind(f"{message}: {str(e)} (while decoding stdout)") try: self._stderr = self._decode_output(self.raw_stderr, encoding) except UnicodeDecodeError as e: raise error_kind(f"{message}: {str(e)} (while decoding stderr)") return self
@property def stdout(self) -> str: if self._stdout is None: self._stdout = self._decode_output(self.raw_stdout) return self._stdout @property def stderr(self) -> str: if self._stderr is None: self._stderr = self._decode_output(self.raw_stderr) return self._stderr def _return_code_message(self) -> str: if self.return_code >= 0: return str(self.return_code) try: name = signal.Signals(-self.return_code).name return f"{128 + -self.return_code} ({name})" except ValueError: return str(self.return_code) def _get_fail_message(self, stdout: bool, stderr: bool, exit_code: bool) -> str: message = "" if exit_code: message += f"\nexit code: {self._return_code_message()}" if stdout: if self.raw_stdout is not None: try: message += "\nstdout:\n" + self.stdout except UnicodeDecodeError: message += "\nstdout (raw bytes):\n" + repr(self.raw_stdout) else: message += "\nstdout is empty" if stderr: if self.raw_stderr is not None: try: message += "\nstderr:\n" + self.stderr except UnicodeDecodeError: message += "\nstderr (raw bytes):\n" + repr(self.raw_stderr) else: message += "\nstderr is empty" return message
[docs] def check( self, message: str, *, error_kind: Type = KOError, allowed_status: Union[int, List[int]] = 0, stdout: bool = True, stderr: bool = True, exit_code: bool = True ) -> 'CompletedProcess': """ Check whether the execution of the process failed :param message: message in case of failure to explain the reason of said failure :param allowed_status: status or list of statuses that are considered successful :param error_kind: exception to raise if the check failed :param stdout: if True add the output of the process to the assertion message :param stderr: if True add the error output of the process to the assertion message :param exit_code: if True add the exit_code of the process to the assertion message """ if isinstance(allowed_status, int): allowed_status = [allowed_status] if self.return_code not in allowed_status: message += self._get_fail_message(stdout, stderr, exit_code) fail(message, error_kind) return self
[docs] def expect( self, message: str, *, allowed_status: Union[int, List[int]] = 0, stdout: bool = True, stderr: bool = True, exit_code: bool = True ) -> 'CompletedProcess': """ Check whether the execution of the process failed :param message: message in case of failure to explain the reason of said failure :param allowed_status: status or list of statuses that are considered successful :param stdout: if True add the output of the process to the requirement message :param stderr: if True add the error output of the process to the requirement message :param exit_code: if True add the exit_code of the process to the requirement message """ if isinstance(allowed_status, int): allowed_status = [allowed_status] if self.return_code not in allowed_status: message += self._get_fail_message(stdout, stderr, exit_code) expect_true(self.return_code in allowed_status, message) return self
def _switch_to_user(username: str): info = pwd.getpwnam(username) os.setgid(info.pw_gid) os.setuid(info.pw_uid) def _subprocess_run_wrapper(*args, **kwargs) -> CompletedProcess: """ Run a subprocess This is a wrapper for subprocess.run() and all the arguments are forwarded to it See the documentation of subprocess.run() for the list of all the possible arguments :raise quixote.inspection.TimeoutError: if the timeout argument is not None and expires before the child process terminates """ try: return CompletedProcess(subprocess.run(*args, **kwargs)) except subprocess.TimeoutExpired as e: raise TimeoutError(e) def _run(*args, as_user: str = None, **kwargs): if as_user is not None: kwargs["preexec_fn"] = lambda: _switch_to_user(as_user) return _subprocess_run_wrapper(*args, **kwargs) def _run_with_new_session( cmd: str, timeout: int = None, force_kill_on_timout: bool = False, env: Mapping[str, str] = None, as_user: str = None, ): extra_args = {} if as_user is not None: extra_args["preexec_fn"] = lambda: _switch_to_user(as_user) proc = subprocess.Popen( ["bash", "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE, start_new_session=True, env=env, **extra_args, ) try: out, err = proc.communicate(timeout=timeout) return CompletedProcess(subprocess.CompletedProcess(proc.args, proc.returncode, out, err)) except subprocess.TimeoutExpired as e: os.killpg(proc.pid, signal.SIGTERM) if force_kill_on_timout: os.killpg(proc.pid, signal.SIGKILL) # Kill again, harder (in case some processes don't exit gracefully) proc.communicate() raise TimeoutError(e)
[docs]def command( cmd: Union[str, List[str]], *, timeout: int = None, env: Mapping[str, str] = None, as_user: str = None, ) -> CompletedProcess: """ Run a single executable. It is not run in a shell. :param cmd: command to be executed :param timeout: the timeout in seconds. If it expires, the child process will be killed and waited for. Then TimeoutExpired exception will be raised after the child process has terminated. :param env: the environment to use when running the command :param as_user: the user as which the command should be executed :raise quixote.inspection.TimeoutError: if timeout is not None and expires before the child process terminates """ if isinstance(cmd, str): cmd = shlex.split(cmd) return _run(cmd, capture_output=True, shell=False, timeout=timeout, env=env, as_user=as_user)
[docs]def bash( cmd: str, *, timeout: int = None, force_kill_on_timeout: bool = False, env: Mapping[str, str] = None, as_user: str = None, ) -> CompletedProcess: """ Run one or a sequence of commands using the Bash shell. :param cmd: commands to be executed :param timeout: the timeout in seconds. If it expires, the child process will be killed and waited for. Then TimeoutExpired exception will be raised after the child process has terminated. :param force_kill_on_timeout: whether processes should be terminated or killed :param env: the environment to use when running the command :param as_user: the user as which the commands should be executed :raise quixote.inspection.TimeoutError: if timeout is not None and expires before the child process terminates """ if timeout is not None: return _run_with_new_session(cmd, timeout, force_kill_on_timout=force_kill_on_timeout, env=env, as_user=as_user) else: return _run(["bash", "-c", cmd], capture_output=True, shell=False, env=env, as_user=as_user)
[docs]class BackgroundProcess: """ Class representing a process running in the background """
[docs] def __init__(self, proc, force_kill_on_scope_exit: bool): self.proc = proc self.force_kill_on_scope_exit = force_kill_on_scope_exit
def __enter__(self): return self
[docs] def is_running(self) -> bool: """ Check whether the process is still running """ return self.proc.poll() is None
[docs] def kill(self) -> CompletedProcess: """ Kill the process, returning a CompletedProcess """ os.killpg(self.proc.pid, signal.SIGTERM) if self.force_kill_on_scope_exit: # Kill again, harder (in case some processes don't exit gracefully) os.killpg(self.proc.pid, signal.SIGKILL) out, err = self.proc.communicate() return CompletedProcess(subprocess.CompletedProcess(self.proc.args, self.proc.returncode, out, err))
def __exit__(self, exc_type, exc_val, exc_tb): if self.is_running(): self.kill()
[docs]def background_bash( cmd: str, *, force_kill_on_scope_exit: bool = False, env: Mapping[str, str] = None, as_user: str = None ) -> BackgroundProcess: """ Run one or a sequence of commands using the Bash shell, in the background :param cmd: commands to be executed :param force_kill_on_scope_exit: whether processes should be terminated or killed :param env: the environment to use when running the command :param as_user: the user as which the commands should be executed .. code-block:: python with background_bash("./my_http_server"): # Server is running in the background, we can make a request bash("curl http://localhost:8080") # Server is stopped when we reach the end of the `with` block """ extra_args = {} if as_user is not None: extra_args["preexec_fn"] = lambda: _switch_to_user(as_user) proc = subprocess.Popen( ["bash", "-c", cmd], stderr=subprocess.PIPE, stdout=subprocess.PIPE, start_new_session=True, env=env, **extra_args, ) return BackgroundProcess(proc, force_kill_on_scope_exit)
[docs]def java( class_name: str, args: List[str] = [], timeout: int = None, options: List[str] = None, env: Mapping[str, str] = None, ) -> CompletedProcess: """ Launch a java class :param class_name: path of the Java class file to launch :param args: list of arguments to pass to the launched class :param timeout: time to wait before terminating the process :param options: list of shell options to be passed to java (see java man page for more info) :param env: environment to run the java command (by default use the current shell environment) :raise quixote.inspection.TimeoutError: if timeout is not None and expires before the child process terminates """ options = options or [] cmd = "java" return _run([cmd, *options, class_name, *args], capture_output=True, timeout=timeout, env=env)
[docs]def javajar( jar_path: str, args: List[str] = [], timeout: int = None, options: List[str] = None, env: Mapping[str, str] = None ) -> CompletedProcess: """ Launch a java archive :param jar_path: path of the Java archive to launch :param args: list of arguments to pass to the launched archive :param timeout: time to wait before terminating the process :param options: list of shell options to be passed to java (see java man page for more info) :param env: environment to run the java command (by default use the current shell environnment) :raise quixote.inspection.TimeoutError: if timeout is not None and expires before the child process terminates .. deprecated:: 2.0 Use the :func:`bash` or :func:`command` functions instead. """ options = options or [] cmd = "java" return _run([cmd, *options, "-jar", jar_path, *args], capture_output=True, timeout=timeout, env=env)