Testing Guide¶
Comprehensive testing guidance for Pyvider RPC Plugin applications, covering unit testing, integration testing, mocking strategies, and best practices for servers, clients, transports, and configurations.
Overview¶
The Pyvider RPC Plugin system provides: - pytest Framework: Modern async Python testing - Configuration Isolation: Automatic config reset between tests - Transport Testing: Specialized fixtures for Unix/TCP testing - Mock Support: Comprehensive mocking for components - Security Testing: mTLS and authentication testing
Testing philosophy: Test components in isolation with integration testing support.
Test Structure¶
tests/
├── conftest.py # Global fixtures and configuration
├── fixtures/ # Reusable test fixtures
│ ├── mocks.py # Mock implementations
│ └── crypto.py # Certificate fixtures
├── unit/ # Unit tests
├── integration/ # Integration tests
└── transport/ # Transport-specific tests
Dependencies: pytest, pytest-asyncio, pytest-cov, pytest-mock
Configuration Management¶
Automatic Reset and Test Configuration¶
# conftest.py - Automatic config isolation
@pytest.fixture(autouse=True, scope="function")
def reset_rpcplugin_config_singleton():
"""Reset RPCPluginConfig singleton before each test."""
# Handles environment cleanup and state reset
@pytest.fixture
def test_config():
"""Test-specific configuration with cleanup."""
original_env = {}
test_vars = {
'PLUGIN_LOG_LEVEL': 'WARNING',
'PLUGIN_AUTO_MTLS': 'true',
'PLUGIN_HANDSHAKE_TIMEOUT': '5.0',
'PLUGIN_CLIENT_RETRY_ENABLED': 'false',
'PLUGIN_RATE_LIMIT_ENABLED': 'false',
}
for key, value in test_vars.items():
original_env[key] = os.environ.get(key)
os.environ[key] = value
yield
# Restore environment
for key, original_value in original_env.items():
if original_value is None:
os.environ.pop(key, None)
else:
os.environ[key] = original_value
Transport Testing¶
Transport Factory and Core Tests¶
import pytest
import uuid
from pathlib import Path
from pyvider.rpcplugin.transport import UnixSocketTransport, TCPSocketTransport
@pytest.fixture
async def transport_factory(tmp_path: Path):
"""Factory for creating isolated transport instances with cleanup."""
created_transports = []
async def create(transport_type: str, **kwargs):
if transport_type == "unix":
socket_path = kwargs.pop('path', None)
if not socket_path:
socket_name = f"test_{uuid.uuid4().hex[:8]}.sock"
socket_path = str(tmp_path / socket_name)
transport = UnixSocketTransport(path=socket_path)
elif transport_type == "tcp":
port = kwargs.get('port', 0) # 0 = auto-assign
host = kwargs.get('host', '127.0.0.1')
transport = TCPSocketTransport(host=host, port=port)
else:
raise ValueError(f"Unknown transport type: {transport_type}")
created_transports.append(transport)
return transport
yield create
# Cleanup
for transport in created_transports:
try:
await transport.close()
except Exception:
pass
@pytest.fixture
def unused_tcp_port():
"""Find unused TCP port."""
import socket
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
s.bind(("127.0.0.1", 0))
return s.getsockname()[1]
# Unix Socket Tests
@pytest.mark.asyncio
async def test_unix_socket_lifecycle(transport_factory):
"""Test Unix socket creation, connection, and cleanup."""
transport = await transport_factory("unix")
endpoint = await transport.listen()
assert endpoint.endswith(".sock")
# Test connection
client = await transport_factory("unix")
await client.connect(endpoint)
assert client.endpoint == endpoint
@pytest.mark.asyncio
async def test_unix_socket_permissions(transport_factory):
"""Test Unix socket file permissions."""
transport = await transport_factory("unix")
await transport.listen()
import os, stat
stat_result = os.stat(transport.endpoint)
mode = stat_result.st_mode
assert stat.S_ISSOCK(mode)
assert mode & stat.S_IRUSR and mode & stat.S_IWUSR # Owner RW
# TCP Transport Tests
@pytest.mark.asyncio
async def test_tcp_transport_lifecycle(transport_factory, unused_tcp_port):
"""Test TCP transport creation and connection."""
transport = await transport_factory("tcp", port=unused_tcp_port)
endpoint = await transport.listen()
assert endpoint == f"127.0.0.1:{unused_tcp_port}"
client = await transport_factory("tcp")
await client.connect(endpoint)
assert client.endpoint == endpoint
@pytest.mark.asyncio
async def test_tcp_auto_port_assignment(transport_factory):
"""Test automatic port assignment."""
transport = await transport_factory("tcp", port=0)
endpoint = await transport.listen()
host, port_str = endpoint.split(":")
assert host == "127.0.0.1"
assert int(port_str) > 0
Server Testing¶
Mock Components and Server Lifecycle¶
# tests/fixtures/mocks.py
from typing import Any
from pyvider.rpcplugin.protocol.base import RPCPluginProtocol
class MockProtocol(RPCPluginProtocol):
"""Mock protocol for testing."""
def __init__(self, service_name="TestService"):
super().__init__()
self.service_name = service_name
self.add_to_server_called = False
async def get_grpc_descriptors(self):
return None, self.service_name
async def add_to_server(self, server: Any, handler: Any):
self.add_to_server_called = True
def get_method_type(self, method_name: str) -> str:
return "unary_unary"
class MockHandler:
"""Mock handler for testing."""
def __init__(self):
self.method_calls = []
async def test_method(self, request, context):
self.method_calls.append(("test_method", request))
return {"result": "test_response"}
@pytest.fixture
def mock_protocol():
return MockProtocol(service_name="TestService")
@pytest.fixture
def mock_handler():
return MockHandler()
# Server Tests
from pyvider.rpcplugin.server import RPCPluginServer
from pyvider.rpcplugin.factories import plugin_server
@pytest.mark.asyncio
async def test_server_creation_and_config(mock_protocol, mock_handler, transport_factory):
"""Test server creation and configuration."""
transport = await transport_factory("unix")
server = RPCPluginServer(
protocol=mock_protocol,
handler=mock_handler,
transport=transport
)
assert server.protocol == mock_protocol
assert server.handler == mock_handler
assert server.transport == transport
@pytest.mark.asyncio
async def test_server_factory_with_config(mock_protocol, mock_handler):
"""Test server creation via factory with config overrides."""
server = plugin_server(
protocol=mock_protocol,
handler=mock_handler,
transport="tcp",
port=0, # Auto-assign port
config={
"PLUGIN_LOG_LEVEL": "WARNING",
"PLUGIN_HANDSHAKE_TIMEOUT": "5.0",
"PLUGIN_RATE_LIMIT_ENABLED": True,
}
)
assert server.protocol == mock_protocol
assert server.config["PLUGIN_RATE_LIMIT_ENABLED"] == True
@pytest.mark.asyncio
async def test_server_lifecycle(mock_protocol, mock_handler, transport_factory):
"""Test server startup, readiness, and shutdown."""
import asyncio
transport = await transport_factory("unix")
server = RPCPluginServer(mock_protocol, mock_handler, transport)
# Start server
server_task = asyncio.create_task(server.serve())
try:
await server.wait_for_server_ready(timeout=5.0)
assert server._running
assert transport.endpoint is not None
# Test graceful shutdown
await server.stop()
await asyncio.wait_for(server_task, timeout=5.0)
except Exception:
server_task.cancel()
try:
await asyncio.wait_for(server_task, timeout=1.0)
except asyncio.TimeoutError:
pass
raise
Client Testing¶
Mock Client and Integration Testing¶
from unittest.mock import AsyncMock
from pyvider.rpcplugin.factories import plugin_client
@pytest.fixture
def mock_client():
"""Create mock client for testing."""
client = plugin_client(
command=["echo", "test"],
config={"PLUGIN_LOG_LEVEL": "WARNING"}
)
# Mock async methods to avoid real process creation
client.start = AsyncMock()
client.shutdown_plugin = AsyncMock()
client.close = AsyncMock()
client.grpc_channel = AsyncMock()
return client
@pytest.mark.asyncio
async def test_mock_client_lifecycle(mock_client):
"""Test client lifecycle with mocks."""
await mock_client.start()
assert mock_client.start.called
assert mock_client.grpc_channel is not None
await mock_client.shutdown_plugin()
await mock_client.close()
assert mock_client.shutdown_plugin.called
assert mock_client.close.called
@pytest.mark.asyncio
async def test_client_configuration(test_config):
"""Test client with configuration overrides."""
client = plugin_client(
command=["python", "-c", "print('test')"],
config={
"env": {
"PLUGIN_LOG_LEVEL": "DEBUG",
"TEST_VAR": "test_value"
}
}
)
assert client.config["env"]["PLUGIN_LOG_LEVEL"] == "DEBUG"
assert client.config["env"]["TEST_VAR"] == "test_value"
@pytest.mark.asyncio
async def test_client_server_integration_pattern(mock_protocol, mock_handler):
"""Integration test pattern (components tested separately)."""
# For full integration tests:
# 1. Start real server with proper handshake
# 2. Start client subprocess
# 3. Perform gRPC calls
# 4. Verify responses and clean shutdown
server = plugin_server(
protocol=mock_protocol,
handler=mock_handler,
transport="unix"
)
client = plugin_client(command=["echo", "mock_plugin"])
assert server is not None
assert client is not None
# Complex integration tests require proper handshake implementation
Exception Testing¶
Exception Hierarchy and Error Simulation¶
import pytest
from pyvider.rpcplugin.exception import *
def test_exception_hierarchy_and_attributes():
"""Test exception inheritance and attribute handling."""
# Test hierarchy - all inherit from RPCPluginError
config_error = ConfigError("test")
transport_error = TransportError("Connection failed",
hint="Check network",
code="TRANSPORT_001")
handshake_error = HandshakeError("test")
protocol_error = ProtocolError("test")
security_error = SecurityError("test")
for error in [config_error, transport_error, handshake_error,
protocol_error, security_error]:
assert isinstance(error, RPCPluginError)
# Test attributes
assert transport_error.message == "Connection failed"
assert transport_error.hint == "Check network"
assert transport_error.code == "TRANSPORT_001"
# Test string representation contains key information
error_str = str(transport_error)
assert "TransportError" in error_str
assert "Connection failed" in error_str
def test_exception_chaining():
"""Test exception chaining with 'from' clause."""
original_error = OSError("Network unreachable")
try:
raise TransportError("Failed to connect") from original_error
except TransportError as e:
assert e.__cause__ is original_error
@pytest.mark.asyncio
async def test_transport_error_conditions(transport_factory):
"""Test transport error simulation."""
# Test connection to non-existent socket
transport = await transport_factory("unix")
with pytest.raises(TransportError) as exc_info:
await transport.connect("/tmp/nonexistent.sock")
assert "does not exist" in exc_info.value.message.lower()
@pytest.mark.asyncio
async def test_port_conflict_error(transport_factory):
"""Test TCP port conflict handling."""
transport1 = await transport_factory("tcp", port=0)
endpoint = await transport1.listen()
port = int(endpoint.split(":")[1])
transport2 = await transport_factory("tcp", port=port)
with pytest.raises(TransportError):
await transport2.listen()
Performance Testing¶
Load and Memory Testing¶
import asyncio
import time
import gc
import os
@pytest.mark.asyncio
async def test_concurrent_connections(transport_factory):
"""Test multiple concurrent connections."""
server_transport = await transport_factory("unix")
endpoint = await server_transport.listen()
# Create multiple client connections
num_clients = 10
client_tasks = []
for i in range(num_clients):
client_transport = await transport_factory("unix")
task = asyncio.create_task(client_transport.connect(endpoint))
client_tasks.append(task)
start_time = time.time()
await asyncio.gather(*client_tasks)
duration = time.time() - start_time
assert duration < 5.0 # All connections within 5 seconds
print(f"Connected {num_clients} clients in {duration:.2f}s")
@pytest.mark.asyncio
async def test_server_startup_performance(mock_protocol, mock_handler, transport_factory):
"""Test server performance metrics."""
server = plugin_server(protocol=mock_protocol, handler=mock_handler, transport="unix")
start_time = time.time()
server_task = asyncio.create_task(server.serve())
await server.wait_for_server_ready(timeout=10.0)
startup_time = time.time() - start_time
try:
assert startup_time < 2.0 # Start within 2 seconds
# Test shutdown time
shutdown_start = time.time()
await server.stop()
await asyncio.wait_for(server_task, timeout=5.0)
shutdown_time = time.time() - shutdown_start
assert shutdown_time < 1.0 # Shutdown within 1 second
except Exception:
await server.stop()
server_task.cancel()
raise
def test_memory_usage_pattern():
"""Test memory usage with resource cleanup."""
try:
import psutil
process = psutil.Process(os.getpid())
initial_memory = process.memory_info().rss / 1024 / 1024 # MB
except ImportError:
pytest.skip("psutil not available for memory testing")
# Create and destroy many transports
transports = []
for _ in range(100):
transport = UnixSocketTransport()
transports.append(transport)
peak_memory = process.memory_info().rss / 1024 / 1024 # MB
# Clean up
transports.clear()
gc.collect()
final_memory = process.memory_info().rss / 1024 / 1024 # MB
memory_growth = final_memory - initial_memory
assert memory_growth < 10.0 # Less than 10MB growth
Security Testing¶
Certificate and Authentication Testing¶
from provide.foundation.crypto import Certificate
from unittest.mock import patch
@pytest.fixture
def test_certificates():
"""Generate test certificates for security testing."""
ca_cert = Certificate.create_ca(
common_name="Test CA",
organization_name="Test Org",
validity_days=365
)
server_cert = Certificate.create_self_signed_server_cert(
common_name="test-server",
organization_name="Test Org",
validity_days=90,
alt_names=["localhost", "127.0.0.1"]
)
client_cert = Certificate(generate_keypair=True, key_type="ecdsa")
return {
"ca": ca_cert,
"server": server_cert,
"client": client_cert
}
def test_certificate_generation(test_certificates):
"""Test certificate generation and validation."""
certs = test_certificates
# Verify all certificates generated
for cert_type in ["ca", "server", "client"]:
assert certs[cert_type].cert is not None
assert certs[cert_type].key is not None
# Verify certificate format
assert "-----BEGIN CERTIFICATE-----" in certs["server"].cert
assert "-----BEGIN PRIVATE KEY-----" in certs["server"].key
@pytest.mark.asyncio
async def test_mtls_configuration(test_certificates, mock_protocol, mock_handler):
"""Test mTLS configuration."""
certs = test_certificates
server = plugin_server(
protocol=mock_protocol,
handler=mock_handler,
transport="tcp",
port=0,
config={
"PLUGIN_AUTO_MTLS": False,
"PLUGIN_SERVER_CERT": certs["server"].cert,
"PLUGIN_SERVER_KEY": certs["server"].key,
"PLUGIN_CLIENT_ROOT_CERTS": certs["ca"].cert,
}
)
assert server.config["PLUGIN_AUTO_MTLS"] == False
assert certs["server"].cert in server.config["PLUGIN_SERVER_CERT"]
def test_magic_cookie_validation():
"""Test magic cookie authentication."""
with patch.dict(os.environ, {
'PLUGIN_MAGIC_COOKIE_KEY': 'TEST_COOKIE',
'PLUGIN_MAGIC_COOKIE_VALUE': 'valid-cookie-123',
'TEST_COOKIE': 'valid-cookie-123'
}):
from pyvider.rpcplugin.config import rpcplugin_config
assert rpcplugin_config.plugin_magic_cookie_key == 'TEST_COOKIE'
assert rpcplugin_config.plugin_magic_cookie_value == 'valid-cookie-123'
Best Practices¶
Test Organization and Patterns¶
# ✅ Good: Organize tests by component with descriptive names
class TestUnixSocketTransport:
"""Test suite for Unix socket transport."""
@pytest.mark.asyncio
async def test_basic_lifecycle(self, transport_factory):
"""Test Unix socket creation and cleanup."""
pass
@pytest.mark.asyncio
async def test_permission_handling(self, transport_factory):
"""Test file permission validation."""
pass
# ✅ Good: Descriptive test names
@pytest.mark.asyncio
async def test_server_starts_successfully_with_unix_transport():
pass
# ❌ Avoid: Generic test names
def test_server():
pass
Fixture and Assertion Patterns¶
# ✅ Good: Proper fixture cleanup
@pytest.fixture
async def managed_server(mock_protocol, mock_handler):
"""Server fixture with guaranteed cleanup."""
server = plugin_server(protocol=mock_protocol, handler=mock_handler)
server_task = None
try:
server_task = asyncio.create_task(server.serve())
await server.wait_for_server_ready(timeout=5.0)
yield server
finally:
await server.stop()
if server_task:
await asyncio.wait_for(server_task, timeout=5.0)
# ✅ Good: Specific assertions with context
def test_endpoint_format():
transport = TCPSocketTransport(host="127.0.0.1", port=8080)
endpoint = await transport.listen()
assert endpoint == "127.0.0.1:8080", f"Expected format, got: {endpoint}"
host, port = endpoint.split(":")
assert host == "127.0.0.1" and int(port) == 8080
# ✅ Good: Test error conditions explicitly
def test_config_error_handling():
with pytest.raises(ConfigError) as exc_info:
raise ConfigError("Invalid value", hint="Use valid option")
error = exc_info.value
assert "Invalid value" in error.message
assert error.hint == "Use valid option"
Running Tests and Markers¶
# Basic test execution
pytest # Run all tests
pytest --cov=pyvider # With coverage
pytest -v # Verbose output
pytest -k "transport" # Filter by name
pytest -m "not slow" # Skip slow tests
# Parallel execution (with pytest-xdist)
pytest -n auto
# Test markers (define in pyproject.toml)
@pytest.mark.slow
@pytest.mark.asyncio
async def test_large_scale_connections():
"""Test with many concurrent connections."""
pass
@pytest.mark.integration
@pytest.mark.asyncio
async def test_full_client_server_workflow():
"""Full integration test."""
pass
@pytest.mark.unit
def test_config_validation():
"""Fast unit test."""
pass
Related Documentation¶
- Configuration - Configuration testing patterns
- Exception Handling - Testing error conditions
- Server API - Server testing specifics
- Client API - Client testing specifics
- Transport Layer - Transport testing details