Skip to content

Index

provide.foundation

TODO: Add module docstring.

Classes

AsyncCircuitBreaker

AsyncCircuitBreaker(
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: (
        type[Exception] | tuple[type[Exception], ...]
    ) = Exception,
    time_source: Callable[[], float] | None = None,
)

Asynchronous circuit breaker for resilience patterns.

Uses asyncio.Lock for async-safe state management. For synchronous code, use SyncCircuitBreaker instead.

Initialize the asynchronous circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit

5
recovery_timeout float

Seconds to wait before attempting recovery

30.0
expected_exception type[Exception] | tuple[type[Exception], ...]

Exception type(s) to catch

Exception
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
Source code in provide/foundation/resilience/circuit_async.py
def __init__(
    self,
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: type[Exception] | tuple[type[Exception], ...] = Exception,
    time_source: Callable[[], float] | None = None,
) -> None:
    """Initialize the asynchronous circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        recovery_timeout: Seconds to wait before attempting recovery
        expected_exception: Exception type(s) to catch
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
    """
    self.failure_threshold = failure_threshold
    self.recovery_timeout = recovery_timeout
    self.expected_exception = expected_exception
    self._time_source = time_source or time.time
    # Create lock directly - asyncio.Lock() can be created outside event loop
    # and will bind when first awaited
    self._lock = asyncio.Lock()
    # Initialize state
    self._state = CircuitState.CLOSED
    self._failure_count = 0
    self._last_failure_time: float | None = None
Functions
call async
call(func: Callable, *args: Any, **kwargs: Any) -> Any

Execute an asynchronous function through the circuit breaker.

Parameters:

Name Type Description Default
func Callable

Async callable to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
Any

Result from func

Raises:

Type Description
RuntimeError

If circuit is open

Exception

Whatever exception func raises

Source code in provide/foundation/resilience/circuit_async.py
async def call(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
    """Execute an asynchronous function through the circuit breaker.

    Args:
        func: Async callable to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from func

    Raises:
        RuntimeError: If circuit is open
        Exception: Whatever exception func raises
    """
    async with self._lock:
        # Check state directly to avoid deadlock
        if self._state == CircuitState.OPEN and not self._can_attempt_recovery():
            raise RuntimeError("Circuit breaker is open")
        # If HALF_OPEN or recovery possible, we proceed with the call

    try:
        result = await func(*args, **kwargs)
        await self._on_success()
        return result
    except self.expected_exception as e:
        await self._on_failure()
        raise e
failure_count async
failure_count() -> int

Get the current failure count.

Returns:

Type Description
int

Current failure count

Source code in provide/foundation/resilience/circuit_async.py
async def failure_count(self) -> int:
    """Get the current failure count.

    Returns:
        Current failure count
    """
    async with self._lock:
        return self._failure_count
reset async
reset() -> None

Reset the circuit breaker to its initial state.

Source code in provide/foundation/resilience/circuit_async.py
async def reset(self) -> None:
    """Reset the circuit breaker to its initial state."""
    async with self._lock:
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
state async
state() -> CircuitState

Get the current state of the circuit breaker.

Returns:

Type Description
CircuitState

Current circuit state

Source code in provide/foundation/resilience/circuit_async.py
async def state(self) -> CircuitState:
    """Get the current state of the circuit breaker.

    Returns:
        Current circuit state
    """
    async with self._lock:
        if self._state == CircuitState.OPEN and self._can_attempt_recovery():
            # This is a view of the state; the actual transition happens in call()
            return CircuitState.HALF_OPEN
        return self._state

BackoffStrategy

Bases: str, Enum

Backoff strategies for retry delays.

CLIContext

Bases: RuntimeConfig

Runtime context for CLI execution and state management.

Manages CLI-specific settings, output formatting, and runtime state during command execution. Supports loading from files, environment variables, and programmatic updates during CLI command execution.

Attributes
logger property
logger: Any

Get or create a logger for this context.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Post-initialization hook.

Source code in provide/foundation/context/core.py
def __attrs_post_init__(self) -> None:
    """Post-initialization hook."""
copy
copy() -> CLIContext

Create a deep copy of the context.

Source code in provide/foundation/context/core.py
def copy(self) -> CLIContext:
    """Create a deep copy of the context."""
    return copy.deepcopy(self)
freeze
freeze() -> None

Freeze context to prevent further modifications.

Source code in provide/foundation/context/core.py
def freeze(self) -> None:
    """Freeze context to prevent further modifications."""
    # Note: With attrs, we can't dynamically freeze an instance
    # This is kept for API compatibility but does nothing
    self._frozen = True
from_dict classmethod
from_dict(
    data: dict[str, Any],
    source: ConfigSource = ConfigSource.RUNTIME,
) -> CLIContext

Create context from dictionary.

Parameters:

Name Type Description Default
data dict[str, Any]

Dictionary with context values

required
source ConfigSource

Source of the configuration data

RUNTIME

Returns:

Type Description
CLIContext

New CLIContext instance

Source code in provide/foundation/context/core.py
@classmethod
def from_dict(cls, data: dict[str, Any], source: ConfigSource = ConfigSource.RUNTIME) -> CLIContext:
    """Create context from dictionary.

    Args:
        data: Dictionary with context values
        source: Source of the configuration data

    Returns:
        New CLIContext instance

    """
    kwargs = {}

    if "log_level" in data:
        kwargs["log_level"] = data["log_level"]
    if "profile" in data:
        kwargs["profile"] = data["profile"]
    if "debug" in data:
        kwargs["debug"] = data["debug"]
    if "json_output" in data:
        kwargs["json_output"] = data["json_output"]
    if data.get("config_file"):
        kwargs["config_file"] = Path(data["config_file"])
    if data.get("log_file"):
        kwargs["log_file"] = Path(data["log_file"])
    if "log_format" in data:
        kwargs["log_format"] = data["log_format"]
    if "no_color" in data:
        kwargs["no_color"] = data["no_color"]
    if "no_emoji" in data:
        kwargs["no_emoji"] = data["no_emoji"]

    return cls(**kwargs)
load_config
load_config(path: str | Path) -> None

Load configuration from file.

Supports TOML, JSON, and YAML formats based on file extension.

Parameters:

Name Type Description Default
path str | Path

Path to configuration file

required
Source code in provide/foundation/context/core.py
def load_config(self, path: str | Path) -> None:
    """Load configuration from file.

    Supports TOML, JSON, and YAML formats based on file extension.

    Args:
        path: Path to configuration file

    """
    # CLIContext is not frozen, so we can modify it
    path = Path(path)
    data = self._load_config_data(path)
    self._update_from_config_data(data)
    self._validate()
merge
merge(
    other: CLIContext, override_defaults: bool = False
) -> CLIContext

Merge with another context, with other taking precedence.

Parameters:

Name Type Description Default
other CLIContext

CLIContext to merge with

required
override_defaults bool

If False, only override if other's value differs from its class default

False

Returns:

Type Description
CLIContext

New merged CLIContext instance

Source code in provide/foundation/context/core.py
def merge(self, other: CLIContext, override_defaults: bool = False) -> CLIContext:
    """Merge with another context, with other taking precedence.

    Args:
        other: CLIContext to merge with
        override_defaults: If False, only override if other's value differs from its class default

    Returns:
        New merged CLIContext instance

    """
    merged_data = self.to_dict()
    other_data = other.to_dict()

    if override_defaults:
        self._merge_with_override(merged_data, other_data)
    else:
        self._merge_without_override(merged_data, other_data)

    return CLIContext.from_dict(merged_data)
save_config
save_config(path: str | Path) -> None

Save configuration to file.

Format is determined by file extension.

Parameters:

Name Type Description Default
path str | Path

Path to save configuration

required
Source code in provide/foundation/context/core.py
def save_config(self, path: str | Path) -> None:
    """Save configuration to file.

    Format is determined by file extension.

    Args:
        path: Path to save configuration

    """
    path = Path(path)
    data = self.to_dict()

    # Remove None values for cleaner output
    data = {k: v for k, v in data.items() if v is not None}

    if path.suffix in (".toml", ".tml"):
        write_toml(path, data)
    elif path.suffix == ".json":
        write_json(path, data, indent=2)
    elif path.suffix in (".yaml", ".yml"):
        write_yaml(path, data, default_flow_style=False)
    elif not path.suffix:
        raise ConfigurationError(
            f"Unsupported config format: no file extension for {path}",
            code="MISSING_FILE_EXTENSION",
            path=str(path),
        )
    else:
        raise ConfigurationError(
            f"Unsupported config format: {path.suffix}",
            code="UNSUPPORTED_CONFIG_FORMAT",
            path=str(path),
            suffix=path.suffix,
        )
to_dict
to_dict(include_sensitive: bool = True) -> dict[str, Any]

Convert context to dictionary.

Source code in provide/foundation/context/core.py
def to_dict(self, include_sensitive: bool = True) -> dict[str, Any]:
    """Convert context to dictionary."""
    return {
        "log_level": self.log_level,
        "profile": self.profile,
        "debug": self.debug,
        "json_output": self.json_output,
        "config_file": str(self.config_file) if self.config_file else None,
        "log_file": str(self.log_file) if self.log_file else None,
        "log_format": self.log_format,
        "no_color": self.no_color,
        "no_emoji": self.no_emoji,
    }
update_from_env
update_from_env(prefix: str = 'PROVIDE') -> None

Update context from environment variables.

Parameters:

Name Type Description Default
prefix str

Environment variable prefix (default: PROVIDE)

'PROVIDE'
Source code in provide/foundation/context/core.py
def update_from_env(self, prefix: str = "PROVIDE") -> None:
    """Update context from environment variables.

    Args:
        prefix: Environment variable prefix (default: PROVIDE)

    """
    if self._frozen:
        raise StateError(
            "Context is frozen and cannot be modified",
            code="CONTEXT_FROZEN",
            context_type=type(self).__name__,
        )

    # Create default instance and environment instance
    default_ctx = self.__class__()  # All defaults
    env_ctx = self.from_env(prefix=prefix)  # Environment + defaults

    # Only update fields where environment differs from default
    for attr in fields(self.__class__):
        if not attr.name.startswith("_"):  # Skip private fields
            default_value = getattr(default_ctx, attr.name)
            env_value = getattr(env_ctx, attr.name)

            # If environment value differs from default, it came from env
            if env_value != default_value:
                setattr(self, attr.name, env_value)

CircuitState

Bases: Enum

Represents the state of the circuit breaker.

ComponentCategory

Bases: Enum

Predefined component categories for Foundation.

These are the standard dimension values used internally by Foundation. External components can still use custom string dimensions for compatibility.

EventMapping

Individual event enrichment mapping for a specific domain.

Attributes:

Name Type Description
name str

Unique identifier for this mapping

visual_markers dict[str, str]

Mapping of values to visual indicators (e.g., emojis)

metadata_fields dict[str, dict[str, Any]]

Additional metadata to attach based on values

transformations dict[str, Callable[[Any], Any]]

Value transformation functions

default_key str

Key to use when no specific match is found

EventSet

Complete event enrichment domain definition.

Attributes:

Name Type Description
name str

Unique identifier for this event set

description str | None

Human-readable description

mappings list[EventMapping]

List of EventMapping definitions

field_mappings list[FieldMapping]

List of field-to-mapping associations

priority int

Higher priority sets override lower ones

FallbackChain

Chain of fallback strategies for graceful degradation.

Executes fallback functions in order when primary function fails.

Functions
add_fallback
add_fallback(fallback_func: Callable[..., T]) -> None

Add a fallback function to the chain.

Source code in provide/foundation/resilience/fallback.py
def add_fallback(self, fallback_func: Callable[..., T]) -> None:
    """Add a fallback function to the chain."""
    self.fallbacks.append(fallback_func)  # type: ignore[arg-type]
    logger.debug(
        "Added fallback to chain",
        fallback_count=len(self.fallbacks),
        fallback_name=getattr(fallback_func, "__name__", "anonymous"),
    )
execute
execute(
    primary_func: Callable[..., T],
    *args: Any,
    **kwargs: Any
) -> T

Execute primary function with fallback chain (sync).

Source code in provide/foundation/resilience/fallback.py
def execute(self, primary_func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute primary function with fallback chain (sync)."""
    # Try primary function first
    primary_exception = None
    try:
        result = primary_func(*args, **kwargs)
        logger.trace(
            "Primary function succeeded",
            func=getattr(primary_func, "__name__", "anonymous"),
        )
        return result
    except Exception as e:
        primary_exception = e
        if not isinstance(e, self.expected_exceptions):
            # Unexpected exception type, don't use fallbacks
            logger.debug(
                "Primary function failed with unexpected exception type",
                exception_type=type(e).__name__,
                expected_types=[t.__name__ for t in self.expected_exceptions],
            )
            raise

        logger.warning(
            "Primary function failed, trying fallbacks",
            func=getattr(primary_func, "__name__", "anonymous"),
            error=str(e),
            fallback_count=len(self.fallbacks),
        )

    # Try fallbacks in order
    last_exception = None
    for i, fallback_func in enumerate(self.fallbacks):
        try:
            result = fallback_func(*args, **kwargs)
            logger.info(
                "Fallback succeeded",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
            )
            return result
        except Exception as e:
            last_exception = e
            logger.warning(
                "Fallback failed",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
                error=str(e),
            )
            continue

    # All fallbacks failed
    logger.error(
        "All fallbacks exhausted",
        primary_func=getattr(primary_func, "__name__", "anonymous"),
        fallback_count=len(self.fallbacks),
    )

    # Raise the last exception from fallbacks, or original if no fallbacks
    if last_exception is not None:
        raise last_exception
    if primary_exception is not None:
        raise primary_exception
    # This should never happen but provide fallback
    raise RuntimeError("Fallback chain execution failed with no recorded exceptions")
execute_async async
execute_async(
    primary_func: Callable[..., T],
    *args: Any,
    **kwargs: Any
) -> T

Execute primary function with fallback chain (async).

Source code in provide/foundation/resilience/fallback.py
async def execute_async(self, primary_func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute primary function with fallback chain (async)."""
    # Try primary function first
    primary_exception = None
    try:
        if asyncio.iscoroutinefunction(primary_func):
            result = await primary_func(*args, **kwargs)
        else:
            result = primary_func(*args, **kwargs)
        logger.trace(
            "Primary function succeeded",
            func=getattr(primary_func, "__name__", "anonymous"),
        )
        return result
    except Exception as e:
        primary_exception = e
        if not isinstance(e, self.expected_exceptions):
            # Unexpected exception type, don't use fallbacks
            logger.debug(
                "Primary function failed with unexpected exception type",
                exception_type=type(e).__name__,
                expected_types=[t.__name__ for t in self.expected_exceptions],
            )
            raise

        logger.warning(
            "Primary function failed, trying fallbacks",
            func=getattr(primary_func, "__name__", "anonymous"),
            error=str(e),
            fallback_count=len(self.fallbacks),
        )

    # Try fallbacks in order
    last_exception = None
    for i, fallback_func in enumerate(self.fallbacks):
        try:
            if asyncio.iscoroutinefunction(fallback_func):
                result = await fallback_func(*args, **kwargs)
            else:
                result = fallback_func(*args, **kwargs)
            logger.info(
                "Fallback succeeded",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
            )
            return result
        except Exception as e:
            last_exception = e
            logger.warning(
                "Fallback failed",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
                error=str(e),
            )
            continue

    # All fallbacks failed
    logger.error(
        "All fallbacks exhausted",
        primary_func=getattr(primary_func, "__name__", "anonymous"),
        fallback_count=len(self.fallbacks),
    )

    # Raise the last exception from fallbacks, or original if no fallbacks
    if last_exception is not None:
        raise last_exception
    if primary_exception is not None:
        raise primary_exception
    # This should never happen but provide fallback
    raise RuntimeError("Fallback chain execution failed with no recorded exceptions")

FieldMapping

Maps a log field to an event set for enrichment.

Attributes:

Name Type Description
log_key str

The field key in log events (e.g., "http.method", "llm.provider")

description str | None

Human-readable description of this field

value_type str | None

Expected type of the field value

event_set_name str | None

Name of the EventSet to use for enrichment

default_override_key str | None

Override the default key for this specific field

default_value Any | None

Default value to use if field is not present

FoundationError

FoundationError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: Exception

Base exception for all Foundation errors.

Parameters:

Name Type Description Default
message str

Human-readable error message.

required
code str | None

Optional error code for programmatic handling.

None
context dict[str, Any] | None

Optional context dictionary with diagnostic data.

None
cause Exception | None

Optional underlying exception that caused this error.

None
**extra_context Any

Additional key-value pairs added to context.

{}

Examples:

>>> raise FoundationError("Operation failed")
>>> raise FoundationError("Operation failed", code="OP_001")
>>> raise FoundationError("Operation failed", user_id=123, retry_count=3)
Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)
Functions
add_context
add_context(key: str, value: Any) -> FoundationError

Add context data to the error.

Parameters:

Name Type Description Default
key str

Context key (use dots for namespacing, e.g., 'aws.region').

required
value Any

Context value.

required

Returns:

Type Description
FoundationError

Self for method chaining.

Source code in provide/foundation/errors/base.py
def add_context(self, key: str, value: Any) -> FoundationError:
    """Add context data to the error.

    Args:
        key: Context key (use dots for namespacing, e.g., 'aws.region').
        value: Context value.

    Returns:
        Self for method chaining.

    """
    self.context[key] = value
    return self
to_dict
to_dict() -> dict[str, Any]

Convert exception to dictionary for structured logging.

Returns:

Type Description
dict[str, Any]

Dictionary representation suitable for logging/serialization.

Source code in provide/foundation/errors/base.py
def to_dict(self) -> dict[str, Any]:
    """Convert exception to dictionary for structured logging.

    Returns:
        Dictionary representation suitable for logging/serialization.

    """
    result = {
        "error.type": self.__class__.__name__,
        "error.message": self.message,
        "error.code": self.code,
    }

    # Add context with error prefix
    for key, value in self.context.items():
        # If key already has a prefix, use it; otherwise add error prefix
        if "." in key:
            result[key] = value
        else:
            result[f"error.{key}"] = value

    if self.cause:
        result["error.cause"] = str(self.cause)
        result["error.cause_type"] = type(self.cause).__name__

    return result

Hub

Hub(
    context: CLIContext | None = None,
    component_registry: Registry | None = None,
    command_registry: Registry | None = None,
    use_shared_registries: bool = False,
)

Bases: CoreHub

Central hub for managing components, commands, and Foundation integration.

The Hub provides a unified interface for: - Registering components and commands - Discovering plugins via entry points - Creating Click CLI applications - Managing component lifecycle - Foundation system initialization

Example

hub = Hub() hub.add_component(MyResource, "resource") hub.add_command(init_cmd, "init") hub.initialize_foundation()

Create CLI with all commands

cli = hub.create_cli() cli()

Initialize the hub.

Parameters:

Name Type Description Default
context CLIContext | None

Foundation CLIContext for configuration

None
component_registry Registry | None

Custom component registry

None
command_registry Registry | None

Custom command registry

None
use_shared_registries bool

If True, use global shared registries

False
Source code in provide/foundation/hub/manager.py
def __init__(
    self,
    context: CLIContext | None = None,
    component_registry: Registry | None = None,
    command_registry: Registry | None = None,
    use_shared_registries: bool = False,
) -> None:
    """Initialize the hub.

    Args:
        context: Foundation CLIContext for configuration
        component_registry: Custom component registry
        command_registry: Custom command registry
        use_shared_registries: If True, use global shared registries

    """
    # Determine if we should use shared registries
    use_shared = should_use_shared_registries(use_shared_registries, component_registry, command_registry)

    # Setup registries
    if component_registry:
        comp_registry = component_registry
    elif use_shared:
        comp_registry = get_component_registry()
    else:
        comp_registry = Registry()

    if command_registry:
        cmd_registry = command_registry
    elif use_shared:
        cmd_registry = get_command_registry()
    else:
        cmd_registry = Registry()

    # Initialize core hub functionality
    super().__init__(context, comp_registry, cmd_registry)

    # Initialize Foundation management, injecting self
    self._foundation = FoundationManager(hub=self, registry=self._component_registry)
Functions
clear
clear(dimension: str | None = None) -> None

Clear registrations and dispose of resources properly.

Parameters:

Name Type Description Default
dimension str | None

Optional dimension to clear (None = all)

None
Source code in provide/foundation/hub/manager.py
def clear(self, dimension: str | None = None) -> None:
    """Clear registrations and dispose of resources properly.

    Args:
        dimension: Optional dimension to clear (None = all)

    """
    # Clear core hub registrations (this will now dispose resources)
    super().clear(dimension)

    # Reset Foundation state when clearing all or foundation-specific dimensions
    if dimension is None or dimension in ("singleton", "foundation"):
        self._foundation.clear_foundation_state()
dispose_all
dispose_all() -> None

Dispose of all managed resources without clearing registrations.

Source code in provide/foundation/hub/manager.py
def dispose_all(self) -> None:
    """Dispose of all managed resources without clearing registrations."""
    self._component_registry.dispose_all()
    if hasattr(self._command_registry, "dispose_all"):
        self._command_registry.dispose_all()
get_foundation_config
get_foundation_config() -> Any | None

Get the current Foundation configuration.

Source code in provide/foundation/hub/manager.py
def get_foundation_config(self) -> Any | None:
    """Get the current Foundation configuration."""
    return self._foundation.get_foundation_config()
get_foundation_logger
get_foundation_logger(name: str | None = None) -> Any

Get Foundation logger instance through Hub.

Auto-initializes Foundation if not already done. Thread-safe with fallback behavior.

Parameters:

Name Type Description Default
name str | None

Logger name (e.g., module name)

None

Returns:

Type Description
Any

Configured logger instance

Source code in provide/foundation/hub/manager.py
def get_foundation_logger(self, name: str | None = None) -> Any:
    """Get Foundation logger instance through Hub.

    Auto-initializes Foundation if not already done.
    Thread-safe with fallback behavior.

    Args:
        name: Logger name (e.g., module name)

    Returns:
        Configured logger instance

    """
    return self._foundation.get_foundation_logger(name)
initialize_foundation
initialize_foundation(
    config: Any = None, force: bool = False
) -> None

Initialize Foundation system through Hub.

Single initialization method replacing all setup_* functions. Thread-safe and idempotent, unless force=True.

Parameters:

Name Type Description Default
config Any

Optional TelemetryConfig (defaults to from_env)

None
force bool

If True, force re-initialization even if already initialized

False
Source code in provide/foundation/hub/manager.py
def initialize_foundation(self, config: Any = None, force: bool = False) -> None:
    """Initialize Foundation system through Hub.

    Single initialization method replacing all setup_* functions.
    Thread-safe and idempotent, unless force=True.

    Args:
        config: Optional TelemetryConfig (defaults to from_env)
        force: If True, force re-initialization even if already initialized

    """
    self._foundation.initialize_foundation(config, force)
is_foundation_initialized
is_foundation_initialized() -> bool

Check if Foundation system is initialized.

Source code in provide/foundation/hub/manager.py
def is_foundation_initialized(self) -> bool:
    """Check if Foundation system is initialized."""
    return self._foundation.is_foundation_initialized()

LoggingConfig

Bases: RuntimeConfig

Configuration specific to logging behavior within Foundation Telemetry.

Registry

Registry()

Multi-dimensional registry for storing and retrieving objects.

Supports hierarchical organization by dimension (component, command, etc.) and name within each dimension. This is a generic registry that can be used for any type of object storage and retrieval.

Thread-safe: All operations are protected by an RLock for safe concurrent access.

Note: Uses threading.RLock (not asyncio.Lock) for thread safety. For async-only applications with high-frequency registry access in request hot-paths (>10k req/sec with runtime registration), consider using an async-native registry implementation with asyncio.Lock. For typical use cases (initialization-time registration, CLI apps, read-heavy workloads), the threading lock has negligible impact.

See: docs/architecture/design-decisions.md#threading-model

Initialize an empty registry.

Source code in provide/foundation/hub/registry.py
def __init__(self) -> None:
    """Initialize an empty registry."""
    # Use managed lock for deadlock prevention
    # Lock is registered during Foundation initialization via register_foundation_locks()
    from provide.foundation.concurrency.locks import get_lock_manager

    self._lock = get_lock_manager().get_lock("foundation.registry")
    self._registry: dict[str, dict[str, RegistryEntry]] = defaultdict(dict)
    self._aliases: dict[str, tuple[str, str]] = {}
    # Type-based registry for dependency injection
    self._type_registry: dict[type[Any], Any] = {}
Functions
__contains__
__contains__(key: str | tuple[str, str]) -> bool

Check if an item exists in the registry.

Source code in provide/foundation/hub/registry.py
def __contains__(self, key: str | tuple[str, str]) -> bool:
    """Check if an item exists in the registry."""
    with self._lock:
        if isinstance(key, tuple):
            dimension, name = key
            return name in self._registry[dimension]
        return any(key in dim_reg for dim_reg in self._registry.values())
__iter__
__iter__() -> Iterator[RegistryEntry]

Iterate over all registry entries.

Source code in provide/foundation/hub/registry.py
def __iter__(self) -> Iterator[RegistryEntry]:
    """Iterate over all registry entries."""
    with self._lock:
        # Create a snapshot to avoid holding lock during iteration
        entries: list[RegistryEntry] = []
        for dim_registry in self._registry.values():
            entries.extend(dim_registry.values())
    # Yield outside the lock
    yield from entries
__len__
__len__() -> int

Get total number of registered items.

Source code in provide/foundation/hub/registry.py
def __len__(self) -> int:
    """Get total number of registered items."""
    with self._lock:
        return sum(len(dim_reg) for dim_reg in self._registry.values())
clear
clear(dimension: str | None = None) -> None

Clear the registry or a specific dimension.

Source code in provide/foundation/hub/registry.py
def clear(self, dimension: str | None = None) -> None:
    """Clear the registry or a specific dimension."""
    with self._lock:
        if dimension is not None:
            # Dispose of resources before clearing
            self._dispose_resources(dimension)
            self._registry[dimension].clear()

            aliases_to_remove = [alias for alias, (dim, _) in self._aliases.items() if dim == dimension]
            for alias in aliases_to_remove:
                del self._aliases[alias]
        else:
            # Dispose of all resources before clearing
            self._dispose_all_resources()
            self._registry.clear()
            self._aliases.clear()
            self._type_registry.clear()
dispose_all
dispose_all() -> None

Dispose of all registered resources properly.

Source code in provide/foundation/hub/registry.py
def dispose_all(self) -> None:
    """Dispose of all registered resources properly."""
    with self._lock:
        self._dispose_all_resources()
get
get(name: str, dimension: str | None = None) -> Any | None

Get an item from the registry.

Parameters:

Name Type Description Default
name str

Name or alias of the item

required
dimension str | None

Optional dimension to search in

None

Returns:

Type Description
Any | None

The registered value or None if not found

Source code in provide/foundation/hub/registry.py
def get(
    self,
    name: str,
    dimension: str | None = None,
) -> Any | None:
    """Get an item from the registry.

    Args:
        name: Name or alias of the item
        dimension: Optional dimension to search in

    Returns:
        The registered value or None if not found

    """
    with self._lock:
        if dimension is not None:
            entry = self._registry[dimension].get(name)
            if entry:
                return entry.value

        if name in self._aliases:
            dim_key, real_name = self._aliases[name]
            if dimension is None or dim_key == dimension:
                entry = self._registry[dim_key].get(real_name)
                if entry:
                    return entry.value

        if dimension is None:
            for dim_registry in self._registry.values():
                if name in dim_registry:
                    return dim_registry[name].value

        return None
get_by_type
get_by_type(type_hint: type[Any]) -> Any | None

Get a registered instance by its type.

Parameters:

Name Type Description Default
type_hint type[Any]

Type to look up

required

Returns:

Type Description
Any | None

Registered instance or None if not found

Example

db = registry.get_by_type(DatabaseClient)

Source code in provide/foundation/hub/registry.py
def get_by_type(self, type_hint: type[Any]) -> Any | None:
    """Get a registered instance by its type.

    Args:
        type_hint: Type to look up

    Returns:
        Registered instance or None if not found

    Example:
        >>> db = registry.get_by_type(DatabaseClient)
    """
    with self._lock:
        return self._type_registry.get(type_hint)
get_entry
get_entry(
    name: str, dimension: str | None = None
) -> RegistryEntry | None

Get the full registry entry.

Source code in provide/foundation/hub/registry.py
def get_entry(
    self,
    name: str,
    dimension: str | None = None,
) -> RegistryEntry | None:
    """Get the full registry entry."""
    with self._lock:
        if dimension is not None:
            return self._registry[dimension].get(name)

        if name in self._aliases:
            dim_key, real_name = self._aliases[name]
            if dimension is None or dim_key == dimension:
                return self._registry[dim_key].get(real_name)

        if dimension is None:
            for dim_registry in self._registry.values():
                if name in dim_registry:
                    return dim_registry[name]

        return None
list_all
list_all() -> dict[str, list[str]]

List all dimensions and their items.

Source code in provide/foundation/hub/registry.py
def list_all(self) -> dict[str, list[str]]:
    """List all dimensions and their items."""
    with self._lock:
        return {dimension: list(items.keys()) for dimension, items in self._registry.items()}
list_dimension
list_dimension(dimension: str) -> list[str]

List all names in a dimension.

Source code in provide/foundation/hub/registry.py
def list_dimension(
    self,
    dimension: str,
) -> list[str]:
    """List all names in a dimension."""
    with self._lock:
        return list(self._registry[dimension].keys())
list_types
list_types() -> list[type[Any]]

List all registered types.

Returns:

Type Description
list[type[Any]]

List of registered types

Source code in provide/foundation/hub/registry.py
def list_types(self) -> list[type[Any]]:
    """List all registered types.

    Returns:
        List of registered types
    """
    with self._lock:
        return list(self._type_registry.keys())
register
register(
    name: str,
    value: Any,
    dimension: str = "default",
    metadata: dict[str, Any] | None = None,
    aliases: list[str] | None = None,
    replace: bool = False,
) -> RegistryEntry

Register an item in the registry.

Parameters:

Name Type Description Default
name str

Unique name within the dimension

required
value Any

The item to register

required
dimension str

Registry dimension for categorization

'default'
metadata dict[str, Any] | None

Optional metadata about the item

None
aliases list[str] | None

Optional list of aliases for this item

None
replace bool

Whether to replace existing entries

False

Returns:

Type Description
RegistryEntry

The created registry entry

Raises:

Type Description
ValueError

If name already exists and replace=False

Source code in provide/foundation/hub/registry.py
def register(
    self,
    name: str,
    value: Any,
    dimension: str = "default",
    metadata: dict[str, Any] | None = None,
    aliases: list[str] | None = None,
    replace: bool = False,
) -> RegistryEntry:
    """Register an item in the registry.

    Args:
        name: Unique name within the dimension
        value: The item to register
        dimension: Registry dimension for categorization
        metadata: Optional metadata about the item
        aliases: Optional list of aliases for this item
        replace: Whether to replace existing entries

    Returns:
        The created registry entry

    Raises:
        ValueError: If name already exists and replace=False

    """
    with self._lock:
        if not replace and name in self._registry[dimension]:
            raise AlreadyExistsError(
                f"Item '{name}' already registered in dimension '{dimension}'. "
                "Use replace=True to override.",
                code="REGISTRY_ITEM_EXISTS",
                item_name=name,
                dimension=dimension,
            )

        entry = RegistryEntry(
            name=name,
            dimension=dimension,
            value=value,
            metadata=metadata or {},
        )

        self._registry[dimension][name] = entry

        if aliases:
            for alias in aliases:
                self._aliases[alias] = (dimension, name)

        # Emit event instead of direct logging to break circular dependency
        from provide.foundation.hub.events import emit_registry_event

        emit_registry_event(
            operation="register",
            item_name=name,
            dimension=dimension,
            has_metadata=bool(metadata),
            aliases=aliases,
        )

        return entry
register_type
register_type(
    type_hint: type[Any],
    instance: Any,
    name: str | None = None,
) -> None

Register an instance by its type for dependency injection.

This enables type-based lookup which is essential for DI patterns.

Parameters:

Name Type Description Default
type_hint type[Any]

Type to register under

required
instance Any

Instance to register

required
name str | None

Optional name for standard registry (defaults to type name)

None
Example

registry.register_type(DatabaseClient, db_instance) db = registry.get_by_type(DatabaseClient)

Source code in provide/foundation/hub/registry.py
def register_type(
    self,
    type_hint: type[Any],
    instance: Any,
    name: str | None = None,
) -> None:
    """Register an instance by its type for dependency injection.

    This enables type-based lookup which is essential for DI patterns.

    Args:
        type_hint: Type to register under
        instance: Instance to register
        name: Optional name for standard registry (defaults to type name)

    Example:
        >>> registry.register_type(DatabaseClient, db_instance)
        >>> db = registry.get_by_type(DatabaseClient)
    """
    with self._lock:
        self._type_registry[type_hint] = instance

        # Also register in standard registry for backward compatibility
        if name is not None:
            self.register(
                name=name,
                value=instance,
                dimension="types",
                metadata={"type": type_hint},
                replace=True,
            )
remove
remove(name: str, dimension: str | None = None) -> bool

Remove an item from the registry.

Returns:

Type Description
bool

True if item was removed, False if not found

Source code in provide/foundation/hub/registry.py
def remove(
    self,
    name: str,
    dimension: str | None = None,
) -> bool:
    """Remove an item from the registry.

    Returns:
        True if item was removed, False if not found

    """
    with self._lock:
        if dimension is not None:
            if name in self._registry[dimension]:
                del self._registry[dimension][name]

                aliases_to_remove = [
                    alias for alias, (dim, n) in self._aliases.items() if dim == dimension and n == name
                ]
                for alias in aliases_to_remove:
                    del self._aliases[alias]

                # Emit event instead of direct logging to break circular dependency
                from provide.foundation.hub.events import emit_registry_event

                emit_registry_event(
                    operation="remove",
                    item_name=name,
                    dimension=dimension,
                )
                return True
        else:
            for dim_key, dim_registry in self._registry.items():
                if name in dim_registry:
                    del dim_registry[name]

                    aliases_to_remove = [
                        alias for alias, (d, n) in self._aliases.items() if d == dim_key and n == name
                    ]
                    for alias in aliases_to_remove:
                        del self._aliases[alias]

                    # Emit event instead of direct logging to break circular dependency
                    from provide.foundation.hub.events import emit_registry_event

                    emit_registry_event(
                        operation="remove",
                        item_name=name,
                        dimension=dim_key,
                    )
                    return True

        return False

RegistryEntry

A single entry in the registry.

Attributes
key property
key: tuple[str, str]

Get the registry key for this entry.

RetryExecutor

RetryExecutor(
    policy: RetryPolicy,
    on_retry: (
        Callable[[int, Exception], None] | None
    ) = None,
    time_source: Callable[[], float] | None = None,
    sleep_func: Callable[[float], None] | None = None,
    async_sleep_func: (
        Callable[[float], Awaitable[None]] | None
    ) = None,
)

Unified retry execution engine.

This executor handles the actual retry loop logic for both sync and async functions, using a RetryPolicy for configuration. It's used internally by both the @retry decorator and RetryMiddleware.

Initialize retry executor.

Parameters:

Name Type Description Default
policy RetryPolicy

Retry policy configuration

required
on_retry Callable[[int, Exception], None] | None

Optional callback for retry events (attempt, error)

None
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
sleep_func Callable[[float], None] | None

Optional synchronous sleep function (for testing). Defaults to time.sleep() for production use.

None
async_sleep_func Callable[[float], Awaitable[None]] | None

Optional asynchronous sleep function (for testing). Defaults to asyncio.sleep() for production use.

None
Source code in provide/foundation/resilience/retry.py
def __init__(
    self,
    policy: RetryPolicy,
    on_retry: Callable[[int, Exception], None] | None = None,
    time_source: Callable[[], float] | None = None,
    sleep_func: Callable[[float], None] | None = None,
    async_sleep_func: Callable[[float], Awaitable[None]] | None = None,
) -> None:
    """Initialize retry executor.

    Args:
        policy: Retry policy configuration
        on_retry: Optional callback for retry events (attempt, error)
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
        sleep_func: Optional synchronous sleep function (for testing).
                   Defaults to time.sleep() for production use.
        async_sleep_func: Optional asynchronous sleep function (for testing).
                         Defaults to asyncio.sleep() for production use.

    """
    self.policy = policy
    self.on_retry = on_retry
    self._time_source = time_source or time.time
    self._sleep = sleep_func or time.sleep
    self._async_sleep = async_sleep_func or asyncio.sleep
Functions
execute_async async
execute_async(
    func: Callable[..., Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute asynchronous function with retry logic.

Parameters:

Name Type Description Default
func Callable[..., Awaitable[T]]

Async function to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result from successful execution

Raises:

Type Description
Exception

The last exception raised if all retry attempts are exhausted

Source code in provide/foundation/resilience/retry.py
async def execute_async(self, func: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any) -> T:
    """Execute asynchronous function with retry logic.

    Args:
        func: Async function to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from successful execution

    Raises:
        Exception: The last exception raised if all retry attempts are exhausted

    """
    last_exception = None

    for attempt in range(1, self.policy.max_attempts + 1):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_exception = e

            # Don't retry on last attempt - log and raise
            if attempt >= self.policy.max_attempts:
                from provide.foundation.hub.foundation import get_foundation_logger

                get_foundation_logger().error(
                    f"All {self.policy.max_attempts} retry attempts failed",
                    attempts=self.policy.max_attempts,
                    error=str(e),
                    error_type=type(e).__name__,
                )
                raise

            # Check if we should retry this error
            if not self.policy.should_retry(e, attempt):
                raise

            # Calculate delay
            delay = self.policy.calculate_delay(attempt)

            # Log retry attempt
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().info(
                f"Retry {attempt}/{self.policy.max_attempts} after {delay:.2f}s",
                attempt=attempt,
                max_attempts=self.policy.max_attempts,
                delay=delay,
                error=str(e),
                error_type=type(e).__name__,
            )

            # Call retry callback if provided
            if self.on_retry:
                try:
                    if asyncio.iscoroutinefunction(self.on_retry):
                        await self.on_retry(attempt, e)
                    else:
                        self.on_retry(attempt, e)
                except Exception as callback_error:
                    from provide.foundation.hub.foundation import get_foundation_logger

                    get_foundation_logger().warning("Retry callback failed", error=str(callback_error))

            # Wait before retry
            await self._async_sleep(delay)

    # Should never reach here, but for safety
    if last_exception is not None:
        raise last_exception
    else:
        raise RuntimeError("No exception captured during async retry attempts")
execute_sync
execute_sync(
    func: Callable[..., T], *args: Any, **kwargs: Any
) -> T

Execute synchronous function with retry logic.

Parameters:

Name Type Description Default
func Callable[..., T]

Function to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result from successful execution

Raises:

Type Description
Exception

The last exception raised if all retry attempts are exhausted

Source code in provide/foundation/resilience/retry.py
def execute_sync(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute synchronous function with retry logic.

    Args:
        func: Function to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from successful execution

    Raises:
        Exception: The last exception raised if all retry attempts are exhausted

    """
    last_exception = None

    for attempt in range(1, self.policy.max_attempts + 1):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            last_exception = e

            # Don't retry on last attempt - log and raise
            if attempt >= self.policy.max_attempts:
                from provide.foundation.hub.foundation import get_foundation_logger

                get_foundation_logger().error(
                    f"All {self.policy.max_attempts} retry attempts failed",
                    attempts=self.policy.max_attempts,
                    error=str(e),
                    error_type=type(e).__name__,
                )
                raise

            # Check if we should retry this error
            if not self.policy.should_retry(e, attempt):
                raise

            # Calculate delay
            delay = self.policy.calculate_delay(attempt)

            # Log retry attempt
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().info(
                f"Retry {attempt}/{self.policy.max_attempts} after {delay:.2f}s",
                attempt=attempt,
                max_attempts=self.policy.max_attempts,
                delay=delay,
                error=str(e),
                error_type=type(e).__name__,
            )

            # Call retry callback if provided
            if self.on_retry:
                try:
                    self.on_retry(attempt, e)
                except Exception as callback_error:
                    from provide.foundation.hub.foundation import get_foundation_logger

                    get_foundation_logger().warning("Retry callback failed", error=str(callback_error))

            # Wait before retry
            self._sleep(delay)

    # Should never reach here, but for safety
    if last_exception is not None:
        raise last_exception
    else:
        raise RuntimeError("No exception captured during retry attempts")

RetryPolicy

Configuration for retry behavior.

This policy can be used with both the @retry decorator and transport middleware, providing a unified configuration model for all retry scenarios.

Attributes:

Name Type Description
max_attempts int

Maximum number of retry attempts (must be >= 1)

backoff BackoffStrategy

Backoff strategy to use for delays

base_delay float

Base delay in seconds between retries

max_delay float

Maximum delay in seconds (caps exponential growth)

jitter bool

Whether to add random jitter to delays (±25%)

retryable_errors tuple[type[Exception], ...] | None

Tuple of exception types to retry (None = all)

retryable_status_codes set[int] | None

Set of HTTP status codes to retry (for middleware)

Functions
__str__
__str__() -> str

Human-readable string representation.

Source code in provide/foundation/resilience/retry.py
def __str__(self) -> str:
    """Human-readable string representation."""
    return (
        f"RetryPolicy(max_attempts={self.max_attempts}, "
        f"backoff={self.backoff.value}, base_delay={self.base_delay}s)"
    )
calculate_delay
calculate_delay(attempt: int) -> float

Calculate delay for a given attempt number.

Parameters:

Name Type Description Default
attempt int

Attempt number (1-based)

required

Returns:

Type Description
float

Delay in seconds

Source code in provide/foundation/resilience/retry.py
def calculate_delay(self, attempt: int) -> float:
    """Calculate delay for a given attempt number.

    Args:
        attempt: Attempt number (1-based)

    Returns:
        Delay in seconds

    """
    if attempt <= 0:
        return 0

    if self.backoff == BackoffStrategy.FIXED:
        delay = self.base_delay
    elif self.backoff == BackoffStrategy.LINEAR:
        delay = self.base_delay * attempt
    elif self.backoff == BackoffStrategy.EXPONENTIAL:
        delay = self.base_delay * (2 ** (attempt - 1))
    elif self.backoff == BackoffStrategy.FIBONACCI:
        # Calculate fibonacci number for attempt
        a, b = 0, 1
        for _ in range(attempt):
            a, b = b, a + b
        delay = self.base_delay * a
    else:
        delay = self.base_delay

    # Cap at max delay
    delay = min(delay, self.max_delay)

    # Add jitter if configured (±25% random variation)
    if self.jitter:
        jitter_factor = 0.75 + (random.random() * 0.5)  # nosec B311 - Retry jitter timing
        delay *= jitter_factor

    return delay
should_retry
should_retry(error: Exception, attempt: int) -> bool

Determine if an error should be retried.

Parameters:

Name Type Description Default
error Exception

The exception that occurred

required
attempt int

Current attempt number (1-based)

required

Returns:

Type Description
bool

True if should retry, False otherwise

Source code in provide/foundation/resilience/retry.py
def should_retry(self, error: Exception, attempt: int) -> bool:
    """Determine if an error should be retried.

    Args:
        error: The exception that occurred
        attempt: Current attempt number (1-based)

    Returns:
        True if should retry, False otherwise

    """
    # Check attempt limit
    if attempt >= self.max_attempts:
        return False

    # Check error type if filter is configured
    if self.retryable_errors is not None:
        return isinstance(error, self.retryable_errors)

    # Default to retry for any error
    return True
should_retry_response
should_retry_response(response: Any, attempt: int) -> bool

Check if HTTP response should be retried.

Parameters:

Name Type Description Default
response Any

Response object with status attribute

required
attempt int

Current attempt number (1-based)

required

Returns:

Type Description
bool

True if should retry, False otherwise

Source code in provide/foundation/resilience/retry.py
def should_retry_response(self, response: Any, attempt: int) -> bool:
    """Check if HTTP response should be retried.

    Args:
        response: Response object with status attribute
        attempt: Current attempt number (1-based)

    Returns:
        True if should retry, False otherwise

    """
    # Check attempt limit
    if attempt >= self.max_attempts:
        return False

    # Check status code if configured
    if self.retryable_status_codes is not None:
        return getattr(response, "status", None) in self.retryable_status_codes

    # Default to no retry for responses
    return False

SyncCircuitBreaker

SyncCircuitBreaker(
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: (
        type[Exception] | tuple[type[Exception], ...]
    ) = Exception,
    time_source: Callable[[], float] | None = None,
)

Synchronous circuit breaker for resilience patterns.

Uses threading.RLock for thread-safe state management in synchronous code. For async code, use AsyncCircuitBreaker instead.

Initialize the synchronous circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit

5
recovery_timeout float

Seconds to wait before attempting recovery

30.0
expected_exception type[Exception] | tuple[type[Exception], ...]

Exception type(s) to catch

Exception
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
Source code in provide/foundation/resilience/circuit_sync.py
def __init__(
    self,
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: type[Exception] | tuple[type[Exception], ...] = Exception,
    time_source: Callable[[], float] | None = None,
) -> None:
    """Initialize the synchronous circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        recovery_timeout: Seconds to wait before attempting recovery
        expected_exception: Exception type(s) to catch
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
    """
    self.failure_threshold = failure_threshold
    self.recovery_timeout = recovery_timeout
    self.expected_exception = expected_exception
    self._time_source = time_source or time.time
    self._lock = threading.RLock()
    # Initialize state attributes (will be set properly in reset())
    self._state: CircuitState
    self._failure_count: int
    self._last_failure_time: float | None
    self.reset()
Functions
call
call(func: Callable, *args: Any, **kwargs: Any) -> Any

Execute a synchronous function through the circuit breaker.

Parameters:

Name Type Description Default
func Callable

Callable to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
Any

Result from func

Raises:

Type Description
RuntimeError

If circuit is open

Exception

Whatever exception func raises

Source code in provide/foundation/resilience/circuit_sync.py
def call(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
    """Execute a synchronous function through the circuit breaker.

    Args:
        func: Callable to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from func

    Raises:
        RuntimeError: If circuit is open
        Exception: Whatever exception func raises
    """
    with self._lock:
        current_state = self.state()
        if current_state == CircuitState.OPEN:
            raise RuntimeError("Circuit breaker is open")
        # If HALF_OPEN, we proceed with the call

    try:
        result = func(*args, **kwargs)
        self._on_success()
        return result
    except self.expected_exception as e:
        self._on_failure()
        raise e
failure_count
failure_count() -> int

Get the current failure count.

Source code in provide/foundation/resilience/circuit_sync.py
def failure_count(self) -> int:
    """Get the current failure count."""
    with self._lock:
        return self._failure_count
reset
reset() -> None

Reset the circuit breaker to its initial state.

Source code in provide/foundation/resilience/circuit_sync.py
def reset(self) -> None:
    """Reset the circuit breaker to its initial state."""
    with self._lock:
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
state
state() -> CircuitState

Get the current state of the circuit breaker.

Source code in provide/foundation/resilience/circuit_sync.py
def state(self) -> CircuitState:
    """Get the current state of the circuit breaker."""
    with self._lock:
        if self._state == CircuitState.OPEN and self._can_attempt_recovery():
            # This is a view of the state; the actual transition happens in call()
            return CircuitState.HALF_OPEN
        return self._state

TelemetryConfig

Bases: RuntimeConfig

Main configuration object for the Foundation Telemetry system.

Functions
from_env classmethod
from_env(
    prefix: str = "",
    delimiter: str = "_",
    case_sensitive: bool = False,
) -> TelemetryConfig

Load configuration from environment variables.

This method explicitly provides the from_env() interface to ensure it's available on TelemetryConfig directly.

If OpenObserve is configured and reachable, OTLP settings are automatically configured if not already set.

Source code in provide/foundation/logger/config/telemetry.py
@classmethod
def from_env(
    cls,
    prefix: str = "",
    delimiter: str = "_",
    case_sensitive: bool = False,
) -> TelemetryConfig:
    """Load configuration from environment variables.

    This method explicitly provides the from_env() interface
    to ensure it's available on TelemetryConfig directly.

    If OpenObserve is configured and reachable, OTLP settings are
    automatically configured if not already set.
    """
    # Load base configuration
    config = super().from_env(prefix=prefix, delimiter=delimiter, case_sensitive=case_sensitive)

    # Auto-configure OTLP if OpenObserve is available and OTLP not already configured
    if not config.otlp_endpoint:
        config = cls._auto_configure_openobserve_otlp(config)

    return config
get_otlp_headers_dict
get_otlp_headers_dict() -> dict[str, str]

Get OTLP headers dictionary.

Returns:

Type Description
dict[str, str]

Dictionary of header key-value pairs

Source code in provide/foundation/logger/config/telemetry.py
def get_otlp_headers_dict(self) -> dict[str, str]:
    """Get OTLP headers dictionary.

    Returns:
        Dictionary of header key-value pairs

    """
    return self.otlp_headers

TokenBucketRateLimiter

TokenBucketRateLimiter(
    capacity: float,
    refill_rate: float,
    time_source: Callable[[], float] | None = None,
)

A Token Bucket rate limiter for asyncio applications.

This limiter allows for bursts up to a specified capacity and refills tokens at a constant rate. It is designed to be thread-safe using an asyncio.Lock.

Initialize the TokenBucketRateLimiter.

Parameters:

Name Type Description Default
capacity float

The maximum number of tokens the bucket can hold (burst capacity).

required
refill_rate float

The rate at which tokens are refilled per second.

required
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.monotonic.

None
Source code in provide/foundation/utils/rate_limiting.py
def __init__(
    self,
    capacity: float,
    refill_rate: float,
    time_source: Callable[[], float] | None = None,
) -> None:
    """Initialize the TokenBucketRateLimiter.

    Args:
        capacity: The maximum number of tokens the bucket can hold
                  (burst capacity).
        refill_rate: The rate at which tokens are refilled per second.
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.monotonic.

    """
    if capacity <= 0:
        raise ValueError("Capacity must be positive.")
    if refill_rate <= 0:
        raise ValueError("Refill rate must be positive.")

    self._capacity: float = float(capacity)
    self._refill_rate: float = float(refill_rate)
    self._tokens: float = float(capacity)  # Start with a full bucket
    self._time_source = time_source if time_source is not None else time.monotonic
    self._last_refill_timestamp: float = self._time_source()
    self._lock = asyncio.Lock()

    # Cache logger instance to avoid repeated imports
    self._logger = None
    try:
        from provide.foundation.logger import get_logger

        self._logger = get_logger(__name__)
        self._logger.debug(
            f"🔩🗑️ TokenBucketRateLimiter initialized: capacity={capacity}, refill_rate={refill_rate}",
        )
    except ImportError:
        # Fallback if logger not available
        pass
Functions
get_current_tokens async
get_current_tokens() -> float

Returns the current number of tokens, for testing/monitoring.

Source code in provide/foundation/utils/rate_limiting.py
async def get_current_tokens(self) -> float:
    """Returns the current number of tokens, for testing/monitoring."""
    async with self._lock:
        # It might be useful to refill before getting, to get the most
        # up-to-date count
        # await self._refill_tokens()
        return self._tokens
is_allowed async
is_allowed() -> bool

Check if a request is allowed based on available tokens.

This method is asynchronous and thread-safe. It refills tokens based on elapsed time and then attempts to consume a token.

Returns:

Type Description
bool

True if the request is allowed, False otherwise.

Source code in provide/foundation/utils/rate_limiting.py
async def is_allowed(self) -> bool:
    """Check if a request is allowed based on available tokens.

    This method is asynchronous and thread-safe. It refills tokens
    based on elapsed time and then attempts to consume a token.

    Returns:
        True if the request is allowed, False otherwise.

    """
    async with self._lock:
        await self._refill_tokens()  # Refill before checking

        if self._tokens >= 1.0:
            self._tokens -= 1.0
            if self._logger:
                self._logger.debug(
                    f"🔩✅ Request allowed. Tokens remaining: {self._tokens:.2f}/{self._capacity:.2f}",
                )
            return True
        if self._logger:
            self._logger.warning(
                "🔩🗑️❌ Request denied. No tokens available. Tokens: "
                f"{self._tokens:.2f}/{self._capacity:.2f}",
            )
        return False

Functions

__getattr__

__getattr__(name: str) -> object

Support lazy loading of modules and version.

This reduces initial import overhead by deferring module imports and version loading until first access.

Parameters:

Name Type Description Default
name str

Attribute name to lazy-load

required

Returns:

Type Description
object

The imported module or attribute

Raises:

Type Description
AttributeError

If attribute doesn't exist

ImportError

If module import fails

Source code in provide/foundation/__init__.py
def __getattr__(name: str) -> object:
    """Support lazy loading of modules and __version__.

    This reduces initial import overhead by deferring module imports
    and version loading until first access.

    Args:
        name: Attribute name to lazy-load

    Returns:
        The imported module or attribute

    Raises:
        AttributeError: If attribute doesn't exist
        ImportError: If module import fails
    """
    # Handle __version__ specially to avoid import-time I/O
    if name == "__version__":
        from provide.foundation.utils.versioning import get_version

        return get_version("provide-foundation", caller_file=__file__)

    # For all other attributes, try to import as a submodule
    try:
        from provide.foundation.utils.importer import lazy_import

        return lazy_import(__name__, name)
    except ModuleNotFoundError as e:
        # If the exact module doesn't exist, it's an invalid attribute
        module_name = f"{__name__}.{name}"
        if module_name in str(e):
            raise AttributeError(f"module '{__name__}' has no attribute '{name}'") from None
        # Otherwise re-raise (it's a missing dependency)
        raise
    except AttributeError:
        # If it's not a valid submodule, raise AttributeError
        raise AttributeError(f"module '{__name__}' has no attribute '{name}'") from None

check_optional_deps

check_optional_deps(
    *, quiet: bool = False, return_status: bool = False
) -> list[DependencyStatus] | None

Check and display optional dependency status.

Parameters:

Name Type Description Default
quiet bool

If True, don't print status (just return it)

False
return_status bool

If True, return the status list

False

Returns:

Type Description
list[DependencyStatus] | None

Optional list of dependency statuses if return_status=True

Source code in provide/foundation/utils/deps.py
def check_optional_deps(*, quiet: bool = False, return_status: bool = False) -> list[DependencyStatus] | None:
    """Check and display optional dependency status.

    Args:
        quiet: If True, don't print status (just return it)
        return_status: If True, return the status list

    Returns:
        Optional list of dependency statuses if return_status=True

    """
    deps = get_optional_dependencies()

    if not quiet:
        from provide.foundation.hub.foundation import get_foundation_logger

        log = get_foundation_logger()
        log.info("=" * 50)

        available_count = sum(1 for dep in deps if dep.available)
        total_count = len(deps)

        for dep in deps:
            status_icon = "✅" if dep.available else "❌"
            version_info = f" (v{dep.version})" if dep.version else ""
            log.info(f"  {status_icon} {dep.name}{version_info}")
            log.info(f"     {dep.description}")
            if not dep.available:
                log.info(f"     Install with: pip install 'provide-foundation[{dep.name}]'")

        log.info(f"📊 Summary: {available_count}/{total_count} optional dependencies available")

        if available_count == total_count:
            log.info("🎉 All optional features are available!")
        elif available_count == 0:
            log.info("💡 Install optional features with: pip install 'provide-foundation[all]'")
        else:
            missing = [dep.name for dep in deps if not dep.available]
            log.info(f"💡 Missing features: {', '.join(missing)}")

    if return_status:
        return deps
    return None

circuit_breaker

circuit_breaker(
    failure_threshold: int = 5,
    recovery_timeout: float = DEFAULT_CIRCUIT_BREAKER_RECOVERY_TIMEOUT,
    expected_exception: (
        type[Exception] | tuple[type[Exception], ...]
    ) = Exception,
    time_source: Callable[[], float] | None = None,
    registry: Registry | None = None,
) -> Callable[[F], F]

Create a circuit breaker decorator.

Creates a SyncCircuitBreaker for synchronous functions and an AsyncCircuitBreaker for asynchronous functions to avoid locking issues.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit.

5
recovery_timeout float

Seconds to wait before attempting recovery.

DEFAULT_CIRCUIT_BREAKER_RECOVERY_TIMEOUT
expected_exception type[Exception] | tuple[type[Exception], ...]

Exception type(s) that trigger the breaker. Can be a single exception type or a tuple of exception types.

Exception
time_source Callable[[], float] | None

Optional callable that returns current time (for testing).

None
registry Registry | None

Optional registry to register the breaker with (for DI).

None

Returns:

Type Description
Callable[[F], F]

Circuit breaker decorator.

Examples:

>>> @circuit_breaker(failure_threshold=3, recovery_timeout=30)
... def unreliable_service():
...     return external_api_call()
>>> @circuit_breaker(expected_exception=ValueError)
... def parse_data():
...     return risky_parse()
>>> @circuit_breaker(expected_exception=(ValueError, TypeError))
... async def async_unreliable_service():
...     return await async_api_call()
Source code in provide/foundation/resilience/decorators.py
def circuit_breaker(
    failure_threshold: int = 5,
    recovery_timeout: float = DEFAULT_CIRCUIT_BREAKER_RECOVERY_TIMEOUT,
    expected_exception: type[Exception] | tuple[type[Exception], ...] = Exception,
    time_source: Callable[[], float] | None = None,
    registry: Registry | None = None,
) -> Callable[[F], F]:
    """Create a circuit breaker decorator.

    Creates a SyncCircuitBreaker for synchronous functions and an
    AsyncCircuitBreaker for asynchronous functions to avoid locking issues.

    Args:
        failure_threshold: Number of failures before opening circuit.
        recovery_timeout: Seconds to wait before attempting recovery.
        expected_exception: Exception type(s) that trigger the breaker.
            Can be a single exception type or a tuple of exception types.
        time_source: Optional callable that returns current time (for testing).
        registry: Optional registry to register the breaker with (for DI).

    Returns:
        Circuit breaker decorator.

    Examples:
        >>> @circuit_breaker(failure_threshold=3, recovery_timeout=30)
        ... def unreliable_service():
        ...     return external_api_call()

        >>> @circuit_breaker(expected_exception=ValueError)
        ... def parse_data():
        ...     return risky_parse()

        >>> @circuit_breaker(expected_exception=(ValueError, TypeError))
        ... async def async_unreliable_service():
        ...     return await async_api_call()

    """
    # Normalize expected_exception to tuple
    expected_exception_tuple: tuple[type[Exception], ...]
    if not isinstance(expected_exception, tuple):
        expected_exception_tuple = (expected_exception,)
    else:
        expected_exception_tuple = expected_exception

    def decorator(func: F) -> F:
        global _circuit_breaker_counter

        # Use provided registry or fall back to global
        reg = registry or _get_circuit_breaker_registry()

        # Create appropriate breaker type based on function type
        breaker: SyncCircuitBreaker | AsyncCircuitBreaker
        if asyncio.iscoroutinefunction(func):
            breaker = AsyncCircuitBreaker(
                failure_threshold=failure_threshold,
                recovery_timeout=recovery_timeout,
                expected_exception=expected_exception_tuple,
                time_source=time_source,
            )

            @functools.wraps(func)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                return await breaker.call(func, *args, **kwargs)

            # Register async circuit breaker (thread-safe)
            with _circuit_breaker_counter_lock:
                _circuit_breaker_counter += 1
                breaker_name = f"cb_{_circuit_breaker_counter}"

            if _should_register_for_global_reset():
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_DIMENSION)
            else:
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_TEST_DIMENSION)

            return async_wrapper  # type: ignore[return-value]
        else:
            breaker = SyncCircuitBreaker(
                failure_threshold=failure_threshold,
                recovery_timeout=recovery_timeout,
                expected_exception=expected_exception_tuple,
                time_source=time_source,
            )

            @functools.wraps(func)
            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                return breaker.call(func, *args, **kwargs)

            # Register sync circuit breaker (thread-safe)
            with _circuit_breaker_counter_lock:
                _circuit_breaker_counter += 1
                breaker_name = f"cb_{_circuit_breaker_counter}"

            if _should_register_for_global_reset():
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_DIMENSION)
            else:
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_TEST_DIMENSION)

            return sync_wrapper  # type: ignore[return-value]

    return decorator

clear_hub

clear_hub() -> None

Clear the global hub instance.

This is primarily used for testing to reset Foundation state between test runs.

Source code in provide/foundation/hub/manager.py
def clear_hub() -> None:
    """Clear the global hub instance.

    This is primarily used for testing to reset Foundation state
    between test runs.

    """
    global _global_hub
    if _global_hub:
        _global_hub.clear()
    _global_hub = None

error_boundary

error_boundary(
    *catch: type[Exception],
    on_error: Callable[[Exception], Any] | None = None,
    log_errors: bool = True,
    reraise: bool = True,
    context: dict[str, Any] | None = None,
    fallback: Any = None
) -> Generator[None, None, None]

Context manager for structured error handling with logging.

Parameters:

Name Type Description Default
*catch type[Exception]

Exception types to catch (defaults to Exception if empty).

()
on_error Callable[[Exception], Any] | None

Optional callback function when error is caught.

None
log_errors bool

Whether to log caught errors.

True
reraise bool

Whether to re-raise after handling.

True
context dict[str, Any] | None

Additional context for error logging.

None
fallback Any

Value to return if error is suppressed (when reraise=False).

None

Yields:

Type Description
None

None

Examples:

>>> with error_boundary(ValueError, on_error=handle_error):
...     risky_operation()
>>> # Suppress and log specific errors
>>> with error_boundary(KeyError, reraise=False, fallback=None):
...     value = data["missing_key"]
Source code in provide/foundation/errors/handlers.py
@contextmanager
def error_boundary(
    *catch: type[Exception],
    on_error: Callable[[Exception], Any] | None = None,
    log_errors: bool = True,
    reraise: bool = True,
    context: dict[str, Any] | None = None,
    fallback: Any = None,
) -> Generator[None, None, None]:
    """Context manager for structured error handling with logging.

    Args:
        *catch: Exception types to catch (defaults to Exception if empty).
        on_error: Optional callback function when error is caught.
        log_errors: Whether to log caught errors.
        reraise: Whether to re-raise after handling.
        context: Additional context for error logging.
        fallback: Value to return if error is suppressed (when reraise=False).

    Yields:
        None

    Examples:
        >>> with error_boundary(ValueError, on_error=handle_error):
        ...     risky_operation()

        >>> # Suppress and log specific errors
        >>> with error_boundary(KeyError, reraise=False, fallback=None):
        ...     value = data["missing_key"]

    """
    # Default to catching all exceptions if none specified
    catch_types = catch if catch else (Exception,)

    try:
        yield
    except catch_types as e:
        if log_errors:
            # Build error context
            error_context = context or {}

            # Add error details
            error_context.update(
                {
                    "error.type": type(e).__name__,
                    "error.message": str(e),
                },
            )

            # If it's a FoundationError, merge its context
            if isinstance(e, FoundationError) and e.context:
                error_context.update(e.context)

            # Log the error
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().error(f"Error caught in boundary: {e}", exc_info=True, **error_context)

        # Call error handler if provided
        if on_error:
            try:
                on_error(e)
            except Exception as handler_error:
                if log_errors:
                    from provide.foundation.hub.foundation import get_foundation_logger

                    get_foundation_logger().error(
                        f"Error handler failed: {handler_error}",
                        exc_info=True,
                        original_error=str(e),
                    )

        # Re-raise if configured
        if reraise:
            raise

        # Return fallback value if not re-raising
        return fallback

get_component_registry

get_component_registry() -> Registry

Get the global component registry.

Source code in provide/foundation/hub/components.py
def get_component_registry() -> Registry:
    """Get the global component registry."""
    return _component_registry

get_hub

get_hub() -> Hub

Get the global shared hub instance (singleton pattern).

This function acts as the Composition Root for the global singleton instance. It is maintained for backward compatibility and convenience.

Note: For building testable and maintainable applications, the recommended approach is to use a Container or Hub instance created at your application's entry point for explicit dependency management. This global accessor should be avoided in application code.

Thread-safe: Uses double-checked locking pattern for efficient lazy initialization.

Auto-Initialization Behavior: This function automatically initializes the Foundation system on first access. The initialization is: - Idempotent: Safe to call multiple times - Thread-safe: Uses lock manager for coordination - Lazy: Only happens on first access

Returns:

Type Description
Hub

Global Hub instance (created and initialized if needed)

Example

hub = get_hub() hub.register_command("my_command", my_function)

Note

For isolated Hub instances (testing, advanced use cases), use:

hub = Hub(use_shared_registries=False)

Source code in provide/foundation/hub/manager.py
def get_hub() -> Hub:
    """Get the global shared hub instance (singleton pattern).

    This function acts as the Composition Root for the global singleton instance.
    It is maintained for backward compatibility and convenience.

    **Note:** For building testable and maintainable applications, the recommended
    approach is to use a `Container` or `Hub` instance created at your application's
    entry point for explicit dependency management. This global accessor should be
    avoided in application code.

    Thread-safe: Uses double-checked locking pattern for efficient lazy initialization.

    **Auto-Initialization Behavior:**
    This function automatically initializes the Foundation system on first access.
    The initialization is:
    - **Idempotent**: Safe to call multiple times
    - **Thread-safe**: Uses lock manager for coordination
    - **Lazy**: Only happens on first access

    Returns:
        Global Hub instance (created and initialized if needed)

    Example:
        >>> hub = get_hub()
        >>> hub.register_command("my_command", my_function)

    Note:
        For isolated Hub instances (testing, advanced use cases), use:
        >>> hub = Hub(use_shared_registries=False)

    """
    global _global_hub

    # Fast path: hub already initialized
    if _global_hub is not None:
        return _global_hub

    # Slow path: need to initialize hub
    with get_lock_manager().acquire("foundation.hub.init"):
        # Double-check after acquiring lock
        if _global_hub is None:
            # Global hub uses shared registries by default
            _global_hub = Hub(use_shared_registries=True)

            # Auto-initialize Foundation on first hub access
            _global_hub.initialize_foundation()

            # Bootstrap foundation components now that hub is ready (skip in test mode)
            from provide.foundation.testmode.detection import is_in_test_mode

            if not is_in_test_mode():
                try:
                    from provide.foundation.hub.components import bootstrap_foundation

                    bootstrap_foundation()
                except ImportError:
                    # Bootstrap function might not exist yet, that's okay
                    pass

    return _global_hub

get_logger

get_logger(name: str | None = None) -> Any

Get a logger instance through Hub with circular import protection.

This function provides access to the global logger instance. It is preserved for backward compatibility but should be avoided in new application code in favor of explicit Dependency Injection.

Circular Import Protection

Uses thread-local state to detect recursive initialization and falls back to basic structlog when circular dependencies are detected.

Parameters:

Name Type Description Default
name str | None

Logger name (e.g., name from a module)

None

Returns:

Type Description
Any

Configured structlog logger instance

Note

For building testable and maintainable applications, the recommended approach is to inject a logger instance via a Container. See the Dependency Injection guide for more information.

Source code in provide/foundation/logger/factories.py
def get_logger(name: str | None = None) -> Any:
    """Get a logger instance through Hub with circular import protection.

    This function provides access to the global logger instance. It is preserved
    for backward compatibility but should be avoided in new application code in
    favor of explicit Dependency Injection.

    Circular Import Protection:
        Uses thread-local state to detect recursive initialization and falls
        back to basic structlog when circular dependencies are detected.

    Args:
        name: Logger name (e.g., __name__ from a module)

    Returns:
        Configured structlog logger instance

    Note:
        For building testable and maintainable applications, the recommended
        approach is to inject a logger instance via a `Container`. See the
        Dependency Injection guide for more information.
    """
    # Track recursion depth to prevent infinite loops
    depth = getattr(_is_initializing, "depth", 0)

    # Check if we're already in the middle of initialization to prevent circular import
    if depth > 0:
        # Already initializing - use fallback to break circular dependency
        import structlog

        return structlog.get_logger(name)

    # Safety check: enforce maximum recursion depth
    if depth >= _MAX_RECURSION_DEPTH:
        import structlog

        return structlog.get_logger(name)

    try:
        # Increment recursion depth
        _is_initializing.depth = depth + 1

        from provide.foundation.hub.manager import get_hub

        hub = get_hub()
        return hub.get_foundation_logger(name)
    except (ImportError, RecursionError):
        # Fallback to basic structlog if hub is not available or circular import detected
        import structlog

        return structlog.get_logger(name)
    finally:
        # Always decrement depth counter to allow future attempts
        _is_initializing.depth = max(0, depth)

perr

perr(message: Any, **kwargs: Any) -> None

Output message to stderr.

Parameters:

Name Type Description Default
message Any

Content to output (any type - will be stringified or JSON-encoded)

required
**kwargs Any

Optional formatting arguments: color: Color name (red, green, yellow, blue, cyan, magenta, white) bold: Bold text dim: Dim text nl/newline: Add newline (default: True) json_key: Key for JSON output mode prefix: Optional prefix string ctx: Override context

{}

Examples:

perr("Error occurred") perr("Warning", color="yellow") perr({"error": details}, json_key="error")

Source code in provide/foundation/console/output.py
@resilient(
    fallback=None,
    suppress=(OSError, IOError, UnicodeEncodeError),
    context_provider=lambda: {"function": "perr"},
)
def perr(message: Any, **kwargs: Any) -> None:
    """Output message to stderr.

    Args:
        message: Content to output (any type - will be stringified or JSON-encoded)
        **kwargs: Optional formatting arguments:
            color: Color name (red, green, yellow, blue, cyan, magenta, white)
            bold: Bold text
            dim: Dim text
            nl/newline: Add newline (default: True)
            json_key: Key for JSON output mode
            prefix: Optional prefix string
            ctx: Override context

    Examples:
        perr("Error occurred")
        perr("Warning", color="yellow")
        perr({"error": details}, json_key="error")

    """
    ctx = kwargs.get("ctx") or _get_context()

    # Handle newline option (support both nl and newline)
    nl = kwargs.get("nl", kwargs.get("newline", True))

    if _should_use_json(ctx):
        # JSON mode
        if kwargs.get("json_key"):
            _output_json({kwargs["json_key"]: message}, sys.stderr)
        else:
            _output_json(message, sys.stderr)
    else:
        # Regular output mode
        # Add optional prefix
        output = str(message)
        if prefix := kwargs.get("prefix"):
            output = f"{prefix} {output}"

        # Apply color/formatting if requested and supported
        color = kwargs.get("color")
        bold = kwargs.get("bold", False)
        dim = kwargs.get("dim", False)

        if _HAS_CLICK:
            if (color or bold or dim) and _should_use_color(ctx, sys.stderr):
                click.secho(output, fg=color, bold=bold, dim=dim, err=True, nl=nl)
            else:
                click.echo(output, err=True, nl=nl)
        # Fallback to standard Python print
        elif nl:
            print(output, file=sys.stderr)
        else:
            print(output, file=sys.stderr, end="")

pin

pin(prompt: str = '', **kwargs: Any) -> str | Any

Input from stdin with optional prompt.

Parameters:

Name Type Description Default
prompt str

Prompt to display before input

''
**kwargs Any

Optional formatting arguments: type: Type to convert input to (int, float, bool, etc.) default: Default value if no input provided password: Hide input for passwords (default: False) confirmation_prompt: Ask for confirmation (for passwords) hide_input: Hide the input (same as password) show_default: Show default value in prompt value_proc: Callable to process the value json_key: Key for JSON output mode ctx: Override context color: Color for prompt (red, green, yellow, blue, cyan, magenta, white) bold: Bold prompt text

{}

Returns:

Type Description
str | Any

User input as string or converted type

Examples:

name = pin("Enter name: ") age = pin("Age: ", type=int, default=0) password = pin("Password: ", password=True)

In JSON mode, returns structured input data.

Source code in provide/foundation/console/input.py
def pin(prompt: str = "", **kwargs: Any) -> str | Any:
    """Input from stdin with optional prompt.

    Args:
        prompt: Prompt to display before input
        **kwargs: Optional formatting arguments:
            type: Type to convert input to (int, float, bool, etc.)
            default: Default value if no input provided
            password: Hide input for passwords (default: False)
            confirmation_prompt: Ask for confirmation (for passwords)
            hide_input: Hide the input (same as password)
            show_default: Show default value in prompt
            value_proc: Callable to process the value
            json_key: Key for JSON output mode
            ctx: Override context
            color: Color for prompt (red, green, yellow, blue, cyan, magenta, white)
            bold: Bold prompt text

    Returns:
        User input as string or converted type

    Examples:
        name = pin("Enter name: ")
        age = pin("Age: ", type=int, default=0)
        password = pin("Password: ", password=True)

    In JSON mode, returns structured input data.

    """
    ctx = kwargs.get("ctx") or _get_context()

    if _should_use_json(ctx):
        return _handle_json_input(prompt, kwargs)
    else:
        return _handle_interactive_input(prompt, kwargs, ctx)

pout

pout(message: Any, **kwargs: Any) -> None

Output message to stdout.

Parameters:

Name Type Description Default
message Any

Content to output (any type - will be stringified or JSON-encoded)

required
**kwargs Any

Optional formatting arguments: color: Color name (red, green, yellow, blue, cyan, magenta, white) bold: Bold text dim: Dim text nl/newline: Add newline (default: True) json_key: Key for JSON output mode prefix: Optional prefix string ctx: Override context

{}

Examples:

pout("Hello world") pout({"data": "value"}) # Auto-JSON if dict/list pout("Success", color="green", bold=True) pout(results, json_key="results")

Source code in provide/foundation/console/output.py
@resilient(
    fallback=None,
    suppress=(OSError, IOError, UnicodeEncodeError),
    context_provider=lambda: {"function": "pout"},
)
def pout(message: Any, **kwargs: Any) -> None:
    """Output message to stdout.

    Args:
        message: Content to output (any type - will be stringified or JSON-encoded)
        **kwargs: Optional formatting arguments:
            color: Color name (red, green, yellow, blue, cyan, magenta, white)
            bold: Bold text
            dim: Dim text
            nl/newline: Add newline (default: True)
            json_key: Key for JSON output mode
            prefix: Optional prefix string
            ctx: Override context

    Examples:
        pout("Hello world")
        pout({"data": "value"})  # Auto-JSON if dict/list
        pout("Success", color="green", bold=True)
        pout(results, json_key="results")

    """
    ctx = kwargs.get("ctx") or _get_context()

    # Handle newline option (support both nl and newline)
    nl = kwargs.get("nl", kwargs.get("newline", True))

    if _should_use_json(ctx):
        # JSON mode
        if kwargs.get("json_key"):
            _output_json({kwargs["json_key"]: message}, sys.stdout)
        else:
            _output_json(message, sys.stdout)
    else:
        # Regular output mode
        # Add optional prefix
        output = str(message)
        if prefix := kwargs.get("prefix"):
            output = f"{prefix} {output}"

        # Apply color/formatting if requested and supported
        color = kwargs.get("color")
        bold = kwargs.get("bold", False)
        dim = kwargs.get("dim", False)

        if _HAS_CLICK:
            if (color or bold or dim) and _should_use_color(ctx, sys.stdout):
                click.secho(output, fg=color, bold=bold, dim=dim, nl=nl)
            else:
                click.echo(output, nl=nl)
        # Fallback to standard Python print
        elif nl:
            print(output, file=sys.stdout)
        else:
            print(output, file=sys.stdout, end="")

resilient

resilient(func: F) -> F
resilient(
    func: None = None,
    *,
    fallback: Any = None,
    log_errors: bool = True,
    context_provider: (
        Callable[[], dict[str, Any]] | None
    ) = None,
    context: dict[str, Any] | None = None,
    error_mapper: (
        Callable[[Exception], Exception] | None
    ) = None,
    suppress: tuple[type[Exception], ...] | None = None,
    reraise: bool = True
) -> Callable[[F], F]
resilient(
    func: F | None = None,
    *,
    fallback: Any = None,
    log_errors: bool = True,
    context_provider: (
        Callable[[], dict[str, Any]] | None
    ) = None,
    context: dict[str, Any] | None = None,
    error_mapper: (
        Callable[[Exception], Exception] | None
    ) = None,
    suppress: tuple[type[Exception], ...] | None = None,
    reraise: bool = True
) -> Callable[[F], F] | F

Decorator for automatic error handling with logging.

Parameters:

Name Type Description Default
fallback Any

Value to return when an error occurs.

None
log_errors bool

Whether to log errors.

True
context_provider Callable[[], dict[str, Any]] | None

Function that provides additional logging context.

None
context dict[str, Any] | None

Static context dict to include in logs (alternative to context_provider).

None
error_mapper Callable[[Exception], Exception] | None

Function to transform exceptions before re-raising.

None
suppress tuple[type[Exception], ...] | None

Tuple of exception types to suppress (return fallback instead).

None
reraise bool

Whether to re-raise exceptions after logging (default: True).

True

Returns:

Type Description
Callable[[F], F] | F

Decorated function.

Note

Preserving Context in error_mapper: When using error_mapper with FoundationError exceptions, the original exception's context dictionary is not automatically transferred to the mapped exception. To preserve rich context, manually copy it:

from provide.foundation.errors import FoundationError @resilient( ... error_mapper=lambda e: ( ... ValidationError( ... str(e), ... context=e.context if isinstance(e, FoundationError) else {} ... ) if isinstance(e, FoundationError) ... else DomainError(str(e)) ... ) ... ) ... def process_data(data): ... # Low-level FoundationError will be mapped to ValidationError ... # with context preserved ... pass

Examples:

>>> @resilient(fallback=None, suppress=(KeyError,))
... def get_value(data, key):
...     return data[key]
>>> @resilient(
...     context_provider=lambda: {"request_id": get_request_id()}
... )
... def process_request():
...     # errors will be logged with request_id
...     pass
>>> @resilient(
...     reraise=False,
...     context={"component": "orchestrator", "method": "run"}
... )
... def run():
...     # errors will be logged but not re-raised
...     pass
Source code in provide/foundation/errors/decorators.py
def resilient(
    func: F | None = None,
    *,
    fallback: Any = None,
    log_errors: bool = True,
    context_provider: Callable[[], dict[str, Any]] | None = None,
    context: dict[str, Any] | None = None,
    error_mapper: Callable[[Exception], Exception] | None = None,
    suppress: tuple[type[Exception], ...] | None = None,
    reraise: bool = True,
) -> Callable[[F], F] | F:
    """Decorator for automatic error handling with logging.

    Args:
        fallback: Value to return when an error occurs.
        log_errors: Whether to log errors.
        context_provider: Function that provides additional logging context.
        context: Static context dict to include in logs (alternative to context_provider).
        error_mapper: Function to transform exceptions before re-raising.
        suppress: Tuple of exception types to suppress (return fallback instead).
        reraise: Whether to re-raise exceptions after logging (default: True).

    Returns:
        Decorated function.

    Note:
        **Preserving Context in error_mapper:**
        When using error_mapper with FoundationError exceptions, the original
        exception's context dictionary is not automatically transferred to the
        mapped exception. To preserve rich context, manually copy it:

        >>> from provide.foundation.errors import FoundationError
        >>> @resilient(
        ...     error_mapper=lambda e: (
        ...         ValidationError(
        ...             str(e),
        ...             context=e.context if isinstance(e, FoundationError) else {}
        ...         ) if isinstance(e, FoundationError)
        ...         else DomainError(str(e))
        ...     )
        ... )
        ... def process_data(data):
        ...     # Low-level FoundationError will be mapped to ValidationError
        ...     # with context preserved
        ...     pass

    Examples:
        >>> @resilient(fallback=None, suppress=(KeyError,))
        ... def get_value(data, key):
        ...     return data[key]

        >>> @resilient(
        ...     context_provider=lambda: {"request_id": get_request_id()}
        ... )
        ... def process_request():
        ...     # errors will be logged with request_id
        ...     pass

        >>> @resilient(
        ...     reraise=False,
        ...     context={"component": "orchestrator", "method": "run"}
        ... )
        ... def run():
        ...     # errors will be logged but not re-raised
        ...     pass

    """

    def decorator(func: F) -> F:
        # Create error handler with all configuration
        handler = ResilientErrorHandler(
            fallback=fallback,
            log_errors=log_errors,
            context_provider=context_provider,
            context=context,
            error_mapper=error_mapper,
            suppress=suppress,
            reraise=reraise,
        )

        # Return appropriate wrapper based on function type
        if inspect.iscoroutinefunction(func):
            return _create_async_wrapper(func, handler)
        return _create_sync_wrapper(func, handler)

    # Support both @resilient and @resilient(...) forms
    if func is None:
        return decorator
    return decorator(func)

show_event_matrix

show_event_matrix() -> None

Display the active event set configuration to the console. Shows all registered event sets and their field mappings.

Source code in provide/foundation/eventsets/display.py
def show_event_matrix() -> None:
    """Display the active event set configuration to the console.
    Shows all registered event sets and their field mappings.
    """
    # Ensure event sets are discovered
    discover_event_sets()

    registry = get_registry()
    resolver = get_resolver()

    # Force resolution to ensure everything is loaded
    resolver.resolve()

    lines: list[str] = ["Foundation Event Sets: Active Configuration"]
    lines.append("=" * 70)

    # Show registered event sets
    event_sets = registry.list_event_sets()
    _format_registered_event_sets(event_sets, lines)

    lines.append("\n" + "=" * 70)

    # Show resolved state
    _format_resolver_state(resolver, lines)

    # Log the complete display
    log.info("\n".join(lines))

shutdown_foundation async

shutdown_foundation(timeout_millis: int = 5000) -> None

Gracefully shutdown all Foundation subsystems.

Parameters:

Name Type Description Default
timeout_millis int

Timeout for shutdown (currently unused)

5000
Source code in provide/foundation/setup/__init__.py
async def shutdown_foundation(timeout_millis: int = 5000) -> None:
    """Gracefully shutdown all Foundation subsystems.

    Args:
        timeout_millis: Timeout for shutdown (currently unused)

    """
    with get_lock_manager().acquire("foundation.logger.setup"):
        # Shutdown OpenTelemetry tracing and metrics
        shutdown_opentelemetry()
        shutdown_opentelemetry_metrics()

        # Flush logging streams
        flush_log_streams()

timed_block

timed_block(
    logger_instance: FoundationLogger,
    event_name: str,
    layer_keys: dict[str, Any] | None = None,
    initial_kvs: dict[str, Any] | None = None,
    **extra_kvs: Any
) -> Generator[dict[str, Any], None, None]

Context manager that logs the duration of a code block.

Logs at DEBUG when entering, INFO on success, ERROR on exception.

Parameters:

Name Type Description Default
logger_instance FoundationLogger

Logger to use for output

required
event_name str

Name of the operation being timed

required
layer_keys dict[str, Any] | None

Semantic layer keys (e.g., llm-specific keys)

None
initial_kvs dict[str, Any] | None

Initial key-value pairs to include in logs

None
**extra_kvs Any

Additional key-value pairs

{}

Yields:

Type Description
dict[str, Any]

A mutable dict that can be updated with additional context

Example

with timed_block(logger, "database_query") as ctx: ctx["query"] = "SELECT * FROM users" result = db.query("SELECT * FROM users") ctx["rows"] = len(result)

Source code in provide/foundation/utils/timing.py
@contextmanager
def timed_block(
    logger_instance: FoundationLogger,
    event_name: str,
    layer_keys: dict[str, Any] | None = None,
    initial_kvs: dict[str, Any] | None = None,
    **extra_kvs: Any,
) -> Generator[dict[str, Any], None, None]:
    """Context manager that logs the duration of a code block.

    Logs at DEBUG when entering, INFO on success, ERROR on exception.

    Args:
        logger_instance: Logger to use for output
        event_name: Name of the operation being timed
        layer_keys: Semantic layer keys (e.g., llm-specific keys)
        initial_kvs: Initial key-value pairs to include in logs
        **extra_kvs: Additional key-value pairs

    Yields:
        A mutable dict that can be updated with additional context

    Example:
        >>> with timed_block(logger, "database_query") as ctx:
        >>>     ctx["query"] = "SELECT * FROM users"
        >>>     result = db.query("SELECT * FROM users")
        >>>     ctx["rows"] = len(result)

    """
    # Combine all key-value pairs
    all_kvs = {}
    if layer_keys:
        all_kvs.update(layer_keys)
    if initial_kvs:
        all_kvs.update(initial_kvs)
    all_kvs.update(extra_kvs)

    # Try to get trace_id from context
    trace_id = _PROVIDE_CONTEXT_TRACE_ID.get()
    if trace_id and "trace_id" not in all_kvs:
        all_kvs["trace_id"] = trace_id

    # Create context dict that can be modified
    context: dict[str, Any] = {}

    # Log start
    logger_instance.debug(f"{event_name} started", **all_kvs)

    start_time = time.perf_counter()
    try:
        yield context

        # Success - calculate duration and log
        duration = time.perf_counter() - start_time
        all_kvs.update(context)
        all_kvs["duration_seconds"] = round(duration, 3)
        all_kvs["outcome"] = "success"

        logger_instance.info(f"{event_name} completed", **all_kvs)

    except Exception as e:
        # Error - calculate duration and log with exception
        duration = time.perf_counter() - start_time
        all_kvs.update(context)
        all_kvs["duration_seconds"] = round(duration, 3)
        all_kvs["outcome"] = "error"
        all_kvs["error.message"] = str(e)
        all_kvs["error.type"] = type(e).__name__

        logger_instance.error(f"{event_name} failed", exc_info=True, **all_kvs)
        raise