Skip to content

Bulkhead

provide.foundation.resilience.bulkhead

TODO: Add module docstring.

Classes

AsyncResourcePool

Asynchronous resource pool with limited capacity for isolation.

Async-safe implementation using asyncio.Lock and asyncio.Event. For sync contexts, use SyncResourcePool instead.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Initialize internal state.

Source code in provide/foundation/resilience/bulkhead_async.py
def __attrs_post_init__(self) -> None:
    """Initialize internal state."""
    pass
acquire async
acquire(timeout: float | None = None) -> bool

Acquire a resource slot (async).

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait (defaults to pool timeout)

None

Returns:

Type Description
bool

True if acquired, False if timeout

Raises:

Type Description
RuntimeError

If queue is full

Source code in provide/foundation/resilience/bulkhead_async.py
async def acquire(self, timeout: float | None = None) -> bool:
    """Acquire a resource slot (async).

    Args:
        timeout: Maximum time to wait (defaults to pool timeout)

    Returns:
        True if acquired, False if timeout

    Raises:
        RuntimeError: If queue is full
    """
    actual_timeout = timeout if timeout is not None else self.timeout

    # Try to acquire immediately
    async with self._lock:
        if self._active_count < self.max_concurrent:
            self._active_count += 1
            return True

        # Check queue limit
        if self._waiting_count >= self.max_queue_size:
            raise RuntimeError(f"Queue is full (max: {self.max_queue_size})")

        # Add to wait queue
        self._waiting_count += 1
        waiter = asyncio.Event()
        self._waiters.append(waiter)

    # Wait for signal from release
    try:
        await asyncio.wait_for(waiter.wait(), timeout=actual_timeout)
        # Successfully signaled, we now have the slot
        return True
    except TimeoutError:
        # Timeout - remove from queue
        async with self._lock:
            with contextlib.suppress(ValueError):
                # Remove from queue if still present (already removed by signal if not found)
                self._waiters.remove(waiter)
        return False
    finally:
        async with self._lock:
            self._waiting_count -= 1
active_count async
active_count() -> int

Number of currently active operations.

Source code in provide/foundation/resilience/bulkhead_async.py
async def active_count(self) -> int:
    """Number of currently active operations."""
    async with self._lock:
        return self._active_count
available_capacity async
available_capacity() -> int

Number of available slots.

Source code in provide/foundation/resilience/bulkhead_async.py
async def available_capacity(self) -> int:
    """Number of available slots."""
    async with self._lock:
        return max(0, self.max_concurrent - self._active_count)
get_stats async
get_stats() -> dict[str, Any]

Get pool statistics.

Source code in provide/foundation/resilience/bulkhead_async.py
async def get_stats(self) -> dict[str, Any]:
    """Get pool statistics."""
    async with self._lock:
        return {
            "max_concurrent": self.max_concurrent,
            "active_count": self._active_count,
            "available_capacity": self.max_concurrent - self._active_count,
            "waiting_count": self._waiting_count,
            "max_queue_size": self.max_queue_size,
            "utilization": self._active_count / self.max_concurrent if self.max_concurrent > 0 else 0.0,
        }
queue_size async
queue_size() -> int

Current number of waiting operations.

Source code in provide/foundation/resilience/bulkhead_async.py
async def queue_size(self) -> int:
    """Current number of waiting operations."""
    async with self._lock:
        return self._waiting_count
release async
release() -> None

Release a resource slot.

Source code in provide/foundation/resilience/bulkhead_async.py
async def release(self) -> None:
    """Release a resource slot."""
    async with self._lock:
        if self._active_count > 0:
            self._active_count -= 1

        # Signal next waiter in FIFO order
        if self._waiters:
            waiter_event = self._waiters.popleft()
            self._active_count += 1
            waiter_event.set()

Bulkhead

Bulkhead isolation pattern for protecting resources.

Can use either SyncResourcePool or AsyncResourcePool depending on use case.

Functions
execute
execute(
    func: Callable[..., T], *args: Any, **kwargs: Any
) -> T

Execute function with bulkhead protection (sync).

Parameters:

Name Type Description Default
func Callable[..., T]

Function to execute

required
*args Any

Function arguments

()
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
T

Function result

Raises:

Type Description
RuntimeError

If resource cannot be acquired

Exception

Any exception from the protected function

Source code in provide/foundation/resilience/bulkhead.py
def execute(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute function with bulkhead protection (sync).

    Args:
        func: Function to execute
        *args: Function arguments
        **kwargs: Function keyword arguments

    Returns:
        Function result

    Raises:
        RuntimeError: If resource cannot be acquired
        Exception: Any exception from the protected function
    """
    # Must use SyncResourcePool for sync execution
    if not isinstance(self.pool, SyncResourcePool):
        raise TypeError("Sync execution requires SyncResourcePool")

    if not self.pool.acquire():
        raise RuntimeError(f"Bulkhead '{self.name}' is at capacity")

    try:
        # Emit acquisition event
        self._emit_event("acquired")
        start_time = time.time()

        result = func(*args, **kwargs)

        # Emit success event
        execution_time = time.time() - start_time
        self._emit_event("completed", execution_time=execution_time)

        return result
    except Exception as e:
        # Emit failure event
        execution_time = time.time() - start_time
        self._emit_event("failed", error=str(e), execution_time=execution_time)
        raise
    finally:
        self.pool.release()
        self._emit_event("released")
execute_async async
execute_async(
    func: Callable[..., Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute async function with bulkhead protection.

Parameters:

Name Type Description Default
func Callable[..., Awaitable[T]]

Async function to execute

required
*args Any

Function arguments

()
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
T

Function result

Raises:

Type Description
RuntimeError

If resource cannot be acquired

Exception

Any exception from the protected function

Source code in provide/foundation/resilience/bulkhead.py
async def execute_async(self, func: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any) -> T:
    """Execute async function with bulkhead protection.

    Args:
        func: Async function to execute
        *args: Function arguments
        **kwargs: Function keyword arguments

    Returns:
        Function result

    Raises:
        RuntimeError: If resource cannot be acquired
        Exception: Any exception from the protected function
    """
    # Must use AsyncResourcePool for async execution
    if not isinstance(self.pool, AsyncResourcePool):
        raise TypeError("Async execution requires AsyncResourcePool")

    if not await self.pool.acquire():
        raise RuntimeError(f"Bulkhead '{self.name}' is at capacity")

    try:
        # Emit acquisition event
        await self._emit_event_async("acquired")
        start_time = time.time()

        result = await func(*args, **kwargs)

        # Emit success event
        execution_time = time.time() - start_time
        await self._emit_event_async("completed", execution_time=execution_time)

        return result
    except Exception as e:
        # Emit failure event
        execution_time = time.time() - start_time
        await self._emit_event_async("failed", error=str(e), execution_time=execution_time)
        raise
    finally:
        await self.pool.release()
        await self._emit_event_async("released")
get_status
get_status() -> dict[str, Any]

Get bulkhead status (sync only).

Source code in provide/foundation/resilience/bulkhead.py
def get_status(self) -> dict[str, Any]:
    """Get bulkhead status (sync only)."""
    if isinstance(self.pool, SyncResourcePool):
        return {
            "name": self.name,
            "pool": self.pool.get_stats(),
        }
    # Can't get async pool stats in sync context
    return {
        "name": self.name,
        "pool": {},
    }
get_status_async async
get_status_async() -> dict[str, Any]

Get bulkhead status (async).

Source code in provide/foundation/resilience/bulkhead.py
async def get_status_async(self) -> dict[str, Any]:
    """Get bulkhead status (async)."""
    if isinstance(self.pool, AsyncResourcePool):
        return {
            "name": self.name,
            "pool": await self.pool.get_stats(),
        }
    # Can get sync pool stats from async context via threading
    return {
        "name": self.name,
        "pool": self.pool.get_stats() if isinstance(self.pool, SyncResourcePool) else {},
    }

BulkheadManager

BulkheadManager()

Manager for multiple bulkheads with different resource pools.

Initialize bulkhead manager.

Source code in provide/foundation/resilience/bulkhead.py
def __init__(self) -> None:
    """Initialize bulkhead manager."""
    self._bulkheads: dict[str, Bulkhead] = {}
    self._lock = threading.RLock()
Functions
create_bulkhead
create_bulkhead(
    name: str,
    max_concurrent: int = 10,
    max_queue_size: int = 100,
    timeout: float = 30.0,
    use_async_pool: bool = False,
) -> Bulkhead

Create or get a bulkhead.

Parameters:

Name Type Description Default
name str

Bulkhead name

required
max_concurrent int

Maximum concurrent operations

10
max_queue_size int

Maximum queue size

100
timeout float

Operation timeout

30.0
use_async_pool bool

If True, create AsyncResourcePool; otherwise SyncResourcePool

False

Returns:

Type Description
Bulkhead

Bulkhead instance

Source code in provide/foundation/resilience/bulkhead.py
def create_bulkhead(
    self,
    name: str,
    max_concurrent: int = 10,
    max_queue_size: int = 100,
    timeout: float = 30.0,
    use_async_pool: bool = False,
) -> Bulkhead:
    """Create or get a bulkhead.

    Args:
        name: Bulkhead name
        max_concurrent: Maximum concurrent operations
        max_queue_size: Maximum queue size
        timeout: Operation timeout
        use_async_pool: If True, create AsyncResourcePool; otherwise SyncResourcePool

    Returns:
        Bulkhead instance
    """
    with self._lock:
        if name not in self._bulkheads:
            pool: SyncResourcePool | AsyncResourcePool
            if use_async_pool:
                pool = AsyncResourcePool(
                    max_concurrent=max_concurrent,
                    max_queue_size=max_queue_size,
                    timeout=timeout,
                )
            else:
                pool = SyncResourcePool(
                    max_concurrent=max_concurrent,
                    max_queue_size=max_queue_size,
                    timeout=timeout,
                )
            self._bulkheads[name] = Bulkhead(name=name, pool=pool)

        return self._bulkheads[name]
get_all_status
get_all_status() -> dict[str, dict[str, Any]]

Get status of all bulkheads.

Source code in provide/foundation/resilience/bulkhead.py
def get_all_status(self) -> dict[str, dict[str, Any]]:
    """Get status of all bulkheads."""
    with self._lock:
        return {name: bulkhead.get_status() for name, bulkhead in self._bulkheads.items()}
get_bulkhead
get_bulkhead(name: str) -> Bulkhead | None

Get a bulkhead by name.

Source code in provide/foundation/resilience/bulkhead.py
def get_bulkhead(self, name: str) -> Bulkhead | None:
    """Get a bulkhead by name."""
    with self._lock:
        return self._bulkheads.get(name)
list_bulkheads
list_bulkheads() -> list[str]

List all bulkhead names.

Source code in provide/foundation/resilience/bulkhead.py
def list_bulkheads(self) -> list[str]:
    """List all bulkhead names."""
    with self._lock:
        return list(self._bulkheads.keys())
remove_bulkhead
remove_bulkhead(name: str) -> bool

Remove a bulkhead.

Parameters:

Name Type Description Default
name str

Bulkhead name

required

Returns:

Type Description
bool

True if removed, False if not found

Source code in provide/foundation/resilience/bulkhead.py
def remove_bulkhead(self, name: str) -> bool:
    """Remove a bulkhead.

    Args:
        name: Bulkhead name

    Returns:
        True if removed, False if not found
    """
    with self._lock:
        if name in self._bulkheads:
            del self._bulkheads[name]
            return True
        return False

SyncResourcePool

Synchronous resource pool with limited capacity for isolation.

Thread-safe implementation using threading.Lock and threading.Event. For async contexts, use AsyncResourcePool instead.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Initialize internal state.

Source code in provide/foundation/resilience/bulkhead_sync.py
def __attrs_post_init__(self) -> None:
    """Initialize internal state."""
    pass
acquire
acquire(timeout: float | None = None) -> bool

Acquire a resource slot (blocking).

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait (defaults to pool timeout)

None

Returns:

Type Description
bool

True if acquired, False if timeout

Raises:

Type Description
RuntimeError

If queue is full

Source code in provide/foundation/resilience/bulkhead_sync.py
def acquire(self, timeout: float | None = None) -> bool:
    """Acquire a resource slot (blocking).

    Args:
        timeout: Maximum time to wait (defaults to pool timeout)

    Returns:
        True if acquired, False if timeout

    Raises:
        RuntimeError: If queue is full
    """
    actual_timeout = timeout if timeout is not None else self.timeout

    # Try to acquire immediately
    with self._counter_lock:
        if self._active_count < self.max_concurrent:
            self._active_count += 1
            return True

        # Check queue limit
        if self._waiting_count >= self.max_queue_size:
            raise RuntimeError(f"Queue is full (max: {self.max_queue_size})")

        # Add to wait queue
        self._waiting_count += 1
        waiter = threading.Event()
        self._waiters.append(waiter)

    # Wait for signal from release
    try:
        if waiter.wait(timeout=actual_timeout):
            # Successfully signaled, we now have the slot
            return True
        # Timeout - remove from queue
        with self._counter_lock, contextlib.suppress(ValueError):
            # Remove from queue if still present (already removed by signal if not found)
            self._waiters.remove(waiter)
        return False
    finally:
        with self._counter_lock:
            self._waiting_count -= 1
active_count
active_count() -> int

Number of currently active operations.

Source code in provide/foundation/resilience/bulkhead_sync.py
def active_count(self) -> int:
    """Number of currently active operations."""
    with self._counter_lock:
        return self._active_count
available_capacity
available_capacity() -> int

Number of available slots.

Source code in provide/foundation/resilience/bulkhead_sync.py
def available_capacity(self) -> int:
    """Number of available slots."""
    with self._counter_lock:
        return max(0, self.max_concurrent - self._active_count)
get_stats
get_stats() -> dict[str, Any]

Get pool statistics.

Source code in provide/foundation/resilience/bulkhead_sync.py
def get_stats(self) -> dict[str, Any]:
    """Get pool statistics."""
    with self._counter_lock:
        return {
            "max_concurrent": self.max_concurrent,
            "active_count": self._active_count,
            "available_capacity": self.max_concurrent - self._active_count,
            "waiting_count": self._waiting_count,
            "max_queue_size": self.max_queue_size,
            "utilization": self._active_count / self.max_concurrent if self.max_concurrent > 0 else 0.0,
        }
queue_size
queue_size() -> int

Current number of waiting operations.

Source code in provide/foundation/resilience/bulkhead_sync.py
def queue_size(self) -> int:
    """Current number of waiting operations."""
    with self._counter_lock:
        return self._waiting_count
release
release() -> None

Release a resource slot.

Source code in provide/foundation/resilience/bulkhead_sync.py
def release(self) -> None:
    """Release a resource slot."""
    with self._counter_lock:
        if self._active_count > 0:
            self._active_count -= 1

        # Signal next waiter in FIFO order
        if self._waiters:
            waiter_event = self._waiters.popleft()
            self._active_count += 1
            waiter_event.set()

Functions

get_bulkhead_manager

get_bulkhead_manager() -> BulkheadManager

Get the global bulkhead manager.

Source code in provide/foundation/resilience/bulkhead.py
def get_bulkhead_manager() -> BulkheadManager:
    """Get the global bulkhead manager."""
    return _bulkhead_manager