Skip to content

Index

provide.foundation.process.sync

TODO: Add module docstring.

Functions

run

run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    text: bool = True,
    input: str | bytes | None = None,
    shell: bool = False,
    **kwargs: Any
) -> CompletedProcess

Run a subprocess command with consistent error handling and logging.

Parameters:

Name Type Description Default
cmd list[str] | str

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables (if None, uses current environment)

None
capture_output bool

Whether to capture stdout/stderr

True
check bool

Whether to raise exception on non-zero exit

True
timeout float | None

Command timeout in seconds

None
text bool

Whether to decode output as text

True
input str | bytes | None

Input to send to the process

None
shell bool

Whether to run command through shell

False
**kwargs Any

Additional arguments passed to subprocess.run

{}

Returns:

Type Description
CompletedProcess

CompletedProcess with results

Raises:

Type Description
ProcessError

If command fails and check=True

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/sync/execution.py
def run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    text: bool = True,
    input: str | bytes | None = None,
    shell: bool = False,
    **kwargs: Any,
) -> CompletedProcess:
    """Run a subprocess command with consistent error handling and logging.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables (if None, uses current environment)
        capture_output: Whether to capture stdout/stderr
        check: Whether to raise exception on non-zero exit
        timeout: Command timeout in seconds
        text: Whether to decode output as text
        input: Input to send to the process
        shell: Whether to run command through shell
        **kwargs: Additional arguments passed to subprocess.run

    Returns:
        CompletedProcess with results

    Raises:
        ProcessError: If command fails and check=True
        ProcessTimeoutError: If timeout is exceeded

    """
    # Mask secrets in command for logging
    from provide.foundation.security import mask_command

    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
    masked_cmd = mask_command(cmd_str)
    log.trace("🚀 Running command", command=masked_cmd, cwd=str(cwd) if cwd else None)

    # Validate command type and shell parameter
    validate_command_type(cmd, shell)

    # Prepare environment
    run_env = prepare_environment(env)

    # Normalize cwd
    cwd = normalize_cwd(cwd)

    # Prepare input
    subprocess_input = prepare_input(input, text)

    try:
        # Prepare command for subprocess
        subprocess_cmd = cmd_str if shell else cmd

        result = subprocess.run(
            subprocess_cmd,
            cwd=cwd,
            env=run_env,
            capture_output=capture_output,
            text=text,
            input=subprocess_input,
            timeout=timeout,
            check=False,  # We'll handle the check ourselves
            shell=shell,  # nosec B602 - Shell usage validated by caller context
            **kwargs,
        )

        completed = CompletedProcess(
            args=cmd if isinstance(cmd, list) else [cmd],
            returncode=result.returncode,
            stdout=result.stdout if capture_output else "",
            stderr=result.stderr if capture_output else "",
            cwd=cwd,
            env=dict(env) if env else None,  # Only store caller overrides, not full run_env
        )

        if check and result.returncode != 0:
            log.error(
                "❌ Command failed",
                command=cmd_str,
                returncode=result.returncode,
                stderr=result.stderr if capture_output else None,
            )
            raise ProcessError(
                f"Command failed with exit code {result.returncode}: {cmd_str}",
                code="PROCESS_COMMAND_FAILED",
                command=cmd_str,
                return_code=result.returncode,
                stdout=result.stdout if capture_output else None,
                stderr=result.stderr if capture_output else None,
            )

        log.debug(
            command=cmd_str,
            returncode=result.returncode,
        )

        return completed

    except subprocess.TimeoutExpired as e:
        log.error(
            "⏱️ Command timed out",
            command=cmd_str,
            timeout=timeout,
        )
        raise ProcessTimeoutError(
            f"Command timed out after {timeout}s: {cmd_str}",
            code="PROCESS_TIMEOUT",
            command=cmd_str,
            timeout_seconds=timeout,
        ) from e
    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError):
            raise
        log.error(
            "💥 Command execution failed",
            command=cmd_str,
            error=str(e),
        )
        raise ProcessError(
            f"Failed to execute command: {cmd_str}",
            code="PROCESS_EXECUTION_FAILED",
            command=cmd_str,
        ) from e

run_simple

run_simple(
    cmd: list[str],
    cwd: str | Path | None = None,
    **kwargs: Any
) -> str

Simple wrapper for run that returns stdout as a string.

Parameters:

Name Type Description Default
cmd list[str]

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
**kwargs Any

Additional arguments passed to run

{}

Returns:

Type Description
str

Stdout as a stripped string

Raises:

Type Description
ProcessError

If command fails

Source code in provide/foundation/process/sync/execution.py
def run_simple(
    cmd: list[str],
    cwd: str | Path | None = None,
    **kwargs: Any,
) -> str:
    """Simple wrapper for run that returns stdout as a string.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        **kwargs: Additional arguments passed to run

    Returns:
        Stdout as a stripped string

    Raises:
        ProcessError: If command fails

    """
    result = run(cmd, cwd=cwd, capture_output=True, check=True, **kwargs)
    return result.stdout.strip()

stream

stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any
) -> Iterator[str]

Stream command output line by line.

Parameters:

Name Type Description Default
cmd list[str]

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables

None
timeout float | None

Command timeout in seconds

None
stream_stderr bool

Whether to stream stderr (merged with stdout)

False
**kwargs Any

Additional arguments passed to subprocess.Popen

{}

Yields:

Type Description
str

Lines of output from the command

Raises:

Type Description
ProcessError

If command fails

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/sync/streaming.py
def stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any,
) -> Iterator[str]:
    """Stream command output line by line.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables
        timeout: Command timeout in seconds
        stream_stderr: Whether to stream stderr (merged with stdout)
        **kwargs: Additional arguments passed to subprocess.Popen

    Yields:
        Lines of output from the command

    Raises:
        ProcessError: If command fails
        ProcessTimeoutError: If timeout is exceeded

    """
    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)

    run_env = prepare_environment(env)
    cwd = normalize_cwd(cwd)

    try:
        process = subprocess.Popen(
            cmd,
            cwd=cwd,
            env=run_env,
            stdout=subprocess.PIPE,
            stderr=subprocess.STDOUT if stream_stderr else subprocess.PIPE,
            text=True,
            bufsize=1,
            universal_newlines=True,
            **kwargs,
        )

        try:
            if timeout is not None:
                yield from _stream_with_timeout(process, timeout, cmd_str)
                returncode = process.poll() or process.wait()
            else:
                yield from _stream_without_timeout(process)
                returncode = process.wait()

            if returncode != 0:
                raise ProcessError(
                    f"Command failed with exit code {returncode}: {cmd_str}",
                    code="PROCESS_STREAM_FAILED",
                    command=cmd_str,
                    return_code=returncode,
                )

        finally:
            _cleanup_process(process)

    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError):
            raise
        log.error("💥 Stream failed", command=cmd_str, error=str(e))
        raise ProcessError(
            f"Failed to stream command: {cmd_str}",
            code="PROCESS_STREAM_ERROR",
            command=cmd_str,
        ) from e