Skip to content

Connection

pyvider.rpcplugin.client.connection

Client Connection Management.

This module defines the ClientConnection class, responsible for managing the state and I/O operations of a single client connection within the Pyvider RPC Plugin system. It includes metrics tracking and supports dependency injection for I/O functions to facilitate testing.

Classes

ClientConnection

Represents an active client connection with associated metrics and state.

This class wraps the asyncio StreamReader and StreamWriter with additional functionality for tracking metrics and managing connection state. It now supports dependency injection for its I/O functions, allowing tests or alternative implementations to override the default behavior.

Attributes:

Name Type Description
reader StreamReader

Stream for reading client data.

writer StreamWriter

Stream for writing responses.

remote_addr str

Remote address of the client.

bytes_sent int

Total bytes sent over this connection.

bytes_received int

Total bytes received over this connection.

send_func SendFuncType | None

Callable used to send data; defaults to _default_send.

receive_func ReceiveFuncType | None

Callable used to receive data; defaults to _default_receive.

Attributes
is_closed property
is_closed: bool

Check if the connection is closed.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Post-initialization hook to set default I/O functions if not provided.

Source code in pyvider/rpcplugin/client/connection.py
def __attrs_post_init__(self) -> None:
    """Post-initialization hook to set default I/O functions if not provided."""
    if self.send_func is None:
        self.send_func = self._default_send
    if self.receive_func is None:
        self.receive_func = self._default_receive
__del__
__del__() -> None

Ensure resources are cleaned up.

Raising exceptions in del is generally discouraged; a warning

is logged instead.

Source code in pyvider/rpcplugin/client/connection.py
def __del__(self) -> None:
    """
    Ensure resources are cleaned up.

    Note: Raising exceptions in __del__ is generally discouraged; a warning
          is logged instead.
    """
    if not self._closed and hasattr(self, "writer"):
        logger.warning(f"Connection to {self.remote_addr} was not properly closed")
close async
close() -> None

Close the connection and clean up resources.

This method is idempotent and can be safely called multiple times.

Source code in pyvider/rpcplugin/client/connection.py
async def close(self) -> None:
    """
    Close the connection and clean up resources.

    This method is idempotent and can be safely called multiple times.
    """
    if self._closed:
        return

    logger.debug(f"Closing connection to {self.remote_addr}")
    self._closed = True

    if not self.writer.is_closing():
        try:
            self.writer.close()
            await self.writer.wait_closed()
            logger.debug(f"Connection to {self.remote_addr} closed successfully")
        except Exception as e:
            logger.error(
                f"Error while closing connection to {self.remote_addr}",
                error=str(e),
            )
receive_data async
receive_data(size: int | None = None) -> bytes

Receive data from the connection using the injected receive_func.

Parameters:

Name Type Description Default
size int | None

Maximum number of bytes to receive.

None

Returns:

Type Description
bytes

Received data as bytes.

Raises:

Type Description
ConnectionError

If the connection is closed.

Source code in pyvider/rpcplugin/client/connection.py
async def receive_data(self, size: int | None = None) -> bytes:
    """
    Receive data from the connection using the injected receive_func.

    Args:
        size: Maximum number of bytes to receive.

    Returns:
        Received data as bytes.

    Raises:
        ConnectionError: If the connection is closed.
    """
    if self.is_closed:
        raise ConnectionError("Attempted to receive data on closed connection")
    if self.receive_func is None:
        # This should ideally not be reached if __attrs_post_init__ ran.
        raise RuntimeError(
            "receive_func was not initialized. This should not happen if "
            "__attrs_post_init__ ran correctly."
        )
    buffer_size = size if size is not None else rpcplugin_config.plugin_buffer_size
    return await self.receive_func(buffer_size)
send_data async
send_data(data: bytes) -> None

Send data over the connection using the injected send_func.

Parameters:

Name Type Description Default
data bytes

Bytes to send.

required

Raises:

Type Description
ConnectionError

If the connection is closed.

Source code in pyvider/rpcplugin/client/connection.py
async def send_data(self, data: bytes) -> None:
    """
    Send data over the connection using the injected send_func.

    Args:
        data: Bytes to send.

    Raises:
        ConnectionError: If the connection is closed.
    """
    if self.is_closed:
        raise ConnectionError("Attempted to send data on closed connection")
    if self.send_func is None:
        # This should ideally not be reached if __attrs_post_init__ ran.
        raise RuntimeError(
            "send_func was not initialized. This should not happen if __attrs_post_init__ ran correctly."
        )
    await self.send_func(data)
update_metrics
update_metrics(
    bytes_sent: int = 0, bytes_received: int = 0
) -> None

Update connection metrics.

Parameters:

Name Type Description Default
bytes_sent int

Number of bytes sent.

0
bytes_received int

Number of bytes received.

0
Source code in pyvider/rpcplugin/client/connection.py
def update_metrics(self, bytes_sent: int = 0, bytes_received: int = 0) -> None:
    """
    Update connection metrics.

    Args:
        bytes_sent: Number of bytes sent.
        bytes_received: Number of bytes received.
    """
    self.bytes_sent += bytes_sent
    self.bytes_received += bytes_received
    logger.debug(
        f"Updated metrics for {self.remote_addr}",
        extra={
            "total_sent": self.bytes_sent,
            "total_received": self.bytes_received,
        },
    )

Functions