Skip to content

Streams

provide.foundation.streams

TODO: Add module docstring.

Functions

close_log_streams

close_log_streams() -> None

Close file log streams and reset to stderr.

Source code in provide/foundation/streams/file.py
def close_log_streams() -> None:
    """Close file log streams and reset to stderr."""
    import provide.foundation.streams.core as core_module

    # Import here to avoid circular dependency
    from provide.foundation.testmode.detection import is_in_click_testing

    with _get_stream_lock():
        if core_module._LOG_FILE_HANDLE:
            with contextlib.suppress(Exception):
                core_module._LOG_FILE_HANDLE.close()
            core_module._LOG_FILE_HANDLE = None

        # Don't reset stream to stderr if we're in Click testing context
        if not is_in_click_testing():
            core_module._PROVIDE_LOG_STREAM = sys.stderr
            # Reconfigure structlog to use stderr
            _reconfigure_structlog_stream()

configure_file_logging

configure_file_logging(log_file_path: str | None) -> None

Configure file logging if a path is provided.

Parameters:

Name Type Description Default
log_file_path str | None

Path to log file, or None to disable file logging

required
Source code in provide/foundation/streams/file.py
def configure_file_logging(log_file_path: str | None) -> None:
    """Configure file logging if a path is provided.

    Args:
        log_file_path: Path to log file, or None to disable file logging

    """
    # Import core module to modify the actual global variables
    import provide.foundation.streams.core as core_module

    # Import here to avoid circular dependency
    from provide.foundation.testmode.detection import is_in_click_testing

    with _get_stream_lock():
        # Don't modify streams if we're in Click testing context
        if is_in_click_testing():
            return
        # Close existing file handle if it exists
        if (
            core_module._LOG_FILE_HANDLE
            and core_module._LOG_FILE_HANDLE is not core_module._PROVIDE_LOG_STREAM
        ):
            with contextlib.suppress(Exception):
                core_module._LOG_FILE_HANDLE.close()
            core_module._LOG_FILE_HANDLE = None

        # Check if we're in testing mode
        is_test_stream = core_module._PROVIDE_LOG_STREAM is not sys.stderr and not isinstance(
            core_module._PROVIDE_LOG_STREAM,
            io.TextIOWrapper,
        )

        if log_file_path:
            try:
                Path(log_file_path).parent.mkdir(parents=True, exist_ok=True)
                core_module._LOG_FILE_HANDLE = Path(log_file_path).open("a", encoding="utf-8", buffering=1)  # noqa: SIM115
                core_module._PROVIDE_LOG_STREAM = core_module._LOG_FILE_HANDLE
                # Reconfigure structlog to use the new file stream
                _reconfigure_structlog_stream()
            except Exception as e:
                # Log error to stderr and fall back
                _safe_error_output(f"Failed to open log file {log_file_path}: {e}")
                core_module._PROVIDE_LOG_STREAM = get_safe_stderr()
                # Reconfigure structlog to use stderr fallback
                _reconfigure_structlog_stream()
        elif not is_test_stream:
            core_module._PROVIDE_LOG_STREAM = get_safe_stderr()
            # Reconfigure structlog to use stderr
            _reconfigure_structlog_stream()

ensure_stderr_default

ensure_stderr_default() -> None

Ensure the log stream defaults to stderr if it's stdout.

Source code in provide/foundation/streams/core.py
def ensure_stderr_default() -> None:
    """Ensure the log stream defaults to stderr if it's stdout."""
    global _PROVIDE_LOG_STREAM
    if not _get_stream_lock().acquire(timeout=5.0):
        # If we can't acquire the lock within 5 seconds, skip the operation
        return
    try:
        if _PROVIDE_LOG_STREAM is sys.stdout:
            _PROVIDE_LOG_STREAM = sys.stderr
    finally:
        _get_stream_lock().release()

flush_log_streams

flush_log_streams() -> None

Flush all log streams.

Source code in provide/foundation/streams/file.py
def flush_log_streams() -> None:
    """Flush all log streams."""
    import provide.foundation.streams.core as core_module

    with _get_stream_lock():
        if core_module._LOG_FILE_HANDLE:
            try:
                core_module._LOG_FILE_HANDLE.flush()
            except Exception as e:
                _safe_error_output(f"Failed to flush log file handle: {e}")

get_console_stream

get_console_stream() -> TextIO

Get the appropriate console stream for output.

Source code in provide/foundation/streams/console.py
def get_console_stream() -> TextIO:
    """Get the appropriate console stream for output."""
    return get_log_stream()

get_log_stream

get_log_stream() -> TextIO

Get the current log stream.

Note: High complexity is intentional for robust stream handling across test/prod.

Source code in provide/foundation/streams/core.py
def get_log_stream() -> TextIO:  # noqa: C901
    """Get the current log stream.

    Note: High complexity is intentional for robust stream handling across test/prod.
    """
    global _PROVIDE_LOG_STREAM
    if not _get_stream_lock().acquire(timeout=5.0):
        # If we can't acquire the lock within 5 seconds, return stderr as fallback
        return sys.stderr
    try:
        # Only validate real streams, not mock objects
        # Check if this is a real stream that can be closed
        if (
            hasattr(_PROVIDE_LOG_STREAM, "closed")
            and not hasattr(_PROVIDE_LOG_STREAM, "_mock_name")  # Skip mock objects
            and _PROVIDE_LOG_STREAM.closed
        ):
            # Stream is closed, reset to stderr
            try:
                if hasattr(sys, "stderr") and sys.stderr is not None:
                    if not (hasattr(sys.stderr, "closed") and sys.stderr.closed):
                        _PROVIDE_LOG_STREAM = sys.stderr
                    else:
                        # Even sys.stderr is closed, use a safe fallback
                        try:
                            import io

                            _PROVIDE_LOG_STREAM = io.StringIO()  # Safe fallback for parallel tests
                        except ImportError:
                            # Last resort - raise exception
                            raise ValueError("All available streams are closed") from None
                else:
                    # Create a safe fallback stream
                    try:
                        import io

                        _PROVIDE_LOG_STREAM = io.StringIO()
                    except ImportError:
                        raise ValueError("No stderr available") from None
            except (OSError, AttributeError) as e:
                # Handle specific stream-related errors
                # NOTE: Cannot use Foundation logger here as it depends on these same streams (circular dependency)
                # Using perr() which is safe as it doesn't depend on Foundation logger
                try:
                    from provide.foundation.console.output import perr

                    perr(
                        f"[STREAM ERROR] Stream operation failed, falling back to stderr: "
                        f"{e.__class__.__name__}: {e}"
                    )
                except Exception:
                    # Generic catch intentional: perr() import/call failed.
                    # Try direct stderr write as absolute last resort.
                    try:
                        sys.stderr.write(
                            f"[STREAM ERROR] Stream operation failed: {e.__class__.__name__}: {e}\n"
                        )
                        sys.stderr.flush()
                    except Exception:
                        # Generic catch intentional: Even stderr.write() failed.
                        # Suppress all errors - this is low-level stream infrastructure.
                        pass

                # Try stderr one more time before giving up
                if hasattr(sys, "stderr") and sys.stderr is not None:
                    try:
                        if not (hasattr(sys.stderr, "closed") and sys.stderr.closed):
                            _PROVIDE_LOG_STREAM = sys.stderr
                        else:
                            # Even stderr is closed - this is a critical error
                            raise ValueError("All available streams are closed (including stderr)") from e
                    except (OSError, AttributeError):
                        # stderr is also problematic - this is a critical error
                        raise ValueError("Stream validation failed - stderr unavailable") from e
                else:
                    # No stderr available - this is a critical error
                    raise ValueError("Stream validation failed - no stderr available") from e

        return _PROVIDE_LOG_STREAM
    finally:
        _get_stream_lock().release()

is_tty

is_tty() -> bool

Check if the current stream is a TTY (terminal).

Source code in provide/foundation/streams/console.py
def is_tty() -> bool:
    """Check if the current stream is a TTY (terminal)."""
    stream = get_log_stream()
    return hasattr(stream, "isatty") and stream.isatty()

reset_streams

reset_streams() -> None

Reset all stream state (for testing).

Source code in provide/foundation/streams/file.py
def reset_streams() -> None:
    """Reset all stream state (for testing)."""
    # Import here to avoid circular dependency
    from provide.foundation.testmode.detection import is_in_click_testing

    # Don't reset streams if we're in Click testing context
    if not is_in_click_testing():
        close_log_streams()

set_log_stream_for_testing

set_log_stream_for_testing(stream: TextIO | None) -> None

Set the log stream for testing purposes.

This function not only sets the stream but also reconfigures structlog if it's already configured to ensure logs actually go to the test stream.

Source code in provide/foundation/streams/core.py
def set_log_stream_for_testing(stream: TextIO | None) -> None:
    """Set the log stream for testing purposes.

    This function not only sets the stream but also reconfigures structlog
    if it's already configured to ensure logs actually go to the test stream.
    """
    from provide.foundation.testmode.detection import should_allow_stream_redirect

    global _PROVIDE_LOG_STREAM
    if not _get_stream_lock().acquire(timeout=5.0):
        # If we can't acquire the lock within 5 seconds, skip the operation
        return
    try:
        # Use testmode to determine if redirect is allowed
        if not should_allow_stream_redirect():
            return

        _PROVIDE_LOG_STREAM = stream if stream is not None else sys.stderr

        # Reconfigure structlog to use the new stream
        _reconfigure_structlog_stream()
    finally:
        _get_stream_lock().release()

supports_color

supports_color() -> bool

Check if the current stream supports color output.

Source code in provide/foundation/streams/console.py
def supports_color() -> bool:
    """Check if the current stream supports color output."""
    config = get_stream_config()

    if config.no_color:
        return False

    if config.force_color:
        return True

    # Check if we're in a TTY
    return is_tty()

write_to_console

write_to_console(
    message: str,
    stream: TextIO | None = None,
    log_fallback: bool = True,
) -> None

Write a message to the console stream.

Parameters:

Name Type Description Default
message str

Message to write

required
stream TextIO | None

Optional specific stream to write to, defaults to current console stream

None
log_fallback bool

Whether to log when falling back to stderr

True
Source code in provide/foundation/streams/console.py
def write_to_console(message: str, stream: TextIO | None = None, log_fallback: bool = True) -> None:
    """Write a message to the console stream.

    Args:
        message: Message to write
        stream: Optional specific stream to write to, defaults to current console stream
        log_fallback: Whether to log when falling back to stderr

    """
    target_stream = stream or get_console_stream()
    try:
        target_stream.write(message)
        target_stream.flush()
    except Exception as e:
        # Log the fallback for debugging if requested
        if log_fallback:
            try:
                from provide.foundation.hub.foundation import get_foundation_logger

                get_foundation_logger().debug(
                    "Console write failed, falling back to stderr",
                    error=str(e),
                    error_type=type(e).__name__,
                    stream_type=type(target_stream).__name__,
                )
            except Exception as log_error:
                # Foundation logger failed, fall back to direct stderr logging
                try:
                    sys.stderr.write(
                        f"[DEBUG] Console write failed (logging also failed): "
                        f"{e.__class__.__name__}: {e} (log_error: {log_error.__class__.__name__})\n"
                    )
                    sys.stderr.flush()
                except Exception:
                    # Even stderr failed - this is a critical system failure, we cannot continue
                    raise RuntimeError(
                        "Critical system failure: unable to write debug information to any stream"
                    ) from e

        # Fallback to stderr - if this fails, let it propagate
        sys.stderr.write(message)
        sys.stderr.flush()