Skip to content

Processor

provide.foundation.logger.ratelimit.processor

TODO: Add module docstring.

Classes

RateLimiterProcessor

RateLimiterProcessor(
    emit_warning_on_limit: bool = True,
    warning_interval_seconds: float = 60.0,
    summary_interval_seconds: float = 5.0,
)

Structlog processor that applies rate limiting to log messages. Can be configured with global and per-logger rate limits.

Initialize the rate limiter processor.

Parameters:

Name Type Description Default
emit_warning_on_limit bool

Whether to emit a warning when rate limited

True
warning_interval_seconds float

Minimum seconds between rate limit warnings

60.0
summary_interval_seconds float

Interval for rate limit summary reports

5.0
Source code in provide/foundation/logger/ratelimit/processor.py
def __init__(
    self,
    emit_warning_on_limit: bool = True,
    warning_interval_seconds: float = 60.0,
    summary_interval_seconds: float = 5.0,
) -> None:
    """Initialize the rate limiter processor.

    Args:
        emit_warning_on_limit: Whether to emit a warning when rate limited
        warning_interval_seconds: Minimum seconds between rate limit warnings
        summary_interval_seconds: Interval for rate limit summary reports

    """
    self.rate_limiter = GlobalRateLimiter()
    self.emit_warning_on_limit = emit_warning_on_limit
    self.warning_interval_seconds = warning_interval_seconds

    # Track last warning time per logger
    self.last_warning_times: dict[str, float] = {}

    # Track suppressed message counts
    self.suppressed_counts: dict[str, int] = {}
    self.last_summary_time = time.monotonic()
    self.summary_interval = summary_interval_seconds  # Emit summary periodically
Functions
__call__
__call__(
    logger: Any, method_name: str, event_dict: EventDict
) -> structlog.types.EventDict

Process a log event, applying rate limiting.

Parameters:

Name Type Description Default
logger Any

The logger instance

required
method_name str

The log method name (debug, info, etc.)

required
event_dict EventDict

The event dictionary

required

Returns:

Type Description
EventDict

The event dictionary if allowed, or raises DropEvent if rate limited

Source code in provide/foundation/logger/ratelimit/processor.py
def __call__(
    self,
    logger: Any,
    method_name: str,
    event_dict: structlog.types.EventDict,
) -> structlog.types.EventDict:
    """Process a log event, applying rate limiting.

    Args:
        logger: The logger instance
        method_name: The log method name (debug, info, etc.)
        event_dict: The event dictionary

    Returns:
        The event dictionary if allowed, or raises DropEvent if rate limited

    """
    logger_name = event_dict.get("logger_name", "unknown")

    # Check if this log is allowed (pass event_dict for tracking)
    allowed, reason = self.rate_limiter.is_allowed(logger_name, event_dict)

    if not allowed:
        # Track suppressed count
        if logger_name not in self.suppressed_counts:
            self.suppressed_counts[logger_name] = 0
        self.suppressed_counts[logger_name] += 1

        # Optionally emit a warning about rate limiting
        if self.emit_warning_on_limit:
            now = time.monotonic()
            last_warning = self.last_warning_times.get(logger_name, 0)

            if now - last_warning >= self.warning_interval_seconds:
                # Create a rate limit warning event
                self.last_warning_times[logger_name] = now

                # Return a modified event indicating rate limiting
                return {
                    "event": f"⚠️ Rate limit: {reason}",
                    "level": "warning",
                    "logger_name": "provide.foundation.ratelimit",
                    "suppressed_count": self.suppressed_counts[logger_name],
                    "original_logger": logger_name,
                    "_rate_limit_warning": True,
                }

        # Drop the event
        raise structlog.DropEvent

    # Check if we should emit a summary
    now = time.monotonic()
    if now - self.last_summary_time >= self.summary_interval:
        # Always check and emit summary if there's been any rate limiting
        self._emit_summary()
        self.last_summary_time = now

    return event_dict

Functions

create_rate_limiter_processor

create_rate_limiter_processor(
    global_rate: float | None = None,
    global_capacity: float | None = None,
    per_logger_rates: (
        dict[str, tuple[float, float]] | None
    ) = None,
    emit_warnings: bool = True,
    summary_interval: float = 5.0,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: str = "drop_oldest",
) -> RateLimiterProcessor

Factory function to create and configure a rate limiter processor.

Parameters:

Name Type Description Default
global_rate float | None

Global logs per second limit

None
global_capacity float | None

Global burst capacity

None
per_logger_rates dict[str, tuple[float, float]] | None

Dict of logger_name -> (rate, capacity) tuples

None
emit_warnings bool

Whether to emit warnings when rate limited

True
summary_interval float

Seconds between rate limit summary reports

5.0
max_queue_size int

Maximum queue size when buffering

1000
max_memory_mb float | None

Maximum memory for buffered logs

None
overflow_policy str

Policy when queue is full

'drop_oldest'

Returns:

Type Description
RateLimiterProcessor

Configured RateLimiterProcessor instance

Source code in provide/foundation/logger/ratelimit/processor.py
def create_rate_limiter_processor(
    global_rate: float | None = None,
    global_capacity: float | None = None,
    per_logger_rates: dict[str, tuple[float, float]] | None = None,
    emit_warnings: bool = True,
    summary_interval: float = 5.0,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: str = "drop_oldest",
) -> RateLimiterProcessor:
    """Factory function to create and configure a rate limiter processor.

    Args:
        global_rate: Global logs per second limit
        global_capacity: Global burst capacity
        per_logger_rates: Dict of logger_name -> (rate, capacity) tuples
        emit_warnings: Whether to emit warnings when rate limited
        summary_interval: Seconds between rate limit summary reports
        max_queue_size: Maximum queue size when buffering
        max_memory_mb: Maximum memory for buffered logs
        overflow_policy: Policy when queue is full

    Returns:
        Configured RateLimiterProcessor instance

    """
    processor = RateLimiterProcessor(
        emit_warning_on_limit=emit_warnings,
        summary_interval_seconds=summary_interval,
    )

    # Determine if we should use buffered rate limiting
    use_buffered = max_queue_size > 0 and overflow_policy in (
        "drop_oldest",
        "drop_newest",
    )

    # Configure the global rate limiter
    processor.rate_limiter.configure(
        global_rate=global_rate,
        global_capacity=global_capacity,
        per_logger_rates=per_logger_rates,
        use_buffered=use_buffered,
        max_queue_size=max_queue_size,
        max_memory_mb=max_memory_mb,
        overflow_policy=overflow_policy,
    )

    return processor