Custom Protocols¶
Build domain-specific communication protocols for specialized plugin requirements when standard gRPC isn't optimal for your use case.
When to Use Custom Protocols¶
Consider custom protocols for: - Ultra-low latency: Financial trading, real-time gaming, IoT sensor data - Specialized formats: Binary protocols, legacy system integration - Domain optimization: Custom compression, message framing, or serialization - Cross-language needs: Protocols that work beyond gRPC ecosystems
Protocol Architecture¶
Protocol Interface¶
All custom protocols implement the base RPCPluginProtocol:
from pyvider.rpcplugin.protocol.base import RPCPluginProtocol
from provide.foundation import logger
class CustomProtocol(RPCPluginProtocol):
def __init__(self, name: str, version: str):
self.name = name
self.version = version
logger.info(f"🔧 Initializing {name} protocol v{version}")
async def get_grpc_descriptors(self):
"""Return gRPC service descriptors for this protocol."""
return self.service_descriptor, self.service_name
async def add_to_server(self, server, handler):
"""Register protocol services with gRPC server."""
self.service_implementation.add_to_server(server, handler)
Protocol Registration¶
from pyvider.rpcplugin import plugin_server
# Create custom protocol instance
my_protocol = CustomProtocol(
name="trading-data",
version="2.1.0"
)
# Register with server
server = plugin_server(
protocol=my_protocol,
handler=MyProtocolHandler()
)
Common Custom Protocol Patterns¶
1. Binary Data Protocol¶
For high-performance binary data exchange:
import struct
from dataclasses import dataclass
@dataclass
class BinaryMessage:
message_type: int
payload_size: int
data: bytes
def serialize(self) -> bytes:
"""Convert to binary format."""
header = struct.pack('<BI', self.message_type, self.payload_size)
return header + self.data
@classmethod
def deserialize(cls, data: bytes) -> 'BinaryMessage':
"""Parse from binary format."""
message_type, payload_size = struct.unpack('<BI', data[:5])
payload = data[5:5+payload_size]
return cls(message_type, payload_size, payload)
class BinaryProtocol(RPCPluginProtocol):
async def process_binary_stream(self, stream):
"""Handle binary message stream."""
async for raw_data in stream:
message = BinaryMessage.deserialize(raw_data)
yield await self.handle_message(message)
2. Streaming Data Protocol¶
For continuous data feeds:
class StreamingProtocol(RPCPluginProtocol):
async def start_data_stream(self, request, context):
"""Provide continuous data stream."""
while context.is_active():
# Generate or fetch data
data = await self.get_next_data_batch()
# Send to client
yield StreamingResponse(
timestamp=time.time_ns(),
data=data,
sequence=self.next_sequence()
)
await asyncio.sleep(0.001) # 1ms intervals
3. Legacy System Integration¶
For connecting to existing systems:
class LegacyProtocol(RPCPluginProtocol):
def __init__(self, legacy_format: str):
self.format = legacy_format
super().__init__("legacy-bridge", "1.0")
async def translate_request(self, grpc_request):
"""Convert gRPC request to legacy format."""
if self.format == "xml":
return self.to_xml(grpc_request)
elif self.format == "fixed-width":
return self.to_fixed_width(grpc_request)
async def translate_response(self, legacy_response):
"""Convert legacy response to gRPC format."""
return self.parse_legacy_response(legacy_response)
Protocol Buffer Integration¶
Custom Message Types¶
Define specialized message formats:
// trading_protocol.proto
syntax = "proto3";
package trading;
message MarketDataRequest {
repeated string symbols = 1;
int64 start_time = 2;
int64 end_time = 3;
DataFormat format = 4;
}
message MarketDataResponse {
string symbol = 1;
double price = 2;
int64 volume = 3;
int64 timestamp = 4;
bytes custom_data = 5; // Binary-encoded custom fields
}
enum DataFormat {
STANDARD = 0;
COMPRESSED = 1;
BINARY = 2;
}
Protocol Implementation¶
import trading_pb2_grpc as trading_grpc
from trading_pb2 import MarketDataRequest, MarketDataResponse
class TradingProtocol(RPCPluginProtocol):
async def get_grpc_descriptors(self):
return (
trading_grpc.DESCRIPTOR.services_by_name['MarketDataService'],
"MarketDataService"
)
async def add_to_server(self, server, handler):
trading_grpc.add_MarketDataServiceServicer_to_server(
handler, server
)
Advanced Features¶
Message Compression¶
import zstandard as zstd
class CompressedProtocol(RPCPluginProtocol):
def __init__(self):
self.compressor = zstd.ZstdCompressor(level=3)
self.decompressor = zstd.ZstdDecompressor()
super().__init__("compressed", "1.0")
def compress_message(self, data: bytes) -> bytes:
return self.compressor.compress(data)
def decompress_message(self, data: bytes) -> bytes:
return self.decompressor.decompress(data)
Custom Authentication¶
class AuthenticatedProtocol(RPCPluginProtocol):
def __init__(self, auth_provider):
self.auth = auth_provider
super().__init__("authenticated", "1.0")
async def validate_request(self, context):
"""Validate incoming request authentication."""
token = context.invocation_metadata().get('auth-token')
if not await self.auth.validate_token(token):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
Protocol Versioning¶
class VersionedProtocol(RPCPluginProtocol):
def __init__(self):
self.supported_versions = ["1.0", "1.1", "2.0"]
super().__init__("versioned", "2.0")
async def negotiate_version(self, client_version: str) -> str:
"""Negotiate protocol version with client."""
if client_version in self.supported_versions:
return client_version
# Find best compatible version
compatible = self.find_compatible_version(client_version)
logger.info(f"Negotiated version {compatible} for client {client_version}")
return compatible
Testing Custom Protocols¶
Protocol Testing¶
import pytest
from pyvider.rpcplugin.testing import ProtocolTestCase
class TestCustomProtocol(ProtocolTestCase):
def setup_protocol(self):
return CustomProtocol("test", "1.0")
async def test_message_serialization(self):
message = BinaryMessage(1, 100, b"test data")
serialized = message.serialize()
deserialized = BinaryMessage.deserialize(serialized)
assert deserialized.message_type == 1
assert deserialized.data == b"test data"
async def test_protocol_integration(self):
async with self.create_test_client() as client:
response = await client.test_method(test_data="example")
assert response.success
Performance Testing¶
import time
import asyncio
async def benchmark_protocol():
"""Benchmark custom protocol performance."""
protocol = CustomProtocol("benchmark", "1.0")
start_time = time.perf_counter()
# Send 1000 messages
for i in range(1000):
await protocol.send_message(f"Message {i}")
duration = time.perf_counter() - start_time
messages_per_second = 1000 / duration
logger.info(f"Protocol throughput: {messages_per_second:.0f} msg/sec")
Deployment Considerations¶
Protocol Discovery¶
class ProtocolRegistry:
def __init__(self):
self.protocols = {}
def register(self, protocol: RPCPluginProtocol):
self.protocols[protocol.name] = protocol
logger.info(f"Registered protocol: {protocol.name}")
def get_protocol(self, name: str) -> RPCPluginProtocol:
return self.protocols.get(name)
# Global registry
protocol_registry = ProtocolRegistry()
Configuration¶
# Environment-based protocol selection
import os
from pyvider.rpcplugin.config import rpcplugin_config
def create_protocol():
protocol_type = os.getenv("PLUGIN_PROTOCOL_TYPE", "grpc")
if protocol_type == "binary":
return BinaryProtocol()
elif protocol_type == "streaming":
return StreamingProtocol()
else:
return plugin_protocol() # Default gRPC
Monitoring¶
from provide.foundation import logger
class MonitoredProtocol(RPCPluginProtocol):
def __init__(self, base_protocol):
self.base = base_protocol
self.message_count = 0
self.error_count = 0
super().__init__(f"monitored-{base_protocol.name}", base_protocol.version)
async def send_message(self, message):
try:
result = await self.base.send_message(message)
self.message_count += 1
return result
except Exception as e:
self.error_count += 1
logger.error(f"Protocol error: {e}")
raise
Best Practices¶
- Start Simple: Begin with standard gRPC and only customize when needed
- Version Management: Plan for protocol evolution and backwards compatibility
- Error Handling: Implement robust error recovery and logging
- Performance Testing: Benchmark custom protocols against gRPC baseline
- Documentation: Document protocol specifications and wire formats
- Security: Consider authentication, authorization, and data validation
- Monitoring: Add metrics and observability to custom protocols
Common Patterns¶
Protocol Factory¶
def create_protocol(protocol_type: str, config: dict) -> RPCPluginProtocol:
"""Factory function for protocol creation."""
if protocol_type == "trading":
return TradingProtocol(**config)
elif protocol_type == "streaming":
return StreamingProtocol(**config)
elif protocol_type == "binary":
return BinaryProtocol(**config)
else:
return plugin_protocol() # Default
Protocol Middleware¶
class ProtocolMiddleware:
def __init__(self, protocol: RPCPluginProtocol):
self.protocol = protocol
async def before_request(self, request):
"""Pre-process requests."""
logger.debug(f"Processing request: {request}")
return request
async def after_response(self, response):
"""Post-process responses."""
logger.debug(f"Sending response: {response}")
return response
Next Steps¶
- Performance Optimization - Optimize protocol performance
- Testing Guide - Test custom protocols
- Production Configuration - Deploy protocols to production
For complex protocol requirements, consider consulting with the Foundation team for architecture guidance and optimization strategies.