Skip to content

Core

pyvider.rpcplugin.server.core

Core RPCPluginServer class definition and lifecycle management.

This module contains the main RPCPluginServer class with its attributes, initialization, configuration, and core server lifecycle methods.

Classes

RPCPluginServer

Bases: Generic[ServerT, HandlerT, TransportT], ServerNetworkMixin

Server interface for hosting Terraform-compatible plugin services.

The RPCPluginServer handles the complete lifecycle of plugin hosting: 1. Transport setup (Unix socket or TCP) with optional mTLS 2. Handshake protocol negotiation with clients 3. gRPC server initialization and service registration 4. Rate limiting and health check services 5. Signal handling for graceful shutdown 6. Optional shutdown file monitoring

The server follows the Terraform go-plugin protocol, which includes a standardized handshake format, negotiated protocol version, and support for Unix socket or TCP transport modes.

Attributes:

Name Type Description
protocol RPCPluginProtocol[ServerT, HandlerT]

Protocol implementation for the plugin service

handler HandlerT

Service handler instance for the protocol

config dict[str, Any] | None

Optional configuration dictionary for customizing server behavior

transport TransportT | None

Optional pre-configured transport instance

Example
# Create a server for a plugin
server = RPCPluginServer(
    protocol=MyProtocol(),
    handler=MyServiceHandler(),
    config={"PLUGIN_AUTO_MTLS": True}
)

# Start the server (setup transport, handshake, serve)
await server.serve()
Note

The server supports automatic mTLS if enabled in configuration, and can generate certificates as needed for secure communication.

Functions
serve async
serve() -> None

Start the plugin server and serve until shutdown.

This is the main entry point for running the server. It orchestrates the complete server lifecycle including transport setup, handshake, gRPC server initialization, and serving until shutdown.

Raises:

Type Description
TransportError

If transport setup fails

ProtocolError

If handshake or protocol setup fails

Source code in pyvider/rpcplugin/server/core.py
async def serve(self) -> None:
    """
    Start the plugin server and serve until shutdown.

    This is the main entry point for running the server. It orchestrates
    the complete server lifecycle including transport setup, handshake,
    gRPC server initialization, and serving until shutdown.

    Raises:
        TransportError: If transport setup fails
        ProtocolError: If handshake or protocol setup fails
    """
    if _tracer:
        with _tracer.start_as_current_span("rpc.server.serve") as span:
            span.set_attribute("component", "server")
            await self._serve_impl()
    else:
        await self._serve_impl()
stop async
stop() -> None

Stop the server and clean up resources.

This method performs graceful shutdown of the server including stopping the gRPC server, cleaning up transport resources, and canceling background tasks.

Source code in pyvider/rpcplugin/server/core.py
async def stop(self) -> None:
    """
    Stop the server and clean up resources.

    This method performs graceful shutdown of the server including
    stopping the gRPC server, cleaning up transport resources,
    and canceling background tasks.
    """
    logger.info("🔒 Stopping RPCPluginServer...")

    # Cancel shutdown watcher task
    if self._shutdown_watcher_task and not self._shutdown_watcher_task.done():
        self._shutdown_watcher_task.cancel()
        with contextlib.suppress(asyncio.CancelledError):
            await self._shutdown_watcher_task

    # Stop gRPC server
    if self._server is not None:
        logger.debug("Stopping gRPC server...")
        server_to_stop = cast(grpc.aio.Server, self._server)
        await server_to_stop.stop(grace=0.5)
        self._server = None

    # Clean up transport
    if self._transport is not None:
        logger.debug("Closing transport...")
        transport_to_close = cast(RPCPluginTransportType, self._transport)
        await transport_to_close.close()
        self._transport = None

    # Complete the serving future if not already done
    if not self._serving_future.done():
        self._serving_future.set_result(None)

    # Exit if configured to do so (only in non-test environments)
    if self._exit_on_stop and not os.environ.get("PYTEST_CURRENT_TEST"):
        logger.info("âš¡ Exiting process...")
        sys.exit(0)
wait_for_server_ready async
wait_for_server_ready(timeout: float | None = None) -> None

Wait for the server to be ready to accept connections.

Parameters:

Name Type Description Default
timeout float | None

Maximum time to wait for server readiness

None

Raises:

Type Description
TimeoutError

If server doesn't become ready within timeout

TransportError

If server setup fails

Source code in pyvider/rpcplugin/server/core.py
async def wait_for_server_ready(self, timeout: float | None = None) -> None:
    """
    Wait for the server to be ready to accept connections.

    Args:
        timeout: Maximum time to wait for server readiness

    Raises:
        TimeoutError: If server doesn't become ready within timeout
        TransportError: If server setup fails
    """
    if timeout is None:
        timeout = rpcplugin_config.plugin_server_ready_timeout

    logger.debug(f"Waiting for server to be ready (timeout: {timeout}s)")

    try:
        await asyncio.wait_for(self._serving_event.wait(), timeout=timeout)
        logger.debug("Server serving event is set")

        # Perform transport-specific readiness checks
        await self._verify_transport_readiness()

        logger.debug("Server is ready")
    except TimeoutError as e:
        error_msg = f"Server failed to become ready within {timeout} seconds"
        logger.error(error_msg)
        raise TimeoutError(error_msg) from e

RateLimitingInterceptor

RateLimitingInterceptor(limiter: TokenBucketRateLimiter)

Bases: ServerInterceptor

gRPC server interceptor for request rate limiting.

This interceptor uses a token bucket algorithm to limit the rate of incoming requests. When the rate limit is exceeded, requests are rejected with a RESOURCE_EXHAUSTED status code.

The interceptor integrates with Foundation's TokenBucketRateLimiter to provide configurable rate limiting with burst capacity support.

Attributes:

Name Type Description
_limiter

Token bucket rate limiter instance

Example
from provide.foundation.utils.rate_limiting import TokenBucketRateLimiter

# Create rate limiter: 100 requests per second, burst of 200
limiter = TokenBucketRateLimiter(
    tokens_per_second=100,
    bucket_size=200
)

# Add interceptor to server
interceptor = RateLimitingInterceptor(limiter)
server = grpc.aio.Server(interceptors=[interceptor])
Note

The rate limiter is shared across all requests to the server. For per-method or per-client rate limiting, implement a custom interceptor with multiple limiters.

Initialize the rate limiting interceptor.

Parameters:

Name Type Description Default
limiter TokenBucketRateLimiter

Token bucket rate limiter to use for request throttling. Controls both the sustained rate (tokens per second) and burst capacity (bucket size).

required
Source code in pyvider/rpcplugin/server/core.py
def __init__(self, limiter: TokenBucketRateLimiter) -> None:
    """
    Initialize the rate limiting interceptor.

    Args:
        limiter: Token bucket rate limiter to use for request throttling.
                 Controls both the sustained rate (tokens per second) and
                 burst capacity (bucket size).
    """
    self._limiter = limiter
Functions
intercept_service async
intercept_service(
    continuation: Callable[
        [HandlerCallDetails],
        Awaitable[RpcMethodHandler[Any, Any]],
    ],
    handler_call_details: HandlerCallDetails,
) -> grpc.RpcMethodHandler[Any, Any]

Intercept incoming RPC calls for rate limiting.

This method is called for each incoming RPC request. It checks if the request can proceed based on the rate limiter's token availability. If tokens are available, the request continues; otherwise, it's rejected.

Parameters:

Name Type Description Default
continuation Callable[[HandlerCallDetails], Awaitable[RpcMethodHandler[Any, Any]]]

Callable to continue processing the request if allowed.

required
handler_call_details HandlerCallDetails

Details about the incoming RPC call, including the method name and invocation metadata.

required

Returns:

Type Description
RpcMethodHandler[Any, Any]

The RPC method handler if the request is allowed.

Raises:

Type Description
AbortError

With RESOURCE_EXHAUSTED status when rate limit is exceeded. Clients should implement exponential backoff when receiving this error.

Source code in pyvider/rpcplugin/server/core.py
async def intercept_service(
    self,
    continuation: Callable[[grpc.HandlerCallDetails], Awaitable[grpc.RpcMethodHandler[Any, Any]]],
    handler_call_details: grpc.HandlerCallDetails,
) -> grpc.RpcMethodHandler[Any, Any]:
    """
    Intercept incoming RPC calls for rate limiting.

    This method is called for each incoming RPC request. It checks if the
    request can proceed based on the rate limiter's token availability.
    If tokens are available, the request continues; otherwise, it's rejected.

    Args:
        continuation: Callable to continue processing the request if allowed.
        handler_call_details: Details about the incoming RPC call, including
                            the method name and invocation metadata.

    Returns:
        The RPC method handler if the request is allowed.

    Raises:
        grpc.aio.AbortError: With RESOURCE_EXHAUSTED status when rate limit
                            is exceeded. Clients should implement exponential
                            backoff when receiving this error.
    """
    if not await self._limiter.is_allowed():
        raise grpc.aio.AbortError(grpc.StatusCode.RESOURCE_EXHAUSTED, "Rate limit exceeded.")
    return await continuation(handler_call_details)

Functions