ManagedProcess(
command: list[str],
*,
cwd: str | Path | None = None,
env: Mapping[str, str] | None = None,
capture_output: bool = True,
text_mode: bool = False,
bufsize: int = 0,
stderr_relay: bool = True,
**kwargs: Any
)
A managed subprocess with lifecycle support, monitoring, and graceful shutdown.
This class wraps subprocess.Popen with additional functionality for:
- Environment management
- Output streaming and monitoring
- Health checks and process monitoring
- Graceful shutdown with timeouts
- Background stderr relaying
Initialize a ManagedProcess.
Source code in provide/foundation/process/lifecycle/managed.py
| def __init__(
self,
command: list[str],
*,
cwd: str | Path | None = None,
env: Mapping[str, str] | None = None,
capture_output: bool = True,
text_mode: bool = False,
bufsize: int = 0,
stderr_relay: bool = True,
**kwargs: Any,
) -> None:
"""Initialize a ManagedProcess."""
self.command = command
self.cwd = str(cwd) if cwd else None
self.capture_output = capture_output
self.text_mode = text_mode
self.bufsize = bufsize
self.stderr_relay = stderr_relay
self.kwargs = kwargs
# Build environment - always start with current environment
self._env = os.environ.copy()
# Clean coverage-related environment variables from subprocess
# to prevent interference with output capture during testing
for key in list(self._env.keys()):
if key.startswith(("COVERAGE", "COV_CORE")):
self._env.pop(key, None)
# Merge in any provided environment variables
if env:
self._env.update(env)
# Process state
self._process: subprocess.Popen[bytes] | None = None
self._stderr_thread: threading.Thread | None = None
self._started = False
log.debug(
"๐ ManagedProcess initialized",
command=" ".join(command),
cwd=self.cwd,
)
|
Attributes
pid
property
Get the process ID, if process is running.
process
property
process: Popen[bytes] | None
Get the underlying subprocess.Popen instance.
returncode
property
Get the return code, if process has terminated.
Functions
__enter__
__enter__() -> ManagedProcess
Context manager entry.
Source code in provide/foundation/process/lifecycle/managed.py
| def __enter__(self) -> ManagedProcess:
"""Context manager entry."""
self.launch()
return self
|
__exit__
__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None
Context manager exit with cleanup.
Source code in provide/foundation/process/lifecycle/managed.py
| def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
"""Context manager exit with cleanup."""
self.terminate_gracefully()
self.cleanup()
|
cleanup
Clean up process resources.
Source code in provide/foundation/process/lifecycle/managed.py
| def cleanup(self) -> None:
"""Clean up process resources."""
# Join stderr relay thread
if self._stderr_thread and self._stderr_thread.is_alive():
# Give it a moment to finish
self._stderr_thread.join(timeout=1.0)
# Clean up process reference
if self._process:
self._process = None
log.debug("๐งน Managed process cleanup completed")
|
is_running
Check if the process is currently running.
Source code in provide/foundation/process/lifecycle/managed.py
| def is_running(self) -> bool:
"""Check if the process is currently running."""
if not self._process:
return False
return self._process.poll() is None
|
launch
Launch the managed process.
Raises:
Source code in provide/foundation/process/lifecycle/managed.py
| @resilient(
error_mapper=lambda e: ProcessError(f"Failed to launch process: {e}")
if not isinstance(e, (ProcessError, StateError))
else e,
)
def launch(self) -> None:
"""Launch the managed process.
Raises:
ProcessError: If the process fails to launch
StateError: If the process is already started
"""
if self._started:
raise StateError(
"Process has already been started", code="PROCESS_ALREADY_STARTED", process_state="started"
)
log.debug("๐ Launching managed process", command=" ".join(self.command))
self._process = subprocess.Popen(
self.command,
cwd=self.cwd,
env=self._env,
stdout=subprocess.PIPE if self.capture_output else None,
stderr=subprocess.PIPE if self.capture_output else None,
text=self.text_mode,
bufsize=self.bufsize,
**self.kwargs,
)
self._started = True
log.info(
"๐ Managed process started successfully",
pid=self._process.pid,
command=" ".join(self.command),
)
# Start stderr relay if enabled
if self.stderr_relay and self._process.stderr:
self._start_stderr_relay()
|
read_char_async
async
read_char_async(
timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT,
) -> str
Read a single character from stdout asynchronously.
Source code in provide/foundation/process/lifecycle/managed.py
| async def read_char_async(self, timeout: float = DEFAULT_PROCESS_READCHAR_TIMEOUT) -> str:
"""Read a single character from stdout asynchronously."""
if not self._process or not self._process.stdout:
raise ProcessError("Process not running or stdout not available")
loop = asyncio.get_event_loop()
# Use functools.partial to avoid closure issues
read_func = functools.partial(self._process.stdout.read, 1)
try:
char_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
if not char_data:
return ""
return (
char_data.decode("utf-8", errors="replace") if isinstance(char_data, bytes) else str(char_data)
)
except TimeoutError as e:
log.debug("Character read timeout on managed process stdout")
raise TimeoutError(f"Character read timeout after {timeout}s") from e
|
read_line_async
async
read_line_async(
timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT,
) -> str
Read a line from stdout asynchronously with timeout.
Source code in provide/foundation/process/lifecycle/managed.py
| async def read_line_async(self, timeout: float = DEFAULT_PROCESS_READLINE_TIMEOUT) -> str:
"""Read a line from stdout asynchronously with timeout."""
if not self._process or not self._process.stdout:
raise ProcessError("Process not running or stdout not available")
loop = asyncio.get_event_loop()
# Use functools.partial to avoid closure issues
read_func = functools.partial(self._process.stdout.readline)
try:
line_data = await asyncio.wait_for(loop.run_in_executor(None, read_func), timeout=timeout)
return (
line_data.decode("utf-8", errors="replace") if isinstance(line_data, bytes) else str(line_data)
).strip()
except TimeoutError as e:
log.debug("Read timeout on managed process stdout")
raise TimeoutError(f"Read timeout after {timeout}s") from e
|
terminate_gracefully
terminate_gracefully(
timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT,
) -> bool
Terminate the process gracefully with a timeout.
Parameters:
| Name |
Type |
Description |
Default |
timeout
|
float
|
Maximum time to wait for graceful termination
|
DEFAULT_PROCESS_TERMINATE_TIMEOUT
|
Returns:
| Type |
Description |
bool
|
True if process terminated gracefully, False if force-killed
|
Source code in provide/foundation/process/lifecycle/managed.py
| def terminate_gracefully(self, timeout: float = DEFAULT_PROCESS_TERMINATE_TIMEOUT) -> bool:
"""Terminate the process gracefully with a timeout.
Args:
timeout: Maximum time to wait for graceful termination
Returns:
True if process terminated gracefully, False if force-killed
"""
if not self._process:
return True
if self._process.poll() is not None:
log.debug("Process already terminated", returncode=self._process.returncode)
return True
log.debug("๐ Terminating managed process gracefully", pid=self._process.pid)
try:
# Send SIGTERM
self._process.terminate()
log.debug("๐ Sent SIGTERM to process", pid=self._process.pid)
# Wait for graceful termination
try:
self._process.wait(timeout=timeout)
log.info("๐ Process terminated gracefully", pid=self._process.pid)
return True
except subprocess.TimeoutExpired:
log.warning(
"๐ Process did not terminate gracefully, force killing",
pid=self._process.pid,
)
# Force kill
self._process.kill()
try:
self._process.wait(timeout=2.0)
log.info("๐ Process force killed", pid=self._process.pid)
return False
except subprocess.TimeoutExpired:
log.error("๐ Process could not be killed", pid=self._process.pid)
return False
except Exception as e:
log.error(
"๐โ Error terminating process",
pid=self._process.pid if self._process else None,
error=str(e),
trace=traceback.format_exc(),
)
return False
|