Skip to content

Index

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

pyvider.rpcplugin.client

Pyvider RPC Plugin Client Package.

This package provides the core components for creating RPC plugin clients, including the main RPCPluginClient class, connection handling, and associated types.

Classes

ClientConnection

Represents an active client connection with associated metrics and state.

This class wraps the asyncio StreamReader and StreamWriter with additional functionality for tracking metrics and managing connection state. It now supports dependency injection for its I/O functions, allowing tests or alternative implementations to override the default behavior.

Attributes:

Name Type Description
reader StreamReader

Stream for reading client data.

writer StreamWriter

Stream for writing responses.

remote_addr str

Remote address of the client.

bytes_sent int

Total bytes sent over this connection.

bytes_received int

Total bytes received over this connection.

send_func SendFuncType | None

Callable used to send data; defaults to _default_send.

receive_func ReceiveFuncType | None

Callable used to receive data; defaults to _default_receive.

Attributes
is_closed property
is_closed: bool

Check if the connection is closed.

Functions
__attrs_post_init__
__attrs_post_init__() -> None

Post-initialization hook to set default I/O functions if not provided.

Source code in pyvider/rpcplugin/client/connection.py
def __attrs_post_init__(self) -> None:
    """Post-initialization hook to set default I/O functions if not provided."""
    if self.send_func is None:
        self.send_func = self._default_send
    if self.receive_func is None:
        self.receive_func = self._default_receive
__del__
__del__() -> None

Ensure resources are cleaned up.

Raising exceptions in del is generally discouraged; a warning

is logged instead.

Source code in pyvider/rpcplugin/client/connection.py
def __del__(self) -> None:
    """
    Ensure resources are cleaned up.

    Note: Raising exceptions in __del__ is generally discouraged; a warning
          is logged instead.
    """
    if not self._closed and hasattr(self, "writer"):
        logger.warning(f"Connection to {self.remote_addr} was not properly closed")
close async
close() -> None

Close the connection and clean up resources.

This method is idempotent and can be safely called multiple times.

Source code in pyvider/rpcplugin/client/connection.py
async def close(self) -> None:
    """
    Close the connection and clean up resources.

    This method is idempotent and can be safely called multiple times.
    """
    if self._closed:
        return

    logger.debug(f"Closing connection to {self.remote_addr}")
    self._closed = True

    if not self.writer.is_closing():
        try:
            self.writer.close()
            await self.writer.wait_closed()
            logger.debug(f"Connection to {self.remote_addr} closed successfully")
        except Exception as e:
            logger.error(
                f"Error while closing connection to {self.remote_addr}",
                error=str(e),
            )
receive_data async
receive_data(size: int | None = None) -> bytes

Receive data from the connection using the injected receive_func.

Parameters:

Name Type Description Default
size int | None

Maximum number of bytes to receive.

None

Returns:

Type Description
bytes

Received data as bytes.

Raises:

Type Description
ConnectionError

If the connection is closed.

Source code in pyvider/rpcplugin/client/connection.py
async def receive_data(self, size: int | None = None) -> bytes:
    """
    Receive data from the connection using the injected receive_func.

    Args:
        size: Maximum number of bytes to receive.

    Returns:
        Received data as bytes.

    Raises:
        ConnectionError: If the connection is closed.
    """
    if self.is_closed:
        raise ConnectionError("Attempted to receive data on closed connection")
    if self.receive_func is None:
        # This should ideally not be reached if __attrs_post_init__ ran.
        raise RuntimeError(
            "receive_func was not initialized. This should not happen if "
            "__attrs_post_init__ ran correctly."
        )
    buffer_size = size if size is not None else rpcplugin_config.plugin_buffer_size
    return await self.receive_func(buffer_size)
send_data async
send_data(data: bytes) -> None

Send data over the connection using the injected send_func.

Parameters:

Name Type Description Default
data bytes

Bytes to send.

required

Raises:

Type Description
ConnectionError

If the connection is closed.

Source code in pyvider/rpcplugin/client/connection.py
async def send_data(self, data: bytes) -> None:
    """
    Send data over the connection using the injected send_func.

    Args:
        data: Bytes to send.

    Raises:
        ConnectionError: If the connection is closed.
    """
    if self.is_closed:
        raise ConnectionError("Attempted to send data on closed connection")
    if self.send_func is None:
        # This should ideally not be reached if __attrs_post_init__ ran.
        raise RuntimeError(
            "send_func was not initialized. This should not happen if __attrs_post_init__ ran correctly."
        )
    await self.send_func(data)
update_metrics
update_metrics(
    bytes_sent: int = 0, bytes_received: int = 0
) -> None

Update connection metrics.

Parameters:

Name Type Description Default
bytes_sent int

Number of bytes sent.

0
bytes_received int

Number of bytes received.

0
Source code in pyvider/rpcplugin/client/connection.py
def update_metrics(self, bytes_sent: int = 0, bytes_received: int = 0) -> None:
    """
    Update connection metrics.

    Args:
        bytes_sent: Number of bytes sent.
        bytes_received: Number of bytes received.
    """
    self.bytes_sent += bytes_sent
    self.bytes_received += bytes_received
    logger.debug(
        f"Updated metrics for {self.remote_addr}",
        extra={
            "total_sent": self.bytes_sent,
            "total_received": self.bytes_received,
        },
    )

RPCPluginClient

Bases: ClientHandshakeMixin, ClientProcessMixin

Client interface for interacting with Terraform-compatible plugin servers.

The RPCPluginClient handles the complete lifecycle of plugin communication: 1. Launching or attaching to a plugin server subprocess 2. Performing handshake, protocol negotiation, and transport selection 3. Setting up secure TLS/mTLS communication when enabled 4. Creating gRPC channels and service stubs 5. Providing plugin logs (stdout/stderr) streaming 6. Managing broker subchannels for multi-service communication 7. Handling graceful shutdown of plugin processes

The client follows the Terraform go-plugin protocol, which includes a standardized handshake format, negotiated protocol version, and support for Unix socket or TCP transport modes.

Attributes:

Name Type Description
command list[str]

List containing the plugin executable command and arguments

config dict[str, Any] | None

Optional configuration dictionary for customizing client behavior

Example
# Create a client for a plugin
client = RPCPluginClient(
    command=["terraform-provider-example"],
    config={"env": {"TF_LOG": "DEBUG"}}
)

# Start the client (launches process, performs handshake, etc.)
await client.start()

# Use the created channel with protocol-specific stubs
provider_stub = MyProviderStub(client.grpc_channel)
response = await provider_stub.SomeMethod(request)

# Graceful shutdown
await client.shutdown_plugin()
await client.close()
Note

The client supports automatic mTLS if enabled in configuration, and can read/generate certificates as needed for secure communication.

Functions
__aenter__ async
__aenter__() -> RPCPluginClient

Async context manager entry.

Source code in pyvider/rpcplugin/client/core.py
async def __aenter__(self) -> RPCPluginClient:
    """Async context manager entry."""
    await self.start()
    return self
__aexit__ async
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Async context manager exit with cleanup.

Source code in pyvider/rpcplugin/client/core.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Async context manager exit with cleanup."""
    try:
        await self.shutdown_plugin()
    except Exception as e:
        self.logger.warning(f"โš ๏ธ Error during shutdown in context manager: {e}", exc_info=True)
    finally:
        await self.close()
__attrs_post_init__
__attrs_post_init__() -> None

Initialize client state after attributes are set.

Source code in pyvider/rpcplugin/client/core.py
def __attrs_post_init__(self) -> None:
    """
    Initialize client state after attributes are set.
    """
    self.logger = logger
close async
close() -> None

Close the client connection and clean up all resources.

This method performs a complete cleanup of the client state, including stopping tasks, closing channels, terminating processes, and cleaning up transport resources.

Source code in pyvider/rpcplugin/client/core.py
async def close(self) -> None:
    """
    Close the client connection and clean up all resources.

    This method performs a complete cleanup of the client state,
    including stopping tasks, closing channels, terminating processes,
    and cleaning up transport resources.
    """
    self.logger.debug("๐Ÿ”’ Closing RPCPluginClient...")

    await self._cancel_tasks()
    await self._close_grpc_channel()
    await self._terminate_process()
    await self._close_transport()
    self._reset_state()
shutdown_plugin async
shutdown_plugin() -> None

Gracefully shutdown the plugin server through gRPC controller.

This method sends a shutdown signal to the plugin server, allowing it to clean up resources before termination.

Source code in pyvider/rpcplugin/client/core.py
async def shutdown_plugin(self) -> None:
    """
    Gracefully shutdown the plugin server through gRPC controller.

    This method sends a shutdown signal to the plugin server, allowing it
    to clean up resources before termination.
    """
    try:
        if self._controller_stub:
            await self._controller_stub.Shutdown(ControllerEmpty())
            self.logger.debug("๐Ÿ“ค Shutdown signal sent to plugin.")
        else:
            self.logger.warning("โš ๏ธ No controller stub available for shutdown signal.")
    except grpc.RpcError:
        # Expected behavior when plugin shuts down immediately
        pass
    except Exception as e:
        self.logger.warning(f"โš ๏ธ Error sending shutdown signal to plugin: {e}", exc_info=True)

    # Give the plugin a moment to shut down gracefully
    await asyncio.sleep(DEFAULT_CLEANUP_WAIT_TIME)
start async
start() -> None

Start the plugin client: launch process, perform handshake, create channel.

This is the main entry point for establishing communication with a plugin. It orchestrates the complete connection process.

Raises:

Type Description
HandshakeError

If handshake fails

TransportError

If transport setup fails

ProtocolError

If protocol negotiation fails

Source code in pyvider/rpcplugin/client/core.py
async def start(self) -> None:
    """
    Start the plugin client: launch process, perform handshake, create channel.

    This is the main entry point for establishing communication with a plugin.
    It orchestrates the complete connection process.

    Raises:
        HandshakeError: If handshake fails
        TransportError: If transport setup fails
        ProtocolError: If protocol negotiation fails
    """
    self.logger.debug("๐Ÿš€ Starting RPCPluginClient...")

    try:
        await self._connect_and_handshake_with_retry()
        self.is_started = True
    except Exception as e:
        self.logger.error(f"โŒ Failed to start RPCPluginClient: {e}")
        self._handshake_failed_event.set()
        # Clean up any partial state on start failure
        await self.close()
        raise

SecureRpcClientT

Bases: Protocol

Protocol for an RPC client supporting secure transport and handshake.