Queue limiter
provide.foundation.logger.ratelimit.queue_limiter
¶
TODO: Add module docstring.
Classes¶
BufferedRateLimiter
¶
BufferedRateLimiter(
capacity: float,
refill_rate: float,
buffer_size: int = 100,
track_dropped: bool = True,
)
Simple synchronous rate limiter with overflow buffer. Does not use a worker thread - processes inline.
Initialize buffered rate limiter.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
capacity
|
float
|
Maximum tokens (burst capacity) |
required |
refill_rate
|
float
|
Tokens per second |
required |
buffer_size
|
int
|
Number of recently dropped items to track |
100
|
track_dropped
|
bool
|
Whether to keep dropped items for debugging |
True
|
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
Functions¶
get_dropped_samples
¶
Get recent dropped items for debugging.
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
get_stats
¶
Get statistics.
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
is_allowed
¶
Check if item is allowed based on rate limit.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
item
|
Any | None
|
Optional item to track if dropped |
None
|
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
Tuple of (allowed, reason) |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
QueuedRateLimiter
¶
QueuedRateLimiter(
capacity: float,
refill_rate: float,
max_queue_size: int = 1000,
max_memory_mb: float | None = None,
overflow_policy: Literal[
"drop_oldest", "drop_newest", "block"
] = "drop_oldest",
)
Rate limiter with a queue for buffering logs. Drops oldest messages when queue is full (FIFO overflow).
Lifecycle Management
The QueuedRateLimiter requires explicit lifecycle management:
1. Create instance: limiter = QueuedRateLimiter(...)
2. Start processing: limiter.start()
3. Use normally: limiter.enqueue(item)
4. Shutdown cleanly: limiter.stop()
Examples:
>>> limiter = QueuedRateLimiter(capacity=100.0, refill_rate=10.0)
>>> limiter.start() # Start background processing
>>> try:
... limiter.enqueue(log_item)
... finally:
... limiter.stop() # Clean shutdown
>>> # Or use as a context manager
>>> with QueuedRateLimiter(100.0, 10.0) as limiter:
... limiter.enqueue(log_item) # Automatically starts and stops
Note on Threading
This implementation uses threading.Thread for background processing. Foundation's preferred concurrency model is asyncio (see utils/rate_limiting.py for the async TokenBucketRateLimiter). This threading approach is maintained for backward compatibility with synchronous logging contexts.
Initialize the queued rate limiter.
Note
This does NOT start the worker thread automatically. Call start() to begin processing the queue. This allows applications to control the lifecycle and thread management.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
capacity
|
float
|
Maximum tokens (burst capacity) |
required |
refill_rate
|
float
|
Tokens per second |
required |
max_queue_size
|
int
|
Maximum number of items in queue |
1000
|
max_memory_mb
|
float | None
|
Maximum memory usage in MB (estimated) |
None
|
overflow_policy
|
Literal['drop_oldest', 'drop_newest', 'block']
|
What to do when queue is full |
'drop_oldest'
|
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
Functions¶
__enter__
¶
Enter context manager, automatically starting the worker thread.
Returns:
| Type | Description |
|---|---|
QueuedRateLimiter
|
Self for use in with statement |
Example
with QueuedRateLimiter(100.0, 10.0) as limiter: ... limiter.enqueue(item)
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
__exit__
¶
Exit context manager, automatically stopping the worker thread.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
exc_type
|
Any
|
Exception type (if any) |
required |
exc_val
|
Any
|
Exception value (if any) |
required |
exc_tb
|
Any
|
Exception traceback (if any) |
required |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
enqueue
¶
Add item to queue for rate-limited processing.
Returns:
| Type | Description |
|---|---|
tuple[bool, str | None]
|
Tuple of (accepted, reason) where reason is set if rejected |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
get_stats
¶
Get queue statistics.
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
start
¶
Start the worker thread for processing queued items.
This should be called after initialization and before enqueuing items. Can be called multiple times (subsequent calls are no-ops if already running).
Raises:
| Type | Description |
|---|---|
RuntimeError
|
If start() is called after stop() on the same instance |
Source code in provide/foundation/logger/ratelimit/queue_limiter.py
stop
¶
Stop the worker thread and wait for it to finish.
This provides a clean shutdown, allowing the worker to finish processing the current item before terminating.
Parameters:
| Name | Type | Description | Default |
|---|---|---|---|
timeout
|
float
|
Maximum seconds to wait for thread to finish (default: 1.0) |
1.0
|
Example
limiter.stop(timeout=2.0) # Wait up to 2 seconds for clean shutdown