Skip to content

Bulkhead async

provide.foundation.resilience.bulkhead_async

TODO: Add module docstring.

Classes

AsyncResourcePool

Asynchronous resource pool with limited capacity for isolation.

Async-safe implementation using asyncio.Lock and asyncio.Event. For sync contexts, use SyncResourcePool instead.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Initialize internal state.

Source code in provide/foundation/resilience/bulkhead_async.py
def __attrs_post_init__(self) -> None:
    """Initialize internal state."""
    pass
acquire async
acquire(timeout: float | None = None) -> bool

Acquire a resource slot (async).

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait (defaults to pool timeout)

None

Returns:

Type Description
bool

True if acquired, False if timeout

Raises:

Type Description
RuntimeError

If queue is full

Source code in provide/foundation/resilience/bulkhead_async.py
async def acquire(self, timeout: float | None = None) -> bool:
    """Acquire a resource slot (async).

    Args:
        timeout: Maximum time to wait (defaults to pool timeout)

    Returns:
        True if acquired, False if timeout

    Raises:
        RuntimeError: If queue is full
    """
    actual_timeout = timeout if timeout is not None else self.timeout

    # Try to acquire immediately
    async with self._lock:
        if self._active_count < self.max_concurrent:
            self._active_count += 1
            return True

        # Check queue limit
        if self._waiting_count >= self.max_queue_size:
            raise RuntimeError(f"Queue is full (max: {self.max_queue_size})")

        # Add to wait queue
        self._waiting_count += 1
        waiter = asyncio.Event()
        self._waiters.append(waiter)

    # Wait for signal from release
    try:
        await asyncio.wait_for(waiter.wait(), timeout=actual_timeout)
        # Successfully signaled, we now have the slot
        return True
    except TimeoutError:
        # Timeout - remove from queue
        async with self._lock:
            with contextlib.suppress(ValueError):
                # Remove from queue if still present (already removed by signal if not found)
                self._waiters.remove(waiter)
        return False
    finally:
        async with self._lock:
            self._waiting_count -= 1
active_count async
active_count() -> int

Number of currently active operations.

Source code in provide/foundation/resilience/bulkhead_async.py
async def active_count(self) -> int:
    """Number of currently active operations."""
    async with self._lock:
        return self._active_count
available_capacity async
available_capacity() -> int

Number of available slots.

Source code in provide/foundation/resilience/bulkhead_async.py
async def available_capacity(self) -> int:
    """Number of available slots."""
    async with self._lock:
        return max(0, self.max_concurrent - self._active_count)
get_stats async
get_stats() -> dict[str, Any]

Get pool statistics.

Source code in provide/foundation/resilience/bulkhead_async.py
async def get_stats(self) -> dict[str, Any]:
    """Get pool statistics."""
    async with self._lock:
        return {
            "max_concurrent": self.max_concurrent,
            "active_count": self._active_count,
            "available_capacity": self.max_concurrent - self._active_count,
            "waiting_count": self._waiting_count,
            "max_queue_size": self.max_queue_size,
            "utilization": self._active_count / self.max_concurrent if self.max_concurrent > 0 else 0.0,
        }
queue_size async
queue_size() -> int

Current number of waiting operations.

Source code in provide/foundation/resilience/bulkhead_async.py
async def queue_size(self) -> int:
    """Current number of waiting operations."""
    async with self._lock:
        return self._waiting_count
release async
release() -> None

Release a resource slot.

Source code in provide/foundation/resilience/bulkhead_async.py
async def release(self) -> None:
    """Release a resource slot."""
    async with self._lock:
        if self._active_count > 0:
            self._active_count -= 1

        # Signal next waiter in FIFO order
        if self._waiters:
            waiter_event = self._waiters.popleft()
            self._active_count += 1
            waiter_event.set()