Skip to content

Process

provide.foundation.process

TODO: Add module docstring.

Classes

CompletedProcess

Result of a completed process.

Note

The env field only stores caller-provided environment variable overrides, not the full subprocess environment. This prevents credential leakage when CompletedProcess objects are logged or stored.

ManagedProcess

ManagedProcess(
    command: list[str],
    *,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    text_mode: bool = False,
    bufsize: int = 0,
    stderr_relay: bool = True,
    **kwargs: Any
)

A managed subprocess with lifecycle support, monitoring, and graceful shutdown.

This class wraps subprocess.Popen with additional functionality for: - Environment management - Output streaming and monitoring - Health checks and process monitoring - Graceful shutdown with timeouts - Background stderr relaying

Initialize a ManagedProcess.

Source code in provide/foundation/process/lifecycle/managed.py
def __init__(
    self,
    command: list[str],
    *,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    text_mode: bool = False,
    bufsize: int = 0,
    stderr_relay: bool = True,
    **kwargs: Any,
) -> None:
    """Initialize a ManagedProcess."""
    self.command = command
    self.cwd = str(cwd) if cwd else None
    self.capture_output = capture_output
    self.text_mode = text_mode
    self.bufsize = bufsize
    self.stderr_relay = stderr_relay
    self.kwargs = kwargs

    # Build environment - always start with current environment
    self._env = os.environ.copy()

    # Clean coverage-related environment variables from subprocess
    # to prevent interference with output capture during testing
    for key in list(self._env.keys()):
        if key.startswith(("COVERAGE", "COV_CORE")):
            self._env.pop(key, None)

    # Merge in any provided environment variables
    if env:
        self._env.update(env)

    # Process state
    self._process: subprocess.Popen[bytes] | None = None
    self._stderr_thread: threading.Thread | None = None
    self._started = False

    log.debug(
        "🚀 ManagedProcess initialized",
        command=" ".join(command),
        cwd=self.cwd,
    )
Attributes
pid property
pid: int | None

Get the process ID, if process is running.

process property
process: Popen[bytes] | None

Get the underlying subprocess.Popen instance.

returncode property
returncode: int | None

Get the return code, if process has terminated.

Functions
__enter__
__enter__() -> ManagedProcess

Context manager entry.

Source code in provide/foundation/process/lifecycle/managed.py
def __enter__(self) -> ManagedProcess:
    """Context manager entry."""
    self.launch()
    return self
__exit__
__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Context manager exit with cleanup.

Source code in provide/foundation/process/lifecycle/managed.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Context manager exit with cleanup."""
    self.terminate_gracefully()
    self.cleanup()
cleanup
cleanup() -> None

Clean up process resources.

Source code in provide/foundation/process/lifecycle/managed.py
def cleanup(self) -> None:
    """Clean up process resources."""
    # Join stderr relay thread
    if self._stderr_thread and self._stderr_thread.is_alive():
        # Give it a moment to finish
        self._stderr_thread.join(timeout=1.0)

    # Clean up process reference
    if self._process:
        self._process = None

    log.debug("🧹 Managed process cleanup completed")
is_running
is_running() -> bool

Check if the process is currently running.

Source code in provide/foundation/process/lifecycle/managed.py
def is_running(self) -> bool:
    """Check if the process is currently running."""
    if not self._process:
        return False
    return self._process.poll() is None
launch
launch() -> None

Launch the managed process.

Raises:

Type Description
ProcessError

If the process fails to launch

StateError

If the process is already started

Source code in provide/foundation/process/lifecycle/managed.py
@resilient(
    error_mapper=lambda e: ProcessError(f"Failed to launch process: {e}")
    if not isinstance(e, (ProcessError, StateError))
    else e,
)
def launch(self) -> None:
    """Launch the managed process.

    Raises:
        ProcessError: If the process fails to launch
        StateError: If the process is already started

    """
    if self._started:
        raise StateError(
            "Process has already been started", code="PROCESS_ALREADY_STARTED", process_state="started"
        )

    log.debug("🚀 Launching managed process", command=" ".join(self.command))

    self._process = subprocess.Popen(
        self.command,
        cwd=self.cwd,
        env=self._env,
        stdout=subprocess.PIPE if self.capture_output else None,
        stderr=subprocess.PIPE if self.capture_output else None,
        text=self.text_mode,
        bufsize=self.bufsize,
        **self.kwargs,
    )
    self._started = True

    log.info(
        "🚀 Managed process started successfully",
        pid=self._process.pid,
        command=" ".join(self.command),
    )

    # Start stderr relay if enabled
    if self.stderr_relay and self._process.stderr:
        self._start_stderr_relay()
read_char_async async
read_char_async(
    timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT,
) -> str

Read a single character from stdout asynchronously.

Source code in provide/foundation/process/lifecycle/managed.py
async def read_char_async(self, timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT) -> str:
    """Read a single character from stdout asynchronously."""
    if not self._process or not self._process.stdout:
        raise ProcessError("Process not running or stdout not available")

    loop = asyncio.get_event_loop()

    # Use functools.partial to avoid closure issues
    read_func = functools.partial(self._process.stdout.read, 1)

    try:
        char_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
        if not char_data:
            return ""
        return (
            char_data.decode("utf-8", errors="replace") if isinstance(char_data, bytes) else str(char_data)
        )
    except TimeoutError as e:
        log.debug("Character read timeout on managed process stdout")
        raise TimeoutError(f"Character read timeout after {timeout}s") from e
read_line_async async
read_line_async(
    timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT,
) -> str

Read a line from stdout asynchronously with timeout.

Source code in provide/foundation/process/lifecycle/managed.py
async def read_line_async(self, timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT) -> str:
    """Read a line from stdout asynchronously with timeout."""
    if not self._process or not self._process.stdout:
        raise ProcessError("Process not running or stdout not available")

    loop = asyncio.get_event_loop()

    # Use functools.partial to avoid closure issues
    read_func = functools.partial(self._process.stdout.readline)

    try:
        line_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
        return (
            line_data.decode("utf-8", errors="replace") if isinstance(line_data, bytes) else str(line_data)
        ).strip()
    except TimeoutError as e:
        log.debug("Read timeout on managed process stdout")
        raise TimeoutError(f"Read timeout after {timeout}s") from e
terminate_gracefully
terminate_gracefully(
    timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT,
) -> bool

Terminate the process gracefully with a timeout.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for graceful termination

DEFAULT_PROCESS_TERMINATE_TIMEOUT

Returns:

Type Description
bool

True if process terminated gracefully, False if force-killed

Source code in provide/foundation/process/lifecycle/managed.py
def terminate_gracefully(self, timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT) -> bool:
    """Terminate the process gracefully with a timeout.

    Args:
        timeout: Maximum time to wait for graceful termination

    Returns:
        True if process terminated gracefully, False if force-killed

    """
    if not self._process:
        return True

    if self._process.poll() is not None:
        log.debug("Process already terminated", returncode=self._process.returncode)
        return True

    log.debug("🛑 Terminating managed process gracefully", pid=self._process.pid)

    try:
        # Send SIGTERM
        self._process.terminate()
        log.debug("🛑 Sent SIGTERM to process", pid=self._process.pid)

        # Wait for graceful termination
        try:
            self._process.wait(timeout=timeout)
            log.info("🛑 Process terminated gracefully", pid=self._process.pid)
            return True
        except subprocess.TimeoutExpired:
            log.warning(
                "🛑 Process did not terminate gracefully, force killing",
                pid=self._process.pid,
            )
            # Force kill
            self._process.kill()
            try:
                self._process.wait(timeout=2.0)
                log.info("🛑 Process force killed", pid=self._process.pid)
                return False
            except subprocess.TimeoutExpired:
                log.error("🛑 Process could not be killed", pid=self._process.pid)
                return False

    except Exception as e:
        log.error(
            "🛑❌ Error terminating process",
            pid=self._process.pid if self._process else None,
            error=str(e),
            trace=traceback.format_exc(),
        )
        return False

ProcessError

ProcessError(
    message: str,
    *,
    command: str | list[str] | None = None,
    return_code: int | None = None,
    stdout: str | bytes | None = None,
    stderr: str | bytes | None = None,
    timeout: bool = False,
    code: str | None = None,
    **extra_context: Any
)

Bases: FoundationError

Error for external process execution failures with output capture.

Initialize ProcessError with command execution details.

Parameters:

Name Type Description Default
message str

Human-readable error message

required
command str | list[str] | None

The command that was executed

None
return_code int | None

Process return/exit code

None
stdout str | bytes | None

Standard output from the process

None
stderr str | bytes | None

Standard error from the process

None
timeout bool

Whether the process timed out

False
code str | None

Optional error code

None
**extra_context Any

Additional context information

{}
Source code in provide/foundation/errors/process.py
def __init__(
    self,
    message: str,
    *,
    command: str | list[str] | None = None,
    return_code: int | None = None,
    stdout: str | bytes | None = None,
    stderr: str | bytes | None = None,
    timeout: bool = False,
    code: str | None = None,
    **extra_context: Any,
) -> None:
    """Initialize ProcessError with command execution details.

    Args:
        message: Human-readable error message
        command: The command that was executed
        return_code: Process return/exit code
        stdout: Standard output from the process
        stderr: Standard error from the process
        timeout: Whether the process timed out
        code: Optional error code
        **extra_context: Additional context information

    """
    # Build comprehensive error message
    full_message = message

    if command:
        cmd_str = command if isinstance(command, str) else " ".join(command)
        full_message += f"\nCommand: {cmd_str}"

    if return_code is not None:
        full_message += f"\nReturn code: {return_code}"

    if timeout:
        full_message += "\nProcess timed out"

    if stdout:
        stdout_str = stdout.decode("utf-8", "replace") if isinstance(stdout, bytes) else stdout
        if stdout_str.strip():
            full_message += f"\n--- STDOUT ---\n{stdout_str.strip()}"

    if stderr:
        stderr_str = stderr.decode("utf-8", "replace") if isinstance(stderr, bytes) else stderr
        if stderr_str.strip():
            full_message += f"\n--- STDERR ---\n{stderr_str.strip()}"

    # Store structured data
    context = extra_context.copy()
    context.update(
        {
            "process.command": command,
            "process.return_code": return_code,
            "process.timeout": timeout,
        },
    )

    # Store clean stdout/stderr for programmatic access
    self.stdout = (
        stdout.decode("utf-8", "replace").strip()
        if isinstance(stdout, bytes)
        else stdout.strip()
        if stdout
        else None
    )

    self.stderr = (
        stderr.decode("utf-8", "replace").strip()
        if isinstance(stderr, bytes)
        else stderr.strip()
        if stderr
        else None
    )

    self.command = command
    self.return_code = return_code
    self.timeout = timeout

    super().__init__(full_message, code=code, context=context)
Functions

Functions

async_run async

async_run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    input: bytes | None = None,
    shell: bool = False,
    **kwargs: Any
) -> CompletedProcess

Run a subprocess command asynchronously.

Parameters:

Name Type Description Default
cmd list[str] | str

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables (if None, uses current environment)

None
capture_output bool

Whether to capture stdout/stderr

True
check bool

Whether to raise exception on non-zero exit

True
timeout float | None

Command timeout in seconds

None
input bytes | None

Input to send to the process

None
shell bool

Whether to execute via shell

False
**kwargs Any

Additional subprocess arguments

{}

Returns:

Type Description
CompletedProcess

CompletedProcess with results

Raises:

Type Description
ValidationError

If command type and shell parameter mismatch

ProcessError

If command fails and check=True

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/aio/execution.py
async def async_run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    input: bytes | None = None,
    shell: bool = False,
    **kwargs: Any,
) -> CompletedProcess:
    """Run a subprocess command asynchronously.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables (if None, uses current environment)
        capture_output: Whether to capture stdout/stderr
        check: Whether to raise exception on non-zero exit
        timeout: Command timeout in seconds
        input: Input to send to the process
        shell: Whether to execute via shell
        **kwargs: Additional subprocess arguments

    Returns:
        CompletedProcess with results

    Raises:
        ValidationError: If command type and shell parameter mismatch
        ProcessError: If command fails and check=True
        ProcessTimeoutError: If timeout is exceeded
    """
    # Mask secrets in command for logging
    from provide.foundation.security import mask_command

    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
    masked_cmd = mask_command(cmd_str)
    log.trace("🚀 Running async command", command=masked_cmd, cwd=str(cwd) if cwd else None)

    # Validate command type and shell parameter
    if isinstance(cmd, str) and not shell:
        raise ValidationError(
            "String commands require explicit shell=True for security. "
            "Use async_shell() for shell commands or pass a list for direct execution.",
            code="INVALID_COMMAND_TYPE",
            expected="list[str] or (str with shell=True)",
            actual="str without shell=True",
        )

    # Prepare environment and convert Path to string
    run_env = prepare_environment(env)
    cwd_str = str(cwd) if isinstance(cwd, Path) else cwd

    process = None
    try:
        # Create subprocess
        process = await create_subprocess(cmd, cmd_str, shell, cwd_str, run_env, capture_output, input, kwargs)

        try:
            # Communicate with process
            stdout, stderr = await communicate_with_timeout(process, input, timeout, cmd_str)

            # Create completed process
            completed = create_completed_process_result(cmd, process, stdout, stderr, cwd_str, env, run_env)

            # Check for success
            check_process_success(process, cmd_str, capture_output, completed.stdout, completed.stderr, check)

            log.debug(
                command=cmd_str,
                returncode=process.returncode,
            )

            return completed
        finally:
            await cleanup_process(process)

    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError | ValidationError):
            raise

        log.error(
            "💥 Async command execution failed",
            command=cmd_str,
            error=str(e),
        )
        raise ProcessError(
            f"Failed to execute async command: {cmd_str}",
            code="PROCESS_ASYNC_EXECUTION_FAILED",
            command=cmd_str,
        ) from e

async_shell async

async_shell(
    cmd: str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    allow_shell_features: bool = DEFAULT_SHELL_ALLOW_FEATURES,
    **kwargs: Any
) -> CompletedProcess

Run a shell command asynchronously with safety validation.

WARNING: This function uses shell=True. By default, shell metacharacters are DENIED to prevent command injection. Use allow_shell_features=True only with trusted input.

Parameters:

Name Type Description Default
cmd str

Shell command string

required
cwd str | Path | None

Working directory

None
env Mapping[str, str] | None

Environment variables

None
capture_output bool

Whether to capture output

True
check bool

Whether to raise on non-zero exit

True
timeout float | None

Command timeout in seconds

None
allow_shell_features bool

Allow shell metacharacters (default: False)

DEFAULT_SHELL_ALLOW_FEATURES
**kwargs Any

Additional subprocess arguments

{}

Returns:

Type Description
CompletedProcess

CompletedProcess with results

Raises:

Type Description
ValidationError

If cmd is not a string

ShellFeatureError

If shell features used without explicit permission

Security Note

For maximum security, use async_run() with a list of arguments instead. Only set allow_shell_features=True if you fully trust the command source.

Safe: await async_shell("ls -la", allow_shell_features=False) # OK Unsafe: await async_shell(user_input) # Will raise ShellFeatureError if metacharacters present Risky: await async_shell(user_input, allow_shell_features=True) # DO NOT DO THIS

Source code in provide/foundation/process/aio/shell.py
async def async_shell(
    cmd: str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    allow_shell_features: bool = DEFAULT_SHELL_ALLOW_FEATURES,
    **kwargs: Any,
) -> CompletedProcess:
    """Run a shell command asynchronously with safety validation.

    WARNING: This function uses shell=True. By default, shell metacharacters
    are DENIED to prevent command injection. Use allow_shell_features=True
    only with trusted input.

    Args:
        cmd: Shell command string
        cwd: Working directory
        env: Environment variables
        capture_output: Whether to capture output
        check: Whether to raise on non-zero exit
        timeout: Command timeout in seconds
        allow_shell_features: Allow shell metacharacters (default: False)
        **kwargs: Additional subprocess arguments

    Returns:
        CompletedProcess with results

    Raises:
        ValidationError: If cmd is not a string
        ShellFeatureError: If shell features used without explicit permission

    Security Note:
        For maximum security, use async_run() with a list of arguments instead.
        Only set allow_shell_features=True if you fully trust the command source.

        Safe:   await async_shell("ls -la", allow_shell_features=False)  # OK
        Unsafe: await async_shell(user_input)  # Will raise ShellFeatureError if metacharacters present
        Risky:  await async_shell(user_input, allow_shell_features=True)  # DO NOT DO THIS

    """
    if not isinstance(cmd, str):
        raise ValidationError(
            "Shell command must be a string",
            code="INVALID_SHELL_COMMAND",
            expected_type="str",
            actual_type=type(cmd).__name__,
        )

    # Validate shell safety - raises ShellFeatureError if dangerous patterns found
    validate_shell_safety(cmd, allow_shell_features=allow_shell_features)

    return await async_run(
        cmd,
        cwd=cwd,
        env=env,
        capture_output=capture_output,
        check=check,
        timeout=timeout,
        shell=True,  # nosec B604 - Intentional shell usage with validation
        **kwargs,
    )

async_stream async

async_stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any
) -> AsyncIterator[str]

Stream command output line by line asynchronously.

Parameters:

Name Type Description Default
cmd list[str]

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables

None
timeout float | None

Command timeout in seconds

None
stream_stderr bool

Whether to merge stderr into stdout

False
**kwargs Any

Additional subprocess arguments

{}

Yields:

Type Description
AsyncIterator[str]

Lines of output from the command

Raises:

Type Description
ProcessError

If command fails

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/aio/streaming.py
async def async_stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any,
) -> AsyncIterator[str]:
    """Stream command output line by line asynchronously.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables
        timeout: Command timeout in seconds
        stream_stderr: Whether to merge stderr into stdout
        **kwargs: Additional subprocess arguments

    Yields:
        Lines of output from the command

    Raises:
        ProcessError: If command fails
        ProcessTimeoutError: If timeout is exceeded
    """
    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)

    # Prepare environment and working directory
    run_env = prepare_environment(env)
    cwd_str = str(cwd) if isinstance(cwd, Path) else cwd

    process = None
    try:
        # Create subprocess
        process = await create_stream_subprocess(cmd, cwd_str, run_env, stream_stderr, kwargs)

        try:
            # Stream output with optional timeout
            if timeout:
                lines = await read_lines_with_timeout(process, timeout, cmd_str)
                await process.wait()
                check_stream_exit_code(process, cmd_str)

                # Yield lines as they were read
                for line in lines:
                    yield line
            else:
                # No timeout - stream normally
                if process.stdout:
                    async for line in process.stdout:
                        yield line.decode(errors="replace").rstrip()

                # Wait for process to complete and check exit code
                await process.wait()
                check_stream_exit_code(process, cmd_str)

        finally:
            await cleanup_stream_process(process)

    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError):
            raise

        log.error("💥 Async stream failed", command=cmd_str, error=str(e))
        raise ProcessError(
            f"Failed to stream async command: {cmd_str}",
            code="PROCESS_ASYNC_STREAM_ERROR",
            command=cmd_str,
        ) from e

exit_error

exit_error(
    message: str | None = None, code: int = EXIT_ERROR
) -> None

Exit with error status.

Parameters:

Name Type Description Default
message str | None

Optional error message to log before exiting

None
code int

Exit code to use (defaults to EXIT_ERROR)

EXIT_ERROR
Source code in provide/foundation/process/exit.py
def exit_error(message: str | None = None, code: int = EXIT_ERROR) -> None:
    """Exit with error status.

    Args:
        message: Optional error message to log before exiting
        code: Exit code to use (defaults to EXIT_ERROR)

    """
    if message:
        logger = get_foundation_logger()
        logger.error(f"Exiting with error: {message}", exit_code=code)
    sys.exit(code)

exit_interrupted

exit_interrupted(
    message: str = "Process interrupted",
) -> None

Exit due to interrupt signal (SIGINT).

Parameters:

Name Type Description Default
message str

Message to log before exiting

'Process interrupted'
Source code in provide/foundation/process/exit.py
def exit_interrupted(message: str = "Process interrupted") -> None:
    """Exit due to interrupt signal (SIGINT).

    Args:
        message: Message to log before exiting

    """
    logger = get_foundation_logger()
    logger.warning(f"Exiting due to interrupt: {message}")
    sys.exit(EXIT_SIGINT)

exit_success

exit_success(message: str | None = None) -> None

Exit with success status.

Parameters:

Name Type Description Default
message str | None

Optional message to log before exiting

None
Source code in provide/foundation/process/exit.py
def exit_success(message: str | None = None) -> None:
    """Exit with success status.

    Args:
        message: Optional message to log before exiting

    """
    if message:
        logger = get_foundation_logger()
        logger.info(f"Exiting successfully: {message}")
    sys.exit(EXIT_SUCCESS)

get_name

get_name() -> str | None

Get process name (PR_GET_NAME).

Returns:

Type Description
str | None

Process name, or None if prctl is not available

Raises:

Type Description
PlatformError

If not on Linux or python-prctl not installed

Example

from provide.foundation.process import get_name get_name() 'worker-1'

Source code in provide/foundation/process/prctl.py
def get_name() -> str | None:
    """Get process name (PR_GET_NAME).

    Returns:
        Process name, or None if prctl is not available

    Raises:
        PlatformError: If not on Linux or python-prctl not installed

    Example:
        >>> from provide.foundation.process import get_name
        >>> get_name()
        'worker-1'

    """
    _require_prctl()

    try:
        return prctl.get_name()
    except Exception as e:
        log.debug("Failed to get process name", error=str(e))
        return None

get_process_title

get_process_title() -> str | None

Get the current process title.

Automatically returns None in test mode (via @skip_in_test_mode decorator) to prevent test interference.

Returns:

Type Description
str | None

The current process title, or None if setproctitle is not available

str | None

or running in test mode

Example

from provide.foundation.process import get_process_title, set_process_title set_process_title("my-process") True get_process_title() 'my-process'

Source code in provide/foundation/process/title.py
@skip_in_test_mode(return_value=None, reason="Process title queries interfere with test isolation")
def get_process_title() -> str | None:
    """Get the current process title.

    Automatically returns None in test mode (via @skip_in_test_mode decorator)
    to prevent test interference.

    Returns:
        The current process title, or None if setproctitle is not available
        or running in test mode

    Example:
        >>> from provide.foundation.process import get_process_title, set_process_title
        >>> set_process_title("my-process")
        True
        >>> get_process_title()
        'my-process'

    """
    if not _HAS_SETPROCTITLE:
        return None

    try:
        return setproctitle.getproctitle()
    except Exception as e:
        log.debug("Failed to get process title", error=str(e))
        return None

has_prctl

has_prctl() -> bool

Check if prctl is available.

Returns:

Type Description
bool

True if running on Linux and python-prctl is installed, False otherwise

Example

from provide.foundation.process import has_prctl if has_prctl(): ... # Use prctl features ... pass

Source code in provide/foundation/process/prctl.py
def has_prctl() -> bool:
    """Check if prctl is available.

    Returns:
        True if running on Linux and python-prctl is installed, False otherwise

    Example:
        >>> from provide.foundation.process import has_prctl
        >>> if has_prctl():
        ...     # Use prctl features
        ...     pass

    """
    return _HAS_PRCTL

has_setproctitle

has_setproctitle() -> bool

Check if setproctitle is available.

Returns:

Type Description
bool

True if setproctitle is available, False otherwise

Example

from provide.foundation.process import has_setproctitle if has_setproctitle(): ... # Use process title features ... pass

Source code in provide/foundation/process/title.py
def has_setproctitle() -> bool:
    """Check if setproctitle is available.

    Returns:
        True if setproctitle is available, False otherwise

    Example:
        >>> from provide.foundation.process import has_setproctitle
        >>> if has_setproctitle():
        ...     # Use process title features
        ...     pass

    """
    return _HAS_SETPROCTITLE

is_linux

is_linux() -> bool

Check if running on Linux.

Returns:

Type Description
bool

True if running on Linux, False otherwise

Example

from provide.foundation.process import is_linux is_linux() True

Source code in provide/foundation/process/prctl.py
def is_linux() -> bool:
    """Check if running on Linux.

    Returns:
        True if running on Linux, False otherwise

    Example:
        >>> from provide.foundation.process import is_linux
        >>> is_linux()
        True

    """
    return _IS_LINUX

run

run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    text: bool = True,
    input: str | bytes | None = None,
    shell: bool = False,
    **kwargs: Any
) -> CompletedProcess

Run a subprocess command with consistent error handling and logging.

Parameters:

Name Type Description Default
cmd list[str] | str

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables (if None, uses current environment)

None
capture_output bool

Whether to capture stdout/stderr

True
check bool

Whether to raise exception on non-zero exit

True
timeout float | None

Command timeout in seconds

None
text bool

Whether to decode output as text

True
input str | bytes | None

Input to send to the process

None
shell bool

Whether to run command through shell

False
**kwargs Any

Additional arguments passed to subprocess.run

{}

Returns:

Type Description
CompletedProcess

CompletedProcess with results

Raises:

Type Description
ProcessError

If command fails and check=True

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/sync/execution.py
def run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    text: bool = True,
    input: str | bytes | None = None,
    shell: bool = False,
    **kwargs: Any,
) -> CompletedProcess:
    """Run a subprocess command with consistent error handling and logging.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables (if None, uses current environment)
        capture_output: Whether to capture stdout/stderr
        check: Whether to raise exception on non-zero exit
        timeout: Command timeout in seconds
        text: Whether to decode output as text
        input: Input to send to the process
        shell: Whether to run command through shell
        **kwargs: Additional arguments passed to subprocess.run

    Returns:
        CompletedProcess with results

    Raises:
        ProcessError: If command fails and check=True
        ProcessTimeoutError: If timeout is exceeded

    """
    # Mask secrets in command for logging
    from provide.foundation.security import mask_command

    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
    masked_cmd = mask_command(cmd_str)
    log.trace("🚀 Running command", command=masked_cmd, cwd=str(cwd) if cwd else None)

    # Validate command type and shell parameter
    validate_command_type(cmd, shell)

    # Prepare environment
    run_env = prepare_environment(env)

    # Normalize cwd
    cwd = normalize_cwd(cwd)

    # Prepare input
    subprocess_input = prepare_input(input, text)

    try:
        # Prepare command for subprocess
        subprocess_cmd = cmd_str if shell else cmd

        result = subprocess.run(
            subprocess_cmd,
            cwd=cwd,
            env=run_env,
            capture_output=capture_output,
            text=text,
            input=subprocess_input,
            timeout=timeout,
            check=False,  # We'll handle the check ourselves
            shell=shell,  # nosec B602 - Shell usage validated by caller context
            **kwargs,
        )

        completed = CompletedProcess(
            args=cmd if isinstance(cmd, list) else [cmd],
            returncode=result.returncode,
            stdout=result.stdout if capture_output else "",
            stderr=result.stderr if capture_output else "",
            cwd=cwd,
            env=dict(env) if env else None,  # Only store caller overrides, not full run_env
        )

        if check and result.returncode != 0:
            log.error(
                "❌ Command failed",
                command=cmd_str,
                returncode=result.returncode,
                stderr=result.stderr if capture_output else None,
            )
            raise ProcessError(
                f"Command failed with exit code {result.returncode}: {cmd_str}",
                code="PROCESS_COMMAND_FAILED",
                command=cmd_str,
                return_code=result.returncode,
                stdout=result.stdout if capture_output else None,
                stderr=result.stderr if capture_output else None,
            )

        log.debug(
            command=cmd_str,
            returncode=result.returncode,
        )

        return completed

    except subprocess.TimeoutExpired as e:
        log.error(
            "⏱️ Command timed out",
            command=cmd_str,
            timeout=timeout,
        )
        raise ProcessTimeoutError(
            f"Command timed out after {timeout}s: {cmd_str}",
            code="PROCESS_TIMEOUT",
            command=cmd_str,
            timeout_seconds=timeout,
        ) from e
    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError):
            raise
        log.error(
            "💥 Command execution failed",
            command=cmd_str,
            error=str(e),
        )
        raise ProcessError(
            f"Failed to execute command: {cmd_str}",
            code="PROCESS_EXECUTION_FAILED",
            command=cmd_str,
        ) from e

run_simple

run_simple(
    cmd: list[str],
    cwd: str | Path | None = None,
    **kwargs: Any
) -> str

Simple wrapper for run that returns stdout as a string.

Parameters:

Name Type Description Default
cmd list[str]

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
**kwargs Any

Additional arguments passed to run

{}

Returns:

Type Description
str

Stdout as a stripped string

Raises:

Type Description
ProcessError

If command fails

Source code in provide/foundation/process/sync/execution.py
def run_simple(
    cmd: list[str],
    cwd: str | Path | None = None,
    **kwargs: Any,
) -> str:
    """Simple wrapper for run that returns stdout as a string.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        **kwargs: Additional arguments passed to run

    Returns:
        Stdout as a stripped string

    Raises:
        ProcessError: If command fails

    """
    result = run(cmd, cwd=cwd, capture_output=True, check=True, **kwargs)
    return result.stdout.strip()

set_death_signal

set_death_signal(signal: int) -> bool

Set signal to be sent to process when parent dies (PR_SET_PDEATHSIG).

This is useful for ensuring child processes are cleaned up when the parent terminates unexpectedly.

Parameters:

Name Type Description Default
signal int

Signal number to send (e.g., signal.SIGTERM, signal.SIGKILL)

required

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
PlatformError

If not on Linux or python-prctl not installed

Example

import signal from provide.foundation.process import set_death_signal set_death_signal(signal.SIGTERM) # Send SIGTERM when parent dies True

Source code in provide/foundation/process/prctl.py
def set_death_signal(signal: int) -> bool:
    """Set signal to be sent to process when parent dies (PR_SET_PDEATHSIG).

    This is useful for ensuring child processes are cleaned up when the parent
    terminates unexpectedly.

    Args:
        signal: Signal number to send (e.g., signal.SIGTERM, signal.SIGKILL)

    Returns:
        True if successful, False otherwise

    Raises:
        PlatformError: If not on Linux or python-prctl not installed

    Example:
        >>> import signal
        >>> from provide.foundation.process import set_death_signal
        >>> set_death_signal(signal.SIGTERM)  # Send SIGTERM when parent dies
        True

    """
    _require_prctl()

    try:
        prctl.set_pdeathsig(signal)
        log.debug("Death signal set", signal=signal)
        return True
    except Exception as e:
        log.warning("Failed to set death signal", signal=signal, error=str(e))
        return False

set_dumpable

set_dumpable(dumpable: bool) -> bool

Set whether process can produce core dumps (PR_SET_DUMPABLE).

Parameters:

Name Type Description Default
dumpable bool

True to allow core dumps, False to disable

required

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
PlatformError

If not on Linux or python-prctl not installed

Example

from provide.foundation.process import set_dumpable set_dumpable(False) # Disable core dumps for security True

Source code in provide/foundation/process/prctl.py
def set_dumpable(dumpable: bool) -> bool:
    """Set whether process can produce core dumps (PR_SET_DUMPABLE).

    Args:
        dumpable: True to allow core dumps, False to disable

    Returns:
        True if successful, False otherwise

    Raises:
        PlatformError: If not on Linux or python-prctl not installed

    Example:
        >>> from provide.foundation.process import set_dumpable
        >>> set_dumpable(False)  # Disable core dumps for security
        True

    """
    _require_prctl()

    try:
        prctl.set_dumpable(1 if dumpable else 0)
        log.debug("Dumpable flag set", dumpable=dumpable)
        return True
    except Exception as e:
        log.warning("Failed to set dumpable flag", dumpable=dumpable, error=str(e))
        return False

set_name

set_name(name: str) -> bool

Set process name (PR_SET_NAME).

Note: This is different from setproctitle. PR_SET_NAME sets the comm value in /proc/[pid]/comm (limited to 16 bytes including null terminator).

Parameters:

Name Type Description Default
name str

Process name (max 15 characters)

required

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
PlatformError

If not on Linux or python-prctl not installed

Example

from provide.foundation.process import set_name set_name("worker-1") True

Source code in provide/foundation/process/prctl.py
def set_name(name: str) -> bool:
    """Set process name (PR_SET_NAME).

    Note: This is different from setproctitle. PR_SET_NAME sets the comm value
    in /proc/[pid]/comm (limited to 16 bytes including null terminator).

    Args:
        name: Process name (max 15 characters)

    Returns:
        True if successful, False otherwise

    Raises:
        PlatformError: If not on Linux or python-prctl not installed

    Example:
        >>> from provide.foundation.process import set_name
        >>> set_name("worker-1")
        True

    """
    _require_prctl()

    if len(name) > 15:
        log.warning(
            "Process name truncated to 15 characters",
            requested=name,
            actual=name[:15],
        )
        name = name[:15]

    try:
        prctl.set_name(name)
        log.debug("Process name set", name=name)
        return True
    except Exception as e:
        log.warning("Failed to set process name", name=name, error=str(e))
        return False

set_no_new_privs

set_no_new_privs(enabled: bool = True) -> bool

Set no_new_privs flag (PR_SET_NO_NEW_PRIVS).

When enabled, execve() will not grant privileges to do anything that could not have been done without the execve() call. This is a security feature.

Parameters:

Name Type Description Default
enabled bool

True to enable no_new_privs, False to attempt disable (usually fails)

True

Returns:

Type Description
bool

True if successful, False otherwise

Raises:

Type Description
PlatformError

If not on Linux or python-prctl not installed

Example

from provide.foundation.process import set_no_new_privs set_no_new_privs(True) # Prevent privilege escalation True

Source code in provide/foundation/process/prctl.py
def set_no_new_privs(enabled: bool = True) -> bool:
    """Set no_new_privs flag (PR_SET_NO_NEW_PRIVS).

    When enabled, execve() will not grant privileges to do anything that could
    not have been done without the execve() call. This is a security feature.

    Args:
        enabled: True to enable no_new_privs, False to attempt disable (usually fails)

    Returns:
        True if successful, False otherwise

    Raises:
        PlatformError: If not on Linux or python-prctl not installed

    Example:
        >>> from provide.foundation.process import set_no_new_privs
        >>> set_no_new_privs(True)  # Prevent privilege escalation
        True

    """
    _require_prctl()

    try:
        # Note: python-prctl doesn't have direct no_new_privs support
        # This would require direct prctl() syscall
        import ctypes

        # PR_SET_NO_NEW_PRIVS = 38
        # PR_GET_NO_NEW_PRIVS = 39
        libc = ctypes.CDLL(None)
        result = libc.prctl(38, 1 if enabled else 0, 0, 0, 0)
        if result == 0:
            log.debug("no_new_privs flag set", enabled=enabled)
            return True
        log.warning("Failed to set no_new_privs flag", result=result)
        return False
    except Exception as e:
        log.warning("Failed to set no_new_privs flag", enabled=enabled, error=str(e))
        return False

set_process_title

set_process_title(title: str) -> bool

Set the process title visible in system monitoring tools.

The process title is what appears in ps, top, htop, and other system monitoring tools. This is useful for identifying processes, especially in multi-process applications or long-running services.

Automatically disabled in test mode (via @skip_in_test_mode decorator) to prevent interference with test isolation and parallel test execution.

Parameters:

Name Type Description Default
title str

The title to set for the current process

required

Returns:

Type Description
bool

True if the title was set successfully (or skipped in test mode),

bool

False if setproctitle is not available

Example

from provide.foundation.process import set_process_title set_process_title("my-worker-process") True

Process will now show as "my-worker-process" in ps/top
Source code in provide/foundation/process/title.py
@skip_in_test_mode(return_value=True, reason="Process title changes interfere with test isolation")
def set_process_title(title: str) -> bool:
    """Set the process title visible in system monitoring tools.

    The process title is what appears in ps, top, htop, and other system
    monitoring tools. This is useful for identifying processes, especially
    in multi-process applications or long-running services.

    Automatically disabled in test mode (via @skip_in_test_mode decorator) to
    prevent interference with test isolation and parallel test execution.

    Args:
        title: The title to set for the current process

    Returns:
        True if the title was set successfully (or skipped in test mode),
        False if setproctitle is not available

    Example:
        >>> from provide.foundation.process import set_process_title
        >>> set_process_title("my-worker-process")
        True
        >>> # Process will now show as "my-worker-process" in ps/top

    """
    if not _HAS_SETPROCTITLE:
        log.debug(
            "Cannot set process title - setproctitle not available",
            title=title,
            hint="Install with: pip install provide-foundation[process]",
        )
        return False

    try:
        setproctitle.setproctitle(title)
        log.debug("Process title set", title=title)
        return True
    except Exception as e:
        log.warning("Failed to set process title", title=title, error=str(e))
        return False

set_process_title_from_argv

set_process_title_from_argv() -> bool

Set process title from argv, preserving the invoked command name.

Extracts the command name from sys.argv[0] (including symlinks) and formats it with the remaining arguments to create a clean process title.

This handles symlinks correctly - if you have a symlink 'whatever' pointing to 'pyvider', and run 'whatever run --config foo.yml', the process title will be 'whatever run --config foo.yml'.

Automatically disabled in test mode (via @skip_in_test_mode decorator) to prevent interference with test isolation and parallel test execution.

Returns:

Type Description
bool

True if the title was set successfully (or skipped in test mode),

bool

False if setproctitle is not available

Example
If invoked as: pyvider run --config foo.yml

from provide.foundation.process import set_process_title_from_argv set_process_title_from_argv() True

Process will show as "pyvider run --config foo.yml" in ps/top
(where whatever -> pyvider)

set_process_title_from_argv() True

Process will show as "whatever run" in ps/top
Source code in provide/foundation/process/title.py
@skip_in_test_mode(return_value=True, reason="Process title changes interfere with test isolation")
def set_process_title_from_argv() -> bool:
    """Set process title from argv, preserving the invoked command name.

    Extracts the command name from sys.argv[0] (including symlinks) and
    formats it with the remaining arguments to create a clean process title.

    This handles symlinks correctly - if you have a symlink 'whatever' pointing
    to 'pyvider', and run 'whatever run --config foo.yml', the process title
    will be 'whatever run --config foo.yml'.

    Automatically disabled in test mode (via @skip_in_test_mode decorator) to
    prevent interference with test isolation and parallel test execution.

    Returns:
        True if the title was set successfully (or skipped in test mode),
        False if setproctitle is not available

    Example:
        >>> # If invoked as: pyvider run --config foo.yml
        >>> from provide.foundation.process import set_process_title_from_argv
        >>> set_process_title_from_argv()
        True
        >>> # Process will show as "pyvider run --config foo.yml" in ps/top

        >>> # If invoked via symlink: whatever run
        >>> # (where whatever -> pyvider)
        >>> set_process_title_from_argv()
        True
        >>> # Process will show as "whatever run" in ps/top

    """
    from pathlib import Path
    import sys

    # Extract command name from argv[0] - preserves symlink names
    cmd_name = Path(sys.argv[0]).name
    args = sys.argv[1:]

    # Format title as "cmd arg1 arg2..."
    title = f"{cmd_name} {' '.join(args)}" if args else cmd_name

    return set_process_title(title)

stream

stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any
) -> Iterator[str]

Stream command output line by line.

Parameters:

Name Type Description Default
cmd list[str]

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables

None
timeout float | None

Command timeout in seconds

None
stream_stderr bool

Whether to stream stderr (merged with stdout)

False
**kwargs Any

Additional arguments passed to subprocess.Popen

{}

Yields:

Type Description
str

Lines of output from the command

Raises:

Type Description
ProcessError

If command fails

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/sync/streaming.py
def stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any,
) -> Iterator[str]:
    """Stream command output line by line.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables
        timeout: Command timeout in seconds
        stream_stderr: Whether to stream stderr (merged with stdout)
        **kwargs: Additional arguments passed to subprocess.Popen

    Yields:
        Lines of output from the command

    Raises:
        ProcessError: If command fails
        ProcessTimeoutError: If timeout is exceeded

    """
    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)

    run_env = prepare_environment(env)
    cwd = normalize_cwd(cwd)

    try:
        process = subprocess.Popen(
            cmd,
            cwd=cwd,
            env=run_env,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT if stream_stderr else subprocess.PIPE,
            text=True,
            bufsize=1,
            universal_newlines=True,
            **kwargs,
        )

        try:
            if timeout is not None:
                yield from _stream_with_timeout(process, timeout, cmd_str)
                returncode = process.poll() or process.wait()
            else:
                yield from _stream_without_timeout(process)
                returncode = process.wait()

            if returncode != 0:
                raise ProcessError(
                    f"Command failed with exit code {returncode}: {cmd_str}",
                    code="PROCESS_STREAM_FAILED",
                    command=cmd_str,
                    return_code=returncode,
                )

        finally:
            _cleanup_process(process)

    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError):
            raise
        log.error("💥 Stream failed", command=cmd_str, error=str(e))
        raise ProcessError(
            f"Failed to stream command: {cmd_str}",
            code="PROCESS_STREAM_ERROR",
            command=cmd_str,
        ) from e

wait_for_process_output async

wait_for_process_output(
    process: ManagedProcess,
    expected_parts: list[str],
    timeout: float = DEFAULT_PROCESS_WAIT_TIMEOUT,
    buffer_size: int = 1024,
) -> str

Wait for specific output pattern from a managed process.

This utility reads from a process stdout until a specific pattern (e.g., handshake string with multiple pipe separators) appears.

Parameters:

Name Type Description Default
process ManagedProcess

The managed process to read from

required
expected_parts list[str]

List of expected parts/separators in the output

required
timeout float

Maximum time to wait for the pattern

DEFAULT_PROCESS_WAIT_TIMEOUT
buffer_size int

Size of read buffer

1024

Returns:

Type Description
str

The complete output buffer containing the expected pattern

Raises:

Type Description
ProcessError

If process exits unexpectedly

TimeoutError

If pattern is not found within timeout

Source code in provide/foundation/process/lifecycle/monitoring.py
async def wait_for_process_output(
    process: ManagedProcess,
    expected_parts: list[str],
    timeout: float = DEFAULT_PROCESS_WAIT_TIMEOUT,
    buffer_size: int = 1024,
) -> str:
    """Wait for specific output pattern from a managed process.

    This utility reads from a process stdout until a specific pattern
    (e.g., handshake string with multiple pipe separators) appears.

    Args:
        process: The managed process to read from
        expected_parts: List of expected parts/separators in the output
        timeout: Maximum time to wait for the pattern
        buffer_size: Size of read buffer

    Returns:
        The complete output buffer containing the expected pattern

    Raises:
        ProcessError: If process exits unexpectedly
        TimeoutError: If pattern is not found within timeout

    """
    loop = asyncio.get_event_loop()
    start_time = loop.time()
    buffer = ""
    last_exit_code = None

    log.debug(
        "⏳ Waiting for process output pattern",
        expected_parts=expected_parts,
        timeout=timeout,
    )

    while (loop.time() - start_time) < timeout:
        # Check if process has exited
        if not process.is_running():
            last_exit_code = process.returncode
            log.debug("Process exited", returncode=last_exit_code)
            return await _handle_exited_process(process, buffer, expected_parts, last_exit_code)

        # Try to read line from running process
        buffer, pattern_found = await _try_read_process_line(process, buffer, expected_parts)
        if pattern_found:
            return buffer

        # Short sleep to avoid busy loop
        await asyncio.sleep(0.01)

    # Final check of buffer before timeout error
    if _check_pattern_found(buffer, expected_parts):
        return buffer

    # If process exited with 0 but we didn't get output, that's still a timeout
    log.error(
        "Timeout waiting for pattern",
        expected_parts=expected_parts,
        buffer=buffer[:200],
        last_exit_code=last_exit_code,
    )
    raise TimeoutError(f"Expected pattern {expected_parts} not found within {timeout}s timeout")