Skip to content

Client

provide.foundation.transport.client

TODO: Add module docstring.

Classes

UniversalClient

Universal client that works with any transport via Hub registry.

The client uses a TransportCache that automatically evicts transports that exceed the failure threshold (default: 3 consecutive failures).

Functions
__aenter__ async
__aenter__() -> UniversalClient

Context manager entry.

Source code in provide/foundation/transport/client.py
async def __aenter__(self) -> UniversalClient:
    """Context manager entry."""
    return self
__aexit__ async
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: Any,
) -> None

Context manager exit - cleanup all transports.

Source code in provide/foundation/transport/client.py
async def __aexit__(
    self, exc_type: type[BaseException] | None, exc_val: BaseException | None, exc_tb: Any
) -> None:
    """Context manager exit - cleanup all transports."""
    transports = self._cache.clear()
    for transport in transports.values():
        try:
            await transport.disconnect()
        except Exception as e:
            log.error(f"Error disconnecting transport: {e}")
delete async
delete(uri: str, **kwargs: Any) -> Response

DELETE request.

Source code in provide/foundation/transport/client.py
async def delete(self, uri: str, **kwargs: Any) -> Response:
    """DELETE request."""
    return await self.request(uri, HTTPMethod.DELETE, **kwargs)
get async
get(uri: str, **kwargs: Any) -> Response

GET request.

Source code in provide/foundation/transport/client.py
async def get(self, uri: str, **kwargs: Any) -> Response:
    """GET request."""
    return await self.request(uri, HTTPMethod.GET, **kwargs)
head async
head(uri: str, **kwargs: Any) -> Response

HEAD request.

Source code in provide/foundation/transport/client.py
async def head(self, uri: str, **kwargs: Any) -> Response:
    """HEAD request."""
    return await self.request(uri, HTTPMethod.HEAD, **kwargs)
options async
options(uri: str, **kwargs: Any) -> Response

OPTIONS request.

Source code in provide/foundation/transport/client.py
async def options(self, uri: str, **kwargs: Any) -> Response:
    """OPTIONS request."""
    return await self.request(uri, HTTPMethod.OPTIONS, **kwargs)
patch async
patch(uri: str, **kwargs: Any) -> Response

PATCH request.

Source code in provide/foundation/transport/client.py
async def patch(self, uri: str, **kwargs: Any) -> Response:
    """PATCH request."""
    return await self.request(uri, HTTPMethod.PATCH, **kwargs)
post async
post(uri: str, **kwargs: Any) -> Response

POST request.

Source code in provide/foundation/transport/client.py
async def post(self, uri: str, **kwargs: Any) -> Response:
    """POST request."""
    return await self.request(uri, HTTPMethod.POST, **kwargs)
put async
put(uri: str, **kwargs: Any) -> Response

PUT request.

Source code in provide/foundation/transport/client.py
async def put(self, uri: str, **kwargs: Any) -> Response:
    """PUT request."""
    return await self.request(uri, HTTPMethod.PUT, **kwargs)
request async
request(
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    *,
    headers: Headers | None = None,
    params: Params | None = None,
    body: Data = None,
    timeout: float | None = None,
    **kwargs: Any
) -> Response

Make a request using appropriate transport.

Parameters:

Name Type Description Default
uri str

Full URI to make request to

required
method str | HTTPMethod

HTTP method or protocol-specific method

GET
headers Headers | None

Request headers

None
params Params | None

Query parameters

None
body Data

Request body (dict for JSON, str/bytes for raw)

None
timeout float | None

Request timeout override

None
**kwargs Any

Additional request metadata

{}

Returns:

Type Description
Response

Response from the transport

Source code in provide/foundation/transport/client.py
async def request(
    self,
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    *,
    headers: Headers | None = None,
    params: Params | None = None,
    body: Data = None,
    timeout: float | None = None,
    **kwargs: Any,
) -> Response:
    """Make a request using appropriate transport.

    Args:
        uri: Full URI to make request to
        method: HTTP method or protocol-specific method
        headers: Request headers
        params: Query parameters
        body: Request body (dict for JSON, str/bytes for raw)
        timeout: Request timeout override
        **kwargs: Additional request metadata

    Returns:
        Response from the transport

    """
    # Normalize method
    if isinstance(method, HTTPMethod):
        method = method.value

    # Merge headers
    request_headers = dict(self.default_headers)
    if headers:
        request_headers.update(headers)

    # Create request object
    request = Request(
        uri=uri,
        method=method,
        headers=request_headers,
        params=params or {},
        body=body,
        timeout=timeout or self.default_timeout,
        metadata=kwargs,
    )

    # Process through middleware
    request = await self.middleware.process_request(request)

    try:
        # Get transport for this URI
        transport = await self._get_transport(request.transport_type.value)

        # Execute request
        response = await transport.execute(request)

        # Mark success in cache
        self._cache.mark_success(request.transport_type.value)

        # Process response through middleware
        response = await self.middleware.process_response(response)

        return response

    except Exception as e:
        # Mark failure if it's a transport error
        if isinstance(e, TransportError):
            self._cache.mark_failure(request.transport_type.value, e)

        # Process error through middleware
        e = await self.middleware.process_error(e, request)
        raise e
reset_transport_cache
reset_transport_cache() -> None

Reset the transport cache.

Useful for testing or forcing reconnection after configuration changes.

Source code in provide/foundation/transport/client.py
def reset_transport_cache(self) -> None:
    """Reset the transport cache.

    Useful for testing or forcing reconnection after configuration changes.
    """
    log.info("🔄 Resetting transport cache")
    self._cache.clear()
stream async
stream(
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    **kwargs: Any
) -> AsyncIterator[bytes]

Stream data from URI.

Parameters:

Name Type Description Default
uri str

URI to stream from

required
method str | HTTPMethod

HTTP method or protocol-specific method

GET
**kwargs Any

Additional request parameters

{}

Yields:

Type Description
AsyncIterator[bytes]

Chunks of response data

Source code in provide/foundation/transport/client.py
async def stream(
    self,
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    **kwargs: Any,
) -> AsyncIterator[bytes]:
    """Stream data from URI.

    Args:
        uri: URI to stream from
        method: HTTP method or protocol-specific method
        **kwargs: Additional request parameters

    Yields:
        Chunks of response data

    """
    # Normalize method
    if isinstance(method, HTTPMethod):
        method = method.value

    # Create request
    request = Request(uri=uri, method=method, headers=dict(self.default_headers), **kwargs)

    # Get transport
    transport = await self._get_transport(request.transport_type.value)

    # Stream response
    async for chunk in transport.stream(request):
        yield chunk

Functions

delete async

delete(uri: str, **kwargs: Any) -> Response

DELETE request using default client.

Source code in provide/foundation/transport/client.py
async def delete(uri: str, **kwargs: Any) -> Response:
    """DELETE request using default client."""
    client = get_default_client()
    return await client.delete(uri, **kwargs)

get async

get(uri: str, **kwargs: Any) -> Response

GET request using default client.

Source code in provide/foundation/transport/client.py
async def get(uri: str, **kwargs: Any) -> Response:
    """GET request using default client."""
    client = get_default_client()
    return await client.get(uri, **kwargs)

get_default_client

get_default_client() -> UniversalClient

Get or create the default client instance.

This function acts as the composition root for the default client, preserving backward compatibility for public convenience functions.

Source code in provide/foundation/transport/client.py
def get_default_client() -> UniversalClient:
    """Get or create the default client instance.

    This function acts as the composition root for the default client,
    preserving backward compatibility for public convenience functions.
    """
    global _default_client
    if _default_client is None:
        _default_client = UniversalClient(hub=get_hub())
    return _default_client

head async

head(uri: str, **kwargs: Any) -> Response

HEAD request using default client.

Source code in provide/foundation/transport/client.py
async def head(uri: str, **kwargs: Any) -> Response:
    """HEAD request using default client."""
    client = get_default_client()
    return await client.head(uri, **kwargs)

options async

options(uri: str, **kwargs: Any) -> Response

OPTIONS request using default client.

Source code in provide/foundation/transport/client.py
async def options(uri: str, **kwargs: Any) -> Response:
    """OPTIONS request using default client."""
    client = get_default_client()
    return await client.options(uri, **kwargs)

patch async

patch(uri: str, **kwargs: Any) -> Response

PATCH request using default client.

Source code in provide/foundation/transport/client.py
async def patch(uri: str, **kwargs: Any) -> Response:
    """PATCH request using default client."""
    client = get_default_client()
    return await client.patch(uri, **kwargs)

post async

post(uri: str, **kwargs: Any) -> Response

POST request using default client.

Source code in provide/foundation/transport/client.py
async def post(uri: str, **kwargs: Any) -> Response:
    """POST request using default client."""
    client = get_default_client()
    return await client.post(uri, **kwargs)

put async

put(uri: str, **kwargs: Any) -> Response

PUT request using default client.

Source code in provide/foundation/transport/client.py
async def put(uri: str, **kwargs: Any) -> Response:
    """PUT request using default client."""
    client = get_default_client()
    return await client.put(uri, **kwargs)

request async

request(
    uri: str,
    method: str | HTTPMethod = HTTPMethod.GET,
    **kwargs: Any
) -> Response

Make a request using the default client.

Source code in provide/foundation/transport/client.py
async def request(uri: str, method: str | HTTPMethod = HTTPMethod.GET, **kwargs: Any) -> Response:
    """Make a request using the default client."""
    client = get_default_client()
    return await client.request(uri, method, **kwargs)

stream async

stream(uri: str, **kwargs: Any) -> AsyncIterator[bytes]

Stream data using default client.

Source code in provide/foundation/transport/client.py
async def stream(uri: str, **kwargs: Any) -> AsyncIterator[bytes]:
    """Stream data using default client."""
    client = get_default_client()
    async for chunk in client.stream(uri, **kwargs):
        yield chunk