Skip to content

Transport

provide.foundation.transport

TODO: Add module docstring.

Classes

HTTPConfig

Bases: TransportConfig

HTTP-specific configuration.

HTTPMethod

Bases: str, Enum

HTTP methods.

HTTPResponseError

HTTPResponseError(
    message: str,
    *,
    status_code: int,
    response: Response,
    **kwargs: Any
)

Bases: TransportError

HTTP response error (4xx/5xx status codes).

Source code in provide/foundation/transport/errors.py
def __init__(self, message: str, *, status_code: int, response: Response, **kwargs: Any) -> None:
    super().__init__(message, **kwargs)
    self.status_code = status_code
    self.response = response

HTTPTransport

HTTPTransport()

Bases: TransportBase

HTTP/HTTPS transport using httpx backend.

Source code in provide/foundation/transport/base.py
def __init__(self) -> None:
    self._logger = get_logger(self.__class__.__name__)
Functions
connect async
connect() -> None

Initialize httpx client with configuration.

Source code in provide/foundation/transport/http.py
async def connect(self) -> None:
    """Initialize httpx client with configuration."""
    if self._client is not None:
        return

    limits = httpx.Limits(
        max_connections=self.config.pool_connections,
        max_keepalive_connections=self.config.pool_maxsize,
    )

    timeout = httpx.Timeout(self.config.timeout)

    self._client = httpx.AsyncClient(
        limits=limits,
        timeout=timeout,
        verify=self.config.verify_ssl,
        follow_redirects=self.config.follow_redirects,
        max_redirects=self.config.max_redirects,
        http2=self.config.http2,
    )

    log.trace(
        "HTTP transport connected",
        pool_connections=self.config.pool_connections,
        http2=self.config.http2,
    )
disconnect async
disconnect() -> None

Close httpx client.

Source code in provide/foundation/transport/http.py
async def disconnect(self) -> None:
    """Close httpx client."""
    if self._client is not None:
        await self._client.aclose()
        self._client = None
        log.trace("HTTP transport disconnected")
execute async
execute(request: Request) -> Response

Execute HTTP request.

Source code in provide/foundation/transport/http.py
async def execute(self, request: Request) -> Response:
    """Execute HTTP request."""
    await self.connect()

    if self._client is None:
        raise TransportConnectionError("HTTP client not connected")

    # Log request with sanitized URI (redacts sensitive query params)
    sanitized_uri = sanitize_uri(request.uri)
    log.info(f"🚀 {request.method} {sanitized_uri}")

    start_time = time.perf_counter()

    try:
        # Determine request body format
        json_data = None
        data = None

        if request.body is not None:
            if isinstance(request.body, dict):
                json_data = request.body
            elif isinstance(request.body, (str, bytes)):
                data = request.body
            else:
                # Try to serialize as JSON
                json_data = request.body

        # Make the request
        # Only pass params if explicitly set (empty dict would override URI query params)
        request_kwargs = {
            "method": request.method,
            "url": request.uri,
            "headers": request.headers,
            "json": json_data,
            "data": data,
            "timeout": request.timeout if request.timeout is not None else self.config.timeout,
        }
        if request.params:
            request_kwargs["params"] = request.params

        httpx_response = await self._client.request(**request_kwargs)  # type: ignore[arg-type]

        elapsed_ms = (time.perf_counter() - start_time) * 1000

        # Log response with status emoji
        status_emoji = self._get_status_emoji(httpx_response.status_code)
        log.info(f"{status_emoji} {httpx_response.status_code} ({elapsed_ms:.0f}ms)")

        # Create response object
        response = Response(
            status=httpx_response.status_code,
            headers=dict(httpx_response.headers),
            body=httpx_response.content,
            metadata={
                "http_version": str(httpx_response.http_version),
                "reason_phrase": httpx_response.reason_phrase,
                "encoding": httpx_response.encoding,
                "is_redirect": httpx_response.is_redirect,
                "url": str(httpx_response.url),
            },
            elapsed_ms=elapsed_ms,
            request=request,
        )

        return response

    except httpx.ConnectError as e:
        log.error(f"❌ Connection failed: {e}")
        raise TransportConnectionError(f"Failed to connect: {e}", request=request) from e

    except httpx.TimeoutException as e:
        elapsed_ms = (time.perf_counter() - start_time) * 1000
        log.error(f"⏱️ Request timed out ({elapsed_ms:.0f}ms)")
        raise TransportTimeoutError(f"Request timed out: {e}", request=request) from e

    except httpx.RequestError as e:
        log.error(f"❌ Request failed: {e}")
        raise TransportConnectionError(f"Request failed: {e}", request=request) from e

    except Exception as e:
        log.error(f"❌ Unexpected error: {e}", exc_info=True)
        raise TransportConnectionError(f"Unexpected error: {e}", request=request) from e
stream async
stream(request: Request) -> AsyncIterator[bytes]

Stream HTTP response.

Source code in provide/foundation/transport/http.py
async def stream(self, request: Request) -> AsyncIterator[bytes]:  # type: ignore[override,misc]
    """Stream HTTP response."""
    await self.connect()

    if self._client is None:
        raise TransportConnectionError("HTTP client not connected")

    # Log streaming request with sanitized URI
    sanitized_uri = sanitize_uri(request.uri)
    log.info(f"🚀 {request.method} {sanitized_uri} (streaming)")

    try:
        # Only pass params if explicitly set (empty dict would override URI query params)
        stream_kwargs = {
            "method": request.method,
            "url": request.uri,
            "headers": request.headers,
            "timeout": request.timeout if request.timeout is not None else self.config.timeout,
        }
        if request.params:
            stream_kwargs["params"] = request.params

        async with self._client.stream(**stream_kwargs) as response:  # type: ignore[arg-type]
            # Log response start
            status_emoji = self._get_status_emoji(response.status_code)
            log.info(f"{status_emoji} {response.status_code} (streaming)")

            # Stream the response
            async for chunk in response.aiter_bytes():
                yield chunk

    except httpx.ConnectError as e:
        raise TransportConnectionError(f"Failed to connect: {e}", request=request) from e

    except httpx.TimeoutException as e:
        raise TransportTimeoutError(f"Stream timed out: {e}", request=request) from e

    except httpx.RequestError as e:
        raise TransportConnectionError(f"Stream failed: {e}", request=request) from e
supports
supports(transport_type: TransportType) -> bool

Check if this transport supports the given type.

Source code in provide/foundation/transport/http.py
def supports(self, transport_type: TransportType) -> bool:
    """Check if this transport supports the given type."""
    return transport_type.value in self.SCHEMES

LoggingMiddleware

Bases: Middleware

Built-in telemetry middleware using foundation.logger.

Functions
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Log errors.

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Log errors."""
    # Sanitize URI if enabled
    uri_str = str(request.uri)
    if self.sanitize_logs:
        uri_str = sanitize_uri(uri_str)

    log.error(
        f"❌ {request.method} {uri_str} failed: {error}",
        method=request.method,
        uri=uri_str,
        error_type=error.__class__.__name__,
        error_message=str(error),
    )
    return error
process_request async
process_request(request: Request) -> Request

Log outgoing request.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """Log outgoing request."""
    if self.log_requests:
        # Sanitize URI and headers if enabled
        uri_str = str(request.uri)
        if self.sanitize_logs:
            uri_str = sanitize_uri(uri_str)
            headers = sanitize_headers(dict(request.headers)) if hasattr(request, "headers") else {}
        else:
            headers = dict(request.headers) if hasattr(request, "headers") else {}

        log.info(
            f"🚀 {request.method} {uri_str}",
            method=request.method,
            uri=uri_str,
            headers=headers,
        )

        if self.log_bodies and request.body:
            # Truncate request body to prevent logging secrets/PII
            body_str = str(request.body) if not isinstance(request.body, str) else request.body
            log.trace(
                "Request body",
                body=body_str[:500],  # Truncate large bodies (matches response behavior)
                method=request.method,
                uri=uri_str,
            )

    return request
process_response async
process_response(response: Response) -> Response

Log incoming response.

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """Log incoming response."""
    if self.log_responses:
        # Sanitize URI and headers if enabled
        uri_str = str(response.request.uri) if response.request else None
        if self.sanitize_logs and uri_str:
            uri_str = sanitize_uri(uri_str)
            headers = sanitize_headers(dict(response.headers)) if hasattr(response, "headers") else {}
        else:
            headers = dict(response.headers) if hasattr(response, "headers") else {}

        status_emoji = self._get_status_emoji(response.status)
        log.info(
            f"{status_emoji} {response.status} ({response.elapsed_ms:.0f}ms)",
            status_code=response.status,
            elapsed_ms=response.elapsed_ms,
            method=response.request.method if response.request else None,
            uri=uri_str,
            headers=headers,
        )

        if self.log_bodies and response.body:
            log.trace(
                "Response body",
                body=response.text[:500],  # Truncate large bodies
                status_code=response.status,
                method=response.request.method if response.request else None,
                uri=uri_str,
            )

    return response

MetricsMiddleware

Bases: Middleware

Middleware for collecting transport metrics using foundation.metrics.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Initialize metrics after creation.

Source code in provide/foundation/transport/middleware.py
def __attrs_post_init__(self) -> None:
    """Initialize metrics after creation."""
    self._request_counter = self._counter_func(
        "transport_requests_total",
        description="Total number of transport requests",
        unit="requests",
    )
    self._request_duration = self._histogram_func(
        "transport_request_duration_seconds",
        description="Duration of transport requests",
        unit="seconds",
    )
    self._error_counter = self._counter_func(
        "transport_errors_total",
        description="Total number of transport errors",
        unit="errors",
    )
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Record error metrics.

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Record error metrics."""
    method = request.method
    error_type = error.__class__.__name__

    self._error_counter.inc(1, method=method, error_type=error_type)

    return error
process_request async
process_request(request: Request) -> Request

Record request start time.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """Record request start time."""
    request.metadata["start_time"] = time.perf_counter()
    return request
process_response async
process_response(response: Response) -> Response

Record response metrics.

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """Record response metrics."""
    if response.request and "start_time" in response.request.metadata:
        start_time = response.request.metadata["start_time"]
        duration = time.perf_counter() - start_time

        method = response.request.method
        status_class = f"{response.status // 100}xx"

        # Record metrics with labels
        self._request_counter.inc(
            1,
            method=method,
            status_code=str(response.status),
            status_class=status_class,
        )

        self._request_duration.observe(duration, method=method, status_class=status_class)

    return response

Middleware

Bases: ABC

Abstract base class for transport middleware.

Functions
process_error abstractmethod async
process_error(
    error: Exception, request: Request
) -> Exception

Process errors during request.

Source code in provide/foundation/transport/middleware.py
@abstractmethod
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Process errors during request."""
process_request abstractmethod async
process_request(request: Request) -> Request

Process request before sending.

Source code in provide/foundation/transport/middleware.py
@abstractmethod
async def process_request(self, request: Request) -> Request:
    """Process request before sending."""
process_response abstractmethod async
process_response(response: Response) -> Response

Process response after receiving.

Source code in provide/foundation/transport/middleware.py
@abstractmethod
async def process_response(self, response: Response) -> Response:
    """Process response after receiving."""

MiddlewarePipeline

Pipeline for executing middleware in order.

Functions
add
add(middleware: Middleware) -> None

Add middleware to the pipeline.

Source code in provide/foundation/transport/middleware.py
def add(self, middleware: Middleware) -> None:
    """Add middleware to the pipeline."""
    self.middleware.append(middleware)
    log.trace(f"Added middleware: {middleware.__class__.__name__}")
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Process error through all middleware.

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Process error through all middleware."""
    for mw in self.middleware:
        error = await mw.process_error(error, request)
    return error
process_request async
process_request(request: Request) -> Request

Process request through all middleware.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """Process request through all middleware."""
    for mw in self.middleware:
        request = await mw.process_request(request)
    return request
process_response async
process_response(response: Response) -> Response

Process response through all middleware (in reverse order).

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """Process response through all middleware (in reverse order)."""
    for mw in reversed(self.middleware):
        response = await mw.process_response(response)
    return response
remove
remove(middleware_class: type[Middleware]) -> bool

Remove middleware by class type.

Source code in provide/foundation/transport/middleware.py
def remove(self, middleware_class: type[Middleware]) -> bool:
    """Remove middleware by class type."""
    for i, mw in enumerate(self.middleware):
        if isinstance(mw, middleware_class):
            del self.middleware[i]
            log.trace(f"Removed middleware: {middleware_class.__name__}")
            return True
    return False

Request

Protocol-agnostic request.

Attributes
base_url property
base_url: str

Extract base URL from URI.

transport_type property
transport_type: TransportType

Infer transport type from URI scheme.

Response

Protocol-agnostic response.

Attributes
text property
text: str

Get response body as text.

Functions
is_success
is_success() -> bool

Check if response indicates success.

Source code in provide/foundation/transport/base.py
def is_success(self) -> bool:
    """Check if response indicates success."""
    return 200 <= self.status < 300
json
json() -> Any

Parse response body as JSON.

Source code in provide/foundation/transport/base.py
def json(self) -> Any:
    """Parse response body as JSON."""
    if isinstance(self.body, bytes):
        return json_loads(self.body.decode("utf-8"))
    if isinstance(self.body, str):
        return json_loads(self.body)
    raise ValueError("Response body is not JSON-parseable")
raise_for_status
raise_for_status() -> None

Raise error if response status indicates failure.

Source code in provide/foundation/transport/base.py
def raise_for_status(self) -> None:
    """Raise error if response status indicates failure."""
    if not self.is_success():
        from provide.foundation.transport.errors import HTTPResponseError

        raise HTTPResponseError(
            f"Request failed with status {self.status}",
            status_code=self.status,
            response=self,
        )

RetryMiddleware

Bases: Middleware

Automatic retry middleware using unified retry logic.

Functions
execute_with_retry async
execute_with_retry(
    execute_func: Callable[[Request], Awaitable[Response]],
    request: Request,
) -> Response

Execute request with retry logic using unified RetryExecutor.

Source code in provide/foundation/transport/middleware.py
async def execute_with_retry(
    self, execute_func: Callable[[Request], Awaitable[Response]], request: Request
) -> Response:
    """Execute request with retry logic using unified RetryExecutor."""
    executor = RetryExecutor(
        self.policy,
        time_source=self.time_source,
        async_sleep_func=self.async_sleep_func,
    )

    async def wrapped() -> Response:
        response = await execute_func(request)

        # Check if status code is retryable
        if self.policy.should_retry_response(response, attempt=1):
            # Convert to exception for executor to handle
            raise TransportError(f"Retryable HTTP status: {response.status}")

        return response

    try:
        return await executor.execute_async(wrapped)
    except TransportError as e:
        # If it's our synthetic error, extract the response
        if "Retryable HTTP status" in str(e):
            # The last response will be returned
            # For now, re-raise as this needs more sophisticated handling
            raise
        raise
process_error async
process_error(
    error: Exception, request: Request
) -> Exception

Handle error, potentially with retries (this is called by client).

Source code in provide/foundation/transport/middleware.py
async def process_error(self, error: Exception, request: Request) -> Exception:
    """Handle error, potentially with retries (this is called by client)."""
    return error
process_request async
process_request(request: Request) -> Request

No request processing needed.

Source code in provide/foundation/transport/middleware.py
async def process_request(self, request: Request) -> Request:
    """No request processing needed."""
    return request
process_response async
process_response(response: Response) -> Response

No response processing needed (retries handled in execute).

Source code in provide/foundation/transport/middleware.py
async def process_response(self, response: Response) -> Response:
    """No response processing needed (retries handled in execute)."""
    return response

TransportConfig

Bases: RuntimeConfig

Base configuration for all transports.

TransportConnectionError

TransportConnectionError(
    message: str,
    *,
    request: Request | None = None,
    **kwargs: Any
)

Bases: TransportError

Transport connection failed.

Source code in provide/foundation/transport/errors.py
def __init__(self, message: str, *, request: Request | None = None, **kwargs: Any) -> None:
    super().__init__(message, **kwargs)
    self.request = request

TransportError

TransportError(
    message: str,
    *,
    request: Request | None = None,
    **kwargs: Any
)

Bases: FoundationError

Base transport error.

Source code in provide/foundation/transport/errors.py
def __init__(self, message: str, *, request: Request | None = None, **kwargs: Any) -> None:
    super().__init__(message, **kwargs)
    self.request = request

TransportNotFoundError

TransportNotFoundError(
    message: str, *, scheme: str, **kwargs: Any
)

Bases: TransportError

No transport found for the given URI scheme.

Source code in provide/foundation/transport/errors.py
def __init__(self, message: str, *, scheme: str, **kwargs: Any) -> None:
    super().__init__(message, **kwargs)
    self.scheme = scheme

TransportTimeoutError

TransportTimeoutError(
    message: str,
    *,
    request: Request | None = None,
    **kwargs: Any
)

Bases: TransportError

Transport request timed out.

Source code in provide/foundation/transport/errors.py
def __init__(self, message: str, *, request: Request | None = None, **kwargs: Any) -> None:
    super().__init__(message, **kwargs)
    self.request = request

TransportType

Bases: str, Enum

Supported transport types.

UniversalClient

Universal client that works with any transport via Hub registry.

The client uses a TransportCache that automatically evicts transports that exceed the failure threshold (default: 3 consecutive failures).

Functions
__aenter__ async
__aenter__() -> UniversalClient

Context manager entry.

Source code in provide/foundation/transport/client.py
async def __aenter__(self) -> UniversalClient:
    """Context manager entry."""
    return self
__aexit__ async
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: Any,
) -> None

Context manager exit - cleanup all transports.

Source code in provide/foundation/transport/client.py
async def __aexit__(
    self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
) -> None:
    """Context manager exit - cleanup all transports."""
    transports = self._cache.clear()
    for transport in transports.values():
        try:
            await transport.disconnect()
        except Exception as e:
            log.error(f"Error disconnecting transport: {e}")
delete async
delete(uri: str, **kwargs: Any) -> Response

DELETE request.

Source code in provide/foundation/transport/client.py
async def delete(self, uri: str, **kwargs: Any) -> Response:
    """DELETE request."""
    return await self.request(uri, HTTPMethod.DELETE, **kwargs)
get async
get(uri: str, **kwargs: Any) -> Response

GET request.

Source code in provide/foundation/transport/client.py
async def get(self, uri: str, **kwargs: Any) -> Response:
    """GET request."""
    return await self.request(uri, HTTPMethod.GET, **kwargs)
head async
head(uri: str, **kwargs: Any) -> Response

HEAD request.

Source code in provide/foundation/transport/client.py
async def head(self, uri: str, **kwargs: Any) -> Response:
    """HEAD request."""
    return await self.request(uri, HTTPMethod.HEAD, **kwargs)
options async
options(uri: str, **kwargs: Any) -> Response

OPTIONS request.

Source code in provide/foundation/transport/client.py
async def options(self, uri: str, **kwargs: Any) -> Response:
    """OPTIONS request."""
    return await self.request(uri, HTTPMethod.OPTIONS, **kwargs)
patch async
patch(uri: str, **kwargs: Any) -> Response

PATCH request.

Source code in provide/foundation/transport/client.py
async def patch(self, uri: str, **kwargs: Any) -> Response:
    """PATCH request."""
    return await self.request(uri, HTTPMethod.PATCH, **kwargs)
post async
post(uri: str, **kwargs: Any) -> Response

POST request.

Source code in provide/foundation/transport/client.py
async def post(self, uri: str, **kwargs: Any) -> Response:
    """POST request."""
    return await self.request(uri, HTTPMethod.POST, **kwargs)
put async
put(uri: str, **kwargs: Any) -> Response

PUT request.

Source code in provide/foundation/transport/client.py
async def put(self, uri: str, **kwargs: Any) -> Response:
    """PUT request."""
    return await self.request(uri, HTTPMethod.PUT, **kwargs)
request async
request(
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    *,
    headers: Headers | None = None,
    params: Params | None = None,
    body: Data = None,
    timeout: float | None = None,
    **kwargs: Any
) -> Response

Make a request using appropriate transport.

Parameters:

Name Type Description Default
uri str

Full URI to make request to

required
method str | HTTPMethod

HTTP method or protocol-specific method

GET
headers Headers | None

Request headers

None
params Params | None

Query parameters

None
body Data

Request body (dict for JSON, str/bytes for raw)

None
timeout float | None

Request timeout override

None
**kwargs Any

Additional request metadata

{}

Returns:

Type Description
Response

Response from the transport

Source code in provide/foundation/transport/client.py
async def request(
    self,
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    *,
    headers: Headers | None = None,
    params: Params | None = None,
    body: Data = None,
    timeout: float | None = None,
    **kwargs: Any,
) -> Response:
    """Make a request using appropriate transport.

    Args:
        uri: Full URI to make request to
        method: HTTP method or protocol-specific method
        headers: Request headers
        params: Query parameters
        body: Request body (dict for JSON, str/bytes for raw)
        timeout: Request timeout override
        **kwargs: Additional request metadata

    Returns:
        Response from the transport

    """
    # Normalize method
    if isinstance(method, HTTPMethod):
        method = method.value

    # Merge headers
    request_headers = dict(self.default_headers)
    if headers:
        request_headers.update(headers)

    # Create request object
    request = Request(
        uri=uri,
        method=method,
        headers=request_headers,
        params=params or {},
        body=body,
        timeout=timeout or self.default_timeout,
        metadata=kwargs,
    )

    # Process through middleware
    request = await self.middleware.process_request(request)

    try:
        # Get transport for this URI
        transport = await self._get_transport(request.transport_type.value)

        # Execute request
        response = await transport.execute(request)

        # Mark success in cache
        self._cache.mark_success(request.transport_type.value)

        # Process response through middleware
        response = await self.middleware.process_response(response)

        return response

    except Exception as e:
        # Mark failure if it's a transport error
        if isinstance(e, TransportError):
            self._cache.mark_failure(request.transport_type.value, e)

        # Process error through middleware
        e = await self.middleware.process_error(e, request)
        raise e
reset_transport_cache
reset_transport_cache() -> None

Reset the transport cache.

Useful for testing or forcing reconnection after configuration changes.

Source code in provide/foundation/transport/client.py
def reset_transport_cache(self) -> None:
    """Reset the transport cache.

    Useful for testing or forcing reconnection after configuration changes.
    """
    log.info("🔄 Resetting transport cache")
    self._cache.clear()
stream async
stream(
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    **kwargs: Any
) -> AsyncIterator[bytes]

Stream data from URI.

Parameters:

Name Type Description Default
uri str

URI to stream from

required
method str | HTTPMethod

HTTP method or protocol-specific method

GET
**kwargs Any

Additional request parameters

{}

Yields:

Type Description
AsyncIterator[bytes]

Chunks of response data

Source code in provide/foundation/transport/client.py
async def stream(
    self,
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    **kwargs: Any,
) -> AsyncIterator[bytes]:
    """Stream data from URI.

    Args:
        uri: URI to stream from
        method: HTTP method or protocol-specific method
        **kwargs: Additional request parameters

    Yields:
        Chunks of response data

    """
    # Normalize method
    if isinstance(method, HTTPMethod):
        method = method.value

    # Create request
    request = Request(uri=uri, method=method, headers=dict(self.default_headers), **kwargs)

    # Get transport
    transport = await self._get_transport(request.transport_type.value)

    # Stream response
    async for chunk in transport.stream(request):
        yield chunk

Functions

create_default_pipeline

create_default_pipeline(
    enable_retry: bool = True,
    enable_logging: bool = True,
    enable_metrics: bool = True,
) -> MiddlewarePipeline

Create pipeline with default middleware.

Parameters:

Name Type Description Default
enable_retry bool

Enable automatic retry middleware (default: True)

True
enable_logging bool

Enable request/response logging middleware (default: True)

True
enable_metrics bool

Enable metrics collection middleware (default: True)

True

Returns:

Type Description
MiddlewarePipeline

Configured middleware pipeline

Source code in provide/foundation/transport/middleware.py
def create_default_pipeline(
    enable_retry: bool = True,
    enable_logging: bool = True,
    enable_metrics: bool = True,
) -> MiddlewarePipeline:
    """Create pipeline with default middleware.

    Args:
        enable_retry: Enable automatic retry middleware (default: True)
        enable_logging: Enable request/response logging middleware (default: True)
        enable_metrics: Enable metrics collection middleware (default: True)

    Returns:
        Configured middleware pipeline

    """
    pipeline = MiddlewarePipeline()

    # Add retry middleware first (so retries happen before logging each attempt)
    if enable_retry:
        # Use sensible retry defaults
        retry_policy = RetryPolicy(
            max_attempts=3,
            backoff=BackoffStrategy.EXPONENTIAL,
            base_delay=1.0,
            max_delay=10.0,
            # Retry on common transient failures
            retryable_status_codes={408, 429, 500, 502, 503, 504},
        )
        pipeline.add(RetryMiddleware(policy=retry_policy))

    # Add built-in middleware
    if enable_logging:
        pipeline.add(LoggingMiddleware())

    if enable_metrics:
        pipeline.add(MetricsMiddleware())

    return pipeline

delete async

delete(uri: str, **kwargs: Any) -> Response

DELETE request using default client.

Source code in provide/foundation/transport/client.py
async def delete(uri: str, **kwargs: Any) -> Response:
    """DELETE request using default client."""
    client = get_default_client()
    return await client.delete(uri, **kwargs)

get async

get(uri: str, **kwargs: Any) -> Response

GET request using default client.

Source code in provide/foundation/transport/client.py
async def get(uri: str, **kwargs: Any) -> Response:
    """GET request using default client."""
    client = get_default_client()
    return await client.get(uri, **kwargs)

get_default_client

get_default_client() -> UniversalClient

Get or create the default client instance.

This function acts as the composition root for the default client, preserving backward compatibility for public convenience functions.

Source code in provide/foundation/transport/client.py
def get_default_client() -> UniversalClient:
    """Get or create the default client instance.

    This function acts as the composition root for the default client,
    preserving backward compatibility for public convenience functions.
    """
    global _default_client
    if _default_client is None:
        _default_client = UniversalClient(hub=get_hub())
    return _default_client

get_transport

get_transport(uri: str) -> Transport

Get transport instance for a URI.

Parameters:

Name Type Description Default
uri str

Full URI to get transport for

required

Returns:

Type Description
Transport

Transport instance ready to use

Raises:

Type Description
TransportNotFoundError

If no transport supports the URI scheme

Source code in provide/foundation/transport/registry.py
def get_transport(uri: str) -> Transport:
    """Get transport instance for a URI.

    Args:
        uri: Full URI to get transport for

    Returns:
        Transport instance ready to use

    Raises:
        TransportNotFoundError: If no transport supports the URI scheme

    """
    scheme = uri.split("://")[0].lower()
    transport_class = get_transport_for_scheme(scheme)
    return transport_class()

get_transport_info

get_transport_info(
    scheme_or_name: str,
) -> dict[str, Any] | None

Get detailed information about a transport.

Parameters:

Name Type Description Default
scheme_or_name str

URI scheme or transport name

required

Returns:

Type Description
dict[str, Any] | None

Transport information or None if not found

Source code in provide/foundation/transport/registry.py
def get_transport_info(scheme_or_name: str) -> dict[str, Any] | None:
    """Get detailed information about a transport.

    Args:
        scheme_or_name: URI scheme or transport name

    Returns:
        Transport information or None if not found

    """
    registry = get_component_registry()

    for entry in registry:
        if entry.dimension == ComponentCategory.TRANSPORT.value:
            # Check if it matches by name
            if entry.name == scheme_or_name:
                return {
                    "name": entry.name,
                    "class": entry.value,
                    "schemes": entry.metadata.get("schemes", []),
                    "transport_type": entry.metadata.get("transport_type"),
                    "metadata": entry.metadata,
                }

            # Check if it matches by scheme
            schemes = entry.metadata.get("schemes", [])
            if scheme_or_name.lower() in schemes:
                return {
                    "name": entry.name,
                    "class": entry.value,
                    "schemes": schemes,
                    "transport_type": entry.metadata.get("transport_type"),
                    "metadata": entry.metadata,
                }

    return None

head async

head(uri: str, **kwargs: Any) -> Response

HEAD request using default client.

Source code in provide/foundation/transport/client.py
async def head(uri: str, **kwargs: Any) -> Response:
    """HEAD request using default client."""
    client = get_default_client()
    return await client.head(uri, **kwargs)

list_registered_transports

list_registered_transports() -> dict[str, dict[str, Any]]

List all registered transports.

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary mapping transport names to their info

Source code in provide/foundation/transport/registry.py
def list_registered_transports() -> dict[str, dict[str, Any]]:
    """List all registered transports.

    Returns:
        Dictionary mapping transport names to their info

    """
    registry = get_component_registry()
    transports = {}

    for entry in registry:
        if entry.dimension == ComponentCategory.TRANSPORT.value:
            transports[entry.name] = {
                "class": entry.value,
                "schemes": entry.metadata.get("schemes", []),
                "transport_type": entry.metadata.get("transport_type"),
                "metadata": entry.metadata,
            }

    return transports

options async

options(uri: str, **kwargs: Any) -> Response

OPTIONS request using default client.

Source code in provide/foundation/transport/client.py
async def options(uri: str, **kwargs: Any) -> Response:
    """OPTIONS request using default client."""
    client = get_default_client()
    return await client.options(uri, **kwargs)

patch async

patch(uri: str, **kwargs: Any) -> Response

PATCH request using default client.

Source code in provide/foundation/transport/client.py
async def patch(uri: str, **kwargs: Any) -> Response:
    """PATCH request using default client."""
    client = get_default_client()
    return await client.patch(uri, **kwargs)

post async

post(uri: str, **kwargs: Any) -> Response

POST request using default client.

Source code in provide/foundation/transport/client.py
async def post(uri: str, **kwargs: Any) -> Response:
    """POST request using default client."""
    client = get_default_client()
    return await client.post(uri, **kwargs)

put async

put(uri: str, **kwargs: Any) -> Response

PUT request using default client.

Source code in provide/foundation/transport/client.py
async def put(uri: str, **kwargs: Any) -> Response:
    """PUT request using default client."""
    client = get_default_client()
    return await client.put(uri, **kwargs)

register_transport

register_transport(
    transport_type: TransportType,
    transport_class: type[Transport],
    schemes: list[str] | None = None,
    **metadata: Any
) -> None

Register a transport implementation in the Hub.

Parameters:

Name Type Description Default
transport_type TransportType

The primary transport type

required
transport_class type[Transport]

Transport implementation class

required
schemes list[str] | None

List of URI schemes this transport handles

None
**metadata Any

Additional metadata for the transport

{}
Source code in provide/foundation/transport/registry.py
def register_transport(
    transport_type: TransportType,
    transport_class: type[Transport],
    schemes: list[str] | None = None,
    **metadata: Any,
) -> None:
    """Register a transport implementation in the Hub.

    Args:
        transport_type: The primary transport type
        transport_class: Transport implementation class
        schemes: List of URI schemes this transport handles
        **metadata: Additional metadata for the transport

    """
    registry = get_component_registry()

    # Default schemes to just the transport type
    if schemes is None:
        schemes = [transport_type.value]

    registry.register(
        name=transport_type.value,
        value=transport_class,
        dimension=ComponentCategory.TRANSPORT.value,
        metadata={
            "transport_type": transport_type,
            "schemes": schemes,
            "class_name": transport_class.__name__,
            **metadata,
        },
        replace=True,  # Allow re-registration
    )

request async

request(
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    **kwargs: Any
) -> Response

Make a request using the default client.

Source code in provide/foundation/transport/client.py
async def request(uri: str, method: str | HTTPMethod = HTTPMethod.GET, **kwargs: Any) -> Response:
    """Make a request using the default client."""
    client = get_default_client()
    return await client.request(uri, method, **kwargs)

stream async

stream(uri: str, **kwargs: Any) -> AsyncIterator[bytes]

Stream data using default client.

Source code in provide/foundation/transport/client.py
async def stream(uri: str, **kwargs: Any) -> AsyncIterator[bytes]:
    """Stream data using default client."""
    client = get_default_client()
    async for chunk in client.stream(uri, **kwargs):
        yield chunk