Index
provide.foundation.logger.ratelimit
¶
TODO: Add module docstring.
Classes¶
AsyncRateLimiter
¶
Asynchronous token bucket rate limiter. Uses asyncio.Lock for thread safety in async contexts.
Initialize the async rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
capacity
|
float
|
Maximum number of tokens (burst capacity) |
required |
refill_rate
|
float
|
Tokens refilled per second |
required |
Source code in provide/foundation/logger/ratelimit/limiters.py
Functions¶
get_stats
async
¶
Get rate limiter statistics.
Source code in provide/foundation/logger/ratelimit/limiters.py
is_allowed
async
¶
Check if a log message is allowed based on available tokens.
Returns:
| Type | Description |
|---|---|
bool
|
True if the log should be allowed, False if rate limited |
Source code in provide/foundation/logger/ratelimit/limiters.py
BufferedRateLimiter
¶
BufferedRateLimiter(
capacity: float,
refill_rate: float,
buffer_size: int = 100,
track_dropped: bool = True,
)
Simple synchronous rate limiter with overflow buffer. Does not use a worker thread - processes inline.
Initialize buffered rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
capacity
|
float
|
Maximum tokens (burst capacity) |
required |
refill_rate
|
float
|
Tokens per second |
required |
buffer_size
|
int
|
Number of recently dropped items to track |
100
|
track_dropped
|
bool
|
Whether to keep dropped items for debugging |
True
|
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
Functions¶
get_dropped_samples
¶
Get recent dropped items for debugging.
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
get_stats
¶
Get statistics.
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
is_allowed
¶
Check if item is allowed based on rate limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
Any | None
|
Optional item to track if dropped |
None
|
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
Tuple of (allowed, reason) |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
GlobalRateLimiter
¶
Global rate limiter singleton for Foundation's logging system. Manages per-logger and global rate limits.
Source code in provide/foundation/logger/ratelimit/limiters.py
Functions¶
configure
¶
configure(
global_rate: float | None = None,
global_capacity: float | None = None,
per_logger_rates: (
dict[str, tuple[float, float]] | None
) = None,
use_buffered: bool = False,
max_queue_size: int = 1000,
max_memory_mb: float | None = None,
overflow_policy: str = "drop_oldest",
) -> None
Configure the global rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
global_rate
|
float | None
|
Global logs per second limit |
None
|
global_capacity
|
float | None
|
Global burst capacity |
None
|
per_logger_rates
|
dict[str, tuple[float, float]] | None
|
Dict of logger_name -> (rate, capacity) tuples |
None
|
use_buffered
|
bool
|
Use buffered rate limiter with tracking |
False
|
max_queue_size
|
int
|
Maximum queue size for buffered limiter |
1000
|
max_memory_mb
|
float | None
|
Maximum memory for buffered limiter |
None
|
overflow_policy
|
str
|
What to do when queue is full |
'drop_oldest'
|
Source code in provide/foundation/logger/ratelimit/limiters.py
get_stats
¶
Get comprehensive rate limiting statistics.
Source code in provide/foundation/logger/ratelimit/limiters.py
is_allowed
¶
Check if a log from a specific logger is allowed.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger_name
|
str
|
Name of the logger |
required |
item
|
Any | None
|
Optional item for buffered tracking |
None
|
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
Tuple of (allowed, reason) where reason is set if denied |
Source code in provide/foundation/logger/ratelimit/limiters.py
QueuedRateLimiter
¶
QueuedRateLimiter(
capacity: float,
refill_rate: float,
max_queue_size: int = 1000,
max_memory_mb: float | None = None,
overflow_policy: Literal[
"drop_oldest", "drop_newest", "block"
] = "drop_oldest",
)
Rate limiter with a queue for buffering logs. Drops oldest messages when queue is full (FIFO overflow).
Lifecycle Management
The QueuedRateLimiter requires explicit lifecycle management:
1. Create instance: limiter = QueuedRateLimiter(...)
2. Start processing: limiter.start()
3. Use normally: limiter.enqueue(item)
4. Shutdown cleanly: limiter.stop()
Examples:
>>> limiter = QueuedRateLimiter(capacity=100.0, refill_rate=10.0)
>>> limiter.start() # Start background processing
>>> try:
... limiter.enqueue(log_item)
... finally:
... limiter.stop() # Clean shutdown
>>> # Or use as a context manager
>>> with QueuedRateLimiter(100.0, 10.0) as limiter:
... limiter.enqueue(log_item) # Automatically starts and stops
Note on Threading
This implementation uses threading.Thread for background processing. Foundation's preferred concurrency model is asyncio (see utils/rate_limiting.py for the async TokenBucketRateLimiter). This threading approach is maintained for backward compatibility with synchronous logging contexts.
Initialize the queued rate limiter.
Note
This does NOT start the worker thread automatically. Call start() to begin processing the queue. This allows applications to control the lifecycle and thread management.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
capacity
|
float
|
Maximum tokens (burst capacity) |
required |
refill_rate
|
float
|
Tokens per second |
required |
max_queue_size
|
int
|
Maximum number of items in queue |
1000
|
max_memory_mb
|
float | None
|
Maximum memory usage in MB (estimated) |
None
|
overflow_policy
|
Literal['drop_oldest', 'drop_newest', 'block']
|
What to do when queue is full |
'drop_oldest'
|
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
Functions¶
__enter__
¶
Enter context manager, automatically starting the worker thread.
Returns:
| Type | Description |
|---|---|
QueuedRateLimiter
|
Self for use in with statement |
Example
with QueuedRateLimiter(100.0, 10.0) as limiter: ... limiter.enqueue(item)
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
__exit__
¶
Exit context manager, automatically stopping the worker thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exc_type
|
Any
|
Exception type (if any) |
required |
exc_val
|
Any
|
Exception value (if any) |
required |
exc_tb
|
Any
|
Exception traceback (if any) |
required |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
enqueue
¶
Add item to queue for rate-limited processing.
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
Tuple of (accepted, reason) where reason is set if rejected |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
get_stats
¶
Get queue statistics.
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
start
¶
Start the worker thread for processing queued items.
This should be called after initialization and before enqueuing items. Can be called multiple times (subsequent calls are no-ops if already running).
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If start() is called after stop() on the same instance |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
stop
¶
Stop the worker thread and wait for it to finish.
This provides a clean shutdown, allowing the worker to finish processing the current item before terminating.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum seconds to wait for thread to finish (default: 1.0) |
1.0
|
Example
limiter.stop(timeout=2.0) # Wait up to 2 seconds for clean shutdown
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
RateLimiterProcessor
¶
RateLimiterProcessor(
emit_warning_on_limit: bool = True,
warning_interval_seconds: float = 60.0,
summary_interval_seconds: float = 5.0,
)
Structlog processor that applies rate limiting to log messages. Can be configured with global and per-logger rate limits.
Initialize the rate limiter processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
emit_warning_on_limit
|
bool
|
Whether to emit a warning when rate limited |
True
|
warning_interval_seconds
|
float
|
Minimum seconds between rate limit warnings |
60.0
|
summary_interval_seconds
|
float
|
Interval for rate limit summary reports |
5.0
|
Source code in provide/foundation/logger/ratelimit/processor.py
Functions¶
__call__
¶
Process a log event, applying rate limiting.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
logger
|
Any
|
The logger instance |
required |
method_name
|
str
|
The log method name (debug, info, etc.) |
required |
event_dict
|
EventDict
|
The event dictionary |
required |
Returns:
| Type | Description |
|---|---|
EventDict
|
The event dictionary if allowed, or raises DropEvent if rate limited |
Source code in provide/foundation/logger/ratelimit/processor.py
SyncRateLimiter
¶
Synchronous token bucket rate limiter for controlling log output rates. Thread-safe implementation suitable for synchronous logging operations.
Initialize the rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
capacity
|
float
|
Maximum number of tokens (burst capacity) |
required |
refill_rate
|
float
|
Tokens refilled per second |
required |
Source code in provide/foundation/logger/ratelimit/limiters.py
Functions¶
get_stats
¶
Get rate limiter statistics.
Source code in provide/foundation/logger/ratelimit/limiters.py
is_allowed
¶
Check if a log message is allowed based on available tokens.
Returns:
| Type | Description |
|---|---|
bool
|
True if the log should be allowed, False if rate limited |
Source code in provide/foundation/logger/ratelimit/limiters.py
Functions¶
create_rate_limiter_processor
¶
create_rate_limiter_processor(
global_rate: float | None = None,
global_capacity: float | None = None,
per_logger_rates: (
dict[str, tuple[float, float]] | None
) = None,
emit_warnings: bool = True,
summary_interval: float = 5.0,
max_queue_size: int = 1000,
max_memory_mb: float | None = None,
overflow_policy: str = "drop_oldest",
) -> RateLimiterProcessor
Factory function to create and configure a rate limiter processor.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
global_rate
|
float | None
|
Global logs per second limit |
None
|
global_capacity
|
float | None
|
Global burst capacity |
None
|
per_logger_rates
|
dict[str, tuple[float, float]] | None
|
Dict of logger_name -> (rate, capacity) tuples |
None
|
emit_warnings
|
bool
|
Whether to emit warnings when rate limited |
True
|
summary_interval
|
float
|
Seconds between rate limit summary reports |
5.0
|
max_queue_size
|
int
|
Maximum queue size when buffering |
1000
|
max_memory_mb
|
float | None
|
Maximum memory for buffered logs |
None
|
overflow_policy
|
str
|
Policy when queue is full |
'drop_oldest'
|
Returns:
| Type | Description |
|---|---|
RateLimiterProcessor
|
Configured RateLimiterProcessor instance |