Skip to content

Protocols

Protocol definitions are the foundation of the Pyvider RPC Plugin system. They define the interface between clients and servers, specify available services and methods, and ensure type-safe communication across process boundaries.

Overview

The protocol system provides:

  • gRPC Integration - Built on Protocol Buffers for efficient, type-safe communication
  • Service Definition - Clear specification of available RPC methods
  • Type Safety - Compile-time type checking and automatic code generation
  • Versioning Support - Backward and forward compatibility for API evolution
  • Multi-language Support - Protocol definitions work across programming languages

Protocol Components

1. Protocol Buffers (.proto files)

Protocol definitions start with .proto files that define services and messages:

// example.proto
syntax = "proto3";

package example;

// Service definition
service DataProcessor {
    // Unary RPC - single request, single response
    rpc ProcessData(ProcessRequest) returns (ProcessResponse);

    // Server streaming - single request, multiple responses  
    rpc StreamResults(StreamRequest) returns (stream ResultItem);

    // Health check method
    rpc GetStatus(StatusRequest) returns (StatusResponse);
}

// Message definitions
message ProcessRequest {
    string data = 1;
    map<string, string> options = 2;
    repeated string tags = 3;
}

message ProcessResponse {
    string result = 1;
    int32 status_code = 2;
    double processing_time = 3;
}

message StreamRequest {
    string query = 1;
    int32 limit = 2;
}

message ResultItem {
    string id = 1;
    string data = 2;
    int64 timestamp = 3;
}

message StatusRequest {}

message StatusResponse {
    string status = 1;
    string version = 2;
    int64 uptime = 3;
}

2. Generated Code

Protocol Buffers compiler generates Python code from .proto files:

# Generated files (automatic)
# example_pb2.py - Message classes
# example_pb2_grpc.py - Service classes and stubs

# Example generated message class
class ProcessRequest:
    def __init__(self, data: str = "", options: dict = None, tags: list = None):
        self.data = data
        self.options = options or {}
        self.tags = tags or []

# Example generated service stub
class DataProcessorStub:
    def __init__(self, channel):
        self.ProcessData = channel.unary_unary(
            '/example.DataProcessor/ProcessData',
            request_serializer=ProcessRequest.SerializeToString,
            response_deserializer=ProcessResponse.FromString,
        )

3. Protocol Implementation

The RPCPluginProtocol class integrates Protocol Buffers with the plugin system:

from pyvider.rpcplugin.protocol.base import RPCPluginProtocol
import example_pb2_grpc

class DataProcessorProtocol(RPCPluginProtocol):
    """Protocol implementation for DataProcessor service."""

    service_name = "example.DataProcessor"

    async def get_grpc_descriptors(self):
        """Return gRPC module and service name for registration."""
        return example_pb2_grpc, "example.DataProcessor"

    async def add_to_server(self, server, handler):
        """Register service implementation with gRPC server."""
        example_pb2_grpc.add_DataProcessorServicer_to_server(handler, server)

    def get_method_type(self, method_name: str) -> str:
        """Return RPC method type for the given method."""
        method_types = {
            "ProcessData": "unary_unary",
            "StreamResults": "unary_stream", 
            "GetStatus": "unary_unary"
        }
        return method_types.get(method_name, "unary_unary")

4. Service Handler Implementation

Service handlers implement the actual business logic:

import example_pb2
import example_pb2_grpc
from provide.foundation import logger

class DataProcessorHandler(example_pb2_grpc.DataProcessorServicer):
    """Implementation of DataProcessor service."""

    def __init__(self):
        self.start_time = time.time()
        logger.info("DataProcessor handler initialized")

    async def ProcessData(self, request, context):
        """Handle ProcessData RPC method."""
        logger.info(f"Processing data: {request.data}")

        try:
            # Validate request
            if not request.data:
                context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
                context.set_details("Data field is required")
                return example_pb2.ProcessResponse()

            # Process data with options
            start_time = time.time()
            result = await self._process_business_logic(
                data=request.data,
                options=dict(request.options),
                tags=list(request.tags)
            )
            processing_time = time.time() - start_time

            return example_pb2.ProcessResponse(
                result=result,
                status_code=0,
                processing_time=processing_time
            )

        except ValueError as e:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details(str(e))
            return example_pb2.ProcessResponse(status_code=1)

        except Exception as e:
            logger.error(f"Processing error: {e}", exc_info=True)
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details("Internal processing error")
            return example_pb2.ProcessResponse(status_code=2)

    async def StreamResults(self, request, context):
        """Handle StreamResults streaming RPC method."""
        logger.info(f"Streaming results for query: {request.query}")

        try:
            results = await self._query_data(request.query, request.limit)

            for result in results:
                yield example_pb2.ResultItem(
                    id=result["id"],
                    data=result["data"],
                    timestamp=int(time.time())
                )

                # Allow for graceful cancellation
                if context.cancelled():
                    logger.info("Stream cancelled by client")
                    break

        except Exception as e:
            logger.error(f"Streaming error: {e}")
            context.set_code(grpc.StatusCode.INTERNAL)
            context.set_details(str(e))

    async def GetStatus(self, request, context):
        """Handle GetStatus RPC method."""
        uptime = int(time.time() - self.start_time)

        return example_pb2.StatusResponse(
            status="healthy",
            version="1.0.0",
            uptime=uptime
        )

    async def _process_business_logic(self, data, options, tags):
        """Internal business logic implementation."""
        # Simulate processing
        await asyncio.sleep(0.1)

        # Apply options
        result = data.upper() if options.get("uppercase") else data

        # Add tags
        if tags:
            result = f"[{','.join(tags)}] {result}"

        return result

    async def _query_data(self, query, limit):
        """Internal data querying logic."""
        # Simulate database query
        results = []
        for i in range(min(limit, 10)):
            results.append({
                "id": f"item_{i}",
                "data": f"Result for '{query}' #{i}"
            })
            await asyncio.sleep(0.05)  # Simulate processing delay

        return results

Protocol Usage Patterns

1. Basic Server Setup

#!/usr/bin/env python3
import asyncio
from pyvider.rpcplugin import plugin_server

async def main():
    # Create server with custom protocol
    server = plugin_server(
        protocol=DataProcessorProtocol(),
        handler=DataProcessorHandler()
    )

    logger.info("DataProcessor server starting...")
    await server.serve()

if __name__ == "__main__":
    asyncio.run(main())

2. Client Usage

#!/usr/bin/env python3
import asyncio
from pyvider.rpcplugin import plugin_client
import example_pb2

async def main():
    client = plugin_client(command=["python", "data_processor_server.py"])

    async with client:
        # Create typed request
        request = example_pb2.ProcessRequest(
            data="Hello, World!",
            options={"uppercase": "true"},
            tags=["greeting", "example"]
        )

        # Make RPC call
        response = await client.data_processor.ProcessData(request)

        logger.info(f"Result: {response.result}")
        logger.info(f"Processing time: {response.processing_time:.3f}s")

        # Stream results
        stream_request = example_pb2.StreamRequest(
            query="test data",
            limit=5
        )

        logger.info("Streaming results...")
        async for item in client.data_processor.StreamResults(stream_request):
            logger.info(f"Received: {item.id} - {item.data}")

if __name__ == "__main__":
    asyncio.run(main())

Advanced Protocol Patterns

1. Multiple Services

A single protocol can support multiple services:

// multi_service.proto
service UserService {
    rpc CreateUser(CreateUserRequest) returns (User);
    rpc GetUser(GetUserRequest) returns (User);
}

service DataService {
    rpc StoreData(StoreRequest) returns (StoreResponse);
    rpc QueryData(QueryRequest) returns (QueryResponse);
}
class MultiServiceProtocol(RPCPluginProtocol):
    """Protocol supporting multiple services."""

    service_name = "multi.Services"

    async def get_grpc_descriptors(self):
        return multi_service_pb2_grpc, "multi.Services"

    async def add_to_server(self, server, handler):
        # Register multiple services with the same handler
        multi_service_pb2_grpc.add_UserServiceServicer_to_server(handler, server)
        multi_service_pb2_grpc.add_DataServiceServicer_to_server(handler, server)

class MultiServiceHandler(
    multi_service_pb2_grpc.UserServiceServicer,
    multi_service_pb2_grpc.DataServiceServicer
):
    """Handler implementing multiple services."""

    async def CreateUser(self, request, context):
        # User service implementation
        pass

    async def StoreData(self, request, context):
        # Data service implementation  
        pass

2. Versioned APIs

Support multiple API versions in the same protocol:

// versioned_api.proto
service DataProcessorV1 {
    rpc Process(ProcessRequestV1) returns (ProcessResponseV1);
}

service DataProcessorV2 {
    rpc Process(ProcessRequestV2) returns (ProcessResponseV2);
    rpc BatchProcess(BatchProcessRequest) returns (BatchProcessResponse);
}
class VersionedProtocol(RPCPluginProtocol):
    """Protocol supporting multiple API versions."""

    def __init__(self, version="v2"):
        self.version = version
        self.service_name = f"example.DataProcessor{version.upper()}"

    async def get_grpc_descriptors(self):
        return versioned_api_pb2_grpc, self.service_name

    async def add_to_server(self, server, handler):
        if self.version == "v1":
            versioned_api_pb2_grpc.add_DataProcessorV1Servicer_to_server(handler, server)
        elif self.version == "v2":
            versioned_api_pb2_grpc.add_DataProcessorV2Servicer_to_server(handler, server)

3. Streaming Patterns

Implement various streaming patterns:

service StreamingService {
    // Server streaming: single request, multiple responses
    rpc DownloadFile(DownloadRequest) returns (stream FileChunk);

    // Client streaming: multiple requests, single response  
    rpc UploadFile(stream FileChunk) returns (UploadResponse);

    // Bidirectional streaming: multiple requests and responses
    rpc Chat(stream ChatMessage) returns (stream ChatMessage);
}
class StreamingHandler(streaming_pb2_grpc.StreamingServiceServicer):
    """Handler with streaming implementations."""

    async def DownloadFile(self, request, context):
        """Server streaming - send file in chunks."""
        file_path = request.file_path

        try:
            with open(file_path, 'rb') as f:
                while True:
                    chunk = f.read(8192)  # 8KB chunks
                    if not chunk:
                        break

                    yield streaming_pb2.FileChunk(
                        data=chunk,
                        sequence=chunk_num,
                        is_last=(len(chunk) < 8192)
                    )
                    chunk_num += 1

        except FileNotFoundError:
            context.set_code(grpc.StatusCode.NOT_FOUND)
            context.set_details(f"File not found: {file_path}")

    async def UploadFile(self, request_iterator, context):
        """Client streaming - receive file in chunks."""
        file_data = b""
        total_chunks = 0

        async for chunk in request_iterator:
            file_data += chunk.data
            total_chunks += 1

        # Save file
        file_path = f"/tmp/upload_{int(time.time())}.bin"
        with open(file_path, 'wb') as f:
            f.write(file_data)

        return streaming_pb2.UploadResponse(
            file_path=file_path,
            total_bytes=len(file_data),
            total_chunks=total_chunks
        )

    async def Chat(self, request_iterator, context):
        """Bidirectional streaming - interactive chat."""
        async for message in request_iterator:
            # Process incoming message
            response_text = await self.process_chat_message(message.text)

            # Send response
            yield streaming_pb2.ChatMessage(
                user="assistant",
                text=response_text,
                timestamp=int(time.time())
            )

Protocol Development Workflow

1. Define Protocol

# 1. Write .proto file
cat > my_service.proto << EOF
syntax = "proto3";
package myservice;

service MyService {
    rpc DoSomething(Request) returns (Response);
}

message Request {
    string input = 1;
}

message Response {
    string output = 1;
}
EOF

2. Generate Code

# 2. Generate Python code from .proto file
python -m grpc_tools.protoc \
    --proto_path=. \
    --python_out=. \
    --grpc_python_out=. \
    my_service.proto

# Generated files:
# - my_service_pb2.py (message classes)
# - my_service_pb2_grpc.py (service classes)

3. Implement Protocol

# 3. Implement protocol class
from pyvider.rpcplugin.protocol.base import RPCPluginProtocol
import my_service_pb2_grpc

class MyServiceProtocol(RPCPluginProtocol):
    service_name = "myservice.MyService"

    async def get_grpc_descriptors(self):
        return my_service_pb2_grpc, "myservice.MyService"

    async def add_to_server(self, server, handler):
        my_service_pb2_grpc.add_MyServiceServicer_to_server(handler, server)

4. Implement Handler

# 4. Implement service handler
import my_service_pb2
import my_service_pb2_grpc

class MyServiceHandler(my_service_pb2_grpc.MyServiceServicer):
    async def DoSomething(self, request, context):
        # Business logic
        result = self.process(request.input)

        return my_service_pb2.Response(output=result)

5. Create Server

# 5. Create server with protocol and handler
from pyvider.rpcplugin import plugin_server

server = plugin_server(
    protocol=MyServiceProtocol(),
    handler=MyServiceHandler()
)

Protocol Best Practices

1. Message Design

// Good: Clear, extensible message design
message ProcessRequest {
    string data = 1;                    // Required data
    map<string, string> options = 2;    // Extensible options
    repeated string tags = 3;           // List of tags
    google.protobuf.Timestamp created = 4;  // Use well-known types
}

// Bad: Unclear, hard to extend
message ProcessRequest {
    string data = 1;
    string option1 = 2;  // Hard to extend
    string option2 = 3;  // Limited flexibility
}

2. Error Handling

async def ProcessData(self, request, context):
    try:
        # Validate request
        if not request.data:
            context.set_code(grpc.StatusCode.INVALID_ARGUMENT)
            context.set_details("Missing required field: data")
            return ProcessResponse()

        # Business logic
        result = await self.process(request.data)

        return ProcessResponse(
            result=result,
            status=ProcessResponse.Status.SUCCESS
        )

    except PermissionError:
        context.set_code(grpc.StatusCode.PERMISSION_DENIED)
        context.set_details("Insufficient permissions")

    except Exception as e:
        logger.error("Processing failed", exc_info=True)
        context.set_code(grpc.StatusCode.INTERNAL)
        context.set_details("Internal error occurred")

    return ProcessResponse(status=ProcessResponse.Status.ERROR)

3. Service Documentation

class DocumentedProtocol(RPCPluginProtocol):
    """
    Well-documented protocol implementation.

    This protocol provides data processing services with the following capabilities:
    - Synchronous data processing
    - Streaming result delivery
    - Health status monitoring

    Usage:
        protocol = DocumentedProtocol()
        handler = DocumentedHandler()
        server = plugin_server(protocol=protocol, handler=handler)
    """

    service_name = "documented.DataProcessor"

    async def get_grpc_descriptors(self):
        """Return gRPC descriptors for the DataProcessor service."""
        return documented_pb2_grpc, self.service_name

Next Steps