Skip to content

Circuit async

provide.foundation.resilience.circuit_async

TODO: Add module docstring.

Classes

AsyncCircuitBreaker

AsyncCircuitBreaker(
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: (
        type[Exception] | tuple[type[Exception], ...]
    ) = Exception,
    time_source: Callable[[], float] | None = None,
)

Asynchronous circuit breaker for resilience patterns.

Uses asyncio.Lock for async-safe state management. For synchronous code, use SyncCircuitBreaker instead.

Initialize the asynchronous circuit breaker.

Parameters:

Name Type Description Default
failure_threshold int

Number of failures before opening circuit

5
recovery_timeout float

Seconds to wait before attempting recovery

30.0
expected_exception type[Exception] | tuple[type[Exception], ...]

Exception type(s) to catch

Exception
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
Source code in provide/foundation/resilience/circuit_async.py
def __init__(
    self,
    failure_threshold: int = 5,
    recovery_timeout: float = 30.0,
    expected_exception: type[Exception] | tuple[type[Exception], ...] = Exception,
    time_source: Callable[[], float] | None = None,
) -> None:
    """Initialize the asynchronous circuit breaker.

    Args:
        failure_threshold: Number of failures before opening circuit
        recovery_timeout: Seconds to wait before attempting recovery
        expected_exception: Exception type(s) to catch
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
    """
    self.failure_threshold = failure_threshold
    self.recovery_timeout = recovery_timeout
    self.expected_exception = expected_exception
    self._time_source = time_source or time.time
    # Create lock directly - asyncio.Lock() can be created outside event loop
    # and will bind when first awaited
    self._lock = asyncio.Lock()
    # Initialize state
    self._state = CircuitState.CLOSED
    self._failure_count = 0
    self._last_failure_time: float | None = None
Functions
call async
call(func: Callable, *args: Any, **kwargs: Any) -> Any

Execute an asynchronous function through the circuit breaker.

Parameters:

Name Type Description Default
func Callable

Async callable to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
Any

Result from func

Raises:

Type Description
RuntimeError

If circuit is open

Exception

Whatever exception func raises

Source code in provide/foundation/resilience/circuit_async.py
async def call(self, func: Callable, *args: Any, **kwargs: Any) -> Any:
    """Execute an asynchronous function through the circuit breaker.

    Args:
        func: Async callable to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from func

    Raises:
        RuntimeError: If circuit is open
        Exception: Whatever exception func raises
    """
    async with self._lock:
        # Check state directly to avoid deadlock
        if self._state == CircuitState.OPEN and not self._can_attempt_recovery():
            raise RuntimeError("Circuit breaker is open")
        # If HALF_OPEN or recovery possible, we proceed with the call

    try:
        result = await func(*args, **kwargs)
        await self._on_success()
        return result
    except self.expected_exception as e:
        await self._on_failure()
        raise e
failure_count async
failure_count() -> int

Get the current failure count.

Returns:

Type Description
int

Current failure count

Source code in provide/foundation/resilience/circuit_async.py
async def failure_count(self) -> int:
    """Get the current failure count.

    Returns:
        Current failure count
    """
    async with self._lock:
        return self._failure_count
reset async
reset() -> None

Reset the circuit breaker to its initial state.

Source code in provide/foundation/resilience/circuit_async.py
async def reset(self) -> None:
    """Reset the circuit breaker to its initial state."""
    async with self._lock:
        self._state = CircuitState.CLOSED
        self._failure_count = 0
        self._last_failure_time = None
state async
state() -> CircuitState

Get the current state of the circuit breaker.

Returns:

Type Description
CircuitState

Current circuit state

Source code in provide/foundation/resilience/circuit_async.py
async def state(self) -> CircuitState:
    """Get the current state of the circuit breaker.

    Returns:
        Current circuit state
    """
    async with self._lock:
        if self._state == CircuitState.OPEN and self._can_attempt_recovery():
            # This is a view of the state; the actual transition happens in call()
            return CircuitState.HALF_OPEN
        return self._state