Skip to content

Processors

provide.foundation.hub.processors

TODO: Add module docstring.

Classes

Functions

get_processor_pipeline

get_processor_pipeline() -> list[RegistryEntry]

Get log processors ordered by priority.

Source code in provide/foundation/hub/processors.py
def get_processor_pipeline() -> list[RegistryEntry]:
    """Get log processors ordered by priority."""
    registry, ComponentCategory = _get_registry_and_lock()

    # Get all processors
    all_entries = list(registry)
    processors = [entry for entry in all_entries if entry.dimension == ComponentCategory.PROCESSOR.value]

    # Sort by priority (highest first)
    processors.sort(key=lambda e: e.metadata.get("priority", 0), reverse=True)
    return processors

get_processors_for_stage

get_processors_for_stage(stage: str) -> list[RegistryEntry]

Get processors for a specific processing stage.

Source code in provide/foundation/hub/processors.py
def get_processors_for_stage(stage: str) -> list[RegistryEntry]:
    """Get processors for a specific processing stage."""
    pipeline = get_processor_pipeline()
    return [entry for entry in pipeline if entry.metadata.get("stage") == stage]