Skip to content

Managed

provide.foundation.process.lifecycle.managed

TODO: Add module docstring.

Classes

ManagedProcess

ManagedProcess(
    command: list[str],
    *,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    text_mode: bool = False,
    bufsize: int = 0,
    stderr_relay: bool = True,
    **kwargs: Any
)

A managed subprocess with lifecycle support, monitoring, and graceful shutdown.

This class wraps subprocess.Popen with additional functionality for: - Environment management - Output streaming and monitoring - Health checks and process monitoring - Graceful shutdown with timeouts - Background stderr relaying

Initialize a ManagedProcess.

Source code in provide/foundation/process/lifecycle/managed.py
def __init__(
    self,
    command: list[str],
    *,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    text_mode: bool = False,
    bufsize: int = 0,
    stderr_relay: bool = True,
    **kwargs: Any,
) -> None:
    """Initialize a ManagedProcess."""
    self.command = command
    self.cwd = str(cwd) if cwd else None
    self.capture_output = capture_output
    self.text_mode = text_mode
    self.bufsize = bufsize
    self.stderr_relay = stderr_relay
    self.kwargs = kwargs

    # Build environment - always start with current environment
    self._env = os.environ.copy()

    # Clean coverage-related environment variables from subprocess
    # to prevent interference with output capture during testing
    for key in list(self._env.keys()):
        if key.startswith(("COVERAGE", "COV_CORE")):
            self._env.pop(key, None)

    # Merge in any provided environment variables
    if env:
        self._env.update(env)

    # Process state
    self._process: subprocess.Popen[bytes] | None = None
    self._stderr_thread: threading.Thread | None = None
    self._started = False

    log.debug(
        "๐Ÿš€ ManagedProcess initialized",
        command=" ".join(command),
        cwd=self.cwd,
    )
Attributes
pid property
pid: int | None

Get the process ID, if process is running.

process property
process: Popen[bytes] | None

Get the underlying subprocess.Popen instance.

returncode property
returncode: int | None

Get the return code, if process has terminated.

Functions
__enter__
__enter__() -> ManagedProcess

Context manager entry.

Source code in provide/foundation/process/lifecycle/managed.py
def __enter__(self) -> ManagedProcess:
    """Context manager entry."""
    self.launch()
    return self
__exit__
__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Context manager exit with cleanup.

Source code in provide/foundation/process/lifecycle/managed.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Context manager exit with cleanup."""
    self.terminate_gracefully()
    self.cleanup()
cleanup
cleanup() -> None

Clean up process resources.

Source code in provide/foundation/process/lifecycle/managed.py
def cleanup(self) -> None:
    """Clean up process resources."""
    # Join stderr relay thread
    if self._stderr_thread and self._stderr_thread.is_alive():
        # Give it a moment to finish
        self._stderr_thread.join(timeout=1.0)

    # Clean up process reference
    if self._process:
        self._process = None

    log.debug("๐Ÿงน Managed process cleanup completed")
is_running
is_running() -> bool

Check if the process is currently running.

Source code in provide/foundation/process/lifecycle/managed.py
def is_running(self) -> bool:
    """Check if the process is currently running."""
    if not self._process:
        return False
    return self._process.poll() is None
launch
launch() -> None

Launch the managed process.

Raises:

Type Description
ProcessError

If the process fails to launch

StateError

If the process is already started

Source code in provide/foundation/process/lifecycle/managed.py
@resilient(
    error_mapper=lambda e: ProcessError(f"Failed to launch process: {e}")
    if not isinstance(e, (ProcessError, StateError))
    else e,
)
def launch(self) -> None:
    """Launch the managed process.

    Raises:
        ProcessError: If the process fails to launch
        StateError: If the process is already started

    """
    if self._started:
        raise StateError(
            "Process has already been started", code="PROCESS_ALREADY_STARTED", process_state="started"
        )

    log.debug("๐Ÿš€ Launching managed process", command=" ".join(self.command))

    self._process = subprocess.Popen(
        self.command,
        cwd=self.cwd,
        env=self._env,
        stdout=subprocess.PIPE if self.capture_output else None,
        stderr=subprocess.PIPE if self.capture_output else None,
        text=self.text_mode,
        bufsize=self.bufsize,
        **self.kwargs,
    )
    self._started = True

    log.info(
        "๐Ÿš€ Managed process started successfully",
        pid=self._process.pid,
        command=" ".join(self.command),
    )

    # Start stderr relay if enabled
    if self.stderr_relay and self._process.stderr:
        self._start_stderr_relay()
read_char_async async
read_char_async(
    timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT,
) -> str

Read a single character from stdout asynchronously.

Source code in provide/foundation/process/lifecycle/managed.py
async def read_char_async(self, timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT) -> str:
    """Read a single character from stdout asynchronously."""
    if not self._process or not self._process.stdout:
        raise ProcessError("Process not running or stdout not available")

    loop = asyncio.get_event_loop()

    # Use functools.partial to avoid closure issues
    read_func = functools.partial(self._process.stdout.read, 1)

    try:
        char_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
        if not char_data:
            return ""
        return (
            char_data.decode("utf-8", errors="replace") if isinstance(char_data, bytes) else str(char_data)
        )
    except TimeoutError as e:
        log.debug("Character read timeout on managed process stdout")
        raise TimeoutError(f"Character read timeout after {timeout}s") from e
read_line_async async
read_line_async(
    timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT,
) -> str

Read a line from stdout asynchronously with timeout.

Source code in provide/foundation/process/lifecycle/managed.py
async def read_line_async(self, timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT) -> str:
    """Read a line from stdout asynchronously with timeout."""
    if not self._process or not self._process.stdout:
        raise ProcessError("Process not running or stdout not available")

    loop = asyncio.get_event_loop()

    # Use functools.partial to avoid closure issues
    read_func = functools.partial(self._process.stdout.readline)

    try:
        line_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
        return (
            line_data.decode("utf-8", errors="replace") if isinstance(line_data, bytes) else str(line_data)
        ).strip()
    except TimeoutError as e:
        log.debug("Read timeout on managed process stdout")
        raise TimeoutError(f"Read timeout after {timeout}s") from e
terminate_gracefully
terminate_gracefully(
    timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT,
) -> bool

Terminate the process gracefully with a timeout.

Parameters:

Name Type Description Default
timeout float

Maximum time to wait for graceful termination

DEFAULT_PROCESS_TERMINATE_TIMEOUT

Returns:

Type Description
bool

True if process terminated gracefully, False if force-killed

Source code in provide/foundation/process/lifecycle/managed.py
def terminate_gracefully(self, timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT) -> bool:
    """Terminate the process gracefully with a timeout.

    Args:
        timeout: Maximum time to wait for graceful termination

    Returns:
        True if process terminated gracefully, False if force-killed

    """
    if not self._process:
        return True

    if self._process.poll() is not None:
        log.debug("Process already terminated", returncode=self._process.returncode)
        return True

    log.debug("๐Ÿ›‘ Terminating managed process gracefully", pid=self._process.pid)

    try:
        # Send SIGTERM
        self._process.terminate()
        log.debug("๐Ÿ›‘ Sent SIGTERM to process", pid=self._process.pid)

        # Wait for graceful termination
        try:
            self._process.wait(timeout=timeout)
            log.info("๐Ÿ›‘ Process terminated gracefully", pid=self._process.pid)
            return True
        except subprocess.TimeoutExpired:
            log.warning(
                "๐Ÿ›‘ Process did not terminate gracefully, force killing",
                pid=self._process.pid,
            )
            # Force kill
            self._process.kill()
            try:
                self._process.wait(timeout=2.0)
                log.info("๐Ÿ›‘ Process force killed", pid=self._process.pid)
                return False
            except subprocess.TimeoutExpired:
                log.error("๐Ÿ›‘ Process could not be killed", pid=self._process.pid)
                return False

    except Exception as e:
        log.error(
            "๐Ÿ›‘โŒ Error terminating process",
            pid=self._process.pid if self._process else None,
            error=str(e),
            trace=traceback.format_exc(),
        )
        return False

Functions