Skip to content

RPC Architecture

The Pyvider RPC Plugin system implements a robust RPC (Remote Procedure Call) architecture built on gRPC, designed for secure, high-performance communication between host applications and plugin processes.

Overview

The RPC architecture follows a client-server model where:

  • Host Application acts as the client, launching and communicating with plugins
  • Plugin Process acts as the server, providing RPC services to the host
  • Communication happens over Unix sockets or TCP with optional mTLS encryption
  • Protocol uses gRPC with automatic service discovery and method routing
┌─────────────────────────────────────────────────────────────────┐
│ Host Application Process                                        │
│                                                                 │
│  ┌─────────────────┐                                           │
│  │ RPCPluginClient │                                           │
│  │                 │                                           │
│  │ ┌─────────────┐ │            gRPC/HTTP2                    │
│  │ │ Service     │ │ ◄─────────────────────────────────────┐  │
│  │ │ Stubs       │ │                                        │  │
│  │ └─────────────┘ │                                        │  │
│  │                 │                                        │  │
│  │ ┌─────────────┐ │                                        │  │
│  │ │ Transport   │ │                                        │  │
│  │ │ Layer       │ │                                        │  │
│  │ └─────────────┘ │                                        │  │
│  └─────────────────┘                                        │  │
└──────────────────────────────────────────────────────────────┼──┘
┌──────────────────────────────────────────────────────────────┼──┐
│ Plugin Process                                               │  │
│                                                              │  │
│  ┌─────────────────┐                                        │  │
│  │ RPCPluginServer │                                        │  │
│  │                 │                                        │  │
│  │ ┌─────────────┐ │                                        │  │
│  │ │ gRPC Server │ │ ◄──────────────────────────────────────┘  │
│  │ │             │ │                                           │
│  │ │ ┌─────────┐ │ │                                           │
│  │ │ │Service  │ │ │                                           │
│  │ │ │Handler  │ │ │                                           │
│  │ │ └─────────┘ │ │                                           │
│  │ └─────────────┘ │                                           │
│  │                 │                                           │
│  │ ┌─────────────┐ │                                           │
│  │ │ Transport   │ │                                           │
│  │ │ Layer       │ │                                           │
│  │ └─────────────┘ │                                           │
│  └─────────────────┘                                           │
└─────────────────────────────────────────────────────────────────┘

Core Components

1. gRPC Foundation

The architecture is built on gRPC, providing:

  • Protocol Buffers for efficient serialization
  • HTTP/2 for multiplexed, bidirectional communication
  • Streaming Support for real-time data flows
  • Built-in Load Balancing and connection management
  • Rich Error Handling with status codes and metadata
# Example service definition (proto file)
syntax = "proto3";

package example;

service DataProcessor {
    // Unary RPC - single request, single response
    rpc ProcessData(DataRequest) returns (DataResponse);

    // Server streaming - single request, multiple responses
    rpc StreamResults(QueryRequest) returns (stream ResultItem);

    // Client streaming - multiple requests, single response
    rpc UploadData(stream DataChunk) returns (UploadResponse);

    // Bidirectional streaming - multiple requests and responses
    rpc InteractiveSession(stream SessionMessage) returns (stream SessionResponse);
}

message DataRequest {
    string data = 1;
    map<string, string> options = 2;
}

message DataResponse {
    string result = 1;
    int32 status_code = 2;
}

2. Service Discovery

The RPC architecture includes automatic service discovery:

# Client automatically discovers available services
async with plugin_client(command=["python", "-m", "my_plugin"]) as client:
    # Available services are automatically exposed as attributes
    response = await client.data_processor.ProcessData(
        data="example",
        options={"format": "json"}
    )

3. Method Routing

gRPC provides automatic method routing and dispatching:

# Server-side service implementation
class DataProcessorServicer:
    async def ProcessData(self, request, context):
        """Handle ProcessData RPC method."""
        logger.info(f"Processing data: {request.data}")

        # Business logic implementation
        result = await self.process_business_logic(request.data)

        return DataResponse(
            result=result,
            status_code=0
        )

    async def StreamResults(self, request, context):
        """Handle streaming RPC method."""
        for i in range(10):
            yield ResultItem(
                id=i,
                data=f"result_{i}",
                timestamp=time.time()
            )
            await asyncio.sleep(0.1)  # Simulate processing

4. Foundation Integration

The RPC architecture seamlessly integrates with Foundation for essential services:

from pyvider.rpcplugin import plugin_server
from pyvider.rpcplugin.protocol.base import RPCPluginProtocol
from provide.foundation import logger, config
from provide.foundation.crypto import Certificate

class SecureDataProcessor:
    def __init__(self):
        # Foundation configuration
        self.config = config.get_config()

    async def ProcessData(self, request, context):
        # Foundation logging with structured context
        logger.info("Processing secure data", 
                   request_id=request.id, 
                   data_size=len(request.data))

        # Foundation crypto for data validation
        if self.config.validate_signatures:
            cert = Certificate.from_request(context)
            if not cert.is_valid():
                logger.warning("Invalid certificate", cert_id=cert.id)
                context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid certificate")

        result = await self.process_data(request.data)
        logger.info("Data processed successfully", result_size=len(result))

        return DataResponse(result=result, status_code=0)

Foundation provides:

  • Structured Logging: Rich context and formatting for debugging and monitoring
  • Configuration Management: Environment-aware settings with validation
  • Cryptography: Certificate management, signing, and validation
  • Error Handling: Comprehensive error boundaries and retry logic
  • Rate Limiting: Token bucket implementation for request throttling

Communication Patterns

1. Unary RPC (Request-Response)

Most common pattern for simple request-response operations:

# Client side
async def simple_request():
    async with plugin_client(command=cmd) as client:
        response = await client.my_service.GetStatus()
        return response.status

# Server side  
class MyServiceServicer:
    async def GetStatus(self, request, context):
        return StatusResponse(
            status="healthy",
            uptime=self.get_uptime(),
            version="1.0.0"
        )

2. Server Streaming

Server sends multiple responses for a single client request:

# Client side - receive stream
async def receive_stream():
    async with plugin_client(command=cmd) as client:
        async for item in client.my_service.StreamData(request):
            print(f"Received: {item.data}")

# Server side - send stream
class MyServiceServicer:
    async def StreamData(self, request, context):
        for i in range(request.count):
            yield DataItem(
                id=i,
                data=f"item_{i}",
                timestamp=time.time()
            )
            await asyncio.sleep(0.1)

3. Client Streaming

Client sends multiple requests, server responds with single response:

# Client side - send stream
async def send_stream():
    async with plugin_client(command=cmd) as client:

        async def generate_requests():
            for i in range(100):
                yield DataChunk(
                    sequence=i,
                    data=f"chunk_{i}".encode()
                )

        response = await client.my_service.UploadData(generate_requests())
        return response.total_bytes

# Server side - receive stream
class MyServiceServicer:
    async def UploadData(self, request_iterator, context):
        total_bytes = 0
        async for chunk in request_iterator:
            total_bytes += len(chunk.data)
            # Process chunk

        return UploadResponse(total_bytes=total_bytes)

4. Bidirectional Streaming

Both client and server can send multiple messages:

# Client side - interactive session
async def interactive_session():
    async with plugin_client(command=cmd) as client:

        async def send_messages():
            for i in range(10):
                yield SessionMessage(
                    id=i,
                    content=f"message_{i}"
                )
                await asyncio.sleep(1.0)

        async for response in client.my_service.Chat(send_messages()):
            print(f"Server says: {response.content}")

# Server side - handle interactive session
class MyServiceServicer:
    async def Chat(self, request_iterator, context):
        async for message in request_iterator:
            # Process incoming message
            response_content = await self.process_message(message.content)

            # Send response
            yield SessionResponse(
                id=message.id,
                content=response_content
            )

Protocol Implementation

Protocol Interface

The RPCPluginProtocol interface defines how services integrate with the server:

from pyvider.rpcplugin.protocol.base import RPCPluginProtocol

class MyProtocol(RPCPluginProtocol):
    service_name = "example.MyService"

    async def get_grpc_descriptors(self):
        """Return gRPC module and service name."""
        import my_service_pb2_grpc
        return my_service_pb2_grpc, "example.MyService"

    async def add_to_server(self, server, handler):
        """Register service with gRPC server."""
        from my_service_pb2_grpc import add_MyServiceServicer_to_server
        add_MyServiceServicer_to_server(handler, server)

    def get_method_type(self, method_name):
        """Return RPC method type for given method."""
        method_types = {
            "GetStatus": "unary_unary",
            "StreamData": "unary_stream", 
            "UploadData": "stream_unary",
            "Chat": "stream_stream"
        }
        return method_types.get(method_name, "unary_unary")

Service Integration

Services are integrated through the protocol implementation:

# Complete service setup
class MyServiceHandler:
    """Business logic implementation."""

    async def GetStatus(self, request, context):
        return StatusResponse(status="healthy")

    async def ProcessData(self, request, context):
        result = await self.business_logic(request.data)
        return DataResponse(result=result)

# Create server with service
server = plugin_server(
    protocol=MyProtocol(),
    handler=MyServiceHandler()
)

Error Handling Architecture

gRPC Status Codes

The architecture uses standard gRPC status codes for error handling:

import grpc
from grpc import StatusCode

class MyServiceServicer:
    async def ProcessData(self, request, context):
        try:
            # Validate input
            if not request.data:
                context.set_code(StatusCode.INVALID_ARGUMENT)
                context.set_details("Data field is required")
                return DataResponse()

            # Process data
            result = await self.process(request.data)
            return DataResponse(result=result)

        except ValidationError as e:
            context.set_code(StatusCode.INVALID_ARGUMENT)
            context.set_details(str(e))
            return DataResponse()

        except PermissionError as e:
            context.set_code(StatusCode.PERMISSION_DENIED)
            context.set_details(str(e))
            return DataResponse()

        except Exception as e:
            logger.error("Unexpected error", exc_info=True)
            context.set_code(StatusCode.INTERNAL)
            context.set_details("Internal server error")
            return DataResponse()

Client Error Handling

Clients handle gRPC errors through exceptions:

import grpc

async def handle_rpc_errors():
    try:
        async with plugin_client(command=cmd) as client:
            response = await client.my_service.ProcessData(request)
            return response

    except grpc.aio.AioRpcError as e:
        if e.code() == grpc.StatusCode.INVALID_ARGUMENT:
            logger.error(f"Invalid request: {e.details()}")
        elif e.code() == grpc.StatusCode.PERMISSION_DENIED:
            logger.error(f"Permission denied: {e.details()}")
        elif e.code() == grpc.StatusCode.UNAVAILABLE:
            logger.error("Service unavailable, retrying...")
            # Implement retry logic
        else:
            logger.error(f"RPC error: {e.code()} - {e.details()}")
        raise

Performance Architecture

Connection Multiplexing

gRPC uses HTTP/2 for efficient connection multiplexing:

# Single connection handles multiple concurrent RPCs
async def concurrent_requests():
    async with plugin_client(command=cmd) as client:
        # All requests share the same connection
        tasks = [
            client.my_service.ProcessData(f"data_{i}")
            for i in range(100)
        ]

        # Requests are automatically multiplexed
        results = await asyncio.gather(*tasks)
        return results

Streaming Efficiency

Streaming RPCs provide efficient data transfer:

# Efficient large data transfer
async def stream_large_dataset():
    async with plugin_client(command=cmd) as client:

        # Stream data without loading everything into memory
        async for chunk in client.my_service.StreamLargeData(request):
            await process_chunk_incrementally(chunk)
            # Memory usage remains constant

Connection Pooling

For high-throughput scenarios, implement connection pooling:

class ConnectionPool:
    def __init__(self, command, pool_size=10):
        self.command = command
        self.pool_size = pool_size
        self.connections = asyncio.Queue(maxsize=pool_size)

    async def initialize(self):
        for _ in range(self.pool_size):
            client = plugin_client(command=self.command)
            await client.start()
            await self.connections.put(client)

    async def get_connection(self):
        return await self.connections.get()

    async def return_connection(self, client):
        await self.connections.put(client)

# Usage with connection pooling
pool = ConnectionPool(["python", "-m", "my_plugin"], pool_size=20)
await pool.initialize()

Security Architecture Integration

Transport Layer Security

The RPC architecture integrates with the security model:

# Automatic mTLS integration
os.environ.update({
    "PLUGIN_AUTO_MTLS": "true",
    "PLUGIN_SERVER_CERT": "file:///etc/ssl/server.pem",
    "PLUGIN_SERVER_KEY": "file:///etc/ssl/server.key"
})

# gRPC automatically uses TLS for all communication
server = plugin_server(protocol=protocol, handler=handler)

Authentication Integration

Magic cookie authentication happens at the transport layer before gRPC:

# Authentication occurs during handshake, before gRPC communication
async def authenticated_rpc():
    # Magic cookie validated during connection establishment
    async with plugin_client(command=cmd) as client:
        # All RPC calls are automatically authenticated
        response = await client.my_service.SecureOperation(request)
        return response

Architecture Benefits

1. Type Safety

Protocol Buffers provide compile-time type safety:

# Types are automatically generated from .proto files
request = DataRequest(
    data="example",
    options={"key": "value"}  # Type-checked
)

response = await client.my_service.ProcessData(request)
# response.result is automatically typed as string

2. Versioning Support

gRPC provides built-in API versioning:

# Forward/backward compatible service evolution
service DataProcessor {
    rpc ProcessDataV1(DataRequestV1) returns (DataResponseV1);
    rpc ProcessDataV2(DataRequestV2) returns (DataResponseV2);
}

3. Language Interoperability

gRPC enables polyglot plugin development:

# Python client can communicate with any gRPC server
# Server could be implemented in Go, Java, C++, etc.
async with plugin_client(command=["./go-plugin-server"]) as client:
    response = await client.data_service.ProcessData(request)

4. Built-in Observability

gRPC provides built-in metrics and tracing:

# Automatic request/response metrics
# - Request count, latency, error rates
# - Connection health and status
# - Streaming metrics (messages sent/received)

Next Steps