Skip to content

Error Handling

Robust plugin systems require comprehensive error handling. Pyvider RPC Plugin provides a structured exception hierarchy and patterns for building resilient applications.

Exception Hierarchy

Core Exceptions

All plugin-related exceptions inherit from the base RPCPluginError:

from pyvider.rpcplugin.exception import (
    RPCPluginError,      # Base exception
    TransportError,      # Network/transport issues  
    HandshakeError,      # Connection establishment failures
    ProtocolError,       # Protocol violations
    SecurityError,       # Authentication/encryption issues
    ConfigError,         # Configuration problems
)

Exception Details

Each exception provides helpful context:

try:
    await client.start()
except RPCPluginError as e:
    print(f"Error: {e.message}")     # Human-readable description
    print(f"Hint: {e.hint}")         # Resolution suggestion
    print(f"Code: {e.code}")         # Optional error code

Specific Exception Types

TransportError

When it occurs: Network connection issues, socket errors, transport setup failures

Common scenarios: - Plugin process fails to start - Network connectivity problems - Port conflicts or binding issues - Unix socket permission problems

try:
    async with plugin_client(command=["python", "my_plugin.py"]) as client:
        await client.start()
        # Use client...
except TransportError as e:
    logger.error(f"❌ Connection failed: {e.message}")
    if "port" in str(e).lower():
        # Transport errors usually require recreating the server
        # Port is a factory parameter, not a config option
        logger.info("Recreate server with: plugin_server(port=0)  # Auto-assign")
    elif "permission" in str(e).lower():
        # Socket permission issues require OS-level fixes
        logger.info("Fix socket permissions at OS level or recreate with different path")

HandshakeError

When it occurs: Authentication and protocol negotiation failures

Common scenarios: - Magic cookie mismatch - Protocol version incompatibility
- mTLS certificate validation failures - Handshake timeout

try:
    await client.start()
except HandshakeError as e:
    logger.error(f"🤝 Handshake failed: {e.message}")
    if "magic cookie" in str(e).lower():
        # Check magic cookie configuration
        cookie_key = rpcplugin_config.plugin_magic_cookie_key
        logger.error(f"Expected cookie key: {cookie_key}")
    elif "certificate" in str(e).lower():
        # Check mTLS configuration
        logger.error("Verify certificate paths and validity")

SecurityError

When it occurs: Security and certificate-related issues

Common scenarios: - Invalid certificate files - Expired certificates - Key/certificate mismatches - CA trust issues

from pyvider.rpcplugin.exception import SecurityError

try:
    configure(
        auto_mtls=True,
        server_cert="file:///path/to/cert.pem",
        server_key="file:///path/to/key.pem"
    )
except SecurityError as e:
    logger.error(f"🔐 Security/Certificate error: {e.message}")
    # Check certificate validity
    # openssl x509 -in cert.pem -text -noout

ConfigError

When it occurs: Configuration validation and loading issues

Common scenarios: - Missing required configuration - Invalid configuration values - Environment variable parsing errors

try:
    server = plugin_server(protocol=my_protocol, handler=my_handler)
except ConfigError as e:
    logger.error(f"⚙️ Configuration error: {e.message}")
    if e.hint:
        logger.error(f"💡 Hint: {e.hint}")

gRPC Errors

After successful connection, RPC method calls may raise gRPC errors:

import grpc
from my_service_pb2_grpc import MyServiceStub

async def make_rpc_call():
    try:
        stub = MyServiceStub(client.grpc_channel)
        response = await stub.MyMethod(request)
        return response
    except grpc.aio.AioRpcError as e:
        status_code = e.code()
        error_details = e.details()

        if status_code == grpc.StatusCode.UNAVAILABLE:
            logger.warning("🔄 Service temporarily unavailable")
            # Implement retry logic
        elif status_code == grpc.StatusCode.UNIMPLEMENTED:
            logger.error(f"❌ Method not implemented: {error_details}")
            # Handle unsupported operation
        elif status_code == grpc.StatusCode.DEADLINE_EXCEEDED:
            logger.error("⏰ Request timeout")
            # Increase timeout or optimize request
        else:
            logger.error(f"🚨 RPC error: {status_code} - {error_details}")

Error Handling Patterns

1. Graceful Degradation

Provide fallback functionality when plugins fail:

async def resilient_plugin_call():
    """Call plugin with fallback to local implementation."""
    try:
        # Try plugin first
        result = await plugin_method()
        return result
    except (TransportError, HandshakeError) as e:
        logger.warning(f"Plugin unavailable: {e}, using fallback")
        # Fallback to local implementation
        return local_fallback_method()
    except grpc.aio.AioRpcError as e:
        if e.code() == grpc.StatusCode.UNAVAILABLE:
            logger.warning("Plugin service unavailable, using cache")
            return cached_result()
        raise  # Re-raise unexpected gRPC errors

2. Retry with Exponential Backoff

Handle transient failures with intelligent retry:

import asyncio
import random
from typing import Callable, Any

async def retry_with_backoff(
    func: Callable,
    max_retries: int = 3,
    base_delay: float = 1.0,
    max_delay: float = 60.0,
    backoff_factor: float = 2.0,
    jitter: bool = True
) -> Any:
    """Retry function with exponential backoff."""

    for attempt in range(max_retries):
        try:
            return await func()
        except (TransportError, grpc.aio.AioRpcError) as e:
            # Check if error is retryable
            if isinstance(e, grpc.aio.AioRpcError):
                if e.code() not in [grpc.StatusCode.UNAVAILABLE, 
                                   grpc.StatusCode.RESOURCE_EXHAUSTED,
                                   grpc.StatusCode.DEADLINE_EXCEEDED]:
                    raise  # Don't retry non-transient errors

            if attempt == max_retries - 1:
                logger.error(f"❌ All {max_retries} attempts failed")
                raise

            # Calculate delay with exponential backoff
            delay = min(base_delay * (backoff_factor ** attempt), max_delay)
            if jitter:
                delay *= (0.5 + random.random() * 0.5)  # Add jitter

            logger.warning(f"🔄 Attempt {attempt + 1} failed: {e}")
            logger.info(f"⏰ Retrying in {delay:.1f}s...")
            await asyncio.sleep(delay)

# Usage
async def make_reliable_call():
    return await retry_with_backoff(
        lambda: stub.MyMethod(request),
        max_retries=3
    )

3. Circuit Breaker Pattern

Prevent cascading failures with circuit breaker:

import time
from enum import Enum

class CircuitState(Enum):
    CLOSED = "closed"      # Normal operation
    OPEN = "open"         # Failing, reject calls
    HALF_OPEN = "half_open"  # Testing recovery

class CircuitBreaker:
    def __init__(
        self,
        failure_threshold: int = 5,
        recovery_timeout: float = 30.0,
        success_threshold: int = 2
    ):
        self.failure_threshold = failure_threshold
        self.recovery_timeout = recovery_timeout  
        self.success_threshold = success_threshold

        self.failure_count = 0
        self.success_count = 0
        self.last_failure_time = 0
        self.state = CircuitState.CLOSED

    async def call(self, func: Callable) -> Any:
        """Execute function with circuit breaker protection."""

        # Check if we should attempt recovery
        if (self.state == CircuitState.OPEN and 
            time.time() - self.last_failure_time >= self.recovery_timeout):
            self.state = CircuitState.HALF_OPEN
            self.success_count = 0
            logger.info("🔄 Circuit breaker: HALF_OPEN")

        # Reject calls if circuit is open
        if self.state == CircuitState.OPEN:
            raise TransportError("Circuit breaker is OPEN")

        try:
            result = await func()

            # Success handling
            if self.state == CircuitState.HALF_OPEN:
                self.success_count += 1
                if self.success_count >= self.success_threshold:
                    self._reset()
                    logger.info("✅ Circuit breaker: CLOSED")

            return result

        except Exception as e:
            self._record_failure()
            raise e

    def _record_failure(self):
        """Record a failure."""
        self.failure_count += 1
        self.last_failure_time = time.time()

        if self.failure_count >= self.failure_threshold:
            self.state = CircuitState.OPEN
            logger.warning(f"🚫 Circuit breaker: OPEN after {self.failure_count} failures")

    def _reset(self):
        """Reset circuit to closed state."""
        self.failure_count = 0
        self.success_count = 0
        self.state = CircuitState.CLOSED

# Usage
circuit_breaker = CircuitBreaker(failure_threshold=3)

async def protected_plugin_call():
    return await circuit_breaker.call(
        lambda: stub.MyMethod(request)
    )

4. Health Monitoring

Proactively monitor plugin health:

async def monitor_plugin_health(client: RPCPluginClient):
    """Monitor plugin health and restart if needed."""

    while True:
        try:
            # Check if client is connected
            if not client.is_started:
                logger.warning("🔄 Plugin disconnected, reconnecting...")
                await client.start()
                continue

            # Optional: Make health check RPC
            # health_response = await health_stub.Check(HealthCheckRequest())

            # Check gRPC channel state
            channel_state = client.grpc_channel.get_state()
            if channel_state not in [grpc.ChannelConnectivity.READY,
                                   grpc.ChannelConnectivity.IDLE]:
                logger.warning(f"⚠️ Channel state: {channel_state}")
                # Consider reconnection

        except Exception as e:
            logger.error(f"❌ Health check failed: {e}")

        await asyncio.sleep(30)  # Check every 30 seconds

Error Recovery Strategies

Automatic Reconnection

class ResilientPluginClient:
    def __init__(self, plugin_command: list[str]):
        self.plugin_command = plugin_command
        self.client = None
        self.reconnect_attempts = 0
        self.max_reconnect_attempts = 5

    async def ensure_connected(self):
        """Ensure plugin is connected, reconnect if needed."""
        if self.client and self.client.is_started:
            return

        if self.reconnect_attempts >= self.max_reconnect_attempts:
            raise TransportError("Max reconnection attempts exceeded")

        try:
            if self.client:
                await self.client.close()

            self.client = plugin_client(command=self.plugin_command)
            await self.client.start()
            self.reconnect_attempts = 0
            logger.info("✅ Plugin reconnected successfully")

        except Exception as e:
            self.reconnect_attempts += 1
            logger.error(f"🔄 Reconnection attempt {self.reconnect_attempts} failed: {e}")
            raise

    async def call_with_retry(self, func: Callable) -> Any:
        """Make RPC call with automatic reconnection."""
        try:
            await self.ensure_connected()
            return await func(self.client)
        except (TransportError, HandshakeError):
            logger.warning("Connection lost, attempting reconnection...")
            self.client = None  # Force reconnection
            await self.ensure_connected()
            return await func(self.client)

Resource Cleanup

Always ensure proper cleanup using async context manager:

async def robust_plugin_usage():
    """Example of robust plugin resource management with async context manager."""
    try:
        # Use async context manager for automatic cleanup
        async with plugin_client(command=["python", "my_plugin.py"]) as client:
            # Setup
            await client.start()

            # Use plugin
            stub = MyServiceStub(client.grpc_channel)
            result = await stub.MyMethod(request)

            return result

        # Client automatically closed on context exit

    except TransportError as e:
        logger.error(f"❌ Transport error: {e}")
        # Maybe try fallback or retry
        raise
    except HandshakeError as e:
        logger.error(f"🤝 Handshake error: {e}")
        # Check configuration
        raise
    except grpc.aio.AioRpcError as e:
        logger.error(f"📡 RPC error: {e.code()} - {e.details()}")
        raise
    except Exception as e:
        logger.error(f"💥 Unexpected error: {e}", exc_info=True)
        raise
    finally:
        # Always cleanup
        if client:
            try:
                await client.close()
                logger.info("🔌 Plugin cleanup completed")
            except Exception as cleanup_error:
                logger.error(f"⚠️ Cleanup error: {cleanup_error}")

Best Practices

1. Error Classification

Categorize errors for appropriate handling:

def classify_error(error: Exception) -> str:
    """Classify errors for appropriate handling."""
    if isinstance(error, HandshakeError):
        return "authentication"
    elif isinstance(error, TransportError):
        return "connectivity"
    elif isinstance(error, SecurityError):
        return "security"
    elif isinstance(error, grpc.aio.AioRpcError):
        if error.code() == grpc.StatusCode.UNAVAILABLE:
            return "temporary"
        elif error.code() in [grpc.StatusCode.UNIMPLEMENTED,
                             grpc.StatusCode.NOT_FOUND]:
            return "permanent"
    return "unknown"

async def handle_classified_error(func: Callable):
    try:
        return await func()
    except Exception as e:
        error_type = classify_error(e)

        if error_type == "temporary":
            # Retry with backoff
            return await retry_with_backoff(func)
        elif error_type == "connectivity":
            # Try fallback
            return await fallback_method()
        elif error_type == "permanent":
            # Log and fail fast
            logger.error(f"Permanent error: {e}")
            raise
        else:
            # Unknown error, be cautious
            logger.error(f"Unknown error: {e}", exc_info=True)
            raise

2. Structured Error Logging

Use structured logging for better observability:

from provide.foundation import logger

async def log_plugin_error(error: Exception, context: dict[str, Any]):
    """Log plugin errors with structured context."""

    error_data = {
        "error_type": type(error).__name__,
        "error_message": str(error),
        "plugin_command": context.get("command"),
        "transport": context.get("transport"),
        "mtls_enabled": context.get("mtls_enabled"),
    }

    if isinstance(error, RPCPluginError):
        error_data.update({
            "error_code": error.code,
            "error_hint": error.hint,
        })

    if isinstance(error, grpc.aio.AioRpcError):
        error_data.update({
            "grpc_code": error.code().name,
            "grpc_details": error.details(),
        })

    logger.error("❌ Plugin operation failed", **error_data)

3. User-Friendly Error Messages

Translate technical errors to user-friendly messages:

def get_user_friendly_message(error: Exception) -> str:
    """Convert technical errors to user-friendly messages."""

    if isinstance(error, HandshakeError):
        if "magic cookie" in str(error).lower():
            return "Plugin authentication failed. Please check plugin configuration."
        elif "timeout" in str(error).lower():
            return "Plugin took too long to start. Please try again."

    elif isinstance(error, TransportError):
        if "connection refused" in str(error).lower():
            return "Could not connect to plugin. The plugin may not be running."
        elif "permission denied" in str(error).lower():
            return "Permission denied. Please check file permissions."

    elif isinstance(error, SecurityError):
        return "Security certificate error. Please check SSL/TLS configuration."

    elif isinstance(error, grpc.aio.AioRpcError):
        if error.code() == grpc.StatusCode.UNAVAILABLE:
            return "Service is temporarily unavailable. Please try again later."
        elif error.code() == grpc.StatusCode.UNIMPLEMENTED:
            return "The requested operation is not supported."

    return "An unexpected error occurred. Please contact support."

What's Next?

Now that you understand error handling: