Skip to content

Events

provide.foundation.hub.events

TODO: Add module docstring.

Classes

Event

Base event class for all system events.

EventBus

EventBus()

Thread-safe event bus for decoupled component communication.

Uses weak references to prevent memory leaks from event handlers.

Initialize empty event bus.

Source code in provide/foundation/hub/events.py
def __init__(self) -> None:
    """Initialize empty event bus."""
    self._handlers: dict[str, list[weakref.ReferenceType]] = {}
    self._cleanup_threshold = 10  # Clean up after this many operations
    self._operation_count = 0
    self._lock = threading.RLock()  # RLock for thread safety
    self._failed_handler_count = 0  # Track handler failures for monitoring
    self._last_errors: list[dict[str, Any]] = []  # Recent errors (max 10)
Functions
clear
clear() -> None

Clear all event subscriptions.

This is primarily used during test resets to prevent duplicate event handlers from accumulating across test runs.

Source code in provide/foundation/hub/events.py
def clear(self) -> None:
    """Clear all event subscriptions.

    This is primarily used during test resets to prevent duplicate
    event handlers from accumulating across test runs.
    """
    with self._lock:
        self._handlers.clear()
        self._operation_count = 0
        self._failed_handler_count = 0
        self._last_errors.clear()
emit
emit(event: Event | RegistryEvent) -> None

Emit an event to all subscribers.

Handler errors are logged but do not prevent other handlers from running. This ensures isolation - one failing handler doesn't break the entire event system.

Parameters:

Name Type Description Default
event Event | RegistryEvent

Event to emit

required
Source code in provide/foundation/hub/events.py
def emit(self, event: Event | RegistryEvent) -> None:
    """Emit an event to all subscribers.

    Handler errors are logged but do not prevent other handlers from running.
    This ensures isolation - one failing handler doesn't break the entire event system.

    Args:
        event: Event to emit
    """
    with self._lock:
        if event.name not in self._handlers:
            return

        # Clean up dead references and call live handlers
        live_handlers = []
        for weak_handler in self._handlers[event.name]:
            handler = weak_handler()
            if handler is not None:
                live_handlers.append(weak_handler)
                try:
                    handler(event)
                except Exception as e:
                    # Log error but continue processing other handlers
                    self._handle_handler_error(event, handler, e)

        # Update handler list with only live references
        self._handlers[event.name] = live_handlers

        # Periodic cleanup of all dead references
        self._operation_count += 1
        if self._operation_count >= self._cleanup_threshold:
            self._cleanup_dead_references()
            self._operation_count = 0
force_cleanup
force_cleanup() -> None

Force immediate cleanup of all dead references.

Source code in provide/foundation/hub/events.py
def force_cleanup(self) -> None:
    """Force immediate cleanup of all dead references."""
    with self._lock:
        self._cleanup_dead_references()
        self._operation_count = 0
get_error_stats
get_error_stats() -> dict[str, Any]

Get error statistics for monitoring handler failures.

Returns:

Type Description
dict[str, Any]

Dictionary with error statistics including:

dict[str, Any]
  • failed_handler_count: Total number of handler failures
dict[str, Any]
  • recent_errors: List of recent error details (max 10)
Source code in provide/foundation/hub/events.py
def get_error_stats(self) -> dict[str, Any]:
    """Get error statistics for monitoring handler failures.

    Returns:
        Dictionary with error statistics including:
        - failed_handler_count: Total number of handler failures
        - recent_errors: List of recent error details (max 10)
    """
    with self._lock:
        return {
            "failed_handler_count": self._failed_handler_count,
            "recent_errors": self._last_errors.copy(),
        }
get_memory_stats
get_memory_stats() -> dict[str, Any]

Get memory usage statistics for the event bus.

Source code in provide/foundation/hub/events.py
def get_memory_stats(self) -> dict[str, Any]:
    """Get memory usage statistics for the event bus."""
    with self._lock:
        total_handlers = 0
        dead_handlers = 0

        for handlers in self._handlers.values():
            for weak_handler in handlers:
                total_handlers += 1
                if weak_handler() is None:
                    dead_handlers += 1

        return {
            "event_types": len(self._handlers),
            "total_handlers": total_handlers,
            "live_handlers": total_handlers - dead_handlers,
            "dead_handlers": dead_handlers,
            "operation_count": self._operation_count,
        }
subscribe
subscribe(
    event_name: str, handler: Callable[[Event], None]
) -> None

Subscribe to events by name.

Parameters:

Name Type Description Default
event_name str

Name of event to subscribe to

required
handler Callable[[Event], None]

Function to call when event occurs

required
Source code in provide/foundation/hub/events.py
def subscribe(self, event_name: str, handler: Callable[[Event], None]) -> None:
    """Subscribe to events by name.

    Args:
        event_name: Name of event to subscribe to
        handler: Function to call when event occurs
    """
    with self._lock:
        if event_name not in self._handlers:
            self._handlers[event_name] = []

        # Use weak reference to prevent memory leaks
        weak_handler = weakref.ref(handler)
        self._handlers[event_name].append(weak_handler)
unsubscribe
unsubscribe(
    event_name: str, handler: Callable[[Event], None]
) -> None

Unsubscribe from events.

Parameters:

Name Type Description Default
event_name str

Name of event to unsubscribe from

required
handler Callable[[Event], None]

Handler function to remove

required
Source code in provide/foundation/hub/events.py
def unsubscribe(self, event_name: str, handler: Callable[[Event], None]) -> None:
    """Unsubscribe from events.

    Args:
        event_name: Name of event to unsubscribe from
        handler: Handler function to remove
    """
    with self._lock:
        if event_name not in self._handlers:
            return

        # Remove handler by comparing actual functions
        self._handlers[event_name] = [
            weak_ref for weak_ref in self._handlers[event_name] if weak_ref() is not handler
        ]

RegistryEvent

Events emitted by the registry system.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Set event name from operation.

Source code in provide/foundation/hub/events.py
def __attrs_post_init__(self) -> None:
    """Set event name from operation."""
    if not self.name:
        object.__setattr__(self, "name", f"registry.{self.operation}")

Functions

emit_registry_event

emit_registry_event(
    operation: str,
    item_name: str,
    dimension: str,
    **kwargs: Any
) -> None

Emit a registry operation event.

Parameters:

Name Type Description Default
operation str

Type of operation (register, remove, etc.)

required
item_name str

Name of the registry item

required
dimension str

Registry dimension

required
**kwargs Any

Additional event data

{}
Source code in provide/foundation/hub/events.py
def emit_registry_event(operation: str, item_name: str, dimension: str, **kwargs: Any) -> None:
    """Emit a registry operation event.

    Args:
        operation: Type of operation (register, remove, etc.)
        item_name: Name of the registry item
        dimension: Registry dimension
        **kwargs: Additional event data
    """
    event = RegistryEvent(
        name="",  # Will be set by __attrs_post_init__
        operation=operation,
        item_name=item_name,
        dimension=dimension,
        data=kwargs,
        source="registry",
    )
    _event_bus.emit(event)

get_event_bus

get_event_bus() -> EventBus

Get the global event bus instance.

Source code in provide/foundation/hub/events.py
def get_event_bus() -> EventBus:
    """Get the global event bus instance."""
    return _event_bus