Skip to content

Core

provide.foundation.streams.core

TODO: Add module docstring.

Functions

ensure_stderr_default

ensure_stderr_default() -> None

Ensure the log stream defaults to stderr if it's stdout.

Source code in provide/foundation/streams/core.py
def ensure_stderr_default() -> None:
    """Ensure the log stream defaults to stderr if it's stdout."""
    global _PROVIDE_LOG_STREAM
    if not _get_stream_lock().acquire(timeout=5.0):
        # If we can't acquire the lock within 5 seconds, skip the operation
        return
    try:
        if _PROVIDE_LOG_STREAM is sys.stdout:
            _PROVIDE_LOG_STREAM = sys.stderr
    finally:
        _get_stream_lock().release()

get_log_stream

get_log_stream() -> TextIO

Get the current log stream.

Note: High complexity is intentional for robust stream handling across test/prod.

Source code in provide/foundation/streams/core.py
def get_log_stream() -> TextIO:  # noqa: C901
    """Get the current log stream.

    Note: High complexity is intentional for robust stream handling across test/prod.
    """
    global _PROVIDE_LOG_STREAM
    if not _get_stream_lock().acquire(timeout=5.0):
        # If we can't acquire the lock within 5 seconds, return stderr as fallback
        return sys.stderr
    try:
        # Only validate real streams, not mock objects
        # Check if this is a real stream that can be closed
        if (
            hasattr(_PROVIDE_LOG_STREAM, "closed")
            and not hasattr(_PROVIDE_LOG_STREAM, "_mock_name")  # Skip mock objects
            and _PROVIDE_LOG_STREAM.closed
        ):
            # Stream is closed, reset to stderr
            try:
                if hasattr(sys, "stderr") and sys.stderr is not None:
                    if not (hasattr(sys.stderr, "closed") and sys.stderr.closed):
                        _PROVIDE_LOG_STREAM = sys.stderr
                    else:
                        # Even sys.stderr is closed, use a safe fallback
                        try:
                            import io

                            _PROVIDE_LOG_STREAM = io.StringIO()  # Safe fallback for parallel tests
                        except ImportError:
                            # Last resort - raise exception
                            raise ValueError("All available streams are closed") from None
                else:
                    # Create a safe fallback stream
                    try:
                        import io

                        _PROVIDE_LOG_STREAM = io.StringIO()
                    except ImportError:
                        raise ValueError("No stderr available") from None
            except (OSError, AttributeError) as e:
                # Handle specific stream-related errors
                # NOTE: Cannot use Foundation logger here as it depends on these same streams (circular dependency)
                # Using perr() which is safe as it doesn't depend on Foundation logger
                try:
                    from provide.foundation.console.output import perr

                    perr(
                        f"[STREAM ERROR] Stream operation failed, falling back to stderr: "
                        f"{e.__class__.__name__}: {e}"
                    )
                except Exception:
                    # Generic catch intentional: perr() import/call failed.
                    # Try direct stderr write as absolute last resort.
                    try:
                        sys.stderr.write(
                            f"[STREAM ERROR] Stream operation failed: {e.__class__.__name__}: {e}\n"
                        )
                        sys.stderr.flush()
                    except Exception:
                        # Generic catch intentional: Even stderr.write() failed.
                        # Suppress all errors - this is low-level stream infrastructure.
                        pass

                # Try stderr one more time before giving up
                if hasattr(sys, "stderr") and sys.stderr is not None:
                    try:
                        if not (hasattr(sys.stderr, "closed") and sys.stderr.closed):
                            _PROVIDE_LOG_STREAM = sys.stderr
                        else:
                            # Even stderr is closed - this is a critical error
                            raise ValueError("All available streams are closed (including stderr)") from e
                    except (OSError, AttributeError):
                        # stderr is also problematic - this is a critical error
                        raise ValueError("Stream validation failed - stderr unavailable") from e
                else:
                    # No stderr available - this is a critical error
                    raise ValueError("Stream validation failed - no stderr available") from e

        return _PROVIDE_LOG_STREAM
    finally:
        _get_stream_lock().release()

set_log_stream_for_testing

set_log_stream_for_testing(stream: TextIO | None) -> None

Set the log stream for testing purposes.

This function not only sets the stream but also reconfigures structlog if it's already configured to ensure logs actually go to the test stream.

Source code in provide/foundation/streams/core.py
def set_log_stream_for_testing(stream: TextIO | None) -> None:
    """Set the log stream for testing purposes.

    This function not only sets the stream but also reconfigures structlog
    if it's already configured to ensure logs actually go to the test stream.
    """
    from provide.foundation.testmode.detection import should_allow_stream_redirect

    global _PROVIDE_LOG_STREAM
    if not _get_stream_lock().acquire(timeout=5.0):
        # If we can't acquire the lock within 5 seconds, skip the operation
        return
    try:
        # Use testmode to determine if redirect is allowed
        if not should_allow_stream_redirect():
            return

        _PROVIDE_LOG_STREAM = stream if stream is not None else sys.stderr

        # Reconfigure structlog to use the new stream
        _reconfigure_structlog_stream()
    finally:
        _get_stream_lock().release()