Skip to content

Index

pyvider.rpcplugin.transport.unix

Unix Domain Socket Transport Package.

This package provides Unix domain socket transport implementation and utilities for the Pyvider RPC Plugin system.

Classes

UnixSocketTransport

Bases: RPCPluginTransport

Unix domain socket transport for local IPC communication.

This transport provides high-performance local inter-process communication using Unix domain sockets. It's the preferred transport for plugin communication on Linux and macOS systems, offering better security and performance than TCP for local connections.

The implementation is compatible with HashiCorp's go-plugin protocol, handling: - Automatic socket path generation when path is None - Path normalization for unix:, unix:/, unix:// prefixes - Proper file permissions (0660) for cross-process access - Socket lifecycle management with cleanup on close - Stale socket detection and removal

Attributes:

Name Type Description
path str | None

Unix socket file path. If None, generates temporary path.

endpoint str | None

The normalized endpoint string (e.g., "unix:/tmp/plugin.sock")

Example
# Server with auto-generated path
transport = UnixSocketTransport()
endpoint = await transport.listen()  # Returns "unix:/tmp/pyvider-xxx.sock"

# Server with specific path
transport = UnixSocketTransport(path="/var/run/myplugin.sock")
endpoint = await transport.listen()  # Returns "unix:/var/run/myplugin.sock"

# Client connection
transport = UnixSocketTransport()
await transport.connect("unix:/tmp/server.sock")

# Cleanup (removes socket file)
await transport.close()
Platform Notes
  • Linux/macOS: Full support with optimal performance
  • Windows: Not supported (use TCPSocketTransport instead)
  • Docker: Ensure socket paths are in shared volumes for cross-container IPC
Security Notes
  • Socket files are created with 0660 permissions (user/group read/write)
  • Consider socket file location for security (avoid world-writable directories)
  • Socket files are automatically removed on close()
Note

This transport is typically created automatically by factory functions (plugin_server, plugin_client) when transport="unix" is specified.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Post-initialization hook for UnixSocketTransport.

If a socket path is not provided, it generates an ephemeral path. Otherwise, it normalizes the provided path. Initializes locks and events.

Source code in pyvider/rpcplugin/transport/unix/transport.py
def __attrs_post_init__(self) -> None:
    """
    Post-initialization hook for UnixSocketTransport.

    If a socket path is not provided, it generates an ephemeral path.
    Otherwise, it normalizes the provided path. Initializes locks and events.
    """
    if not self.path:
        # Generate ephemeral path if none provided
        self.path = str(Path(tempfile.gettempdir()) / f"pyvider-{uuid.uuid4().hex[:8]}.sock")
    else:
        # Normalize path if it has a unix: prefix
        self.path = normalize_unix_path(self.path)

    self._server_ready = asyncio.Event()
    self._connections = set()  # Initialize connection set
close async
close() -> None

Closes the Unix socket transport.

This involves closing any active client connections, stopping the server, and removing the socket file from the filesystem. It is designed to be idempotent.

Source code in pyvider/rpcplugin/transport/unix/transport.py
async def close(self) -> None:
    """
    Closes the Unix socket transport.

    This involves closing any active client connections, stopping the server,
    and removing the socket file from the filesystem.
    It is designed to be idempotent.
    """

    if self._closing:
        return

    self._closing = True
    self._running = False

    try:
        await self._close_connections()
        await self._close_client_connection()
        await self._close_server()
        if self.path:
            await self._remove_socket_file(self.path)
    finally:
        # Always reset state even if socket removal fails
        self.endpoint = None
        self._closing = False
connect async
connect(endpoint: str) -> None

Connect to a remote Unix socket with robust path handling.

This method: 1. Normalizes the endpoint path to handle various formats 2. Verifies the socket file exists (with retries) 3. Establishes the connection with timeout handling

Parameters:

Name Type Description Default
endpoint str

The Unix socket path to connect to, which can be in various formats: - Absolute path: "/tmp/socket.sock" - With prefix: "unix:/tmp/socket.sock"

required

Raises:

Type Description
TransportError

If the socket file doesn't exist or connection fails

TimeoutError

If the connection attempt times out

Source code in pyvider/rpcplugin/transport/unix/transport.py
async def connect(self, endpoint: str) -> None:
    """
    Connect to a remote Unix socket with robust path handling.

    This method:
    1. Normalizes the endpoint path to handle various formats
    2. Verifies the socket file exists (with retries)
    3. Establishes the connection with timeout handling

    Args:
        endpoint: The Unix socket path to connect to, which can be in
                  various formats:
                 - Absolute path: "/tmp/socket.sock"
                 - With prefix: "unix:/tmp/socket.sock"

    Raises:
        TransportError: If the socket file doesn't exist or connection fails
        TimeoutError: If the connection attempt times out
    """
    # Normalize endpoint path
    endpoint = normalize_unix_path(endpoint)

    # Verify socket file exists with retries
    retries = 3
    for attempt in range(retries):
        if Path(endpoint).exists():
            break
        if attempt < retries - 1:
            await asyncio.sleep(0.5)  # Short delay between retries

    if not Path(endpoint).exists():
        raise TransportError(f"Socket {endpoint} does not exist")

    # Add validation that it's actually a socket
    try:
        if not stat.S_ISSOCK(Path(endpoint).stat().st_mode):
            raise TransportError(f"Path exists but is not a socket: {endpoint}")
    except OSError as e:
        raise TransportError(f"Error checking socket status: {e}") from e

    try:
        reader_writer = await asyncio.wait_for(asyncio.open_unix_connection(endpoint), timeout=5.0)
        self._reader, self._writer = reader_writer  # Unpack after awaiting
        self.endpoint = endpoint
    except TimeoutError as e_timeout:
        raise TransportError(f"Connection to Unix socket timed out: {e_timeout}") from e_timeout
    except Exception as e:
        raise TransportError(f"Failed to connect to Unix socket: {e}") from e
listen async
listen() -> str

Start listening on Unix socket with cross-platform compatibility.

Source code in pyvider/rpcplugin/transport/unix/transport.py
async def listen(self) -> str:
    """Start listening on Unix socket with cross-platform compatibility."""
    async with self._lock:
        self._raise_if_running()
        await self._ensure_socket_available()
        socket_path = self._require_socket_path()
        self._ensure_socket_directory(socket_path)
        await self._remove_stale_socket_file(socket_path)
        return await self._start_server_at_path(socket_path)

Functions

normalize_unix_path

normalize_unix_path(path: str) -> str

Standardized Unix socket path normalization, handling: - unix: prefix - unix:/ prefix - unix:// prefix - Multiple leading slashes

Returns a clean path suitable for socket operations.

Source code in pyvider/rpcplugin/transport/unix/utils.py
def normalize_unix_path(path: str) -> str:
    """
    Standardized Unix socket path normalization, handling:
    - unix: prefix
    - unix:/ prefix
    - unix:// prefix
    - Multiple leading slashes

    Returns a clean path suitable for socket operations.
    """

    # Handle unix: prefix formats
    if path.startswith("unix:"):
        path = path[5:]  # Remove 'unix:'

    # Handle multiple leading slashes
    if path.startswith("//"):
        # Split by / and rebuild with single leading slash
        parts = [p for p in path.split("/") if p]
        path = "/" + "/".join(parts)
    elif path.startswith("/"):
        # Keep absolute paths as-is
        pass
    # Relative paths remain unchanged

    return path