Skip to content

Core

pyvider.rpcplugin.client.core

Core RPCPluginClient class definition and lifecycle management.

This module contains the main RPCPluginClient class with its attributes, initialization, and core lifecycle methods like start, close, and shutdown.

Classes

RPCPluginClient

Bases: ClientHandshakeMixin, ClientProcessMixin

Client interface for interacting with Terraform-compatible plugin servers.

The RPCPluginClient handles the complete lifecycle of plugin communication: 1. Launching or attaching to a plugin server subprocess 2. Performing handshake, protocol negotiation, and transport selection 3. Setting up secure TLS/mTLS communication when enabled 4. Creating gRPC channels and service stubs 5. Providing plugin logs (stdout/stderr) streaming 6. Managing broker subchannels for multi-service communication 7. Handling graceful shutdown of plugin processes

The client follows the Terraform go-plugin protocol, which includes a standardized handshake format, negotiated protocol version, and support for Unix socket or TCP transport modes.

Attributes:

Name Type Description
command list[str]

List containing the plugin executable command and arguments

config dict[str, Any] | None

Optional configuration dictionary for customizing client behavior

Example
# Create a client for a plugin
client = RPCPluginClient(
    command=["terraform-provider-example"],
    config={"env": {"TF_LOG": "DEBUG"}}
)

# Start the client (launches process, performs handshake, etc.)
await client.start()

# Use the created channel with protocol-specific stubs
provider_stub = MyProviderStub(client.grpc_channel)
response = await provider_stub.SomeMethod(request)

# Graceful shutdown
await client.shutdown_plugin()
await client.close()
Note

The client supports automatic mTLS if enabled in configuration, and can read/generate certificates as needed for secure communication.

Functions
__aenter__ async
__aenter__() -> RPCPluginClient

Async context manager entry.

Source code in pyvider/rpcplugin/client/core.py
async def __aenter__(self) -> RPCPluginClient:
    """Async context manager entry."""
    await self.start()
    return self
__aexit__ async
__aexit__(
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None

Async context manager exit with cleanup.

Source code in pyvider/rpcplugin/client/core.py
async def __aexit__(
    self,
    exc_type: type[BaseException] | None,
    exc_val: BaseException | None,
    exc_tb: TracebackType | None,
) -> None:
    """Async context manager exit with cleanup."""
    try:
        await self.shutdown_plugin()
    except Exception as e:
        self.logger.warning(f"⚠️ Error during shutdown in context manager: {e}", exc_info=True)
    finally:
        await self.close()
__attrs_post_init__
__attrs_post_init__() -> None

Initialize client state after attributes are set.

Source code in pyvider/rpcplugin/client/core.py
def __attrs_post_init__(self) -> None:
    """
    Initialize client state after attributes are set.
    """
    self.logger = logger
close async
close() -> None

Close the client connection and clean up all resources.

This method performs a complete cleanup of the client state, including stopping tasks, closing channels, terminating processes, and cleaning up transport resources.

Source code in pyvider/rpcplugin/client/core.py
async def close(self) -> None:
    """
    Close the client connection and clean up all resources.

    This method performs a complete cleanup of the client state,
    including stopping tasks, closing channels, terminating processes,
    and cleaning up transport resources.
    """
    self.logger.debug("🔒 Closing RPCPluginClient...")

    await self._cancel_tasks()
    await self._close_grpc_channel()
    await self._terminate_process()
    await self._close_transport()
    self._reset_state()
shutdown_plugin async
shutdown_plugin() -> None

Gracefully shutdown the plugin server through gRPC controller.

This method sends a shutdown signal to the plugin server, allowing it to clean up resources before termination.

Source code in pyvider/rpcplugin/client/core.py
async def shutdown_plugin(self) -> None:
    """
    Gracefully shutdown the plugin server through gRPC controller.

    This method sends a shutdown signal to the plugin server, allowing it
    to clean up resources before termination.
    """
    try:
        if self._controller_stub:
            await self._controller_stub.Shutdown(ControllerEmpty())
            self.logger.debug("📤 Shutdown signal sent to plugin.")
        else:
            self.logger.warning("⚠️ No controller stub available for shutdown signal.")
    except grpc.RpcError:
        # Expected behavior when plugin shuts down immediately
        pass
    except Exception as e:
        self.logger.warning(f"⚠️ Error sending shutdown signal to plugin: {e}", exc_info=True)

    # Give the plugin a moment to shut down gracefully
    await asyncio.sleep(DEFAULT_CLEANUP_WAIT_TIME)
start async
start() -> None

Start the plugin client: launch process, perform handshake, create channel.

This is the main entry point for establishing communication with a plugin. It orchestrates the complete connection process.

Raises:

Type Description
HandshakeError

If handshake fails

TransportError

If transport setup fails

ProtocolError

If protocol negotiation fails

Source code in pyvider/rpcplugin/client/core.py
async def start(self) -> None:
    """
    Start the plugin client: launch process, perform handshake, create channel.

    This is the main entry point for establishing communication with a plugin.
    It orchestrates the complete connection process.

    Raises:
        HandshakeError: If handshake fails
        TransportError: If transport setup fails
        ProtocolError: If protocol negotiation fails
    """
    self.logger.debug("🚀 Starting RPCPluginClient...")

    try:
        await self._connect_and_handshake_with_retry()
        self.is_started = True
    except Exception as e:
        self.logger.error(f"❌ Failed to start RPCPluginClient: {e}")
        self._handshake_failed_event.set()
        # Clean up any partial state on start failure
        await self.close()
        raise

Functions