Skip to content

Index

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

provide.foundation.process.lifecycle

Classes

ManagedProcess

ManagedProcess(
    command: list[str],
    *,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    text_mode: bool = False,
    bufsize: int = 0,
    stderr_relay: bool = True,
    **kwargs: Any,
)

A managed subprocess with lifecycle support, monitoring, and graceful shutdown.

This class wraps subprocess.Popen with additional functionality for: - Environment management - Output streaming and monitoring - Health checks and process monitoring - Graceful shutdown with timeouts - Background stderr relaying

Initialize a ManagedProcess.

Source code in provide/foundation/process/lifecycle/managed.py
def __init__(
    self,
    command: list[str],
    *,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    text_mode: bool = False,
    bufsize: int = 0,
    stderr_relay: bool = True,
    **kwargs: Any,
) -> None:
    """Initialize a ManagedProcess."""
    self.command = command
    self.cwd = str(cwd) if cwd else None
    self.capture_output = capture_output
    self.text_mode = text_mode
    self.bufsize = bufsize
    self.stderr_relay = stderr_relay
    self.kwargs = kwargs

    # Build environment - always start with current environment
    self._env = os.environ.copy()

    # Clean coverage-related environment variables from subprocess
    # to prevent interference with output capture during testing
    for key in list(self._env.keys()):
        if key.startswith(("COVERAGE", "COV_CORE")):
            self._env.pop(key, None)

    # Merge in any provided environment variables
    if env:
        self._env.update(env)

    # Process state
    self._process: subprocess.Popen[bytes] | None = None
    self._stderr_thread: threading.Thread | None = None
    self._started = False

    log.debug(
        "๐Ÿš€ ManagedProcess initialized",
        command=" ".join(command),
        cwd=self.cwd,
    )
Attributes
pid property
pid: int | None

Get the process ID, if process is running.

process property
process: Popen[bytes] | None

Get the underlying subprocess.Popen instance.

returncode property
returncode: int | None

Get the return code, if process has terminated.

Functions
__enter__
__enter__() -> ManagedProcess

Context manager entry.

Source code in provide/foundation/process/lifecycle/managed.py
def __enter__(self) -> ManagedProcess:
    """Context manager entry."""
    self.launch()
    return self
__exit__
__exit__(exc_type: Any, exc_val: Any, _exc_tb: Any) -> None

Context manager exit with cleanup.

Source code in provide/foundation/process/lifecycle/managed.py
def __exit__(self, exc_type: Any, exc_val: Any, _exc_tb: Any) -> None:
    """Context manager exit with cleanup."""
    self.terminate_gracefully()
    self.cleanup()
cleanup
cleanup() -> None

Clean up process resources.

Source code in provide/foundation/process/lifecycle/managed.py
def cleanup(self) -> None:
    """Clean up process resources."""
    # Join stderr relay thread
    if self._stderr_thread and self._stderr_thread.is_alive():
        # Give it a moment to finish
        self._stderr_thread.join(timeout=1.0)

    # Clean up process reference
    if self._process:
        self._process = None

    log.debug("๐Ÿงน Managed process cleanup completed")
is_running
is_running() -> bool

Check if the process is currently running.

Source code in provide/foundation/process/lifecycle/managed.py
def is_running(self) -> bool:
    """Check if the process is currently running."""
    if not self._process:
        return False
    return self._process.poll() is None
launch
launch() -> None

Launch the managed process.

Raises:

Type Description
ProcessError

If the process fails to launch

StateError

If the process is already started

Source code in provide/foundation/process/lifecycle/managed.py
@resilient(
    error_mapper=lambda e: ProcessError(f"Failed to launch process: {e}")
    if not isinstance(e, (ProcessError, StateError))
    else e,
)
def launch(self) -> None:
    """Launch the managed process.

    Raises:
        ProcessError: If the process fails to launch
        StateError: If the process is already started

    """
    if self._started:
        raise StateError(
            "Process has already been started", code="PROCESS_ALREADY_STARTED", process_state="started"
        )

    log.debug("๐Ÿš€ Launching managed process", command=" ".join(self.command))

    self._process = subprocess.Popen(
        self.command,
        cwd=self.cwd,
        env=self._env,
        stdout=subprocess.PIPE if self.capture_output else None,
        stderr=subprocess.PIPE if self.capture_output else None,
        text=self.text_mode,
        bufsize=self.bufsize,
        **self.kwargs,
    )
    self._started = True

    log.info(
        "๐Ÿš€ Managed process started successfully",
        pid=self._process.pid,
        command=" ".join(self.command),
    )

    # Start stderr relay if enabled
    if self.stderr_relay and self._process.stderr:
        self._start_stderr_relay()
read_char_async async
read_char_async(
    timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT,
) -> str

Read a single character from stdout asynchronously.

Source code in provide/foundation/process/lifecycle/managed.py
async def read_char_async(self, timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT) -> str:
    """Read a single character from stdout asynchronously."""
    if not self._process or not self._process.stdout:
        raise ProcessError("Process not running or stdout not available")

    loop = asyncio.get_event_loop()

    # Use functools.partial to avoid closure issues
    read_func = functools.partial(self._process.stdout.read, 1)

    try:
        char_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
        if not char_data:
            return ""
        return (
            char_data.decode("utf-8", errors="replace") if isinstance(char_data, bytes) else str(char_data)
        )
    except TimeoutError as e:
        log.debug("Character read timeout on managed process stdout")
        raise TimeoutError(f"Character read timeout after {timeout}s") from e
read_line_async async
read_line_async(
    timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT,
) -> str

Read a line from stdout asynchronously with timeout.

Source code in provide/foundation/process/lifecycle/managed.py
async def read_line_async(self, timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT) -> str:
    """Read a line from stdout asynchronously with timeout."""
    if not self._process or not self._process.stdout:
        raise ProcessError("Process not running or stdout not available")

    loop = asyncio.get_event_loop()

    # Use functools.partial to avoid closure issues
    read_func = functools.partial(self._process.stdout.readline)

    try:
        line_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
        return (
            line_data.decode("utf-8", errors="replace") if isinstance(line_data, bytes) else str(line_data)
        ).strip()
    except TimeoutError as e:
        log.debug("Read timeout on managed process stdout")
        raise TimeoutError(f"Read timeout after {timeout}s") from e
terminate_gracefully
terminate_gracefully(
    timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT,
) -> bool

Terminate the process gracefully with a timeout.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for graceful termination

DEFAULT_PROCESS_TERMINATE_TIMEOUT

Returns:

Type Description
bool

True if process terminated gracefully, False if force-killed

Source code in provide/foundation/process/lifecycle/managed.py
def terminate_gracefully(self, timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT) -> bool:
    """Terminate the process gracefully with a timeout.

    Args:
        timeout: Maximum time to wait for graceful termination

    Returns:
        True if process terminated gracefully, False if force-killed

    """
    if not self._process:
        return True

    if self._process.poll() is not None:
        log.debug("Process already terminated", returncode=self._process.returncode)
        return True

    log.debug("๐Ÿ›‘ Terminating managed process gracefully", pid=self._process.pid)

    try:
        # Send SIGTERM
        self._process.terminate()
        log.debug("๐Ÿ›‘ Sent SIGTERM to process", pid=self._process.pid)

        # Wait for graceful termination
        try:
            self._process.wait(timeout=timeout)
            log.info("๐Ÿ›‘ Process terminated gracefully", pid=self._process.pid)
            return True
        except subprocess.TimeoutExpired:
            log.warning(
                "๐Ÿ›‘ Process did not terminate gracefully, force killing",
                pid=self._process.pid,
            )
            # Force kill
            self._process.kill()
            try:
                self._process.wait(timeout=2.0)
                log.info("๐Ÿ›‘ Process force killed", pid=self._process.pid)
                return False
            except subprocess.TimeoutExpired:
                log.error("๐Ÿ›‘ Process could not be killed", pid=self._process.pid)
                return False

    except Exception as e:
        log.error(
            "๐Ÿ›‘โŒ Error terminating process",
            pid=self._process.pid if self._process else None,
            error=str(e),
            trace=traceback.format_exc(),
        )
        return False

Functions

wait_for_process_output async

wait_for_process_output(
    process: ManagedProcess,
    expected_parts: list[str],
    timeout: float = DEFAULT_PROCESS_WAIT_TIMEOUT,
    buffer_size: int = 1024,
) -> str

Wait for specific output pattern from a managed process.

This utility reads from a process stdout until a specific pattern (e.g., handshake string with multiple pipe separators) appears.

Parameters:

Name Type Description Default
process ManagedProcess

The managed process to read from

required
expected_parts list[str]

List of expected parts/separators in the output

required
timeout float

Maximum time to wait for the pattern

DEFAULT_PROCESS_WAIT_TIMEOUT
buffer_size int

Size of read buffer

1024

Returns:

Type Description
str

The complete output buffer containing the expected pattern

Raises:

Type Description
ProcessError

If process exits unexpectedly

TimeoutError

If pattern is not found within timeout

Source code in provide/foundation/process/lifecycle/monitoring.py
async def wait_for_process_output(
    process: ManagedProcess,
    expected_parts: list[str],
    timeout: float = DEFAULT_PROCESS_WAIT_TIMEOUT,
    buffer_size: int = 1024,
) -> str:
    """Wait for specific output pattern from a managed process.

    This utility reads from a process stdout until a specific pattern
    (e.g., handshake string with multiple pipe separators) appears.

    Args:
        process: The managed process to read from
        expected_parts: List of expected parts/separators in the output
        timeout: Maximum time to wait for the pattern
        buffer_size: Size of read buffer

    Returns:
        The complete output buffer containing the expected pattern

    Raises:
        ProcessError: If process exits unexpectedly
        TimeoutError: If pattern is not found within timeout

    """
    loop = asyncio.get_event_loop()
    start_time = loop.time()
    buffer = ""
    last_exit_code = None

    log.debug(
        "โณ Waiting for process output pattern",
        expected_parts=expected_parts,
        timeout=timeout,
    )

    while (loop.time() - start_time) < timeout:
        # Check if process has exited
        if not process.is_running():
            last_exit_code = process.returncode
            log.debug("Process exited", returncode=last_exit_code)
            return await _handle_exited_process(process, buffer, expected_parts, last_exit_code)

        # Try to read line from running process
        buffer, pattern_found = await _try_read_process_line(process, buffer, expected_parts)
        if pattern_found:
            return buffer

        # Short sleep to avoid busy loop
        await asyncio.sleep(0.01)

    # Final check of buffer before timeout error
    if _check_pattern_found(buffer, expected_parts):
        return buffer

    # If process exited with 0 but we didn't get output, that's still a timeout
    log.error(
        "Timeout waiting for pattern",
        expected_parts=expected_parts,
        buffer=buffer[:200],
        last_exit_code=last_exit_code,
    )
    raise TimeoutError(f"Expected pattern {expected_parts} not found within {timeout}s timeout")