Direct Connections¶
Connect directly to running plugin servers without launching subprocesses - ideal for microservices, distributed systems, and server-to-server communication.
๐ค AI-Generated Content
This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
Overview¶
Direct connections enable clients to connect to already-running plugin servers via network protocols (TCP or Unix sockets), bypassing subprocess management.
Use Cases: - Microservice architectures and service meshes - Load-balanced plugin pools - Cross-machine/cross-container communication - Development and testing environments - Server-to-server communication patterns
Basic Patterns¶
Connect directly to a TCP server:
from pyvider.rpcplugin import plugin_client
from calculator_pb2_grpc import CalculatorStub
from calculator_pb2 import AddRequest
async def connect_to_tcp_server():
"""Connect directly to a TCP server."""
# Connect to running server
client = await plugin_client(
host="localhost",
port=50051,
skip_subprocess=True # Direct connection
)
try:
# Use gRPC stub as normal
stub = CalculatorStub(client.grpc_channel)
result = await stub.Add(AddRequest(a=5, b=3))
print(f"Result: {result.result}")
finally:
await client.close()
Connect via Unix domain socket (Linux/macOS only):
Configure from environment variables:
import os
from dataclasses import dataclass
@dataclass
class DirectConnectionConfig:
"""Configuration for direct connections."""
host: str = os.environ.get("PLUGIN_SERVER_HOST", "localhost")
port: int = int(os.environ.get("PLUGIN_SERVER_PORT", "50051"))
timeout: float = float(os.environ.get("PLUGIN_TIMEOUT", "30.0"))
# mTLS configuration
client_cert: str | None = os.environ.get("PLUGIN_CLIENT_CERT")
client_key: str | None = os.environ.get("PLUGIN_CLIENT_KEY")
ca_cert: str | None = os.environ.get("PLUGIN_CA_CERT")
# Usage
config = DirectConnectionConfig()
client = await plugin_client(
host=config.host,
port=config.port,
skip_subprocess=True,
timeout=config.timeout
)
Advanced Patterns¶
Distribute requests across multiple servers:
import random
class LoadBalancedClient:
"""Client with load balancing across servers."""
def __init__(self, endpoints: list[dict]):
"""
Args:
endpoints: [{"host": "server1", "port": 50051, "weight": 2}, ...]
"""
self.endpoints = endpoints
self.clients = []
self.weights = []
async def connect_all(self):
"""Connect to all endpoints."""
for endpoint in self.endpoints:
client = await plugin_client(
host=endpoint["host"],
port=endpoint["port"],
skip_subprocess=True
)
self.clients.append(client)
self.weights.append(endpoint.get("weight", 1))
async def call(self, stub_class, method_name: str, request):
"""Make load-balanced RPC call."""
# Weighted random selection
client = random.choices(self.clients, weights=self.weights)[0]
try:
stub = stub_class(client.grpc_channel)
method = getattr(stub, method_name)
return await method(request)
except Exception as e:
# Retry on different server
for other_client in self.clients:
if other_client != client:
try:
stub = stub_class(other_client.grpc_channel)
method = getattr(stub, method_name)
return await method(request)
except:
continue
raise
# Usage
endpoints = [
{"host": "server1", "port": 50051, "weight": 3},
{"host": "server2", "port": 50051, "weight": 2},
{"host": "server3", "port": 50051, "weight": 1}
]
lb_client = LoadBalancedClient(endpoints)
await lb_client.connect_all()
result = await lb_client.call(CalculatorStub, "Add", AddRequest(a=5, b=3))
See Connection Resilience for complete connection pool implementation with health monitoring.
Security¶
Secure direct connections with mTLS:
import ssl
from pathlib import Path
# Create SSL context
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
ssl_context.load_cert_chain("/etc/ssl/client.pem", "/etc/ssl/client.key")
ssl_context.load_verify_locations("/etc/ssl/ca.pem")
ssl_context.check_hostname = True
# Connect with mTLS
client = await plugin_client(
host="secure.example.com",
port=443,
skip_subprocess=True,
ssl_context=ssl_context
)
See Security Guide for comprehensive mTLS setup.
Add authentication to direct connections:
class AuthenticatedClient:
"""Direct client with authentication."""
def __init__(self, host: str, port: int, api_key: str):
self.host = host
self.port = port
self.api_key = api_key
self.token = None
async def connect(self):
"""Connect and authenticate."""
self.client = await plugin_client(
host=self.host,
port=self.port,
skip_subprocess=True
)
# Authenticate to get token
stub = AuthStub(self.client.grpc_channel)
auth_response = await stub.Authenticate(
AuthRequest(api_key=self.api_key)
)
self.token = auth_response.token
async def call(self, stub_class, method_name: str, request):
"""Make authenticated call."""
if not self.token:
await self.connect()
# Add auth token to metadata
metadata = [("authorization", f"Bearer {self.token}")]
stub = stub_class(self.client.grpc_channel)
method = getattr(stub, method_name)
return await method(request, metadata=metadata)
Service Discovery¶
Integrate with service registries for dynamic endpoint discovery:
async def discover_and_connect(service_name: str):
"""Discover service via Consul and connect."""
import consul.aio
# Discover service
c = consul.aio.Consul(host="localhost", port=8500)
_, services = await c.health.service(service_name, passing=True)
if not services:
raise Exception(f"No healthy {service_name} instances found")
# Connect to first available
service = services[0]
client = await plugin_client(
host=service["Service"]["Address"],
port=service["Service"]["Port"],
skip_subprocess=True
)
return client
See Connection Resilience for complete service discovery patterns and health monitoring.
Best Practices¶
- Use Environment Variables - Configure host/port from environment
- Implement Health Checks - Monitor connection health
- Add Retry Logic - Handle transient failures (see Resilience Guide)
- Secure with mTLS - Always use encryption for production
- Pool Connections - Reuse connections efficiently
- Load Balance - Distribute load across multiple servers
- Service Discovery - Integrate with service registries
- Monitor Metrics - Track connection performance
Next Steps¶
- Connection Resilience - Health monitoring, retry logic, connection pools
- Client Development Guide - Basic client setup and configuration
- Security Guide - Comprehensive mTLS setup
- Examples - Complete working examples