Skip to content

Resilience

provide.foundation.resilience

TODO: Add module docstring.

Classes

AsyncCircuitBreaker

AsyncCircuitBreaker(
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: (
        type[Exception] | tuple[type[Exception], ...]
    ) = Exception,
    time_source: Callable[[], float] | None = None,
)

Asynchronous circuit breaker for resilience patterns.

Uses asyncio.Lock for async-safe state management. For synchronous code, use SyncCircuitBreaker instead.

Initialize the asynchronous circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit

5
recovery_timeout float

Seconds to wait before attempting recovery

30.0
expected_exception type[Exception] | tuple[type[Exception], ...]

Exception type(s) to catch

Exception
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
Source code in provide/foundation/resilience/circuit_async.py
def __init__(
    self,
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: type[Exception] | tuple[type[Exception], ...] = Exception,
    time_source: Callable[[], float] | None = None,
) -> None:
    """Initialize the asynchronous circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        recovery_timeout: Seconds to wait before attempting recovery
        expected_exception: Exception type(s) to catch
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
    """
    self.failure_threshold = failure_threshold
    self.recovery_timeout = recovery_timeout
    self.expected_exception = expected_exception
    self._time_source = time_source or time.time
    # Create lock directly - asyncio.Lock() can be created outside event loop
    # and will bind when first awaited
    self._lock = asyncio.Lock()
    # Initialize state
    self._state = CircuitState.CLOSED
    self._failure_count = 0
    self._last_failure_time: float | None = None
Functions
call async
call(func: Callable, *args: Any, **kwargs: Any) -> Any

Execute an asynchronous function through the circuit breaker.

Parameters:

Name Type Description Default
func Callable

Async callable to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
Any

Result from func

Raises:

Type Description
RuntimeError

If circuit is open

Exception

Whatever exception func raises

Source code in provide/foundation/resilience/circuit_async.py
async def call(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
    """Execute an asynchronous function through the circuit breaker.

    Args:
        func: Async callable to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from func

    Raises:
        RuntimeError: If circuit is open
        Exception: Whatever exception func raises
    """
    async with self._lock:
        # Check state directly to avoid deadlock
        if self._state == CircuitState.OPEN and not self._can_attempt_recovery():
            raise RuntimeError("Circuit breaker is open")
        # If HALF_OPEN or recovery possible, we proceed with the call

    try:
        result = await func(*args, **kwargs)
        await self._on_success()
        return result
    except self.expected_exception as e:
        await self._on_failure()
        raise e
failure_count async
failure_count() -> int

Get the current failure count.

Returns:

Type Description
int

Current failure count

Source code in provide/foundation/resilience/circuit_async.py
async def failure_count(self) -> int:
    """Get the current failure count.

    Returns:
        Current failure count
    """
    async with self._lock:
        return self._failure_count
reset async
reset() -> None

Reset the circuit breaker to its initial state.

Source code in provide/foundation/resilience/circuit_async.py
async def reset(self) -> None:
    """Reset the circuit breaker to its initial state."""
    async with self._lock:
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
state async
state() -> CircuitState

Get the current state of the circuit breaker.

Returns:

Type Description
CircuitState

Current circuit state

Source code in provide/foundation/resilience/circuit_async.py
async def state(self) -> CircuitState:
    """Get the current state of the circuit breaker.

    Returns:
        Current circuit state
    """
    async with self._lock:
        if self._state == CircuitState.OPEN and self._can_attempt_recovery():
            # This is a view of the state; the actual transition happens in call()
            return CircuitState.HALF_OPEN
        return self._state

BackoffStrategy

Bases: str, Enum

Backoff strategies for retry delays.

Bulkhead

Bulkhead isolation pattern for protecting resources.

Can use either SyncResourcePool or AsyncResourcePool depending on use case.

Functions
execute
execute(
    func: Callable[..., T], *args: Any, **kwargs: Any
) -> T

Execute function with bulkhead protection (sync).

Parameters:

Name Type Description Default
func Callable[..., T]

Function to execute

required
*args Any

Function arguments

()
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
T

Function result

Raises:

Type Description
RuntimeError

If resource cannot be acquired

Exception

Any exception from the protected function

Source code in provide/foundation/resilience/bulkhead.py
def execute(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute function with bulkhead protection (sync).

    Args:
        func: Function to execute
        *args: Function arguments
        **kwargs: Function keyword arguments

    Returns:
        Function result

    Raises:
        RuntimeError: If resource cannot be acquired
        Exception: Any exception from the protected function
    """
    # Must use SyncResourcePool for sync execution
    if not isinstance(self.pool, SyncResourcePool):
        raise TypeError("Sync execution requires SyncResourcePool")

    if not self.pool.acquire():
        raise RuntimeError(f"Bulkhead '{self.name}' is at capacity")

    try:
        # Emit acquisition event
        self._emit_event("acquired")
        start_time = time.time()

        result = func(*args, **kwargs)

        # Emit success event
        execution_time = time.time() - start_time
        self._emit_event("completed", execution_time=execution_time)

        return result
    except Exception as e:
        # Emit failure event
        execution_time = time.time() - start_time
        self._emit_event("failed", error=str(e), execution_time=execution_time)
        raise
    finally:
        self.pool.release()
        self._emit_event("released")
execute_async async
execute_async(
    func: Callable[..., Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute async function with bulkhead protection.

Parameters:

Name Type Description Default
func Callable[..., Awaitable[T]]

Async function to execute

required
*args Any

Function arguments

()
**kwargs Any

Function keyword arguments

{}

Returns:

Type Description
T

Function result

Raises:

Type Description
RuntimeError

If resource cannot be acquired

Exception

Any exception from the protected function

Source code in provide/foundation/resilience/bulkhead.py
async def execute_async(self, func: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any) -> T:
    """Execute async function with bulkhead protection.

    Args:
        func: Async function to execute
        *args: Function arguments
        **kwargs: Function keyword arguments

    Returns:
        Function result

    Raises:
        RuntimeError: If resource cannot be acquired
        Exception: Any exception from the protected function
    """
    # Must use AsyncResourcePool for async execution
    if not isinstance(self.pool, AsyncResourcePool):
        raise TypeError("Async execution requires AsyncResourcePool")

    if not await self.pool.acquire():
        raise RuntimeError(f"Bulkhead '{self.name}' is at capacity")

    try:
        # Emit acquisition event
        await self._emit_event_async("acquired")
        start_time = time.time()

        result = await func(*args, **kwargs)

        # Emit success event
        execution_time = time.time() - start_time
        await self._emit_event_async("completed", execution_time=execution_time)

        return result
    except Exception as e:
        # Emit failure event
        execution_time = time.time() - start_time
        await self._emit_event_async("failed", error=str(e), execution_time=execution_time)
        raise
    finally:
        await self.pool.release()
        await self._emit_event_async("released")
get_status
get_status() -> dict[str, Any]

Get bulkhead status (sync only).

Source code in provide/foundation/resilience/bulkhead.py
def get_status(self) -> dict[str, Any]:
    """Get bulkhead status (sync only)."""
    if isinstance(self.pool, SyncResourcePool):
        return {
            "name": self.name,
            "pool": self.pool.get_stats(),
        }
    # Can't get async pool stats in sync context
    return {
        "name": self.name,
        "pool": {},
    }
get_status_async async
get_status_async() -> dict[str, Any]

Get bulkhead status (async).

Source code in provide/foundation/resilience/bulkhead.py
async def get_status_async(self) -> dict[str, Any]:
    """Get bulkhead status (async)."""
    if isinstance(self.pool, AsyncResourcePool):
        return {
            "name": self.name,
            "pool": await self.pool.get_stats(),
        }
    # Can get sync pool stats from async context via threading
    return {
        "name": self.name,
        "pool": self.pool.get_stats() if isinstance(self.pool, SyncResourcePool) else {},
    }

BulkheadManager

BulkheadManager()

Manager for multiple bulkheads with different resource pools.

Initialize bulkhead manager.

Source code in provide/foundation/resilience/bulkhead.py
def __init__(self) -> None:
    """Initialize bulkhead manager."""
    self._bulkheads: dict[str, Bulkhead] = {}
    self._lock = threading.RLock()
Functions
create_bulkhead
create_bulkhead(
    name: str,
    max_concurrent: int = 10,
    max_queue_size: int = 100,
    timeout: float = 30.0,
    use_async_pool: bool = False,
) -> Bulkhead

Create or get a bulkhead.

Parameters:

Name Type Description Default
name str

Bulkhead name

required
max_concurrent int

Maximum concurrent operations

10
max_queue_size int

Maximum queue size

100
timeout float

Operation timeout

30.0
use_async_pool bool

If True, create AsyncResourcePool; otherwise SyncResourcePool

False

Returns:

Type Description
Bulkhead

Bulkhead instance

Source code in provide/foundation/resilience/bulkhead.py
def create_bulkhead(
    self,
    name: str,
    max_concurrent: int = 10,
    max_queue_size: int = 100,
    timeout: float = 30.0,
    use_async_pool: bool = False,
) -> Bulkhead:
    """Create or get a bulkhead.

    Args:
        name: Bulkhead name
        max_concurrent: Maximum concurrent operations
        max_queue_size: Maximum queue size
        timeout: Operation timeout
        use_async_pool: If True, create AsyncResourcePool; otherwise SyncResourcePool

    Returns:
        Bulkhead instance
    """
    with self._lock:
        if name not in self._bulkheads:
            pool: SyncResourcePool | AsyncResourcePool
            if use_async_pool:
                pool = AsyncResourcePool(
                    max_concurrent=max_concurrent,
                    max_queue_size=max_queue_size,
                    timeout=timeout,
                )
            else:
                pool = SyncResourcePool(
                    max_concurrent=max_concurrent,
                    max_queue_size=max_queue_size,
                    timeout=timeout,
                )
            self._bulkheads[name] = Bulkhead(name=name, pool=pool)

        return self._bulkheads[name]
get_all_status
get_all_status() -> dict[str, dict[str, Any]]

Get status of all bulkheads.

Source code in provide/foundation/resilience/bulkhead.py
def get_all_status(self) -> dict[str, dict[str, Any]]:
    """Get status of all bulkheads."""
    with self._lock:
        return {name: bulkhead.get_status() for name, bulkhead in self._bulkheads.items()}
get_bulkhead
get_bulkhead(name: str) -> Bulkhead | None

Get a bulkhead by name.

Source code in provide/foundation/resilience/bulkhead.py
def get_bulkhead(self, name: str) -> Bulkhead | None:
    """Get a bulkhead by name."""
    with self._lock:
        return self._bulkheads.get(name)
list_bulkheads
list_bulkheads() -> list[str]

List all bulkhead names.

Source code in provide/foundation/resilience/bulkhead.py
def list_bulkheads(self) -> list[str]:
    """List all bulkhead names."""
    with self._lock:
        return list(self._bulkheads.keys())
remove_bulkhead
remove_bulkhead(name: str) -> bool

Remove a bulkhead.

Parameters:

Name Type Description Default
name str

Bulkhead name

required

Returns:

Type Description
bool

True if removed, False if not found

Source code in provide/foundation/resilience/bulkhead.py
def remove_bulkhead(self, name: str) -> bool:
    """Remove a bulkhead.

    Args:
        name: Bulkhead name

    Returns:
        True if removed, False if not found
    """
    with self._lock:
        if name in self._bulkheads:
            del self._bulkheads[name]
            return True
        return False

CircuitState

Bases: Enum

Represents the state of the circuit breaker.

FallbackChain

Chain of fallback strategies for graceful degradation.

Executes fallback functions in order when primary function fails.

Functions
add_fallback
add_fallback(fallback_func: Callable[..., T]) -> None

Add a fallback function to the chain.

Source code in provide/foundation/resilience/fallback.py
def add_fallback(self, fallback_func: Callable[..., T]) -> None:
    """Add a fallback function to the chain."""
    self.fallbacks.append(fallback_func)  # type: ignore[arg-type]
    logger.debug(
        "Added fallback to chain",
        fallback_count=len(self.fallbacks),
        fallback_name=getattr(fallback_func, "__name__", "anonymous"),
    )
execute
execute(
    primary_func: Callable[..., T],
    *args: Any,
    **kwargs: Any
) -> T

Execute primary function with fallback chain (sync).

Source code in provide/foundation/resilience/fallback.py
def execute(self, primary_func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute primary function with fallback chain (sync)."""
    # Try primary function first
    primary_exception = None
    try:
        result = primary_func(*args, **kwargs)
        logger.trace(
            "Primary function succeeded",
            func=getattr(primary_func, "__name__", "anonymous"),
        )
        return result
    except Exception as e:
        primary_exception = e
        if not isinstance(e, self.expected_exceptions):
            # Unexpected exception type, don't use fallbacks
            logger.debug(
                "Primary function failed with unexpected exception type",
                exception_type=type(e).__name__,
                expected_types=[t.__name__ for t in self.expected_exceptions],
            )
            raise

        logger.warning(
            "Primary function failed, trying fallbacks",
            func=getattr(primary_func, "__name__", "anonymous"),
            error=str(e),
            fallback_count=len(self.fallbacks),
        )

    # Try fallbacks in order
    last_exception = None
    for i, fallback_func in enumerate(self.fallbacks):
        try:
            result = fallback_func(*args, **kwargs)
            logger.info(
                "Fallback succeeded",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
            )
            return result
        except Exception as e:
            last_exception = e
            logger.warning(
                "Fallback failed",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
                error=str(e),
            )
            continue

    # All fallbacks failed
    logger.error(
        "All fallbacks exhausted",
        primary_func=getattr(primary_func, "__name__", "anonymous"),
        fallback_count=len(self.fallbacks),
    )

    # Raise the last exception from fallbacks, or original if no fallbacks
    if last_exception is not None:
        raise last_exception
    if primary_exception is not None:
        raise primary_exception
    # This should never happen but provide fallback
    raise RuntimeError("Fallback chain execution failed with no recorded exceptions")
execute_async async
execute_async(
    primary_func: Callable[..., T],
    *args: Any,
    **kwargs: Any
) -> T

Execute primary function with fallback chain (async).

Source code in provide/foundation/resilience/fallback.py
async def execute_async(self, primary_func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute primary function with fallback chain (async)."""
    # Try primary function first
    primary_exception = None
    try:
        if asyncio.iscoroutinefunction(primary_func):
            result = await primary_func(*args, **kwargs)
        else:
            result = primary_func(*args, **kwargs)
        logger.trace(
            "Primary function succeeded",
            func=getattr(primary_func, "__name__", "anonymous"),
        )
        return result
    except Exception as e:
        primary_exception = e
        if not isinstance(e, self.expected_exceptions):
            # Unexpected exception type, don't use fallbacks
            logger.debug(
                "Primary function failed with unexpected exception type",
                exception_type=type(e).__name__,
                expected_types=[t.__name__ for t in self.expected_exceptions],
            )
            raise

        logger.warning(
            "Primary function failed, trying fallbacks",
            func=getattr(primary_func, "__name__", "anonymous"),
            error=str(e),
            fallback_count=len(self.fallbacks),
        )

    # Try fallbacks in order
    last_exception = None
    for i, fallback_func in enumerate(self.fallbacks):
        try:
            if asyncio.iscoroutinefunction(fallback_func):
                result = await fallback_func(*args, **kwargs)
            else:
                result = fallback_func(*args, **kwargs)
            logger.info(
                "Fallback succeeded",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
            )
            return result
        except Exception as e:
            last_exception = e
            logger.warning(
                "Fallback failed",
                fallback_index=i,
                fallback_name=getattr(fallback_func, "__name__", "anonymous"),
                error=str(e),
            )
            continue

    # All fallbacks failed
    logger.error(
        "All fallbacks exhausted",
        primary_func=getattr(primary_func, "__name__", "anonymous"),
        fallback_count=len(self.fallbacks),
    )

    # Raise the last exception from fallbacks, or original if no fallbacks
    if last_exception is not None:
        raise last_exception
    if primary_exception is not None:
        raise primary_exception
    # This should never happen but provide fallback
    raise RuntimeError("Fallback chain execution failed with no recorded exceptions")

RetryExecutor

RetryExecutor(
    policy: RetryPolicy,
    on_retry: (
        Callable[[int, Exception], None] | None
    ) = None,
    time_source: Callable[[], float] | None = None,
    sleep_func: Callable[[float], None] | None = None,
    async_sleep_func: (
        Callable[[float], Awaitable[None]] | None
    ) = None,
)

Unified retry execution engine.

This executor handles the actual retry loop logic for both sync and async functions, using a RetryPolicy for configuration. It's used internally by both the @retry decorator and RetryMiddleware.

Initialize retry executor.

Parameters:

Name Type Description Default
policy RetryPolicy

Retry policy configuration

required
on_retry Callable[[int, Exception], None] | None

Optional callback for retry events (attempt, error)

None
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
sleep_func Callable[[float], None] | None

Optional synchronous sleep function (for testing). Defaults to time.sleep() for production use.

None
async_sleep_func Callable[[float], Awaitable[None]] | None

Optional asynchronous sleep function (for testing). Defaults to asyncio.sleep() for production use.

None
Source code in provide/foundation/resilience/retry.py
def __init__(
    self,
    policy: RetryPolicy,
    on_retry: Callable[[int, Exception], None] | None = None,
    time_source: Callable[[], float] | None = None,
    sleep_func: Callable[[float], None] | None = None,
    async_sleep_func: Callable[[float], Awaitable[None]] | None = None,
) -> None:
    """Initialize retry executor.

    Args:
        policy: Retry policy configuration
        on_retry: Optional callback for retry events (attempt, error)
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
        sleep_func: Optional synchronous sleep function (for testing).
                   Defaults to time.sleep() for production use.
        async_sleep_func: Optional asynchronous sleep function (for testing).
                         Defaults to asyncio.sleep() for production use.

    """
    self.policy = policy
    self.on_retry = on_retry
    self._time_source = time_source or time.time
    self._sleep = sleep_func or time.sleep
    self._async_sleep = async_sleep_func or asyncio.sleep
Functions
execute_async async
execute_async(
    func: Callable[..., Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute asynchronous function with retry logic.

Parameters:

Name Type Description Default
func Callable[..., Awaitable[T]]

Async function to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result from successful execution

Raises:

Type Description
Exception

The last exception raised if all retry attempts are exhausted

Source code in provide/foundation/resilience/retry.py
async def execute_async(self, func: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any) -> T:
    """Execute asynchronous function with retry logic.

    Args:
        func: Async function to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from successful execution

    Raises:
        Exception: The last exception raised if all retry attempts are exhausted

    """
    last_exception = None

    for attempt in range(1, self.policy.max_attempts + 1):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_exception = e

            # Don't retry on last attempt - log and raise
            if attempt >= self.policy.max_attempts:
                from provide.foundation.hub.foundation import get_foundation_logger

                get_foundation_logger().error(
                    f"All {self.policy.max_attempts} retry attempts failed",
                    attempts=self.policy.max_attempts,
                    error=str(e),
                    error_type=type(e).__name__,
                )
                raise

            # Check if we should retry this error
            if not self.policy.should_retry(e, attempt):
                raise

            # Calculate delay
            delay = self.policy.calculate_delay(attempt)

            # Log retry attempt
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().info(
                f"Retry {attempt}/{self.policy.max_attempts} after {delay:.2f}s",
                attempt=attempt,
                max_attempts=self.policy.max_attempts,
                delay=delay,
                error=str(e),
                error_type=type(e).__name__,
            )

            # Call retry callback if provided
            if self.on_retry:
                try:
                    if asyncio.iscoroutinefunction(self.on_retry):
                        await self.on_retry(attempt, e)
                    else:
                        self.on_retry(attempt, e)
                except Exception as callback_error:
                    from provide.foundation.hub.foundation import get_foundation_logger

                    get_foundation_logger().warning("Retry callback failed", error=str(callback_error))

            # Wait before retry
            await self._async_sleep(delay)

    # Should never reach here, but for safety
    if last_exception is not None:
        raise last_exception
    else:
        raise RuntimeError("No exception captured during async retry attempts")
execute_sync
execute_sync(
    func: Callable[..., T], *args: Any, **kwargs: Any
) -> T

Execute synchronous function with retry logic.

Parameters:

Name Type Description Default
func Callable[..., T]

Function to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result from successful execution

Raises:

Type Description
Exception

The last exception raised if all retry attempts are exhausted

Source code in provide/foundation/resilience/retry.py
def execute_sync(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute synchronous function with retry logic.

    Args:
        func: Function to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from successful execution

    Raises:
        Exception: The last exception raised if all retry attempts are exhausted

    """
    last_exception = None

    for attempt in range(1, self.policy.max_attempts + 1):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            last_exception = e

            # Don't retry on last attempt - log and raise
            if attempt >= self.policy.max_attempts:
                from provide.foundation.hub.foundation import get_foundation_logger

                get_foundation_logger().error(
                    f"All {self.policy.max_attempts} retry attempts failed",
                    attempts=self.policy.max_attempts,
                    error=str(e),
                    error_type=type(e).__name__,
                )
                raise

            # Check if we should retry this error
            if not self.policy.should_retry(e, attempt):
                raise

            # Calculate delay
            delay = self.policy.calculate_delay(attempt)

            # Log retry attempt
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().info(
                f"Retry {attempt}/{self.policy.max_attempts} after {delay:.2f}s",
                attempt=attempt,
                max_attempts=self.policy.max_attempts,
                delay=delay,
                error=str(e),
                error_type=type(e).__name__,
            )

            # Call retry callback if provided
            if self.on_retry:
                try:
                    self.on_retry(attempt, e)
                except Exception as callback_error:
                    from provide.foundation.hub.foundation import get_foundation_logger

                    get_foundation_logger().warning("Retry callback failed", error=str(callback_error))

            # Wait before retry
            self._sleep(delay)

    # Should never reach here, but for safety
    if last_exception is not None:
        raise last_exception
    else:
        raise RuntimeError("No exception captured during retry attempts")

RetryPolicy

Configuration for retry behavior.

This policy can be used with both the @retry decorator and transport middleware, providing a unified configuration model for all retry scenarios.

Attributes:

Name Type Description
max_attempts int

Maximum number of retry attempts (must be >= 1)

backoff BackoffStrategy

Backoff strategy to use for delays

base_delay float

Base delay in seconds between retries

max_delay float

Maximum delay in seconds (caps exponential growth)

jitter bool

Whether to add random jitter to delays (±25%)

retryable_errors tuple[type[Exception], ...] | None

Tuple of exception types to retry (None = all)

retryable_status_codes set[int] | None

Set of HTTP status codes to retry (for middleware)

Functions
__str__
__str__() -> str

Human-readable string representation.

Source code in provide/foundation/resilience/retry.py
def __str__(self) -> str:
    """Human-readable string representation."""
    return (
        f"RetryPolicy(max_attempts={self.max_attempts}, "
        f"backoff={self.backoff.value}, base_delay={self.base_delay}s)"
    )
calculate_delay
calculate_delay(attempt: int) -> float

Calculate delay for a given attempt number.

Parameters:

Name Type Description Default
attempt int

Attempt number (1-based)

required

Returns:

Type Description
float

Delay in seconds

Source code in provide/foundation/resilience/retry.py
def calculate_delay(self, attempt: int) -> float:
    """Calculate delay for a given attempt number.

    Args:
        attempt: Attempt number (1-based)

    Returns:
        Delay in seconds

    """
    if attempt <= 0:
        return 0

    if self.backoff == BackoffStrategy.FIXED:
        delay = self.base_delay
    elif self.backoff == BackoffStrategy.LINEAR:
        delay = self.base_delay * attempt
    elif self.backoff == BackoffStrategy.EXPONENTIAL:
        delay = self.base_delay * (2 ** (attempt - 1))
    elif self.backoff == BackoffStrategy.FIBONACCI:
        # Calculate fibonacci number for attempt
        a, b = 0, 1
        for _ in range(attempt):
            a, b = b, a + b
        delay = self.base_delay * a
    else:
        delay = self.base_delay

    # Cap at max delay
    delay = min(delay, self.max_delay)

    # Add jitter if configured (±25% random variation)
    if self.jitter:
        jitter_factor = 0.75 + (random.random() * 0.5)  # nosec B311 - Retry jitter timing
        delay *= jitter_factor

    return delay
should_retry
should_retry(error: Exception, attempt: int) -> bool

Determine if an error should be retried.

Parameters:

Name Type Description Default
error Exception

The exception that occurred

required
attempt int

Current attempt number (1-based)

required

Returns:

Type Description
bool

True if should retry, False otherwise

Source code in provide/foundation/resilience/retry.py
def should_retry(self, error: Exception, attempt: int) -> bool:
    """Determine if an error should be retried.

    Args:
        error: The exception that occurred
        attempt: Current attempt number (1-based)

    Returns:
        True if should retry, False otherwise

    """
    # Check attempt limit
    if attempt >= self.max_attempts:
        return False

    # Check error type if filter is configured
    if self.retryable_errors is not None:
        return isinstance(error, self.retryable_errors)

    # Default to retry for any error
    return True
should_retry_response
should_retry_response(response: Any, attempt: int) -> bool

Check if HTTP response should be retried.

Parameters:

Name Type Description Default
response Any

Response object with status attribute

required
attempt int

Current attempt number (1-based)

required

Returns:

Type Description
bool

True if should retry, False otherwise

Source code in provide/foundation/resilience/retry.py
def should_retry_response(self, response: Any, attempt: int) -> bool:
    """Check if HTTP response should be retried.

    Args:
        response: Response object with status attribute
        attempt: Current attempt number (1-based)

    Returns:
        True if should retry, False otherwise

    """
    # Check attempt limit
    if attempt >= self.max_attempts:
        return False

    # Check status code if configured
    if self.retryable_status_codes is not None:
        return getattr(response, "status", None) in self.retryable_status_codes

    # Default to no retry for responses
    return False

SyncCircuitBreaker

SyncCircuitBreaker(
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: (
        type[Exception] | tuple[type[Exception], ...]
    ) = Exception,
    time_source: Callable[[], float] | None = None,
)

Synchronous circuit breaker for resilience patterns.

Uses threading.RLock for thread-safe state management in synchronous code. For async code, use AsyncCircuitBreaker instead.

Initialize the synchronous circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit

5
recovery_timeout float

Seconds to wait before attempting recovery

30.0
expected_exception type[Exception] | tuple[type[Exception], ...]

Exception type(s) to catch

Exception
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
Source code in provide/foundation/resilience/circuit_sync.py
def __init__(
    self,
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: type[Exception] | tuple[type[Exception], ...] = Exception,
    time_source: Callable[[], float] | None = None,
) -> None:
    """Initialize the synchronous circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        recovery_timeout: Seconds to wait before attempting recovery
        expected_exception: Exception type(s) to catch
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
    """
    self.failure_threshold = failure_threshold
    self.recovery_timeout = recovery_timeout
    self.expected_exception = expected_exception
    self._time_source = time_source or time.time
    self._lock = threading.RLock()
    # Initialize state attributes (will be set properly in reset())
    self._state: CircuitState
    self._failure_count: int
    self._last_failure_time: float | None
    self.reset()
Functions
call
call(func: Callable, *args: Any, **kwargs: Any) -> Any

Execute a synchronous function through the circuit breaker.

Parameters:

Name Type Description Default
func Callable

Callable to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
Any

Result from func

Raises:

Type Description
RuntimeError

If circuit is open

Exception

Whatever exception func raises

Source code in provide/foundation/resilience/circuit_sync.py
def call(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
    """Execute a synchronous function through the circuit breaker.

    Args:
        func: Callable to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from func

    Raises:
        RuntimeError: If circuit is open
        Exception: Whatever exception func raises
    """
    with self._lock:
        current_state = self.state()
        if current_state == CircuitState.OPEN:
            raise RuntimeError("Circuit breaker is open")
        # If HALF_OPEN, we proceed with the call

    try:
        result = func(*args, **kwargs)
        self._on_success()
        return result
    except self.expected_exception as e:
        self._on_failure()
        raise e
failure_count
failure_count() -> int

Get the current failure count.

Source code in provide/foundation/resilience/circuit_sync.py
def failure_count(self) -> int:
    """Get the current failure count."""
    with self._lock:
        return self._failure_count
reset
reset() -> None

Reset the circuit breaker to its initial state.

Source code in provide/foundation/resilience/circuit_sync.py
def reset(self) -> None:
    """Reset the circuit breaker to its initial state."""
    with self._lock:
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
state
state() -> CircuitState

Get the current state of the circuit breaker.

Source code in provide/foundation/resilience/circuit_sync.py
def state(self) -> CircuitState:
    """Get the current state of the circuit breaker."""
    with self._lock:
        if self._state == CircuitState.OPEN and self._can_attempt_recovery():
            # This is a view of the state; the actual transition happens in call()
            return CircuitState.HALF_OPEN
        return self._state

Functions

circuit_breaker

circuit_breaker(
    failure_threshold: int = 5,
    recovery_timeout: float = DEFAULT_CIRCUIT_BREAKER_RECOVERY_TIMEOUT,
    expected_exception: (
        type[Exception] | tuple[type[Exception], ...]
    ) = Exception,
    time_source: Callable[[], float] | None = None,
    registry: Registry | None = None,
) -> Callable[[F], F]

Create a circuit breaker decorator.

Creates a SyncCircuitBreaker for synchronous functions and an AsyncCircuitBreaker for asynchronous functions to avoid locking issues.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit.

5
recovery_timeout float

Seconds to wait before attempting recovery.

DEFAULT_CIRCUIT_BREAKER_RECOVERY_TIMEOUT
expected_exception type[Exception] | tuple[type[Exception], ...]

Exception type(s) that trigger the breaker. Can be a single exception type or a tuple of exception types.

Exception
time_source Callable[[], float] | None

Optional callable that returns current time (for testing).

None
registry Registry | None

Optional registry to register the breaker with (for DI).

None

Returns:

Type Description
Callable[[F], F]

Circuit breaker decorator.

Examples:

>>> @circuit_breaker(failure_threshold=3, recovery_timeout=30)
... def unreliable_service():
...     return external_api_call()
>>> @circuit_breaker(expected_exception=ValueError)
... def parse_data():
...     return risky_parse()
>>> @circuit_breaker(expected_exception=(ValueError, TypeError))
... async def async_unreliable_service():
...     return await async_api_call()
Source code in provide/foundation/resilience/decorators.py
def circuit_breaker(
    failure_threshold: int = 5,
    recovery_timeout: float = DEFAULT_CIRCUIT_BREAKER_RECOVERY_TIMEOUT,
    expected_exception: type[Exception] | tuple[type[Exception], ...] = Exception,
    time_source: Callable[[], float] | None = None,
    registry: Registry | None = None,
) -> Callable[[F], F]:
    """Create a circuit breaker decorator.

    Creates a SyncCircuitBreaker for synchronous functions and an
    AsyncCircuitBreaker for asynchronous functions to avoid locking issues.

    Args:
        failure_threshold: Number of failures before opening circuit.
        recovery_timeout: Seconds to wait before attempting recovery.
        expected_exception: Exception type(s) that trigger the breaker.
            Can be a single exception type or a tuple of exception types.
        time_source: Optional callable that returns current time (for testing).
        registry: Optional registry to register the breaker with (for DI).

    Returns:
        Circuit breaker decorator.

    Examples:
        >>> @circuit_breaker(failure_threshold=3, recovery_timeout=30)
        ... def unreliable_service():
        ...     return external_api_call()

        >>> @circuit_breaker(expected_exception=ValueError)
        ... def parse_data():
        ...     return risky_parse()

        >>> @circuit_breaker(expected_exception=(ValueError, TypeError))
        ... async def async_unreliable_service():
        ...     return await async_api_call()

    """
    # Normalize expected_exception to tuple
    expected_exception_tuple: tuple[type[Exception], ...]
    if not isinstance(expected_exception, tuple):
        expected_exception_tuple = (expected_exception,)
    else:
        expected_exception_tuple = expected_exception

    def decorator(func: F) -> F:
        global _circuit_breaker_counter

        # Use provided registry or fall back to global
        reg = registry or _get_circuit_breaker_registry()

        # Create appropriate breaker type based on function type
        breaker: SyncCircuitBreaker | AsyncCircuitBreaker
        if asyncio.iscoroutinefunction(func):
            breaker = AsyncCircuitBreaker(
                failure_threshold=failure_threshold,
                recovery_timeout=recovery_timeout,
                expected_exception=expected_exception_tuple,
                time_source=time_source,
            )

            @functools.wraps(func)
            async def async_wrapper(*args: Any, **kwargs: Any) -> Any:
                return await breaker.call(func, *args, **kwargs)

            # Register async circuit breaker (thread-safe)
            with _circuit_breaker_counter_lock:
                _circuit_breaker_counter += 1
                breaker_name = f"cb_{_circuit_breaker_counter}"

            if _should_register_for_global_reset():
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_DIMENSION)
            else:
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_TEST_DIMENSION)

            return async_wrapper  # type: ignore[return-value]
        else:
            breaker = SyncCircuitBreaker(
                failure_threshold=failure_threshold,
                recovery_timeout=recovery_timeout,
                expected_exception=expected_exception_tuple,
                time_source=time_source,
            )

            @functools.wraps(func)
            def sync_wrapper(*args: Any, **kwargs: Any) -> Any:
                return breaker.call(func, *args, **kwargs)

            # Register sync circuit breaker (thread-safe)
            with _circuit_breaker_counter_lock:
                _circuit_breaker_counter += 1
                breaker_name = f"cb_{_circuit_breaker_counter}"

            if _should_register_for_global_reset():
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_DIMENSION)
            else:
                reg.register(breaker_name, breaker, dimension=CIRCUIT_BREAKER_TEST_DIMENSION)

            return sync_wrapper  # type: ignore[return-value]

    return decorator

get_bulkhead_manager

get_bulkhead_manager() -> BulkheadManager

Get the global bulkhead manager.

Source code in provide/foundation/resilience/bulkhead.py
def get_bulkhead_manager() -> BulkheadManager:
    """Get the global bulkhead manager."""
    return _bulkhead_manager