Skip to content

Streaming

provide.foundation.process.sync.streaming

TODO: Add module docstring.

Classes

Functions

stream

stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any
) -> Iterator[str]

Stream command output line by line.

Parameters:

Name Type Description Default
cmd list[str]

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables

None
timeout float | None

Command timeout in seconds

None
stream_stderr bool

Whether to stream stderr (merged with stdout)

False
**kwargs Any

Additional arguments passed to subprocess.Popen

{}

Yields:

Type Description
str

Lines of output from the command

Raises:

Type Description
ProcessError

If command fails

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/sync/streaming.py
def stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any,
) -> Iterator[str]:
    """Stream command output line by line.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables
        timeout: Command timeout in seconds
        stream_stderr: Whether to stream stderr (merged with stdout)
        **kwargs: Additional arguments passed to subprocess.Popen

    Yields:
        Lines of output from the command

    Raises:
        ProcessError: If command fails
        ProcessTimeoutError: If timeout is exceeded

    """
    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)

    run_env = prepare_environment(env)
    cwd = normalize_cwd(cwd)

    try:
        process = subprocess.Popen(
            cmd,
            cwd=cwd,
            env=run_env,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT if stream_stderr else subprocess.PIPE,
            text=True,
            bufsize=1,
            universal_newlines=True,
            **kwargs,
        )

        try:
            if timeout is not None:
                yield from _stream_with_timeout(process, timeout, cmd_str)
                returncode = process.poll() or process.wait()
            else:
                yield from _stream_without_timeout(process)
                returncode = process.wait()

            if returncode != 0:
                raise ProcessError(
                    f"Command failed with exit code {returncode}: {cmd_str}",
                    code="PROCESS_STREAM_FAILED",
                    command=cmd_str,
                    return_code=returncode,
                )

        finally:
            _cleanup_process(process)

    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError):
            raise
        log.error("💥 Stream failed", command=cmd_str, error=str(e))
        raise ProcessError(
            f"Failed to stream command: {cmd_str}",
            code="PROCESS_STREAM_ERROR",
            command=cmd_str,
        ) from e