Skip to content

Index

pyvider.rpcplugin.transport

Transport Layer for Pyvider RPC Plugin

This package provides the network transport abstractions for communication between plugin clients and servers. It handles the low-level socket operations, connection management, and protocol negotiation.

Key components: - RPCPluginTransport: Base interface for all transport implementations - TCPSocketTransport: TCP socket-based transport implementation - UnixSocketTransport: Unix domain socket-based transport implementation

The transport layer is responsible for: 1. Listening for connections (server-side) 2. Connecting to endpoints (client-side) 3. Managing connection lifecycle and cleanup 4. Ensuring Go-Python interoperability

Classes

RPCPluginTransport

Bases: ABC

Abstract base class defining the interface for all transport implementations.

This class defines the contract that concrete transport implementations must fulfill to provide network communication for plugins. The interface supports both client-side (connect) and server-side (listen) operations.

Implementations must handle: - Connection setup and teardown - Socket lifecycle management - Error handling and reporting - Resource cleanup

Custom transports can be implemented by subclassing this class and implementing the required abstract methods.

Functions
close abstractmethod async
close() -> None

Close the transport and release any associated resources.

Implementations should ensure that all network resources (like sockets) are properly closed and cleaned up. This method should be idempotent.

Raises:

Type Description
TransportError

If an error occurs during closing.

Source code in pyvider/rpcplugin/transport/base.py
@abc.abstractmethod
async def close(self) -> None:  # pragma: no cover
    """
    Close the transport and release any associated resources.

    Implementations should ensure that all network resources (like sockets)
    are properly closed and cleaned up. This method should be idempotent.

    Raises:
        TransportError: If an error occurs during closing.
    """
    ...
connect abstractmethod async
connect(endpoint: str) -> None

Connect to a remote endpoint.

Implementations should establish a connection to the specified endpoint address. This is typically used by client components.

Parameters:

Name Type Description Default
endpoint str

The target endpoint address string.

required

Raises:

Type Description
TransportError

If the connection cannot be established.

Source code in pyvider/rpcplugin/transport/base.py
@abc.abstractmethod
async def connect(self, endpoint: str) -> None:  # pragma: no cover
    """
    Connect to a remote endpoint.

    Implementations should establish a connection to the specified endpoint
    address. This is typically used by client components.

    Args:
        endpoint: The target endpoint address string.

    Raises:
        TransportError: If the connection cannot be established.
    """
    ...
listen abstractmethod async
listen() -> str

Start listening for connections.

Implementations should bind to an appropriate socket or address and begin accepting connections. This is typically used by server components.

Returns:

Type Description
str

The endpoint address as a string (e.g., "127.0.0.1:50051" or

str

"/tmp/socket.sock")

Raises:

Type Description
TransportError

If binding or listening fails

Source code in pyvider/rpcplugin/transport/base.py
@abc.abstractmethod
async def listen(self) -> str:  # pragma: no cover
    """
    Start listening for connections.

    Implementations should bind to an appropriate socket or address and
    begin accepting connections. This is typically used by server components.

    Returns:
        The endpoint address as a string (e.g., "127.0.0.1:50051" or
        "/tmp/socket.sock")

    Raises:
        TransportError: If binding or listening fails
    """
    ...

TCPSocketTransport

Bases: RPCPluginTransport

TCP socket transport for network-based RPC communication.

This transport implementation provides TCP/IP connectivity for RPC plugins, supporting both server (listen) and client (connect) modes. It's suitable for network communication between processes on the same machine or across a network.

The transport handles: - Dynamic port allocation (when port=0) - DNS resolution for hostnames - Connection lifecycle management - Graceful shutdown with timeouts

Attributes:

Name Type Description
host str

IP address or hostname to bind/connect to (default: "127.0.0.1")

port int

Port number to use (0 for dynamic allocation, default: 0)

endpoint str | None

The resolved endpoint string in "host:port" format (set after listen/connect)

Example
# Server mode with dynamic port
transport = TCPSocketTransport()
endpoint = await transport.listen()  # Returns "127.0.0.1:45678"

# Server mode with specific port
transport = TCPSocketTransport(host="0.0.0.0", port=8080)
endpoint = await transport.listen()  # Returns "0.0.0.0:8080"

# Client mode
transport = TCPSocketTransport()
await transport.connect("remote.host:8080")

# Cleanup
await transport.close()
Note

This transport is typically created automatically by the factory functions (plugin_server, plugin_client) rather than instantiated directly. For local IPC on Unix-like systems, prefer UnixSocketTransport for better performance and security.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Initializes locks and events for managing transport state.

Source code in pyvider/rpcplugin/transport/tcp.py
def __attrs_post_init__(self) -> None:
    """Initializes locks and events for managing transport state."""
    self._lock = asyncio.Lock()  # Lock for synchronizing access to shared resources
    self._server_ready = asyncio.Event()  # Event to signal when the server is ready
close async
close() -> None

Closes the TCP transport, including any active server or client connections.

This method is idempotent and ensures that all resources associated with this transport instance are released.

Source code in pyvider/rpcplugin/transport/tcp.py
async def close(self) -> None:
    """
    Closes the TCP transport, including any active server or client connections.

    This method is idempotent and ensures that all resources associated with
    this transport instance are released.
    """

    async with self._lock:
        # Close client connection
        if self._writer:
            try:
                await self._close_writer(self._writer)
            except Exception:
                pass  # Empty except block
            finally:
                self._writer = None
                self._reader = None

        # Close server
        if self._server:
            server_was_serving = self._server.is_serving()  # Store initial state
            try:
                if server_was_serving:
                    self._server.close()  # This is synchronous, initiates closing

                # Only await wait_closed if close was called or it was serving
                if server_was_serving:
                    await asyncio.wait_for(self._server.wait_closed(), timeout=5.0)
                else:  # If it wasn't serving, log that no action was needed.
                    pass
            except TimeoutError:
                endpoint_str = self.endpoint if self.endpoint else "unknown"
                logger.warning(f"Timeout closing TCP server endpoint {endpoint_str}")
            except Exception:
                pass  # Empty except block
            finally:
                self._server = None

        # This should be set regardless of whether self._server (asyncio server)
        # was active, as close() means the transport is shutting down.
        self._running = False

    self.endpoint = None
connect async
connect(endpoint: str) -> None

Connects to a remote TCP endpoint.

The endpoint string must be in the format 'host:port'. This method parses the endpoint, performs DNS resolution, and establishes a connection.

Parameters:

Name Type Description Default
endpoint str

The target TCP endpoint string (e.g., "127.0.0.1:12345").

required

Raises:

Type Description
TransportError

If the endpoint format is invalid, DNS resolution fails, or the connection cannot be established (e.g., timeout, refused).

Source code in pyvider/rpcplugin/transport/tcp.py
async def connect(self, endpoint: str) -> None:
    """
    Connects to a remote TCP endpoint.

    The endpoint string must be in the format 'host:port'. This method
    parses the endpoint, performs DNS resolution, and establishes a
    connection.

    Args:
        endpoint: The target TCP endpoint string (e.g., "127.0.0.1:12345").

    Raises:
        TransportError: If the endpoint format is invalid, DNS resolution fails,
                        or the connection cannot be established
                        (e.g., timeout, refused).
    """
    if not is_valid_tcp_endpoint(endpoint):
        raise TransportError(f"Invalid TCP endpoint format: {endpoint}")

    try:
        # Parse the endpoint
        parts = endpoint.split(":")
        match parts:
            case [host, port_str] if port_str.isdigit():
                self.host = host
                self.port = int(port_str)
                self.endpoint = f"{self.host}:{self.port}"
            case _:
                raise TransportError(f"Unexpected endpoint format: {endpoint}")

        # Perform DNS resolution to ensure the address is reachable.
        try:
            socket.getaddrinfo(self.host, self.port)
        except socket.gaierror as e:
            raise TransportError(f"Address resolution failed for {self.host}:{self.port}: {e}") from e

        try:
            self._reader, self._writer = await asyncio.wait_for(
                asyncio.open_connection(self.host, self.port), timeout=5.0
            )
        except TimeoutError as e_timeout:
            raise TransportError(f"Connection timed out: {e_timeout}") from e_timeout
        except ConnectionRefusedError as e_refused:
            raise TransportError(f"Connection refused: {e_refused}") from e_refused

    except TransportError:
        # Re-raise TransportError without additional wrapping
        raise
    except Exception as e:
        raise TransportError(f"Failed to connect to TCP endpoint {endpoint}: {e}") from e
listen async
listen() -> str

endpoint (host:port).

Source code in pyvider/rpcplugin/transport/tcp.py
async def listen(self) -> str:
    """
    endpoint (host:port).
    """
    async with self._lock:
        if self._running:
            # If gRPC is managing, this might be okay if called multiple times,
            # but for now, let's assume it means endpoint is set.
            if self.endpoint:
                return self.endpoint
            raise TransportError("TCP transport is already configured with an endpoint but it's None.")

        if self.port == 0:
            # Find an ephemeral port
            try:
                temp_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
                temp_sock.bind((self.host, 0))
                self.port = temp_sock.getsockname()[1]
                temp_sock.close()
                logger.info()
            except OSError as e:
                raise TransportError(f"Failed to find an ephemeral port: {e}") from e

        # If self.port was non-zero, we use it directly.
        self.endpoint = f"{self.host}:{self.port}"
        self._running = True  # Mark as "endpoint determined"
        self._server_ready.set()  # Signal readiness (endpoint is known)

        # self._server remains None as gRPC will handle the actual server lifecycle.
        self._server = None

        logger.info(f"(Host: {self.host}, Port: {self.port})")
        return self.endpoint

UnixSocketTransport

Bases: RPCPluginTransport

Unix domain socket transport for local IPC communication.

This transport provides high-performance local inter-process communication using Unix domain sockets. It's the preferred transport for plugin communication on Linux and macOS systems, offering better security and performance than TCP for local connections.

The implementation is compatible with HashiCorp's go-plugin protocol, handling: - Automatic socket path generation when path is None - Path normalization for unix:, unix:/, unix:// prefixes - Proper file permissions (0660) for cross-process access - Socket lifecycle management with cleanup on close - Stale socket detection and removal

Attributes:

Name Type Description
path str | None

Unix socket file path. If None, generates temporary path.

endpoint str | None

The normalized endpoint string (e.g., "unix:/tmp/plugin.sock")

Example
# Server with auto-generated path
transport = UnixSocketTransport()
endpoint = await transport.listen()  # Returns "unix:/tmp/pyvider-xxx.sock"

# Server with specific path
transport = UnixSocketTransport(path="/var/run/myplugin.sock")
endpoint = await transport.listen()  # Returns "unix:/var/run/myplugin.sock"

# Client connection
transport = UnixSocketTransport()
await transport.connect("unix:/tmp/server.sock")

# Cleanup (removes socket file)
await transport.close()
Platform Notes
  • Linux/macOS: Full support with optimal performance
  • Windows: Not supported (use TCPSocketTransport instead)
  • Docker: Ensure socket paths are in shared volumes for cross-container IPC
Security Notes
  • Socket files are created with 0660 permissions (user/group read/write)
  • Consider socket file location for security (avoid world-writable directories)
  • Socket files are automatically removed on close()
Note

This transport is typically created automatically by factory functions (plugin_server, plugin_client) when transport="unix" is specified.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Post-initialization hook for UnixSocketTransport.

If a socket path is not provided, it generates an ephemeral path. Otherwise, it normalizes the provided path. Initializes locks and events.

Source code in pyvider/rpcplugin/transport/unix/transport.py
def __attrs_post_init__(self) -> None:
    """
    Post-initialization hook for UnixSocketTransport.

    If a socket path is not provided, it generates an ephemeral path.
    Otherwise, it normalizes the provided path. Initializes locks and events.
    """
    if not self.path:
        # Generate ephemeral path if none provided
        self.path = str(Path(tempfile.gettempdir()) / f"pyvider-{uuid.uuid4().hex[:8]}.sock")
    else:
        # Normalize path if it has a unix: prefix
        self.path = normalize_unix_path(self.path)

    self._server_ready = asyncio.Event()
    self._connections = set()  # Initialize connection set
close async
close() -> None

Closes the Unix socket transport.

This involves closing any active client connections, stopping the server, and removing the socket file from the filesystem. It is designed to be idempotent.

Source code in pyvider/rpcplugin/transport/unix/transport.py
async def close(self) -> None:
    """
    Closes the Unix socket transport.

    This involves closing any active client connections, stopping the server,
    and removing the socket file from the filesystem.
    It is designed to be idempotent.
    """

    if self._closing:
        return

    self._closing = True
    self._running = False

    try:
        await self._close_connections()
        await self._close_client_connection()
        await self._close_server()
        if self.path:
            await self._remove_socket_file(self.path)
    finally:
        # Always reset state even if socket removal fails
        self.endpoint = None
        self._closing = False
connect async
connect(endpoint: str) -> None

Connect to a remote Unix socket with robust path handling.

This method: 1. Normalizes the endpoint path to handle various formats 2. Verifies the socket file exists (with retries) 3. Establishes the connection with timeout handling

Parameters:

Name Type Description Default
endpoint str

The Unix socket path to connect to, which can be in various formats: - Absolute path: "/tmp/socket.sock" - With prefix: "unix:/tmp/socket.sock"

required

Raises:

Type Description
TransportError

If the socket file doesn't exist or connection fails

TimeoutError

If the connection attempt times out

Source code in pyvider/rpcplugin/transport/unix/transport.py
async def connect(self, endpoint: str) -> None:
    """
    Connect to a remote Unix socket with robust path handling.

    This method:
    1. Normalizes the endpoint path to handle various formats
    2. Verifies the socket file exists (with retries)
    3. Establishes the connection with timeout handling

    Args:
        endpoint: The Unix socket path to connect to, which can be in
                  various formats:
                 - Absolute path: "/tmp/socket.sock"
                 - With prefix: "unix:/tmp/socket.sock"

    Raises:
        TransportError: If the socket file doesn't exist or connection fails
        TimeoutError: If the connection attempt times out
    """
    # Normalize endpoint path
    endpoint = normalize_unix_path(endpoint)

    # Verify socket file exists with retries
    retries = 3
    for attempt in range(retries):
        if Path(endpoint).exists():
            break
        if attempt < retries - 1:
            await asyncio.sleep(0.5)  # Short delay between retries

    if not Path(endpoint).exists():
        raise TransportError(f"Socket {endpoint} does not exist")

    # Add validation that it's actually a socket
    try:
        if not stat.S_ISSOCK(Path(endpoint).stat().st_mode):
            raise TransportError(f"Path exists but is not a socket: {endpoint}")
    except OSError as e:
        raise TransportError(f"Error checking socket status: {e}") from e

    try:
        reader_writer = await asyncio.wait_for(asyncio.open_unix_connection(endpoint), timeout=5.0)
        self._reader, self._writer = reader_writer  # Unpack after awaiting
        self.endpoint = endpoint
    except TimeoutError as e_timeout:
        raise TransportError(f"Connection to Unix socket timed out: {e_timeout}") from e_timeout
    except Exception as e:
        raise TransportError(f"Failed to connect to Unix socket: {e}") from e
listen async
listen() -> str

Start listening on Unix socket with cross-platform compatibility.

Source code in pyvider/rpcplugin/transport/unix/transport.py
async def listen(self) -> str:
    """Start listening on Unix socket with cross-platform compatibility."""
    async with self._lock:
        self._raise_if_running()
        await self._ensure_socket_available()
        socket_path = self._require_socket_path()
        self._ensure_socket_directory(socket_path)
        await self._remove_stale_socket_file(socket_path)
        return await self._start_server_at_path(socket_path)