Skip to content

Index

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

provide.foundation.process.aio

Functions

async_run async

async_run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    input: bytes | None = None,
    shell: bool = False,
    **kwargs: Any,
) -> CompletedProcess

Run a subprocess command asynchronously.

Parameters:

Name Type Description Default
cmd list[str] | str

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables (if None, uses current environment)

None
capture_output bool

Whether to capture stdout/stderr

True
check bool

Whether to raise exception on non-zero exit

True
timeout float | None

Command timeout in seconds

None
input bytes | None

Input to send to the process

None
shell bool

Whether to execute via shell

False
**kwargs Any

Additional subprocess arguments

{}

Returns:

Type Description
CompletedProcess

CompletedProcess with results

Raises:

Type Description
ValidationError

If command type and shell parameter mismatch

ProcessError

If command fails and check=True

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/aio/execution.py
async def async_run(
    cmd: list[str] | str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    input: bytes | None = None,
    shell: bool = False,
    **kwargs: Any,
) -> CompletedProcess:
    """Run a subprocess command asynchronously.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables (if None, uses current environment)
        capture_output: Whether to capture stdout/stderr
        check: Whether to raise exception on non-zero exit
        timeout: Command timeout in seconds
        input: Input to send to the process
        shell: Whether to execute via shell
        **kwargs: Additional subprocess arguments

    Returns:
        CompletedProcess with results

    Raises:
        ValidationError: If command type and shell parameter mismatch
        ProcessError: If command fails and check=True
        ProcessTimeoutError: If timeout is exceeded
    """
    # Mask secrets in command for logging
    from provide.foundation.security import mask_command

    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)
    masked_cmd = mask_command(cmd_str)
    log.trace("๐Ÿš€ Running async command", command=masked_cmd, cwd=str(cwd) if cwd else None)

    # Validate command type and shell parameter
    if isinstance(cmd, str) and not shell:
        raise ValidationError(
            "String commands require explicit shell=True for security. "
            "Use async_shell() for shell commands or pass a list for direct execution.",
            code="INVALID_COMMAND_TYPE",
            expected="list[str] or (str with shell=True)",
            actual="str without shell=True",
        )

    # Prepare environment and convert Path to string
    run_env = prepare_environment(env)
    cwd_str = str(cwd) if isinstance(cwd, Path) else cwd

    process = None
    try:
        # Create subprocess
        process = await create_subprocess(cmd, cmd_str, shell, cwd_str, run_env, capture_output, input, kwargs)

        try:
            # Communicate with process
            stdout, stderr = await communicate_with_timeout(process, input, timeout, cmd_str)

            # Create completed process
            completed = create_completed_process_result(cmd, process, stdout, stderr, cwd_str, env, run_env)

            # Check for success
            check_process_success(process, cmd_str, capture_output, completed.stdout, completed.stderr, check)

            log.debug(
                command=cmd_str,
                returncode=process.returncode,
            )

            return completed
        finally:
            await cleanup_process(process)

    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError | ValidationError):
            raise

        log.error(
            "๐Ÿ’ฅ Async command execution failed",
            command=cmd_str,
            error=str(e),
        )
        raise ProcessError(
            f"Failed to execute async command: {cmd_str}",
            code="PROCESS_ASYNC_EXECUTION_FAILED",
            command=cmd_str,
        ) from e

async_shell async

async_shell(
    cmd: str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    allow_shell_features: bool = DEFAULT_SHELL_ALLOW_FEATURES,
    **kwargs: Any,
) -> CompletedProcess

Run a shell command asynchronously with safety validation.

WARNING: This function uses shell=True. By default, shell metacharacters are DENIED to prevent command injection. Use allow_shell_features=True only with trusted input.

Parameters:

Name Type Description Default
cmd str

Shell command string

required
cwd str | Path | None

Working directory

None
env Mapping[str, str] | None

Environment variables

None
capture_output bool

Whether to capture output

True
check bool

Whether to raise on non-zero exit

True
timeout float | None

Command timeout in seconds

None
allow_shell_features bool

Allow shell metacharacters (default: False)

DEFAULT_SHELL_ALLOW_FEATURES
**kwargs Any

Additional subprocess arguments

{}

Returns:

Type Description
CompletedProcess

CompletedProcess with results

Raises:

Type Description
ValidationError

If cmd is not a string

ShellFeatureError

If shell features used without explicit permission

Security Note

For maximum security, use async_run() with a list of arguments instead. Only set allow_shell_features=True if you fully trust the command source.

Safe: await async_shell("ls -la", allow_shell_features=False) # OK Unsafe: await async_shell(user_input) # Will raise ShellFeatureError if metacharacters present Risky: await async_shell(user_input, allow_shell_features=True) # DO NOT DO THIS

Source code in provide/foundation/process/aio/shell.py
async def async_shell(
    cmd: str,
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    capture_output: bool = True,
    check: bool = True,
    timeout: float | None = None,
    allow_shell_features: bool = DEFAULT_SHELL_ALLOW_FEATURES,
    **kwargs: Any,
) -> CompletedProcess:
    """Run a shell command asynchronously with safety validation.

    WARNING: This function uses shell=True. By default, shell metacharacters
    are DENIED to prevent command injection. Use allow_shell_features=True
    only with trusted input.

    Args:
        cmd: Shell command string
        cwd: Working directory
        env: Environment variables
        capture_output: Whether to capture output
        check: Whether to raise on non-zero exit
        timeout: Command timeout in seconds
        allow_shell_features: Allow shell metacharacters (default: False)
        **kwargs: Additional subprocess arguments

    Returns:
        CompletedProcess with results

    Raises:
        ValidationError: If cmd is not a string
        ShellFeatureError: If shell features used without explicit permission

    Security Note:
        For maximum security, use async_run() with a list of arguments instead.
        Only set allow_shell_features=True if you fully trust the command source.

        Safe:   await async_shell("ls -la", allow_shell_features=False)  # OK
        Unsafe: await async_shell(user_input)  # Will raise ShellFeatureError if metacharacters present
        Risky:  await async_shell(user_input, allow_shell_features=True)  # DO NOT DO THIS

    """
    if not isinstance(cmd, str):
        raise ValidationError(
            "Shell command must be a string",
            code="INVALID_SHELL_COMMAND",
            expected_type="str",
            actual_type=type(cmd).__name__,
        )

    # Validate shell safety - raises ShellFeatureError if dangerous patterns found
    validate_shell_safety(cmd, allow_shell_features=allow_shell_features)

    return await async_run(
        cmd,
        cwd=cwd,
        env=env,
        capture_output=capture_output,
        check=check,
        timeout=timeout,
        shell=True,  # nosec B604 - Intentional shell usage with validation
        **kwargs,
    )

async_stream async

async_stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any,
) -> AsyncIterator[str]

Stream command output line by line asynchronously.

Parameters:

Name Type Description Default
cmd list[str]

Command and arguments as a list

required
cwd str | Path | None

Working directory for the command

None
env Mapping[str, str] | None

Environment variables

None
timeout float | None

Command timeout in seconds

None
stream_stderr bool

Whether to merge stderr into stdout

False
**kwargs Any

Additional subprocess arguments

{}

Yields:

Type Description
AsyncIterator[str]

Lines of output from the command

Raises:

Type Description
ProcessError

If command fails

ProcessTimeoutError

If timeout is exceeded

Source code in provide/foundation/process/aio/streaming.py
async def async_stream(
    cmd: list[str],
    cwd: str | Path | None = None,
    env: Mapping[str, str] | None = None,
    timeout: float | None = None,
    stream_stderr: bool = False,
    **kwargs: Any,
) -> AsyncIterator[str]:
    """Stream command output line by line asynchronously.

    Args:
        cmd: Command and arguments as a list
        cwd: Working directory for the command
        env: Environment variables
        timeout: Command timeout in seconds
        stream_stderr: Whether to merge stderr into stdout
        **kwargs: Additional subprocess arguments

    Yields:
        Lines of output from the command

    Raises:
        ProcessError: If command fails
        ProcessTimeoutError: If timeout is exceeded
    """
    cmd_str = " ".join(cmd) if isinstance(cmd, list) else str(cmd)

    # Prepare environment and working directory
    run_env = prepare_environment(env)
    cwd_str = str(cwd) if isinstance(cwd, Path) else cwd

    process = None
    try:
        # Create subprocess
        process = await create_stream_subprocess(cmd, cwd_str, run_env, stream_stderr, kwargs)

        try:
            # Stream output with optional timeout
            if timeout:
                lines = await read_lines_with_timeout(process, timeout, cmd_str)
                await process.wait()
                check_stream_exit_code(process, cmd_str)

                # Yield lines as they were read
                for line in lines:
                    yield line
            else:
                # No timeout - stream normally using readline for proper line buffering
                if process.stdout:
                    while True:
                        line = await process.stdout.readline()
                        if not line:
                            break
                        yield line.decode(errors="replace").rstrip()

                # Wait for process to complete and check exit code
                await process.wait()
                check_stream_exit_code(process, cmd_str)

        finally:
            await cleanup_stream_process(process)

    except Exception as e:
        if isinstance(e, ProcessError | ProcessTimeoutError):
            raise

        log.error("๐Ÿ’ฅ Async stream failed", command=cmd_str, error=str(e))
        raise ProcessError(
            f"Failed to stream async command: {cmd_str}",
            code="PROCESS_ASYNC_STREAM_ERROR",
            command=cmd_str,
        ) from e