Skip to content

Middleware

provide.foundation.transport.middleware

TODO: Add module docstring.

Classes

LoggingMiddleware

Bases: Middleware

Built-in telemetry middleware using foundation.logger.

Functions
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Log errors.

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Log errors."""
    # Sanitize URI if enabled
    uri_str = str(request.uri)
    if self.sanitize_logs:
        uri_str = sanitize_uri(uri_str)

    log.error(
        f"❌ {request.method} {uri_str} failed: {error}",
        method=request.method,
        uri=uri_str,
        error_type=error.__class__.__name__,
        error_message=str(error),
    )
    return error
process_request async
process_request(request: Request) -> Request

Log outgoing request.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """Log outgoing request."""
    if self.log_requests:
        # Sanitize URI and headers if enabled
        uri_str = str(request.uri)
        if self.sanitize_logs:
            uri_str = sanitize_uri(uri_str)
            headers = sanitize_headers(dict(request.headers)) if hasattr(request, "headers") else {}
        else:
            headers = dict(request.headers) if hasattr(request, "headers") else {}

        log.info(
            f"🚀 {request.method} {uri_str}",
            method=request.method,
            uri=uri_str,
            headers=headers,
        )

        if self.log_bodies and request.body:
            # Truncate request body to prevent logging secrets/PII
            body_str = str(request.body) if not isinstance(request.body, str) else request.body
            log.trace(
                "Request body",
                body=body_str[:500],  # Truncate large bodies (matches response behavior)
                method=request.method,
                uri=uri_str,
            )

    return request
process_response async
process_response(response: Response) -> Response

Log incoming response.

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """Log incoming response."""
    if self.log_responses:
        # Sanitize URI and headers if enabled
        uri_str = str(response.request.uri) if response.request else None
        if self.sanitize_logs and uri_str:
            uri_str = sanitize_uri(uri_str)
            headers = sanitize_headers(dict(response.headers)) if hasattr(response, "headers") else {}
        else:
            headers = dict(response.headers) if hasattr(response, "headers") else {}

        status_emoji = self._get_status_emoji(response.status)
        log.info(
            f"{status_emoji} {response.status} ({response.elapsed_ms:.0f}ms)",
            status_code=response.status,
            elapsed_ms=response.elapsed_ms,
            method=response.request.method if response.request else None,
            uri=uri_str,
            headers=headers,
        )

        if self.log_bodies and response.body:
            log.trace(
                "Response body",
                body=response.text[:500],  # Truncate large bodies
                status_code=response.status,
                method=response.request.method if response.request else None,
                uri=uri_str,
            )

    return response

MetricsMiddleware

Bases: Middleware

Middleware for collecting transport metrics using foundation.metrics.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Initialize metrics after creation.

Source code in provide/foundation/transport/middleware.py
def __attrs_post_init__(self) -> None:
    """Initialize metrics after creation."""
    self._request_counter = self._counter_func(
        "transport_requests_total",
        description="Total number of transport requests",
        unit="requests",
    )
    self._request_duration = self._histogram_func(
        "transport_request_duration_seconds",
        description="Duration of transport requests",
        unit="seconds",
    )
    self._error_counter = self._counter_func(
        "transport_errors_total",
        description="Total number of transport errors",
        unit="errors",
    )
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Record error metrics.

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Record error metrics."""
    method = request.method
    error_type = error.__class__.__name__

    self._error_counter.inc(1, method=method, error_type=error_type)

    return error
process_request async
process_request(request: Request) -> Request

Record request start time.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """Record request start time."""
    request.metadata["start_time"] = time.perf_counter()
    return request
process_response async
process_response(response: Response) -> Response

Record response metrics.

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """Record response metrics."""
    if response.request and "start_time" in response.request.metadata:
        start_time = response.request.metadata["start_time"]
        duration = time.perf_counter() - start_time

        method = response.request.method
        status_class = f"{response.status // 100}xx"

        # Record metrics with labels
        self._request_counter.inc(
            1,
            method=method,
            status_code=str(response.status),
            status_class=status_class,
        )

        self._request_duration.observe(duration, method=method, status_class=status_class)

    return response

Middleware

Bases: ABC

Abstract base class for transport middleware.

Functions
process_error abstractmethod async
process_error(
    error: Exception, request: Request
) -> Exception

Process errors during request.

Source code in provide/foundation/transport/middleware.py
@abstractmethod
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Process errors during request."""
process_request abstractmethod async
process_request(request: Request) -> Request

Process request before sending.

Source code in provide/foundation/transport/middleware.py
@abstractmethod
async def process_request(self, request: Request) -> Request:
    """Process request before sending."""
process_response abstractmethod async
process_response(response: Response) -> Response

Process response after receiving.

Source code in provide/foundation/transport/middleware.py
@abstractmethod
async def process_response(self, response: Response) -> Response:
    """Process response after receiving."""

MiddlewarePipeline

Pipeline for executing middleware in order.

Functions
add
add(middleware: Middleware) -> None

Add middleware to the pipeline.

Source code in provide/foundation/transport/middleware.py
def add(self, middleware: Middleware) -> None:
    """Add middleware to the pipeline."""
    self.middleware.append(middleware)
    log.trace(f"Added middleware: {middleware.__class__.__name__}")
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Process error through all middleware.

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Process error through all middleware."""
    for mw in self.middleware:
        error = await mw.process_error(error, request)
    return error
process_request async
process_request(request: Request) -> Request

Process request through all middleware.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """Process request through all middleware."""
    for mw in self.middleware:
        request = await mw.process_request(request)
    return request
process_response async
process_response(response: Response) -> Response

Process response through all middleware (in reverse order).

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """Process response through all middleware (in reverse order)."""
    for mw in reversed(self.middleware):
        response = await mw.process_response(response)
    return response
remove
remove(middleware_class: type[Middleware]) -> bool

Remove middleware by class type.

Source code in provide/foundation/transport/middleware.py
def remove(self, middleware_class: type[Middleware]) -> bool:
    """Remove middleware by class type."""
    for i, mw in enumerate(self.middleware):
        if isinstance(mw, middleware_class):
            del self.middleware[i]
            log.trace(f"Removed middleware: {middleware_class.__name__}")
            return True
    return False

RetryMiddleware

Bases: Middleware

Automatic retry middleware using unified retry logic.

Functions
execute_with_retry async
execute_with_retry(
    execute_func: Callable[[Request], Awaitable[Response]],
    request: Request,
) -> Response

Execute request with retry logic using unified RetryExecutor.

Source code in provide/foundation/transport/middleware.py
async def execute_with_retry(
    self, execute_func: Callable[[Request], Awaitable[Response]], request: Request
) -> Response:
    """Execute request with retry logic using unified RetryExecutor."""
    executor = RetryExecutor(
        self.policy,
        time_source=self.time_source,
        async_sleep_func=self.async_sleep_func,
    )

    async def wrapped() -> Response:
        response = await execute_func(request)

        # Check if status code is retryable
        if self.policy.should_retry_response(response, attempt=1):
            # Convert to exception for executor to handle
            raise TransportError(f"Retryable HTTP status: {response.status}")

        return response

    try:
        return await executor.execute_async(wrapped)
    except TransportError as e:
        # If it's our synthetic error, extract the response
        if "Retryable HTTP status" in str(e):
            # The last response will be returned
            # For now, re-raise as this needs more sophisticated handling
            raise
        raise
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Handle error, potentially with retries (this is called by client).

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Handle error, potentially with retries (this is called by client)."""
    return error
process_request async
process_request(request: Request) -> Request

No request processing needed.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """No request processing needed."""
    return request
process_response async
process_response(response: Response) -> Response

No response processing needed (retries handled in execute).

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """No response processing needed (retries handled in execute)."""
    return response

Functions

create_default_pipeline

create_default_pipeline(
    enable_retry: bool = True,
    enable_logging: bool = True,
    enable_metrics: bool = True,
) -> MiddlewarePipeline

Create pipeline with default middleware.

Parameters:

Name Type Description Default
enable_retry bool

Enable automatic retry middleware (default: True)

True
enable_logging bool

Enable request/response logging middleware (default: True)

True
enable_metrics bool

Enable metrics collection middleware (default: True)

True

Returns:

Type Description
MiddlewarePipeline

Configured middleware pipeline

Source code in provide/foundation/transport/middleware.py
def create_default_pipeline(
    enable_retry: bool = True,
    enable_logging: bool = True,
    enable_metrics: bool = True,
) -> MiddlewarePipeline:
    """Create pipeline with default middleware.

    Args:
        enable_retry: Enable automatic retry middleware (default: True)
        enable_logging: Enable request/response logging middleware (default: True)
        enable_metrics: Enable metrics collection middleware (default: True)

    Returns:
        Configured middleware pipeline

    """
    pipeline = MiddlewarePipeline()

    # Add retry middleware first (so retries happen before logging each attempt)
    if enable_retry:
        # Use sensible retry defaults
        retry_policy = RetryPolicy(
            max_attempts=3,
            backoff=BackoffStrategy.EXPONENTIAL,
            base_delay=1.0,
            max_delay=10.0,
            # Retry on common transient failures
            retryable_status_codes={408, 429, 500, 502, 503, 504},
        )
        pipeline.add(RetryMiddleware(policy=retry_policy))

    # Add built-in middleware
    if enable_logging:
        pipeline.add(LoggingMiddleware())

    if enable_metrics:
        pipeline.add(MetricsMiddleware())

    return pipeline

get_middleware_by_category

get_middleware_by_category(
    category: str = "transport.middleware",
) -> list[type[Middleware]]

Get all middleware for a category, sorted by priority.

Source code in provide/foundation/transport/middleware.py
def get_middleware_by_category(
    category: str = "transport.middleware",
) -> list[type[Middleware]]:
    """Get all middleware for a category, sorted by priority."""
    registry = get_component_registry()
    middleware = []

    for entry in registry:
        if entry.dimension == category:
            priority = entry.metadata.get("priority", 100)
            middleware.append((entry.value, priority))

    # Sort by priority (lower numbers = higher priority)
    middleware.sort(key=lambda x: x[1])
    return [mw[0] for mw in middleware]

register_middleware

register_middleware(
    name: str,
    middleware_class: type[Middleware],
    category: str = "transport.middleware",
    **metadata: str | int | bool | None
) -> None

Register middleware in the Hub.

Source code in provide/foundation/transport/middleware.py
def register_middleware(
    name: str,
    middleware_class: type[Middleware],
    category: str = "transport.middleware",
    **metadata: str | int | bool | None,
) -> None:
    """Register middleware in the Hub."""
    registry = get_component_registry()

    registry.register(
        name=name,
        value=middleware_class,
        dimension=category,
        metadata={
            "category": category,
            "priority": metadata.get("priority", 100),
            "class_name": middleware_class.__name__,
            **metadata,
        },
        replace=True,
    )