Skip to content

Components

provide.foundation.hub.components

TODO: Add module docstring.

Classes

ComponentCategory

Bases: Enum

Predefined component categories for Foundation.

These are the standard dimension values used internally by Foundation. External components can still use custom string dimensions for compatibility.

ComponentInfo

Information about a registered component.

ComponentLifecycle

Bases: Protocol

Protocol for components that support lifecycle management.

Functions
cleanup async
cleanup() -> None

Clean up the component.

Source code in provide/foundation/hub/components.py
async def cleanup(self) -> None:
    """Clean up the component."""
    ...
initialize async
initialize() -> None

Initialize the component.

Source code in provide/foundation/hub/components.py
async def initialize(self) -> None:
    """Initialize the component."""
    ...

Functions

bootstrap_foundation

bootstrap_foundation() -> None

Bootstrap Foundation with core registry components.

Source code in provide/foundation/hub/components.py
def bootstrap_foundation() -> None:
    """Bootstrap Foundation with core registry components."""
    registry = get_component_registry()

    # Check if already bootstrapped
    if registry.get_entry("timestamp", ComponentCategory.PROCESSOR.value):
        return  # Already bootstrapped

    # Register core processors
    def timestamp_processor(logger: object, method_name: str, event_dict: dict[str, Any]) -> dict[str, Any]:
        import time

        event_dict["timestamp"] = time.time()
        return event_dict

    registry.register(
        name="timestamp",
        value=timestamp_processor,
        dimension=ComponentCategory.PROCESSOR.value,
        metadata={"priority": 100, "stage": "pre_format"},
        replace=True,  # Allow replacement for test scenarios
    )

    # Register configuration schemas
    from provide.foundation.config.bootstrap import discover_and_register_configs

    discover_and_register_configs()

    from provide.foundation.hub.foundation import get_foundation_logger

    get_foundation_logger().debug("Foundation bootstrap completed with registry components")

check_component_health

check_component_health(
    name: str, dimension: str
) -> dict[str, Any]

Check component health status.

Source code in provide/foundation/hub/components.py
@resilient(
    fallback={"status": "error"},
    context_provider=lambda: {
        "function": "check_component_health",
        "module": "hub.components",
    },
)
def check_component_health(name: str, dimension: str) -> dict[str, Any]:
    """Check component health status."""
    component = _component_registry.get(name, dimension)

    if not component:
        return {"status": "not_found"}

    entry = _component_registry.get_entry(name, dimension)
    if not entry or not entry.metadata.get("supports_health_check", False):
        return {"status": "no_health_check"}

    if hasattr(component, "health_check"):
        try:
            return component.health_check()
        except Exception as e:
            return {"status": "error", "error": str(e)}

    return {"status": "unknown"}

cleanup_all_components

cleanup_all_components(
    dimension: str | None = None,
) -> None

Clean up all components in dimension.

Source code in provide/foundation/hub/lifecycle.py
def cleanup_all_components(dimension: str | None = None) -> None:
    """Clean up all components in dimension."""
    registry, _ = _get_registry_and_globals()

    entries = [entry for entry in registry if entry.dimension == dimension] if dimension else list(registry)

    for entry in entries:
        if entry.metadata.get("supports_cleanup", False):
            component = entry.value
            if hasattr(component, "cleanup"):
                try:
                    cleanup_func = component.cleanup
                    if inspect.iscoroutinefunction(cleanup_func):
                        # Run async cleanup
                        loop = None
                        try:
                            loop = asyncio.get_event_loop()
                            if loop.is_running():
                                # Create task if loop is running
                                task = loop.create_task(cleanup_func())
                                # Store reference to prevent garbage collection
                                task.add_done_callback(lambda t: None)
                            else:
                                loop.run_until_complete(cleanup_func())
                        except RuntimeError:
                            # Create new loop if none exists
                            loop = asyncio.new_event_loop()
                            loop.run_until_complete(cleanup_func())
                            loop.close()
                    else:
                        cleanup_func()
                except Exception as e:
                    get_foundation_logger().error(
                        "Component cleanup failed",
                        component=entry.name,
                        dimension=entry.dimension,
                        error=str(e),
                    )

discover_components

discover_components(
    group: str,
    dimension: str | None = None,
    registry: Registry | None = None,
) -> dict[str, type[Any]]

Discover and register components from entry points.

Uses the @resilient decorator for standardized error handling.

Parameters:

Name Type Description Default
group str

Entry point group name (e.g., 'provide.components')

required
dimension str | None

Registry dimension for components (defaults to "component")

None
registry Registry | None

Optional registry to use (defaults to global registry)

None

Returns:

Type Description
dict[str, type[Any]]

Dictionary mapping component names to their classes

Source code in provide/foundation/hub/discovery.py
def discover_components(
    group: str,
    dimension: str | None = None,
    registry: Registry | None = None,
) -> dict[str, type[Any]]:
    """Discover and register components from entry points.

    Uses the @resilient decorator for standardized error handling.

    Args:
        group: Entry point group name (e.g., 'provide.components')
        dimension: Registry dimension for components (defaults to "component")
        registry: Optional registry to use (defaults to global registry)

    Returns:
        Dictionary mapping component names to their classes

    """
    # Use ComponentCategory default if not specified
    if dimension is None:
        dimension = ComponentCategory.COMPONENT.value

    discovered = {}

    # If no registry provided, get the global component registry
    if registry is None:
        registry = _get_registry_and_lock()

    # Get entry points for the group (with resilient error handling)
    group_entries = _get_entry_points(group)

    # Load each entry point (with resilient error handling per entry point)
    for entry_point in group_entries:
        result = _load_entry_point(entry_point, registry, dimension)
        if result is not None:
            name, component_class = result
            discovered[name] = component_class

    return discovered

execute_error_handlers

execute_error_handlers(
    exception: Exception, context: dict[str, Any]
) -> dict[str, Any] | None

Execute error handlers until one handles the exception.

Source code in provide/foundation/hub/handlers.py
@resilient(
    fallback=None,
    context_provider=lambda: {
        "function": "execute_error_handlers",
        "module": "hub.handlers",
    },
)
def execute_error_handlers(exception: Exception, context: dict[str, Any]) -> dict[str, Any] | None:
    """Execute error handlers until one handles the exception."""
    handlers = get_handlers_for_exception(exception)

    for entry in handlers:
        handler = entry.value
        try:
            result = handler(exception, context)
            if result is not None:
                return result
        except Exception as handler_error:
            get_foundation_logger().error("Error handler failed", handler=entry.name, error=str(handler_error))

    return None

get_component_config_schema

get_component_config_schema(
    name: str, dimension: str
) -> dict[str, Any] | None

Get component configuration schema.

Source code in provide/foundation/hub/components.py
def get_component_config_schema(name: str, dimension: str) -> dict[str, Any] | None:
    """Get component configuration schema."""
    entry = _component_registry.get_entry(name, dimension)

    if not entry:
        return None

    return entry.metadata.get("config_schema")

get_component_registry

get_component_registry() -> Registry

Get the global component registry.

Source code in provide/foundation/hub/components.py
def get_component_registry() -> Registry:
    """Get the global component registry."""
    return _component_registry

get_config_chain

get_config_chain() -> list[RegistryEntry]

Get configuration sources ordered by priority.

Source code in provide/foundation/hub/config.py
def get_config_chain() -> list[RegistryEntry]:
    """Get configuration sources ordered by priority."""
    registry, ComponentCategory = _get_registry_and_lock()

    # Get all config sources
    all_entries = list(registry)
    config_sources = [
        entry for entry in all_entries if entry.dimension == ComponentCategory.CONFIG_SOURCE.value
    ]

    # Sort by priority (highest first)
    config_sources.sort(key=lambda e: e.metadata.get("priority", 0), reverse=True)
    return config_sources

get_handlers_for_exception

get_handlers_for_exception(
    exception: Exception,
) -> list[RegistryEntry]

Get error handlers that can handle the given exception type.

Source code in provide/foundation/hub/handlers.py
def get_handlers_for_exception(exception: Exception) -> list[RegistryEntry]:
    """Get error handlers that can handle the given exception type."""
    registry, ComponentCategory = _get_registry_and_lock()

    # Get all error handlers
    all_entries = list(registry)
    handlers = [entry for entry in all_entries if entry.dimension == ComponentCategory.ERROR_HANDLER.value]

    # Filter by exception type
    exception_type_name = type(exception).__name__
    matching_handlers = []

    for entry in handlers:
        exception_types = entry.metadata.get("exception_types", [])
        if any(
            exc_type in exception_type_name or exception_type_name in exc_type for exc_type in exception_types
        ):
            matching_handlers.append(entry)

    # Sort by priority (highest first)
    matching_handlers.sort(key=lambda e: e.metadata.get("priority", 0), reverse=True)
    return matching_handlers

get_or_initialize_component

get_or_initialize_component(
    name: str, dimension: str
) -> Any

Get component, initializing lazily if needed.

Source code in provide/foundation/hub/lifecycle.py
def get_or_initialize_component(name: str, dimension: str) -> Any:
    """Get component, initializing lazily if needed."""
    registry, initialized_components = _get_registry_and_globals()
    key = (name, dimension)

    # Return already initialized component
    if key in initialized_components:
        return initialized_components[key]

    entry = registry.get_entry(name, dimension)

    if not entry:
        return None

    # If already initialized, return it
    if entry.value is not None:
        initialized_components[key] = entry.value
        return entry.value

    # Initialize lazily
    if entry.metadata.get("lazy", False):
        factory = entry.metadata.get("factory")
        if factory:
            try:
                component = factory()
                # Update registry with initialized component
                registry.register(
                    name=name,
                    value=component,
                    dimension=dimension,
                    metadata=entry.metadata,
                    replace=True,
                )
                initialized_components[key] = component
                return component
            except Exception as e:
                get_foundation_logger().error(
                    "Component initialization failed",
                    component=name,
                    dimension=dimension,
                    error=str(e),
                )
                # Return None on failure for resilient behavior
                return None

    return entry.value

get_processor_pipeline

get_processor_pipeline() -> list[RegistryEntry]

Get log processors ordered by priority.

Source code in provide/foundation/hub/processors.py
def get_processor_pipeline() -> list[RegistryEntry]:
    """Get log processors ordered by priority."""
    registry, ComponentCategory = _get_registry_and_lock()

    # Get all processors
    all_entries = list(registry)
    processors = [entry for entry in all_entries if entry.dimension == ComponentCategory.PROCESSOR.value]

    # Sort by priority (highest first)
    processors.sort(key=lambda e: e.metadata.get("priority", 0), reverse=True)
    return processors

get_processors_for_stage

get_processors_for_stage(stage: str) -> list[RegistryEntry]

Get processors for a specific processing stage.

Source code in provide/foundation/hub/processors.py
def get_processors_for_stage(stage: str) -> list[RegistryEntry]:
    """Get processors for a specific processing stage."""
    pipeline = get_processor_pipeline()
    return [entry for entry in pipeline if entry.metadata.get("stage") == stage]

initialize_all_async_components async

initialize_all_async_components() -> None

Initialize all async components in dependency order.

Source code in provide/foundation/hub/lifecycle.py
async def initialize_all_async_components() -> None:
    """Initialize all async components in dependency order."""
    registry, _ = _get_registry_and_globals()

    # Get all async components
    async_components = [entry for entry in registry if entry.metadata.get("async", False)]

    # Sort by priority for initialization order
    async_components.sort(key=lambda e: e.metadata.get("priority", 0), reverse=True)

    # Initialize each component
    for entry in async_components:
        try:
            await initialize_async_component(entry.name, entry.dimension)
        except Exception as e:
            get_foundation_logger().error(
                "Failed to initialize async component",
                component=entry.name,
                dimension=entry.dimension,
                error=str(e),
            )

initialize_async_component async

initialize_async_component(
    name: str, dimension: str
) -> Any

Initialize component asynchronously.

Source code in provide/foundation/hub/lifecycle.py
async def initialize_async_component(name: str, dimension: str) -> Any:
    """Initialize component asynchronously."""
    registry, initialized_components = _get_registry_and_globals()
    key = (name, dimension)

    # First, check if already initialized
    if key in initialized_components:
        return initialized_components[key]

    # Registry operations are thread-safe internally, no external lock needed
    entry = registry.get_entry(name, dimension)

    if not entry:
        return None

    # If not async or no factory, return current value
    if not entry.metadata.get("async", False):
        return entry.value

    factory = entry.metadata.get("factory")
    if not factory:
        return entry.value

    # Double-check if already initialized
    if key in initialized_components:
        return initialized_components[key]

    # Initialize component outside any lock
    try:
        if inspect.iscoroutinefunction(factory):
            component = await factory()
        else:
            component = factory()

        # Update both registry and initialized_components
        # Registry handles its own thread-safety
        registry.register(
            name=name,
            value=component,
            dimension=dimension,
            metadata=entry.metadata,
            replace=True,
        )

        # Update initialized_components cache
        # Final check before update (race condition is acceptable here)
        if key not in initialized_components:
            initialized_components[key] = component
        return initialized_components[key]

    except Exception as e:
        get_foundation_logger().error(
            "Async component initialization failed",
            component=name,
            dimension=dimension,
            error=str(e),
        )
        # Return None on failure for resilient behavior
        return None

load_all_configs async

load_all_configs() -> dict[str, Any]

Load configurations from all registered sources.

Source code in provide/foundation/hub/config.py
@resilient(fallback={}, context_provider=lambda: {"function": "load_all_configs"})
async def load_all_configs() -> dict[str, Any]:
    """Load configurations from all registered sources."""
    configs = {}
    chain = get_config_chain()

    for entry in chain:
        source = entry.value
        if hasattr(source, "load_config"):
            try:
                if inspect.iscoroutinefunction(source.load_config):
                    source_config = await source.load_config()
                else:
                    source_config = source.load_config()

                if source_config:
                    configs.update(source_config)
            except Exception as e:
                get_foundation_logger().warning(
                    "Config source failed to load", source=entry.name, error=str(e)
                )

    return configs

load_config_from_registry

load_config_from_registry(config_class: type[T]) -> T

Load configuration from registry sources.

Parameters:

Name Type Description Default
config_class type[T]

Configuration class to instantiate

required

Returns:

Type Description
T

Configuration instance loaded from registry sources

Source code in provide/foundation/hub/config.py
def load_config_from_registry(config_class: type[T]) -> T:
    """Load configuration from registry sources.

    Args:
        config_class: Configuration class to instantiate

    Returns:
        Configuration instance loaded from registry sources

    """
    _registry, _ComponentCategory = _get_registry_and_lock()

    # Get configuration data from registry
    config_data = {}

    # Load from all config sources
    chain = get_config_chain()
    for entry in chain:
        source = entry.value
        if hasattr(source, "load_config"):
            try:
                # Skip async sources in sync context
                if inspect.iscoroutinefunction(source.load_config):
                    get_foundation_logger().debug(
                        "Skipping async config source in sync context",
                        source=entry.name,
                    )
                    continue

                source_data = source.load_config()
                if source_data:
                    config_data.update(source_data)
            except Exception as e:
                get_foundation_logger().warning(
                    "Failed to load config from source",
                    source=entry.name,
                    error=str(e),
                )

    # Create config instance
    return config_class.from_dict(config_data)

resolve_component_dependencies

resolve_component_dependencies(
    name: str, dimension: str
) -> dict[str, Any]

Resolve component dependencies recursively.

Source code in provide/foundation/hub/discovery.py
def resolve_component_dependencies(name: str, dimension: str) -> dict[str, Any]:
    """Resolve component dependencies recursively."""
    registry = _get_registry_and_lock()

    entry = registry.get_entry(name, dimension)

    if not entry:
        return {}

    dependencies = {}
    dep_names = entry.metadata.get("dependencies", [])

    for dep_name in dep_names:
        # Try same dimension first
        dep_component = registry.get(dep_name, dimension)
        if dep_component is not None:
            dependencies[dep_name] = dep_component
        else:
            # Search across dimensions
            dep_component = registry.get(dep_name)
            if dep_component is not None:
                dependencies[dep_name] = dep_component

    return dependencies

resolve_config_value

resolve_config_value(key: str) -> Any

Resolve configuration value using priority-ordered sources.

Source code in provide/foundation/hub/config.py
@resilient(fallback=None, suppress=(Exception,))
def resolve_config_value(key: str) -> Any:
    """Resolve configuration value using priority-ordered sources."""
    registry, ComponentCategory = _get_registry_and_lock()

    # Get all config sources
    all_entries = list(registry)
    config_sources = [
        entry for entry in all_entries if entry.dimension == ComponentCategory.CONFIG_SOURCE.value
    ]

    # Sort by priority (highest first)
    config_sources.sort(key=lambda e: e.metadata.get("priority", 0), reverse=True)

    # Try each source
    for entry in config_sources:
        source = entry.value
        if hasattr(source, "get_value"):
            # Try to get value, continue on error
            try:
                value = source.get_value(key)
                if value is not None:
                    return value
            except Exception as e:
                # Log but continue - config sources may legitimately not have this key
                get_foundation_logger().debug(
                    "Config source failed to get value",
                    source=entry.name,
                    key=key,
                    error=str(e),
                )
                continue

    return None