Skip to content

Monitoring

provide.foundation.process.lifecycle.monitoring

TODO: Add module docstring.

Classes

Functions

wait_for_process_output async

wait_for_process_output(
    process: ManagedProcess,
    expected_parts: list[str],
    timeout: float = DEFAULT_PROCESS_WAIT_TIMEOUT,
    buffer_size: int = 1024,
) -> str

Wait for specific output pattern from a managed process.

This utility reads from a process stdout until a specific pattern (e.g., handshake string with multiple pipe separators) appears.

Parameters:

Name Type Description Default
process ManagedProcess

The managed process to read from

required
expected_parts list[str]

List of expected parts/separators in the output

required
timeout float

Maximum time to wait for the pattern

DEFAULT_PROCESS_WAIT_TIMEOUT
buffer_size int

Size of read buffer

1024

Returns:

Type Description
str

The complete output buffer containing the expected pattern

Raises:

Type Description
ProcessError

If process exits unexpectedly

TimeoutError

If pattern is not found within timeout

Source code in provide/foundation/process/lifecycle/monitoring.py
async def wait_for_process_output(
    process: ManagedProcess,
    expected_parts: list[str],
    timeout: float = DEFAULT_PROCESS_WAIT_TIMEOUT,
    buffer_size: int = 1024,
) -> str:
    """Wait for specific output pattern from a managed process.

    This utility reads from a process stdout until a specific pattern
    (e.g., handshake string with multiple pipe separators) appears.

    Args:
        process: The managed process to read from
        expected_parts: List of expected parts/separators in the output
        timeout: Maximum time to wait for the pattern
        buffer_size: Size of read buffer

    Returns:
        The complete output buffer containing the expected pattern

    Raises:
        ProcessError: If process exits unexpectedly
        TimeoutError: If pattern is not found within timeout

    """
    loop = asyncio.get_event_loop()
    start_time = loop.time()
    buffer = ""
    last_exit_code = None

    log.debug(
        "⏳ Waiting for process output pattern",
        expected_parts=expected_parts,
        timeout=timeout,
    )

    while (loop.time() - start_time) < timeout:
        # Check if process has exited
        if not process.is_running():
            last_exit_code = process.returncode
            log.debug("Process exited", returncode=last_exit_code)
            return await _handle_exited_process(process, buffer, expected_parts, last_exit_code)

        # Try to read line from running process
        buffer, pattern_found = await _try_read_process_line(process, buffer, expected_parts)
        if pattern_found:
            return buffer

        # Short sleep to avoid busy loop
        await asyncio.sleep(0.01)

    # Final check of buffer before timeout error
    if _check_pattern_found(buffer, expected_parts):
        return buffer

    # If process exited with 0 but we didn't get output, that's still a timeout
    log.error(
        "Timeout waiting for pattern",
        expected_parts=expected_parts,
        buffer=buffer[:200],
        last_exit_code=last_exit_code,
    )
    raise TimeoutError(f"Expected pattern {expected_parts} not found within {timeout}s timeout")