Skip to content

Retry

provide.foundation.resilience.retry

TODO: Add module docstring.

Classes

RetryExecutor

RetryExecutor(
    policy: RetryPolicy,
    on_retry: (
        Callable[[int, Exception], None] | None
    ) = None,
    time_source: Callable[[], float] | None = None,
    sleep_func: Callable[[float], None] | None = None,
    async_sleep_func: (
        Callable[[float], Awaitable[None]] | None
    ) = None,
)

Unified retry execution engine.

This executor handles the actual retry loop logic for both sync and async functions, using a RetryPolicy for configuration. It's used internally by both the @retry decorator and RetryMiddleware.

Initialize retry executor.

Parameters:

Name Type Description Default
policy RetryPolicy

Retry policy configuration

required
on_retry Callable[[int, Exception], None] | None

Optional callback for retry events (attempt, error)

None
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.time() for production use.

None
sleep_func Callable[[float], None] | None

Optional synchronous sleep function (for testing). Defaults to time.sleep() for production use.

None
async_sleep_func Callable[[float], Awaitable[None]] | None

Optional asynchronous sleep function (for testing). Defaults to asyncio.sleep() for production use.

None
Source code in provide/foundation/resilience/retry.py
def __init__(
    self,
    policy: RetryPolicy,
    on_retry: Callable[[int, Exception], None] | None = None,
    time_source: Callable[[], float] | None = None,
    sleep_func: Callable[[float], None] | None = None,
    async_sleep_func: Callable[[float], Awaitable[None]] | None = None,
) -> None:
    """Initialize retry executor.

    Args:
        policy: Retry policy configuration
        on_retry: Optional callback for retry events (attempt, error)
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.time() for production use.
        sleep_func: Optional synchronous sleep function (for testing).
                   Defaults to time.sleep() for production use.
        async_sleep_func: Optional asynchronous sleep function (for testing).
                         Defaults to asyncio.sleep() for production use.

    """
    self.policy = policy
    self.on_retry = on_retry
    self._time_source = time_source or time.time
    self._sleep = sleep_func or time.sleep
    self._async_sleep = async_sleep_func or asyncio.sleep
Functions
execute_async async
execute_async(
    func: Callable[..., Awaitable[T]],
    *args: Any,
    **kwargs: Any
) -> T

Execute asynchronous function with retry logic.

Parameters:

Name Type Description Default
func Callable[..., Awaitable[T]]

Async function to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result from successful execution

Raises:

Type Description
Exception

The last exception raised if all retry attempts are exhausted

Source code in provide/foundation/resilience/retry.py
async def execute_async(self, func: Callable[..., Awaitable[T]], *args: Any, **kwargs: Any) -> T:
    """Execute asynchronous function with retry logic.

    Args:
        func: Async function to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from successful execution

    Raises:
        Exception: The last exception raised if all retry attempts are exhausted

    """
    last_exception = None

    for attempt in range(1, self.policy.max_attempts + 1):
        try:
            return await func(*args, **kwargs)
        except Exception as e:
            last_exception = e

            # Don't retry on last attempt - log and raise
            if attempt >= self.policy.max_attempts:
                from provide.foundation.hub.foundation import get_foundation_logger

                get_foundation_logger().error(
                    f"All {self.policy.max_attempts} retry attempts failed",
                    attempts=self.policy.max_attempts,
                    error=str(e),
                    error_type=type(e).__name__,
                )
                raise

            # Check if we should retry this error
            if not self.policy.should_retry(e, attempt):
                raise

            # Calculate delay
            delay = self.policy.calculate_delay(attempt)

            # Log retry attempt
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().info(
                f"Retry {attempt}/{self.policy.max_attempts} after {delay:.2f}s",
                attempt=attempt,
                max_attempts=self.policy.max_attempts,
                delay=delay,
                error=str(e),
                error_type=type(e).__name__,
            )

            # Call retry callback if provided
            if self.on_retry:
                try:
                    if asyncio.iscoroutinefunction(self.on_retry):
                        await self.on_retry(attempt, e)
                    else:
                        self.on_retry(attempt, e)
                except Exception as callback_error:
                    from provide.foundation.hub.foundation import get_foundation_logger

                    get_foundation_logger().warning("Retry callback failed", error=str(callback_error))

            # Wait before retry
            await self._async_sleep(delay)

    # Should never reach here, but for safety
    if last_exception is not None:
        raise last_exception
    else:
        raise RuntimeError("No exception captured during async retry attempts")
execute_sync
execute_sync(
    func: Callable[..., T], *args: Any, **kwargs: Any
) -> T

Execute synchronous function with retry logic.

Parameters:

Name Type Description Default
func Callable[..., T]

Function to execute

required
*args Any

Positional arguments for func

()
**kwargs Any

Keyword arguments for func

{}

Returns:

Type Description
T

Result from successful execution

Raises:

Type Description
Exception

The last exception raised if all retry attempts are exhausted

Source code in provide/foundation/resilience/retry.py
def execute_sync(self, func: Callable[..., T], *args: Any, **kwargs: Any) -> T:
    """Execute synchronous function with retry logic.

    Args:
        func: Function to execute
        *args: Positional arguments for func
        **kwargs: Keyword arguments for func

    Returns:
        Result from successful execution

    Raises:
        Exception: The last exception raised if all retry attempts are exhausted

    """
    last_exception = None

    for attempt in range(1, self.policy.max_attempts + 1):
        try:
            return func(*args, **kwargs)
        except Exception as e:
            last_exception = e

            # Don't retry on last attempt - log and raise
            if attempt >= self.policy.max_attempts:
                from provide.foundation.hub.foundation import get_foundation_logger

                get_foundation_logger().error(
                    f"All {self.policy.max_attempts} retry attempts failed",
                    attempts=self.policy.max_attempts,
                    error=str(e),
                    error_type=type(e).__name__,
                )
                raise

            # Check if we should retry this error
            if not self.policy.should_retry(e, attempt):
                raise

            # Calculate delay
            delay = self.policy.calculate_delay(attempt)

            # Log retry attempt
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().info(
                f"Retry {attempt}/{self.policy.max_attempts} after {delay:.2f}s",
                attempt=attempt,
                max_attempts=self.policy.max_attempts,
                delay=delay,
                error=str(e),
                error_type=type(e).__name__,
            )

            # Call retry callback if provided
            if self.on_retry:
                try:
                    self.on_retry(attempt, e)
                except Exception as callback_error:
                    from provide.foundation.hub.foundation import get_foundation_logger

                    get_foundation_logger().warning("Retry callback failed", error=str(callback_error))

            # Wait before retry
            self._sleep(delay)

    # Should never reach here, but for safety
    if last_exception is not None:
        raise last_exception
    else:
        raise RuntimeError("No exception captured during retry attempts")

RetryPolicy

Configuration for retry behavior.

This policy can be used with both the @retry decorator and transport middleware, providing a unified configuration model for all retry scenarios.

Attributes:

Name Type Description
max_attempts int

Maximum number of retry attempts (must be >= 1)

backoff BackoffStrategy

Backoff strategy to use for delays

base_delay float

Base delay in seconds between retries

max_delay float

Maximum delay in seconds (caps exponential growth)

jitter bool

Whether to add random jitter to delays (±25%)

retryable_errors tuple[type[Exception], ...] | None

Tuple of exception types to retry (None = all)

retryable_status_codes set[int] | None

Set of HTTP status codes to retry (for middleware)

Functions
__str__
__str__() -> str

Human-readable string representation.

Source code in provide/foundation/resilience/retry.py
def __str__(self) -> str:
    """Human-readable string representation."""
    return (
        f"RetryPolicy(max_attempts={self.max_attempts}, "
        f"backoff={self.backoff.value}, base_delay={self.base_delay}s)"
    )
calculate_delay
calculate_delay(attempt: int) -> float

Calculate delay for a given attempt number.

Parameters:

Name Type Description Default
attempt int

Attempt number (1-based)

required

Returns:

Type Description
float

Delay in seconds

Source code in provide/foundation/resilience/retry.py
def calculate_delay(self, attempt: int) -> float:
    """Calculate delay for a given attempt number.

    Args:
        attempt: Attempt number (1-based)

    Returns:
        Delay in seconds

    """
    if attempt <= 0:
        return 0

    if self.backoff == BackoffStrategy.FIXED:
        delay = self.base_delay
    elif self.backoff == BackoffStrategy.LINEAR:
        delay = self.base_delay * attempt
    elif self.backoff == BackoffStrategy.EXPONENTIAL:
        delay = self.base_delay * (2 ** (attempt - 1))
    elif self.backoff == BackoffStrategy.FIBONACCI:
        # Calculate fibonacci number for attempt
        a, b = 0, 1
        for _ in range(attempt):
            a, b = b, a + b
        delay = self.base_delay * a
    else:
        delay = self.base_delay

    # Cap at max delay
    delay = min(delay, self.max_delay)

    # Add jitter if configured (±25% random variation)
    if self.jitter:
        jitter_factor = 0.75 + (random.random() * 0.5)  # nosec B311 - Retry jitter timing
        delay *= jitter_factor

    return delay
should_retry
should_retry(error: Exception, attempt: int) -> bool

Determine if an error should be retried.

Parameters:

Name Type Description Default
error Exception

The exception that occurred

required
attempt int

Current attempt number (1-based)

required

Returns:

Type Description
bool

True if should retry, False otherwise

Source code in provide/foundation/resilience/retry.py
def should_retry(self, error: Exception, attempt: int) -> bool:
    """Determine if an error should be retried.

    Args:
        error: The exception that occurred
        attempt: Current attempt number (1-based)

    Returns:
        True if should retry, False otherwise

    """
    # Check attempt limit
    if attempt >= self.max_attempts:
        return False

    # Check error type if filter is configured
    if self.retryable_errors is not None:
        return isinstance(error, self.retryable_errors)

    # Default to retry for any error
    return True
should_retry_response
should_retry_response(response: Any, attempt: int) -> bool

Check if HTTP response should be retried.

Parameters:

Name Type Description Default
response Any

Response object with status attribute

required
attempt int

Current attempt number (1-based)

required

Returns:

Type Description
bool

True if should retry, False otherwise

Source code in provide/foundation/resilience/retry.py
def should_retry_response(self, response: Any, attempt: int) -> bool:
    """Check if HTTP response should be retried.

    Args:
        response: Response object with status attribute
        attempt: Current attempt number (1-based)

    Returns:
        True if should retry, False otherwise

    """
    # Check attempt limit
    if attempt >= self.max_attempts:
        return False

    # Check status code if configured
    if self.retryable_status_codes is not None:
        return getattr(response, "status", None) in self.retryable_status_codes

    # Default to no retry for responses
    return False

Functions