Type System and Custom Protocols¶
Extend the framework with custom type safety patterns and specialized protocols for domain-specific requirements.
๐ค AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
Overview¶
The pyvider.rpcplugin framework provides comprehensive customization capabilities through modern Python typing features and pluggable protocol architecture. This enables type-safe, domain-optimized communication for specialized use cases.
Type System Features: - Static type checking with mypy/pyre support - Runtime-checkable protocols - Type guards and generic types - Modern Python 3.11+ typing syntax
Custom Protocol Capabilities: - Binary data protocols for high performance - Streaming protocols for continuous data - Legacy system integration - Domain-specific optimizations
Type System¶
from typing import TypeVar, Protocol, runtime_checkable
# Generic type variables
ServerT = TypeVar('ServerT')
HandlerT = TypeVar('HandlerT')
TransportT = TypeVar('TransportT')
# Type aliases
GrpcServerType = grpc.aio.Server
GrpcChannelType = grpc.aio.Channel
AddressType = tuple[str, int]
# Modern union types (Python 3.11+)
ConfigType = dict[str, any] | None
EndpointType = str | tuple[str, int]
@runtime_checkable
class RPCPluginHandler(Protocol):
"""Base protocol for RPC handlers."""
async def handle_request(self, request: any, context: any) -> any:
"""Handle an RPC request."""
...
@runtime_checkable
class RPCPluginTransport(Protocol):
"""Transport protocol for network communication."""
endpoint: str | None
async def listen(self) -> str:
"""Start listening and return endpoint."""
...
async def connect(self, endpoint: str) -> None:
"""Connect to endpoint."""
...
async def close(self) -> None:
"""Close transport."""
...
from typing import TypeGuard
def is_valid_handler(obj: any) -> TypeGuard[RPCPluginHandler]:
"""Check if object implements handler protocol."""
return (
hasattr(obj, 'handle_request') and
callable(getattr(obj, 'handle_request'))
)
def is_valid_transport(obj: any) -> TypeGuard[RPCPluginTransport]:
"""Check if object implements transport protocol."""
return (
hasattr(obj, 'endpoint') and
hasattr(obj, 'listen') and
hasattr(obj, 'connect') and
hasattr(obj, 'close')
)
# Usage with type narrowing
def setup_server(handler: any, transport: any) -> None:
if not is_valid_handler(handler):
raise TypeError("Invalid handler")
if not is_valid_transport(transport):
raise TypeError("Invalid transport")
# Type checker knows types are correct here
server = RPCPluginServer(handler=handler, transport=transport)
Generic Classes¶
Build type-safe components with generics:
from typing import Generic
class RPCPluginServer(Generic[ServerT, HandlerT, TransportT]):
"""
Generic RPC server.
Type parameters:
- ServerT: The gRPC server type
- HandlerT: The handler implementation type
- TransportT: The transport implementation type
"""
def __init__(
self,
protocol: RPCPluginProtocol[ServerT, HandlerT],
handler: HandlerT,
transport: TransportT | None = None
):
self.protocol = protocol
self.handler = handler
self.transport = transport
# Usage with specific types
server: RPCPluginServer[grpc.aio.Server, MyHandler, MyTransport] = (
RPCPluginServer(
protocol=my_protocol,
handler=MyHandler(),
transport=MyTransport()
)
)
Attrs Integration¶
Type-safe data classes with validation:
from attrs import define, field
@define
class RPCPluginClient:
"""Client with attrs type annotations."""
command: list[str] = field()
config: dict[str, any] | None = field(default=None)
# Internal typed fields
_process: ManagedProcess | None = field(init=False, default=None)
_transport: TransportType | None = field(init=False, default=None)
grpc_channel: grpc.aio.Channel | None = field(init=False, default=None)
@command.validator
def _validate_command(self, attribute, value):
if not value or not all(isinstance(v, str) for v in value):
raise TypeError("Command must be non-empty list of strings")
Factory Functions with Overloads¶
from typing import overload
@overload
def plugin_client(
command: list[str],
config: None = None
) -> RPCPluginClient: ...
@overload
def plugin_client(
command: list[str],
config: dict[str, any]
) -> RPCPluginClient: ...
def plugin_client(
command: list[str],
config: dict[str, any] | None = None
) -> RPCPluginClient:
"""Create client with proper typing."""
return RPCPluginClient(command=command, config=config)
Runtime Type Validation¶
from typing import get_type_hints
import inspect
def validate_types(func):
"""Decorator for runtime type validation."""
hints = get_type_hints(func)
def wrapper(*args, **kwargs):
bound = inspect.signature(func).bind(*args, **kwargs)
for name, value in bound.arguments.items():
if name in hints:
expected_type = hints[name]
if not isinstance(value, expected_type):
raise TypeError(
f"{name} must be {expected_type}, got {type(value)}"
)
result = func(*args, **kwargs)
if 'return' in hints:
expected_return = hints['return']
if not isinstance(result, expected_return):
raise TypeError(
f"Return must be {expected_return}, got {type(result)}"
)
return result
return wrapper
# Usage
@validate_types
def add_numbers(a: int, b: int) -> int:
return a + b
Custom Protocols¶
When to Use¶
Consider custom protocols for: - Ultra-low latency: Financial trading, real-time gaming, IoT - Specialized formats: Binary protocols, legacy integration - Domain optimization: Custom compression or serialization - Cross-language needs: Beyond gRPC ecosystems
Protocol Architecture¶
from pyvider.rpcplugin.protocol.base import RPCPluginProtocol
from provide.foundation import logger
class CustomProtocol(RPCPluginProtocol):
def __init__(self, name: str, version: str):
self.name = name
self.version = version
logger.info(f"๐ง Initializing {name} protocol v{version}")
async def get_grpc_descriptors(self):
"""Return gRPC service descriptors."""
return self.service_descriptor, self.service_name
async def add_to_server(self, server, handler):
"""Register protocol services with server."""
self.service_implementation.add_to_server(server, handler)
Protocol Patterns¶
import struct
from dataclasses import dataclass
@dataclass
class BinaryMessage:
message_type: int
payload_size: int
data: bytes
def serialize(self) -> bytes:
header = struct.pack('<BI', self.message_type, self.payload_size)
return header + self.data
@classmethod
def deserialize(cls, data: bytes) -> 'BinaryMessage':
message_type, payload_size = struct.unpack('<BI', data[:5])
payload = data[5:5+payload_size]
return cls(message_type, payload_size, payload)
class BinaryProtocol(RPCPluginProtocol):
async def process_binary_stream(self, stream):
async for raw_data in stream:
message = BinaryMessage.deserialize(raw_data)
yield await self.handle_message(message)
class StreamingProtocol(RPCPluginProtocol):
async def start_data_stream(self, request, context):
"""Provide continuous data stream."""
while context.is_active():
data = await self.get_next_data_batch()
yield StreamingResponse(
timestamp=time.time_ns(),
data=data,
sequence=self.next_sequence()
)
await asyncio.sleep(0.001) # 1ms intervals
class LegacyProtocol(RPCPluginProtocol):
def __init__(self, legacy_format: str):
self.format = legacy_format
super().__init__("legacy-bridge", "1.0")
async def translate_request(self, grpc_request):
"""Convert gRPC request to legacy format."""
if self.format == "xml":
return self.to_xml(grpc_request)
elif self.format == "fixed-width":
return self.to_fixed_width(grpc_request)
async def translate_response(self, legacy_response):
"""Convert legacy response to gRPC format."""
return self.parse_legacy_response(legacy_response)
Protocol Buffer Definition¶
// trading_protocol.proto
syntax = "proto3";
package trading;
message MarketDataRequest {
repeated string symbols = 1;
int64 start_time = 2;
int64 end_time = 3;
DataFormat format = 4;
}
message MarketDataResponse {
string symbol = 1;
double price = 2;
int64 volume = 3;
int64 timestamp = 4;
bytes custom_data = 5;
}
enum DataFormat {
STANDARD = 0;
COMPRESSED = 1;
BINARY = 2;
}
Protocol Implementation¶
import trading_pb2_grpc as trading_grpc
from trading_pb2 import MarketDataRequest, MarketDataResponse
class TradingProtocol(RPCPluginProtocol):
async def get_grpc_descriptors(self):
return (
trading_grpc.DESCRIPTOR.services_by_name['MarketDataService'],
"MarketDataService"
)
async def add_to_server(self, server, handler):
trading_grpc.add_MarketDataServiceServicer_to_server(
handler, server
)
Advanced Features¶
Message Compression¶
import zstandard as zstd
class CompressedProtocol(RPCPluginProtocol):
def __init__(self):
self.compressor = zstd.ZstdCompressor(level=3)
self.decompressor = zstd.ZstdDecompressor()
super().__init__("compressed", "1.0")
def compress_message(self, data: bytes) -> bytes:
return self.compressor.compress(data)
def decompress_message(self, data: bytes) -> bytes:
return self.decompressor.decompress(data)
Protocol Versioning¶
class VersionedProtocol(RPCPluginProtocol):
def __init__(self):
self.supported_versions = ["1.0", "1.1", "2.0"]
super().__init__("versioned", "2.0")
async def negotiate_version(self, client_version: str) -> str:
"""Negotiate protocol version with client."""
if client_version in self.supported_versions:
return client_version
compatible = self.find_compatible_version(client_version)
logger.info(f"Negotiated version {compatible} for client {client_version}")
return compatible
Custom Authentication¶
class AuthenticatedProtocol(RPCPluginProtocol):
def __init__(self, auth_provider):
self.auth = auth_provider
super().__init__("authenticated", "1.0")
async def validate_request(self, context):
"""Validate incoming request authentication."""
token = context.invocation_metadata().get('auth-token')
if not await self.auth.validate_token(token):
context.abort(grpc.StatusCode.UNAUTHENTICATED, "Invalid token")
Testing¶
Type Testing¶
# Configure mypy for strict checking
# pyproject.toml
[tool.mypy]
strict = true
python_version = "3.11"
warn_unused_ignores = true
[[tool.mypy.overrides]]
module = "pyvider.rpcplugin.protocol.grpc_*_pb2*"
ignore_errors = true # Ignore generated files
Protocol Testing¶
import pytest
from pyvider.rpcplugin.testing import ProtocolTestCase
class TestCustomProtocol(ProtocolTestCase):
def setup_protocol(self):
return CustomProtocol("test", "1.0")
async def test_message_serialization(self):
message = BinaryMessage(1, 100, b"test data")
serialized = message.serialize()
deserialized = BinaryMessage.deserialize(serialized)
assert deserialized.message_type == 1
assert deserialized.data == b"test data"
async def test_protocol_integration(self):
async with self.create_test_client() as client:
response = await client.test_method(test_data="example")
assert response.success
Performance Benchmarking¶
import time
async def benchmark_protocol():
"""Benchmark custom protocol performance."""
protocol = CustomProtocol("benchmark", "1.0")
start_time = time.perf_counter()
for i in range(1000):
await protocol.send_message(f"Message {i}")
duration = time.perf_counter() - start_time
messages_per_second = 1000 / duration
logger.info(f"Protocol throughput: {messages_per_second:.0f} msg/sec")
Deployment¶
Protocol Registry¶
class ProtocolRegistry:
def __init__(self):
self.protocols = {}
def register(self, protocol: RPCPluginProtocol):
self.protocols[protocol.name] = protocol
logger.info(f"Registered protocol: {protocol.name}")
def get_protocol(self, name: str) -> RPCPluginProtocol:
return self.protocols.get(name)
# Global registry
protocol_registry = ProtocolRegistry()
Configuration-Based Selection¶
import os
from pyvider.rpcplugin.config import rpcplugin_config
def create_protocol():
protocol_type = os.getenv("PLUGIN_PROTOCOL_TYPE", "grpc")
if protocol_type == "binary":
return BinaryProtocol()
elif protocol_type == "streaming":
return StreamingProtocol()
else:
return plugin_protocol() # Default gRPC
Monitoring¶
from provide.foundation import logger
class MonitoredProtocol(RPCPluginProtocol):
def __init__(self, base_protocol):
self.base = base_protocol
self.message_count = 0
self.error_count = 0
super().__init__(f"monitored-{base_protocol.name}", base_protocol.version)
async def send_message(self, message):
try:
result = await self.base.send_message(message)
self.message_count += 1
return result
except Exception as e:
self.error_count += 1
logger.error(f"Protocol error: {e}")
raise
Best Practices¶
Type System¶
- Use type hints everywhere for IDE support and static analysis
- Define custom types for domain-specific values (NewType)
- Use protocols for interfaces instead of abstract base classes
- Leverage type guards for runtime type narrowing
- Document type constraints in docstrings
Custom Protocols¶
- Start simple - begin with gRPC, customize only when needed
- Version management - plan for protocol evolution
- Error handling - implement robust error recovery
- Performance testing - benchmark against gRPC baseline
- Security - consider authentication and validation
- Documentation - document protocol specifications
Advanced Patterns¶
Conditional Types¶
from typing import Literal, overload
@overload
def get_value(return_type: Literal["str"]) -> str: ...
@overload
def get_value(return_type: Literal["int"]) -> int: ...
def get_value(return_type: Literal["str", "int"]) -> str | int:
if return_type == "str":
return "hello"
else:
return 42
Type-Safe Builder¶
from typing import Self
class ServerBuilder:
"""Type-safe builder for server configuration."""
def with_port(self, port: int) -> Self:
self.port = port
return self
def with_handler(self, handler: HandlerT) -> Self:
self.handler = handler
return self
def build(self) -> RPCPluginServer[any, HandlerT, any]:
return RPCPluginServer(
handler=self.handler,
config={"port": self.port}
)
# Method chaining with type preservation
server = (
ServerBuilder()
.with_port(8080)
.with_handler(MyHandler())
.build()
)
Protocol Factory¶
def create_protocol(protocol_type: str, config: dict) -> RPCPluginProtocol:
"""Factory function for protocol creation."""
if protocol_type == "trading":
return TradingProtocol(**config)
elif protocol_type == "streaming":
return StreamingProtocol(**config)
elif protocol_type == "binary":
return BinaryProtocol(**config)
else:
return plugin_protocol() # Default
Related Topics¶
- API Reference - Complete API with type annotations
- Architecture - System design with types
- Testing - Type testing strategies
- Observability - Performance monitoring