Skip to content

Index

provide.foundation.logger.ratelimit

TODO: Add module docstring.

Classes

AsyncRateLimiter

AsyncRateLimiter(capacity: float, refill_rate: float)

Asynchronous token bucket rate limiter. Uses asyncio.Lock for thread safety in async contexts.

Initialize the async rate limiter.

Parameters:

Name Type Description Default
capacity float

Maximum number of tokens (burst capacity)

required
refill_rate float

Tokens refilled per second

required
Source code in provide/foundation/logger/ratelimit/limiters.py
def __init__(self, capacity: float, refill_rate: float) -> None:
    """Initialize the async rate limiter.

    Args:
        capacity: Maximum number of tokens (burst capacity)
        refill_rate: Tokens refilled per second

    """
    if capacity <= 0:
        raise ValueError("Capacity must be positive")
    if refill_rate <= 0:
        raise ValueError("Refill rate must be positive")

    self.capacity = float(capacity)
    self.refill_rate = float(refill_rate)
    self.tokens = float(capacity)
    self.last_refill = time.monotonic()
    self._lock = asyncio.Lock()

    # Track statistics
    self.total_allowed = 0
    self.total_denied = 0
    self.last_denied_time: float | None = None
Functions
get_stats async
get_stats() -> dict[str, Any]

Get rate limiter statistics.

Source code in provide/foundation/logger/ratelimit/limiters.py
async def get_stats(self) -> dict[str, Any]:
    """Get rate limiter statistics."""
    async with self._lock:
        return {
            "tokens_available": self.tokens,
            "capacity": self.capacity,
            "refill_rate": self.refill_rate,
            "total_allowed": self.total_allowed,
            "total_denied": self.total_denied,
            "last_denied_time": self.last_denied_time,
        }
is_allowed async
is_allowed() -> bool

Check if a log message is allowed based on available tokens.

Returns:

Type Description
bool

True if the log should be allowed, False if rate limited

Source code in provide/foundation/logger/ratelimit/limiters.py
async def is_allowed(self) -> bool:
    """Check if a log message is allowed based on available tokens.

    Returns:
        True if the log should be allowed, False if rate limited

    """
    async with self._lock:
        now = time.monotonic()
        elapsed = now - self.last_refill

        # Refill tokens based on elapsed time
        if elapsed > 0:
            tokens_to_add = elapsed * self.refill_rate
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

        # Try to consume a token
        if self.tokens >= 1.0:
            self.tokens -= 1.0
            self.total_allowed += 1
            return True
        self.total_denied += 1
        self.last_denied_time = now
        return False

BufferedRateLimiter

BufferedRateLimiter(
    capacity: float,
    refill_rate: float,
    buffer_size: int = 100,
    track_dropped: bool = True,
)

Simple synchronous rate limiter with overflow buffer. Does not use a worker thread - processes inline.

Initialize buffered rate limiter.

Parameters:

Name Type Description Default
capacity float

Maximum tokens (burst capacity)

required
refill_rate float

Tokens per second

required
buffer_size int

Number of recently dropped items to track

100
track_dropped bool

Whether to keep dropped items for debugging

True
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def __init__(
    self,
    capacity: float,
    refill_rate: float,
    buffer_size: int = 100,
    track_dropped: bool = True,
) -> None:
    """Initialize buffered rate limiter.

    Args:
        capacity: Maximum tokens (burst capacity)
        refill_rate: Tokens per second
        buffer_size: Number of recently dropped items to track
        track_dropped: Whether to keep dropped items for debugging

    """
    if capacity <= 0:
        raise ValueError("Capacity must be positive")
    if refill_rate <= 0:
        raise ValueError("Refill rate must be positive")

    self.capacity = float(capacity)
    self.refill_rate = float(refill_rate)
    self.tokens = float(capacity)
    self.last_refill = time.monotonic()
    self.lock = threading.Lock()

    # Track dropped items
    self.buffer_size = buffer_size
    self.track_dropped = track_dropped
    self.dropped_buffer: deque[Any] | None = deque(maxlen=buffer_size) if track_dropped else None

    # Statistics
    self.total_allowed = 0
    self.total_denied = 0
    self.total_bytes_dropped = 0
Functions
get_dropped_samples
get_dropped_samples(count: int = 10) -> list[Any]

Get recent dropped items for debugging.

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def get_dropped_samples(self, count: int = 10) -> list[Any]:
    """Get recent dropped items for debugging."""
    if not self.track_dropped or not self.dropped_buffer:
        return []

    with self.lock:
        return list(self.dropped_buffer)[-count:]
get_stats
get_stats() -> dict[str, Any]

Get statistics.

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def get_stats(self) -> dict[str, Any]:
    """Get statistics."""
    with self.lock:
        stats = {
            "tokens_available": self.tokens,
            "capacity": self.capacity,
            "refill_rate": self.refill_rate,
            "total_allowed": self.total_allowed,
            "total_denied": self.total_denied,
            "total_bytes_dropped": self.total_bytes_dropped,
        }

        if self.track_dropped and self.dropped_buffer:
            stats["dropped_buffer_size"] = len(self.dropped_buffer)
            stats["oldest_dropped_age"] = (
                time.monotonic() - self.dropped_buffer[0]["time"] if self.dropped_buffer else 0
            )

        return stats
is_allowed
is_allowed(
    item: Any | None = None,
) -> tuple[bool, str | None]

Check if item is allowed based on rate limit.

Parameters:

Name Type Description Default
item Any | None

Optional item to track if dropped

None

Returns:

Type Description
tuple[bool, str | None]

Tuple of (allowed, reason)

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def is_allowed(self, item: Any | None = None) -> tuple[bool, str | None]:
    """Check if item is allowed based on rate limit.

    Args:
        item: Optional item to track if dropped

    Returns:
        Tuple of (allowed, reason)

    """
    with self.lock:
        now = time.monotonic()
        elapsed = now - self.last_refill

        # Refill tokens
        if elapsed > 0:
            tokens_to_add = elapsed * self.refill_rate
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

        # Try to consume token
        if self.tokens >= 1.0:
            self.tokens -= 1.0
            self.total_allowed += 1
            return True, None
        self.total_denied += 1

        # Track dropped item
        if self.track_dropped and item is not None and self.dropped_buffer is not None:
            self.dropped_buffer.append(
                {
                    "time": now,
                    "item": item,
                    "size": sys.getsizeof(item),
                },
            )
            self.total_bytes_dropped += sys.getsizeof(item)

        return False, f"Rate limit exceeded (tokens: {self.tokens:.1f})"

GlobalRateLimiter

GlobalRateLimiter()

Global rate limiter singleton for Foundation's logging system. Manages per-logger and global rate limits.

Source code in provide/foundation/logger/ratelimit/limiters.py
def __init__(self) -> None:
    if self._initialized:
        return

    self._initialized = True
    self.global_limiter: Any = None
    self.logger_limiters: dict[str, SyncRateLimiter] = {}
    self.lock = threading.Lock()

    # Default configuration (can be overridden)
    self.global_rate: float | None = None
    self.global_capacity: float | None = None
    self.per_logger_rates: dict[str, tuple[float, float]] = {}

    # Queue configuration
    self.use_buffered = False
    self.max_queue_size = 1000
    self.max_memory_mb: float | None = None
    self.overflow_policy = "drop_oldest"
Functions
configure
configure(
    global_rate: float | None = None,
    global_capacity: float | None = None,
    per_logger_rates: (
        dict[str, tuple[float, float]] | None
    ) = None,
    use_buffered: bool = False,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: str = "drop_oldest",
) -> None

Configure the global rate limiter.

Parameters:

Name Type Description Default
global_rate float | None

Global logs per second limit

None
global_capacity float | None

Global burst capacity

None
per_logger_rates dict[str, tuple[float, float]] | None

Dict of logger_name -> (rate, capacity) tuples

None
use_buffered bool

Use buffered rate limiter with tracking

False
max_queue_size int

Maximum queue size for buffered limiter

1000
max_memory_mb float | None

Maximum memory for buffered limiter

None
overflow_policy str

What to do when queue is full

'drop_oldest'
Source code in provide/foundation/logger/ratelimit/limiters.py
def configure(
    self,
    global_rate: float | None = None,
    global_capacity: float | None = None,
    per_logger_rates: dict[str, tuple[float, float]] | None = None,
    use_buffered: bool = False,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: str = "drop_oldest",
) -> None:
    """Configure the global rate limiter.

    Args:
        global_rate: Global logs per second limit
        global_capacity: Global burst capacity
        per_logger_rates: Dict of logger_name -> (rate, capacity) tuples
        use_buffered: Use buffered rate limiter with tracking
        max_queue_size: Maximum queue size for buffered limiter
        max_memory_mb: Maximum memory for buffered limiter
        overflow_policy: What to do when queue is full

    """
    with self.lock:
        self.use_buffered = use_buffered
        self.max_queue_size = max_queue_size
        self.max_memory_mb = max_memory_mb
        self.overflow_policy = overflow_policy

        if global_rate is not None and global_capacity is not None:
            self.global_rate = global_rate
            self.global_capacity = global_capacity

            if use_buffered:
                from provide.foundation.logger.ratelimit.queue_limiter import (
                    BufferedRateLimiter,
                )

                self.global_limiter = BufferedRateLimiter(
                    capacity=global_capacity,
                    refill_rate=global_rate,
                    buffer_size=max_queue_size,
                    track_dropped=True,
                )
            else:
                self.global_limiter = SyncRateLimiter(global_capacity, global_rate)

        if per_logger_rates:
            self.per_logger_rates = per_logger_rates
            # Create rate limiters for configured loggers
            for logger_name, (rate, capacity) in per_logger_rates.items():
                self.logger_limiters[logger_name] = SyncRateLimiter(capacity, rate)
get_stats
get_stats() -> dict[str, Any]

Get comprehensive rate limiting statistics.

Source code in provide/foundation/logger/ratelimit/limiters.py
def get_stats(self) -> dict[str, Any]:
    """Get comprehensive rate limiting statistics."""
    with self.lock:
        stats: dict[str, Any] = {
            "global": self.global_limiter.get_stats() if self.global_limiter else None,
            "per_logger": {},
        }

        for logger_name, limiter in self.logger_limiters.items():
            stats["per_logger"][logger_name] = limiter.get_stats()

        return stats
is_allowed
is_allowed(
    logger_name: str, item: Any | None = None
) -> tuple[bool, str | None]

Check if a log from a specific logger is allowed.

Parameters:

Name Type Description Default
logger_name str

Name of the logger

required
item Any | None

Optional item for buffered tracking

None

Returns:

Type Description
tuple[bool, str | None]

Tuple of (allowed, reason) where reason is set if denied

Source code in provide/foundation/logger/ratelimit/limiters.py
def is_allowed(self, logger_name: str, item: Any | None = None) -> tuple[bool, str | None]:
    """Check if a log from a specific logger is allowed.

    Args:
        logger_name: Name of the logger
        item: Optional item for buffered tracking

    Returns:
        Tuple of (allowed, reason) where reason is set if denied

    """
    with self.lock:
        # Check per-logger limit first
        if logger_name in self.logger_limiters and not self.logger_limiters[logger_name].is_allowed():
            return False, f"Logger '{logger_name}' rate limit exceeded"

        # Check global limit
        if self.global_limiter:
            if self.use_buffered:
                # BufferedRateLimiter returns tuple
                from provide.foundation.logger.ratelimit.queue_limiter import (
                    BufferedRateLimiter,
                )

                if isinstance(self.global_limiter, BufferedRateLimiter):
                    allowed, reason = self.global_limiter.is_allowed(item)
                    if not allowed:
                        return False, reason or "Global rate limit exceeded"
            # SyncRateLimiter returns bool
            elif not self.global_limiter.is_allowed():
                return False, "Global rate limit exceeded"

        return True, None

QueuedRateLimiter

QueuedRateLimiter(
    capacity: float,
    refill_rate: float,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: Literal[
        "drop_oldest", "drop_newest", "block"
    ] = "drop_oldest",
)

Rate limiter with a queue for buffering logs. Drops oldest messages when queue is full (FIFO overflow).

Lifecycle Management

The QueuedRateLimiter requires explicit lifecycle management: 1. Create instance: limiter = QueuedRateLimiter(...) 2. Start processing: limiter.start() 3. Use normally: limiter.enqueue(item) 4. Shutdown cleanly: limiter.stop()

Examples:

>>> limiter = QueuedRateLimiter(capacity=100.0, refill_rate=10.0)
>>> limiter.start()  # Start background processing
>>> try:
...     limiter.enqueue(log_item)
... finally:
...     limiter.stop()  # Clean shutdown
>>> # Or use as a context manager
>>> with QueuedRateLimiter(100.0, 10.0) as limiter:
...     limiter.enqueue(log_item)  # Automatically starts and stops
Note on Threading

This implementation uses threading.Thread for background processing. Foundation's preferred concurrency model is asyncio (see utils/rate_limiting.py for the async TokenBucketRateLimiter). This threading approach is maintained for backward compatibility with synchronous logging contexts.

Initialize the queued rate limiter.

Note

This does NOT start the worker thread automatically. Call start() to begin processing the queue. This allows applications to control the lifecycle and thread management.

Parameters:

Name Type Description Default
capacity float

Maximum tokens (burst capacity)

required
refill_rate float

Tokens per second

required
max_queue_size int

Maximum number of items in queue

1000
max_memory_mb float | None

Maximum memory usage in MB (estimated)

None
overflow_policy Literal['drop_oldest', 'drop_newest', 'block']

What to do when queue is full

'drop_oldest'
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def __init__(
    self,
    capacity: float,
    refill_rate: float,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: Literal["drop_oldest", "drop_newest", "block"] = "drop_oldest",
) -> None:
    """Initialize the queued rate limiter.

    Note:
        This does NOT start the worker thread automatically. Call start()
        to begin processing the queue. This allows applications to control
        the lifecycle and thread management.

    Args:
        capacity: Maximum tokens (burst capacity)
        refill_rate: Tokens per second
        max_queue_size: Maximum number of items in queue
        max_memory_mb: Maximum memory usage in MB (estimated)
        overflow_policy: What to do when queue is full

    """
    if capacity <= 0:
        raise ValueError("Capacity must be positive")
    if refill_rate <= 0:
        raise ValueError("Refill rate must be positive")
    if max_queue_size <= 0:
        raise ValueError("Max queue size must be positive")

    self.capacity = float(capacity)
    self.refill_rate = float(refill_rate)
    self.tokens = float(capacity)
    self.last_refill = time.monotonic()

    # Queue management
    self.max_queue_size = max_queue_size
    self.max_memory_bytes = int(max_memory_mb * 1024 * 1024) if max_memory_mb else None
    self.overflow_policy = overflow_policy

    # Use deque for efficient FIFO operations
    self.pending_queue: deque[Any] = deque(
        maxlen=max_queue_size if overflow_policy == "drop_oldest" else None
    )
    self.queue_lock = threading.Lock()

    # Track statistics
    self.total_queued = 0
    self.total_dropped = 0
    self.total_processed = 0
    self.estimated_memory = 0

    # Worker thread for processing queue (not started automatically)
    self.running = False
    self.worker_thread: threading.Thread | None = None
Functions
__enter__
__enter__() -> QueuedRateLimiter

Enter context manager, automatically starting the worker thread.

Returns:

Type Description
QueuedRateLimiter

Self for use in with statement

Example

with QueuedRateLimiter(100.0, 10.0) as limiter: ... limiter.enqueue(item)

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def __enter__(self) -> QueuedRateLimiter:
    """Enter context manager, automatically starting the worker thread.

    Returns:
        Self for use in with statement

    Example:
        >>> with QueuedRateLimiter(100.0, 10.0) as limiter:
        ...     limiter.enqueue(item)
    """
    self.start()
    return self
__exit__
__exit__(exc_type: Any, exc_val: Any, exc_tb: Any) -> None

Exit context manager, automatically stopping the worker thread.

Parameters:

Name Type Description Default
exc_type Any

Exception type (if any)

required
exc_val Any

Exception value (if any)

required
exc_tb Any

Exception traceback (if any)

required
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def __exit__(self, exc_type: Any, exc_val: Any, exc_tb: Any) -> None:
    """Exit context manager, automatically stopping the worker thread.

    Args:
        exc_type: Exception type (if any)
        exc_val: Exception value (if any)
        exc_tb: Exception traceback (if any)
    """
    self.stop()
enqueue
enqueue(item: Any) -> tuple[bool, str | None]

Add item to queue for rate-limited processing.

Returns:

Type Description
tuple[bool, str | None]

Tuple of (accepted, reason) where reason is set if rejected

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def enqueue(self, item: Any) -> tuple[bool, str | None]:
    """Add item to queue for rate-limited processing.

    Returns:
        Tuple of (accepted, reason) where reason is set if rejected

    """
    with self.queue_lock:
        # Check memory limit
        if self.max_memory_bytes:
            item_size = self._estimate_size(item)
            if self.estimated_memory + item_size > self.max_memory_bytes:
                self.total_dropped += 1
                return (
                    False,
                    f"Memory limit exceeded ({self.estimated_memory / 1024 / 1024:.1f}MB)",
                )

        # Check queue size
        if len(self.pending_queue) >= self.max_queue_size:
            if self.overflow_policy == "drop_newest":
                self.total_dropped += 1
                return False, f"Queue full ({self.max_queue_size} items)"
            if self.overflow_policy == "drop_oldest":
                # deque with maxlen automatically drops oldest
                if len(self.pending_queue) > 0:
                    old_item = (
                        self.pending_queue[0] if len(self.pending_queue) == self.max_queue_size else None
                    )
                    if old_item and self.max_memory_bytes:
                        self.estimated_memory -= self._estimate_size(old_item)
                    self.total_dropped += 1
            elif self.overflow_policy == "block":
                # In block mode, we would need to wait
                # For now, just reject
                return False, "Queue full (blocking not implemented)"

        # Add to queue
        self.pending_queue.append(item)
        self.total_queued += 1

        if self.max_memory_bytes:
            self.estimated_memory += self._estimate_size(item)

        return True, None
get_stats
get_stats() -> dict[str, Any]

Get queue statistics.

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def get_stats(self) -> dict[str, Any]:
    """Get queue statistics."""
    with self.queue_lock:
        return {
            "queue_size": len(self.pending_queue),
            "max_queue_size": self.max_queue_size,
            "tokens_available": self.tokens,
            "capacity": self.capacity,
            "refill_rate": self.refill_rate,
            "total_queued": self.total_queued,
            "total_dropped": self.total_dropped,
            "total_processed": self.total_processed,
            "estimated_memory_mb": self.estimated_memory / 1024 / 1024 if self.max_memory_bytes else None,
            "max_memory_mb": self.max_memory_bytes / 1024 / 1024 if self.max_memory_bytes else None,
            "overflow_policy": self.overflow_policy,
        }
start
start() -> None

Start the worker thread for processing queued items.

This should be called after initialization and before enqueuing items. Can be called multiple times (subsequent calls are no-ops if already running).

Raises:

Type Description
RuntimeError

If start() is called after stop() on the same instance

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def start(self) -> None:
    """Start the worker thread for processing queued items.

    This should be called after initialization and before enqueuing items.
    Can be called multiple times (subsequent calls are no-ops if already running).

    Raises:
        RuntimeError: If start() is called after stop() on the same instance
    """
    if self.running:
        # Already running, no-op
        return

    if self.worker_thread is not None and self.worker_thread.is_alive():
        # Thread exists and is alive, no-op
        return

    # Start new worker thread
    self.running = True
    self.worker_thread = threading.Thread(target=self._process_queue, daemon=True)
    self.worker_thread.start()
stop
stop(timeout: float = 1.0) -> None

Stop the worker thread and wait for it to finish.

This provides a clean shutdown, allowing the worker to finish processing the current item before terminating.

Parameters:

Name Type Description Default
timeout float

Maximum seconds to wait for thread to finish (default: 1.0)

1.0
Example

limiter.stop(timeout=2.0) # Wait up to 2 seconds for clean shutdown

Source code in provide/foundation/logger/ratelimit/queue_limiter.py
def stop(self, timeout: float = 1.0) -> None:
    """Stop the worker thread and wait for it to finish.

    This provides a clean shutdown, allowing the worker to finish processing
    the current item before terminating.

    Args:
        timeout: Maximum seconds to wait for thread to finish (default: 1.0)

    Example:
        >>> limiter.stop(timeout=2.0)  # Wait up to 2 seconds for clean shutdown
    """
    self.running = False
    if self.worker_thread and self.worker_thread.is_alive():
        self.worker_thread.join(timeout=timeout)

RateLimiterProcessor

RateLimiterProcessor(
    emit_warning_on_limit: bool = True,
    warning_interval_seconds: float = 60.0,
    summary_interval_seconds: float = 5.0,
)

Structlog processor that applies rate limiting to log messages. Can be configured with global and per-logger rate limits.

Initialize the rate limiter processor.

Parameters:

Name Type Description Default
emit_warning_on_limit bool

Whether to emit a warning when rate limited

True
warning_interval_seconds float

Minimum seconds between rate limit warnings

60.0
summary_interval_seconds float

Interval for rate limit summary reports

5.0
Source code in provide/foundation/logger/ratelimit/processor.py
def __init__(
    self,
    emit_warning_on_limit: bool = True,
    warning_interval_seconds: float = 60.0,
    summary_interval_seconds: float = 5.0,
) -> None:
    """Initialize the rate limiter processor.

    Args:
        emit_warning_on_limit: Whether to emit a warning when rate limited
        warning_interval_seconds: Minimum seconds between rate limit warnings
        summary_interval_seconds: Interval for rate limit summary reports

    """
    self.rate_limiter = GlobalRateLimiter()
    self.emit_warning_on_limit = emit_warning_on_limit
    self.warning_interval_seconds = warning_interval_seconds

    # Track last warning time per logger
    self.last_warning_times: dict[str, float] = {}

    # Track suppressed message counts
    self.suppressed_counts: dict[str, int] = {}
    self.last_summary_time = time.monotonic()
    self.summary_interval = summary_interval_seconds  # Emit summary periodically
Functions
__call__
__call__(
    logger: Any, method_name: str, event_dict: EventDict
) -> structlog.types.EventDict

Process a log event, applying rate limiting.

Parameters:

Name Type Description Default
logger Any

The logger instance

required
method_name str

The log method name (debug, info, etc.)

required
event_dict EventDict

The event dictionary

required

Returns:

Type Description
EventDict

The event dictionary if allowed, or raises DropEvent if rate limited

Source code in provide/foundation/logger/ratelimit/processor.py
def __call__(
    self,
    logger: Any,
    method_name: str,
    event_dict: structlog.types.EventDict,
) -> structlog.types.EventDict:
    """Process a log event, applying rate limiting.

    Args:
        logger: The logger instance
        method_name: The log method name (debug, info, etc.)
        event_dict: The event dictionary

    Returns:
        The event dictionary if allowed, or raises DropEvent if rate limited

    """
    logger_name = event_dict.get("logger_name", "unknown")

    # Check if this log is allowed (pass event_dict for tracking)
    allowed, reason = self.rate_limiter.is_allowed(logger_name, event_dict)

    if not allowed:
        # Track suppressed count
        if logger_name not in self.suppressed_counts:
            self.suppressed_counts[logger_name] = 0
        self.suppressed_counts[logger_name] += 1

        # Optionally emit a warning about rate limiting
        if self.emit_warning_on_limit:
            now = time.monotonic()
            last_warning = self.last_warning_times.get(logger_name, 0)

            if now - last_warning >= self.warning_interval_seconds:
                # Create a rate limit warning event
                self.last_warning_times[logger_name] = now

                # Return a modified event indicating rate limiting
                return {
                    "event": f"⚠️ Rate limit: {reason}",
                    "level": "warning",
                    "logger_name": "provide.foundation.ratelimit",
                    "suppressed_count": self.suppressed_counts[logger_name],
                    "original_logger": logger_name,
                    "_rate_limit_warning": True,
                }

        # Drop the event
        raise structlog.DropEvent

    # Check if we should emit a summary
    now = time.monotonic()
    if now - self.last_summary_time >= self.summary_interval:
        # Always check and emit summary if there's been any rate limiting
        self._emit_summary()
        self.last_summary_time = now

    return event_dict

SyncRateLimiter

SyncRateLimiter(capacity: float, refill_rate: float)

Synchronous token bucket rate limiter for controlling log output rates. Thread-safe implementation suitable for synchronous logging operations.

Initialize the rate limiter.

Parameters:

Name Type Description Default
capacity float

Maximum number of tokens (burst capacity)

required
refill_rate float

Tokens refilled per second

required
Source code in provide/foundation/logger/ratelimit/limiters.py
def __init__(self, capacity: float, refill_rate: float) -> None:
    """Initialize the rate limiter.

    Args:
        capacity: Maximum number of tokens (burst capacity)
        refill_rate: Tokens refilled per second

    """
    if capacity <= 0:
        raise ValueError("Capacity must be positive")
    if refill_rate <= 0:
        raise ValueError("Refill rate must be positive")

    self.capacity = float(capacity)
    self.refill_rate = float(refill_rate)
    self.tokens = float(capacity)
    self.last_refill = time.monotonic()
    self.lock = threading.Lock()

    # Track statistics
    self.total_allowed = 0
    self.total_denied = 0
    self.last_denied_time: float | None = None
Functions
get_stats
get_stats() -> dict[str, Any]

Get rate limiter statistics.

Source code in provide/foundation/logger/ratelimit/limiters.py
def get_stats(self) -> dict[str, Any]:
    """Get rate limiter statistics."""
    with self.lock:
        return {
            "tokens_available": self.tokens,
            "capacity": self.capacity,
            "refill_rate": self.refill_rate,
            "total_allowed": self.total_allowed,
            "total_denied": self.total_denied,
            "last_denied_time": self.last_denied_time,
        }
is_allowed
is_allowed() -> bool

Check if a log message is allowed based on available tokens.

Returns:

Type Description
bool

True if the log should be allowed, False if rate limited

Source code in provide/foundation/logger/ratelimit/limiters.py
def is_allowed(self) -> bool:
    """Check if a log message is allowed based on available tokens.

    Returns:
        True if the log should be allowed, False if rate limited

    """
    with self.lock:
        now = time.monotonic()
        elapsed = now - self.last_refill

        # Refill tokens based on elapsed time
        if elapsed > 0:
            tokens_to_add = elapsed * self.refill_rate
            self.tokens = min(self.capacity, self.tokens + tokens_to_add)
            self.last_refill = now

        # Try to consume a token
        if self.tokens >= 1.0:
            self.tokens -= 1.0
            self.total_allowed += 1
            return True
        self.total_denied += 1
        self.last_denied_time = now
        return False

Functions

create_rate_limiter_processor

create_rate_limiter_processor(
    global_rate: float | None = None,
    global_capacity: float | None = None,
    per_logger_rates: (
        dict[str, tuple[float, float]] | None
    ) = None,
    emit_warnings: bool = True,
    summary_interval: float = 5.0,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: str = "drop_oldest",
) -> RateLimiterProcessor

Factory function to create and configure a rate limiter processor.

Parameters:

Name Type Description Default
global_rate float | None

Global logs per second limit

None
global_capacity float | None

Global burst capacity

None
per_logger_rates dict[str, tuple[float, float]] | None

Dict of logger_name -> (rate, capacity) tuples

None
emit_warnings bool

Whether to emit warnings when rate limited

True
summary_interval float

Seconds between rate limit summary reports

5.0
max_queue_size int

Maximum queue size when buffering

1000
max_memory_mb float | None

Maximum memory for buffered logs

None
overflow_policy str

Policy when queue is full

'drop_oldest'

Returns:

Type Description
RateLimiterProcessor

Configured RateLimiterProcessor instance

Source code in provide/foundation/logger/ratelimit/processor.py
def create_rate_limiter_processor(
    global_rate: float | None = None,
    global_capacity: float | None = None,
    per_logger_rates: dict[str, tuple[float, float]] | None = None,
    emit_warnings: bool = True,
    summary_interval: float = 5.0,
    max_queue_size: int = 1000,
    max_memory_mb: float | None = None,
    overflow_policy: str = "drop_oldest",
) -> RateLimiterProcessor:
    """Factory function to create and configure a rate limiter processor.

    Args:
        global_rate: Global logs per second limit
        global_capacity: Global burst capacity
        per_logger_rates: Dict of logger_name -> (rate, capacity) tuples
        emit_warnings: Whether to emit warnings when rate limited
        summary_interval: Seconds between rate limit summary reports
        max_queue_size: Maximum queue size when buffering
        max_memory_mb: Maximum memory for buffered logs
        overflow_policy: Policy when queue is full

    Returns:
        Configured RateLimiterProcessor instance

    """
    processor = RateLimiterProcessor(
        emit_warning_on_limit=emit_warnings,
        summary_interval_seconds=summary_interval,
    )

    # Determine if we should use buffered rate limiting
    use_buffered = max_queue_size > 0 and overflow_policy in (
        "drop_oldest",
        "drop_newest",
    )

    # Configure the global rate limiter
    processor.rate_limiter.configure(
        global_rate=global_rate,
        global_capacity=global_capacity,
        per_logger_rates=per_logger_rates,
        use_buffered=use_buffered,
        max_queue_size=max_queue_size,
        max_memory_mb=max_memory_mb,
        overflow_policy=overflow_policy,
    )

    return processor