Direct Connections¶
Connect directly to running plugin servers without launching subprocess, ideal for microservices, distributed systems, and server-to-server communication.
Overview¶
Direct connections enable clients to connect to already-running plugin servers via network protocols (TCP or Unix sockets), bypassing subprocess management.
Use Cases: - Microservice architectures - Load-balanced plugin pools - Cross-machine communication - Containerized deployments - Development and testing
Basic Direct Connection¶
TCP Connection¶
from pyvider.rpcplugin import plugin_client
from provide.foundation import logger
async def connect_to_server():
"""Connect directly to a TCP server."""
# Connect to running server
client = await plugin_client(
host="localhost",
port=50051,
skip_subprocess=True # Direct connection
)
try:
logger.info("Connected to server")
# Make RPC calls
response = await client.call("Echo", message="Hello")
logger.info(f"Response: {response.message}")
finally:
await client.close()
# Run
await connect_to_server()
Unix Socket Connection¶
async def connect_unix_socket():
"""Connect via Unix domain socket."""
client = await plugin_client(
unix_socket="/tmp/plugin.sock",
skip_subprocess=True
)
try:
response = await client.call("Process", data={"key": "value"})
return response
finally:
await client.close()
Configuration¶
Environment-Based Setup¶
import os
from dataclasses import dataclass
@dataclass
class DirectConnectionConfig:
"""Configuration for direct connections."""
host: str = os.environ.get("PLUGIN_SERVER_HOST", "localhost")
port: int = int(os.environ.get("PLUGIN_SERVER_PORT", "50051"))
use_tls: bool = os.environ.get("PLUGIN_USE_TLS", "false").lower() == "true"
timeout: float = float(os.environ.get("PLUGIN_TIMEOUT", "30.0"))
# mTLS configuration
client_cert: str | None = os.environ.get("PLUGIN_CLIENT_CERT")
client_key: str | None = os.environ.get("PLUGIN_CLIENT_KEY")
ca_cert: str | None = os.environ.get("PLUGIN_CA_CERT")
# Usage
config = DirectConnectionConfig()
client = await plugin_client(
host=config.host,
port=config.port,
skip_subprocess=True,
timeout=config.timeout,
client_cert=config.client_cert,
client_key=config.client_key
)
Advanced Patterns¶
Connection Pool¶
import asyncio
from collections import deque
from provide.foundation import logger
class ConnectionPool:
"""Manage pool of direct connections."""
def __init__(self, host: str, port: int, size: int = 5):
self.host = host
self.port = port
self.size = size
self.available = deque()
self.in_use = set()
self._lock = asyncio.Lock()
async def initialize(self):
"""Create initial connections."""
for _ in range(self.size):
client = await plugin_client(
host=self.host,
port=self.port,
skip_subprocess=True
)
self.available.append(client)
logger.info(f"Pool initialized with {self.size} connections")
async def acquire(self):
"""Get connection from pool."""
async with self._lock:
if self.available:
client = self.available.popleft()
self.in_use.add(client)
return client
# Create new connection if under limit
if len(self.in_use) < self.size * 2:
client = await plugin_client(
host=self.host,
port=self.port,
skip_subprocess=True
)
self.in_use.add(client)
return client
# Wait for available connection
while not self.available:
await asyncio.sleep(0.1)
client = self.available.popleft()
self.in_use.add(client)
return client
async def release(self, client):
"""Return connection to pool."""
async with self._lock:
self.in_use.discard(client)
# Check health before returning to pool
try:
await client.health_check()
self.available.append(client)
except:
await client.close()
logger.warning("Unhealthy connection removed from pool")
async def close_all(self):
"""Close all connections."""
all_clients = list(self.available) + list(self.in_use)
for client in all_clients:
await client.close()
self.available.clear()
self.in_use.clear()
# Usage
pool = ConnectionPool("localhost", 50051, size=10)
await pool.initialize()
# Use connections
client = await pool.acquire()
try:
result = await client.call("Process", data={"test": "data"})
finally:
await pool.release(client)
Load Balancing¶
import random
from typing import Any
class LoadBalancedClient:
"""Client with load balancing across servers."""
def __init__(self, endpoints: list[dict[str, Any]]):
"""
Args:
endpoints: List of server configurations
[{"host": "server1", "port": 50051, "weight": 2},
{"host": "server2", "port": 50052, "weight": 1}]
"""
self.endpoints = endpoints
self.clients = []
self.weights = []
async def connect_all(self):
"""Connect to all endpoints."""
for endpoint in self.endpoints:
client = await plugin_client(
host=endpoint["host"],
port=endpoint["port"],
skip_subprocess=True
)
self.clients.append(client)
self.weights.append(endpoint.get("weight", 1))
logger.info(f"Connected to {len(self.clients)} servers")
def select_client(self, strategy: str = "weighted"):
"""Select client based on strategy."""
if strategy == "random":
return random.choice(self.clients)
elif strategy == "weighted":
return random.choices(self.clients, weights=self.weights)[0]
elif strategy == "round_robin":
# Rotate through clients
client = self.clients.pop(0)
self.clients.append(client)
return client
else:
return self.clients[0]
async def call(self, method: str, **kwargs):
"""Make load-balanced RPC call."""
client = self.select_client()
try:
return await client.call(method, **kwargs)
except Exception as e:
logger.error(f"Call failed on {client.host}:{client.port}: {e}")
# Retry on different server
for other_client in self.clients:
if other_client != client:
try:
return await other_client.call(method, **kwargs)
except:
continue
raise Exception("All servers failed")
async def close_all(self):
"""Close all client connections."""
for client in self.clients:
await client.close()
# Usage
endpoints = [
{"host": "server1.internal", "port": 50051, "weight": 3},
{"host": "server2.internal", "port": 50051, "weight": 2},
{"host": "server3.internal", "port": 50051, "weight": 1}
]
lb_client = LoadBalancedClient(endpoints)
await lb_client.connect_all()
result = await lb_client.call("Process", data={"request": "data"})
Secure Connections¶
mTLS Direct Connection¶
from provide.foundation.crypto import Certificate
import ssl
import tempfile
async def secure_direct_connection():
"""Establish mTLS direct connection."""
from pathlib import Path
# Load certificates using Foundation - read PEM content from files
client_cert_content = Path("/etc/ssl/client.pem").read_text()
client_key_content = Path("/etc/ssl/client.key").read_text()
client_cert = Certificate.from_pem(
cert_pem=client_cert_content,
key_pem=client_key_content
)
ca_cert_content = Path("/etc/ssl/ca.pem").read_text()
ca_cert = Certificate.from_pem(cert_pem=ca_cert_content)
# Create SSL context with certificate PEM strings
# Note: ssl.load_cert_chain requires file paths, so write to temp files
ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
with tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') as cert_file, \
tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.key') as key_file, \
tempfile.NamedTemporaryFile(mode='w', delete=False, suffix='.pem') as ca_file:
cert_file.write(client_cert.cert_pem)
key_file.write(client_cert.key_pem)
ca_file.write(ca_cert.cert_pem)
cert_file.flush()
key_file.flush()
ca_file.flush()
ssl_context.load_cert_chain(cert_file.name, key_file.name)
ssl_context.load_verify_locations(ca_file.name)
ssl_context.check_hostname = True
# Connect with mTLS
client = await plugin_client(
host="secure.example.com",
port=443,
skip_subprocess=True,
ssl_context=ssl_context
)
try:
response = await client.call("SecureMethod", sensitive_data="...")
return response
finally:
await client.close()
Authentication¶
class AuthenticatedDirectClient:
"""Direct client with authentication."""
def __init__(self, host: str, port: int, api_key: str):
self.host = host
self.port = port
self.api_key = api_key
self.client = None
self.token = None
async def connect(self):
"""Connect and authenticate."""
self.client = await plugin_client(
host=self.host,
port=self.port,
skip_subprocess=True
)
# Authenticate
auth_response = await self.client.call(
"auth.Authenticate",
api_key=self.api_key
)
self.token = auth_response.token
logger.info("Authenticated successfully")
async def call(self, method: str, **kwargs):
"""Make authenticated call."""
if not self.token:
await self.connect()
# Add auth token to metadata
metadata = [("authorization", f"Bearer {self.token}")]
return await self.client.call(
method,
metadata=metadata,
**kwargs
)
async def close(self):
"""Close connection."""
if self.client:
await self.client.close()
Service Discovery¶
Consul Integration¶
import consul.aio
class ConsulServiceDiscovery:
"""Discover plugin servers via Consul."""
def __init__(self, consul_host: str = "localhost", consul_port: int = 8500):
self.consul = consul.aio.Consul(host=consul_host, port=consul_port)
self.services = {}
async def discover_service(self, service_name: str):
"""Discover service endpoints from Consul."""
_, services = await self.consul.health.service(
service_name,
passing=True # Only healthy instances
)
endpoints = []
for service in services:
endpoints.append({
"host": service["Service"]["Address"],
"port": service["Service"]["Port"],
"id": service["Service"]["ID"]
})
self.services[service_name] = endpoints
logger.info(f"Discovered {len(endpoints)} {service_name} instances")
return endpoints
async def connect_to_service(self, service_name: str):
"""Connect to discovered service."""
if service_name not in self.services:
await self.discover_service(service_name)
endpoints = self.services[service_name]
if not endpoints:
raise Exception(f"No healthy {service_name} instances found")
# Connect to first available
for endpoint in endpoints:
try:
client = await plugin_client(
host=endpoint["host"],
port=endpoint["port"],
skip_subprocess=True
)
logger.info(f"Connected to {service_name} at {endpoint['host']}:{endpoint['port']}")
return client
except Exception as e:
logger.warning(f"Failed to connect to {endpoint['id']}: {e}")
continue
raise Exception(f"Could not connect to any {service_name} instance")
# Usage
discovery = ConsulServiceDiscovery()
client = await discovery.connect_to_service("payment-processor")
Health Monitoring¶
Health Check Client¶
class HealthMonitoringClient:
"""Client with health monitoring."""
def __init__(self, host: str, port: int):
self.host = host
self.port = port
self.client = None
self.healthy = False
self.last_health_check = 0
async def ensure_connected(self):
"""Ensure healthy connection."""
current_time = time.time()
# Check health periodically
if current_time - self.last_health_check > 30:
await self.health_check()
if not self.healthy:
await self.reconnect()
async def health_check(self):
"""Check server health."""
try:
if self.client:
response = await self.client.call("health.Check", service="")
self.healthy = response.status == "SERVING"
self.last_health_check = time.time()
if not self.healthy:
logger.warning(f"Server unhealthy: {response.status}")
except Exception as e:
logger.error(f"Health check failed: {e}")
self.healthy = False
async def reconnect(self):
"""Reconnect to server."""
if self.client:
await self.client.close()
try:
self.client = await plugin_client(
host=self.host,
port=self.port,
skip_subprocess=True
)
await self.health_check()
logger.info("Reconnected successfully")
except Exception as e:
logger.error(f"Reconnection failed: {e}")
self.healthy = False
async def call(self, method: str, **kwargs):
"""Make monitored call."""
await self.ensure_connected()
if not self.healthy:
raise Exception("Server is not healthy")
return await self.client.call(method, **kwargs)
Testing¶
Mock Direct Server¶
import pytest
from unittest.mock import AsyncMock
@pytest.fixture
async def mock_direct_server():
"""Create mock server for testing."""
class MockServer:
def __init__(self):
self.host = "localhost"
self.port = 50051
self.calls = []
async def handle_call(self, method: str, **kwargs):
self.calls.append((method, kwargs))
return {"success": True, "method": method}
server = MockServer()
# Start mock server (implementation depends on framework)
# ...
yield server
# Cleanup
# ...
@pytest.mark.asyncio
async def test_direct_connection(mock_direct_server):
"""Test direct connection."""
client = await plugin_client(
host=mock_direct_server.host,
port=mock_direct_server.port,
skip_subprocess=True
)
result = await client.call("TestMethod", param="value")
assert result["success"] == True
assert len(mock_direct_server.calls) == 1
assert mock_direct_server.calls[0] == ("TestMethod", {"param": "value"})
Best Practices¶
- Connection Management: Always use connection pools for production
- Health Checks: Implement regular health monitoring
- Retry Logic: Add retry with exponential backoff for resilience
- Load Balancing: Distribute load across multiple servers
- Security: Use mTLS for secure connections
- Service Discovery: Integrate with service registries
- Monitoring: Track connection metrics and performance
- Graceful Shutdown: Close connections properly