Skip to content

Troubleshooting

This guide helps you diagnose and resolve common issues when developing with the Pyvider RPC Plugin framework.

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

Common Issues

1. Connection Issues

Problem: "Connection refused" errors

Symptoms:

ConnectionRefusedError: [Errno 111] Connection refused
grpc._channel._InactiveRpcError: status = StatusCode.UNAVAILABLE

Diagnosis:

import asyncio
import socket

async def diagnose_connection(host: str, port: int):
    """Diagnose connection issues."""
    print(f"๐Ÿ” Diagnosing connection to {host}:{port}")

    # Test basic network connectivity
    sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
    sock.settimeout(5)

    try:
        result = sock.connect_ex((host, port))
        if result == 0:
            print("โœ… Network connection successful")
        else:
            print(f"โŒ Network connection failed: {result}")
            return False
    except Exception as e:
        print(f"โŒ Network error: {e}")
        return False
    finally:
        sock.close()

    return True

Solutions:

# Check if server process is running
ps aux | grep python
netstat -tlnp | grep :50051
# Verify server configuration
config = ServerConfig.from_env()
print(f"Server should be on {config.transport.host}:{config.transport.port}")
# Check firewall rules (Linux)
sudo iptables -L -n

# Check firewall rules (macOS)
sudo pfctl -sr
# Server binding to all interfaces
config = ServerConfig(
    transport=TransportConfig(
        host="0.0.0.0",  # Bind to all interfaces
        port=50051
    )
)

Problem: "Address already in use"

Symptoms:

OSError: [Errno 98] Address already in use

Solutions:

# Find and kill process using port
lsof -i :50051
kill -9 <PID>

# Or use different port
config = ServerConfig(transport=TransportConfig(port=50052))

2. SSL/TLS Issues

Problem: SSL certificate verification failures

Symptoms:

ssl.SSLCertVerificationError: certificate verify failed
grpc._channel._InactiveRpcError: SSL handshake failed

Diagnosis:

import ssl
import asyncio

async def diagnose_ssl(host: str, port: int, cert_file: str | None = None):
    """Diagnose SSL/TLS issues."""
    print(f"๐Ÿ” Diagnosing SSL connection to {host}:{port}")

    context = ssl.create_default_context()
    if cert_file:
        context.load_verify_locations(cert_file)

    try:
        reader, writer = await asyncio.open_connection(
            host, port, ssl=context
        )
        print("โœ… SSL connection successful")
        writer.close()
        await writer.wait_closed()
        return True
    except ssl.SSLError as e:
        print(f"โŒ SSL error: {e}")
        return False

Solutions:

from pathlib import Path

cert_file = Path("certs/server.pem")
if not cert_file.exists():
    print(f"โŒ Certificate file not found: {cert_file}")
# Disable SSL verification (development only!)
import ssl

context = ssl.create_default_context()
context.check_hostname = False
context.verify_mode = ssl.CERT_NONE
# Generate self-signed certificate for testing
openssl req -x509 -newkey rsa:4096 \
  -keyout key.pem -out cert.pem -days 365 -nodes

3. Performance Issues

Problem: Slow RPC calls

Diagnosis:

import time
import statistics
from typing import Any

class PerformanceDiagnostics:
    """Performance diagnostics tools."""

    def __init__(self, client):
        self.client = client
        self.call_times: list[float] = []

    async def benchmark_call(
        self,
        method_name: str,
        request: Any,
        iterations: int = 100
    ):
        """Benchmark RPC call performance."""
        print(f"๐Ÿƒ Benchmarking {method_name} ({iterations} iterations)")

        for i in range(iterations):
            start_time = time.perf_counter()

            try:
                await self.client.call(method_name, request)
                end_time = time.perf_counter()
                self.call_times.append(end_time - start_time)
            except Exception as e:
                print(f"โŒ Call {i+1} failed: {e}")

        self._analyze_performance()

    def _analyze_performance(self):
        """Analyze performance metrics."""
        if not self.call_times:
            return

        mean_time = statistics.mean(self.call_times)
        p95 = sorted(self.call_times)[int(0.95 * len(self.call_times))]

        print(f"\n๐Ÿ“Š Performance Analysis:")
        print(f"  Mean:   {mean_time*1000:.2f}ms")
        print(f"  P95:    {p95*1000:.2f}ms")

        if mean_time > 1.0:
            print("โš ๏ธ  Average call time is high (>1s)")

Solutions:

from pyvider.client import ConnectionPool

# Use connection pooling to reduce overhead
pool = ConnectionPool(max_size=10)
client = RPCPluginClient(connection_pool=pool)
# Batch multiple requests
async def batch_requests(client, requests):
    tasks = []
    for request in requests:
        tasks.append(client.call("method", request))

    return await asyncio.gather(*tasks)
# Use streaming for large responses
async def stream_large_data(client):
    async for chunk in client.stream_call("get_large_data", request):
        process_chunk(chunk)

Problem: Memory leaks

Diagnosis:

import psutil
import gc

class MemoryMonitor:
    """Monitor memory usage."""

    def __init__(self):
        self.process = psutil.Process()
        self.initial_memory = self.process.memory_info().rss

    def check_memory_leak(self, threshold_mb: int = 100):
        """Check for potential memory leaks."""
        current_memory = self.process.memory_info().rss
        increase = (current_memory - self.initial_memory) / 1024 / 1024

        print(f"๐Ÿ’พ Memory usage: {current_memory/1024/1024:.1f}MB")
        print(f"๐Ÿ“ˆ Memory increase: {increase:.1f}MB")

        if increase > threshold_mb:
            print(f"โš ๏ธ  Potential memory leak detected (+{increase:.1f}MB)")
            gc.collect()

Solutions:

# Always use context managers
async with RPCPluginClient() as client:
    result = await client.call("method", request)

# Or explicit cleanup
client = RPCPluginClient()
try:
    await client.connect()
    result = await client.call("method", request)
finally:
    await client.close()
# Reuse connections instead of creating new ones
pool = ConnectionPool(max_size=10)
# Use pool for multiple requests

4. Serialization Issues

Problem: Protobuf serialization errors

Symptoms:

google.protobuf.message.DecodeError: Error parsing message
TypeError: Couldn't build proto file into descriptor pool

Diagnosis:

from google.protobuf.json_format import MessageToJson
from google.protobuf.message import Message

def diagnose_protobuf_issue(message: Message):
    """Diagnose protobuf serialization issues."""
    print(f"๐Ÿ” Diagnosing protobuf message: {type(message).__name__}")

    try:
        # Test JSON serialization
        json_str = MessageToJson(message)
        print("โœ… JSON serialization successful")

        # Test binary serialization
        binary_data = message.SerializeToString()
        print(f"โœ… Binary serialization successful ({len(binary_data)} bytes)")

        # Test deserialization
        new_message = type(message)()
        new_message.ParseFromString(binary_data)
        print("โœ… Binary deserialization successful")

    except Exception as e:
        print(f"โŒ Serialization error: {e}")

Solutions:

# Check protobuf versions
pip list | grep protobuf

# Regenerate protobuf files
python -m grpc_tools.protoc --python_out=. --grpc_python_out=. *.proto
def validate_message(message):
    """Validate protobuf message."""
    try:
        message.SerializeToString()
        return True
    except Exception as e:
        print(f"Message validation failed: {e}")
        return False
# Ensure correct field types
request = MyRequest(
    id=int(user_id),              # Ensure integer
    name=str(name),                # Ensure string
    timestamp=int(time.time()),    # Ensure integer timestamp
)

5. Authentication Issues

Problem: JWT token validation failures

Symptoms:

jwt.InvalidTokenError: Invalid token
grpc.RpcError: UNAUTHENTICATED: Invalid token

Diagnosis:

import jwt
import time

def diagnose_jwt_token(token: str, secret: str, algorithm: str = 'HS256'):
    """Diagnose JWT token issues."""
    print(f"๐Ÿ” Diagnosing JWT token")

    try:
        # Decode without verification first
        unverified = jwt.decode(token, options={"verify_signature": False})
        print("โœ… Token structure is valid")
        print(f"Payload: {unverified}")

        # Check expiration
        if 'exp' in unverified:
            exp_time = unverified['exp']
            if exp_time < time.time():
                print(f"โŒ Token expired")
            else:
                print(f"โœ… Token valid until {time.ctime(exp_time)}")

        # Verify signature
        verified = jwt.decode(token, secret, algorithms=[algorithm])
        print("โœ… Token signature is valid")
        return verified

    except jwt.ExpiredSignatureError:
        print("โŒ Token has expired")
    except jwt.InvalidSignatureError:
        print("โŒ Token signature is invalid")
    except jwt.InvalidTokenError as e:
        print(f"โŒ Token is invalid: {e}")

    return None

Solutions:

# Generate token with longer expiration
payload = {
    'user_id': 123,
    'exp': int(time.time()) + 7200  # 2 hours
}
# Ensure consistent secret across client and server
SECRET_KEY = os.getenv('PLUGIN_JWT_SECRET')
if not SECRET_KEY:
    raise ValueError("PLUGIN_JWT_SECRET environment variable not set")

Debugging Tools

1. Enable Debug Logging

import logging

# Enable debug logging for pyvider
logging.getLogger('pyvider').setLevel(logging.DEBUG)

# Enable gRPC debug logging
logging.getLogger('grpc').setLevel(logging.DEBUG)

# Format debug output
logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)

2. Network Debugging

import asyncio
import socket

async def debug_network_connection(host: str, port: int):
    """Debug network connectivity."""
    print(f"๐ŸŒ Testing connection to {host}:{port}")

    # Test DNS resolution
    try:
        addr_info = socket.getaddrinfo(host, port, socket.AF_INET, socket.SOCK_STREAM)
        print(f"โœ… DNS resolution: {addr_info[0][4]}")
    except Exception as e:
        print(f"โŒ DNS resolution failed: {e}")
        return

    # Test TCP connection
    try:
        reader, writer = await asyncio.open_connection(host, port)
        print("โœ… TCP connection successful")
        writer.close()
        await writer.wait_closed()
    except Exception as e:
        print(f"โŒ TCP connection failed: {e}")

3. RPC Call Tracing

import functools
import time

def trace_rpc_calls(func):
    """Decorator to trace RPC calls."""
    @functools.wraps(func)
    async def wrapper(*args, **kwargs):
        start_time = time.perf_counter()
        method_name = getattr(func, '__name__', 'unknown')

        print(f"๐Ÿš€ Starting RPC call: {method_name}")

        try:
            result = await func(*args, **kwargs)
            duration = time.perf_counter() - start_time

            print(f"โœ… RPC call completed: {method_name} ({duration:.3f}s)")
            return result

        except Exception as e:
            duration = time.perf_counter() - start_time

            print(f"โŒ RPC call failed: {method_name} ({duration:.3f}s)")
            print(f"   Error: {e}")
            raise

    return wrapper

Getting Help

Communication Channels

  • GitHub Issues - Report bugs and request features
  • GitHub Discussions - Ask questions and share solutions
  • Documentation - Comprehensive guides and API reference

Diagnostic Information Collection

When reporting issues, include this diagnostic information:

import sys
import platform
import pyvider
import grpc
import google.protobuf

def collect_diagnostic_info():
    """Collect system diagnostic information."""
    info = {
        'pyvider_version': pyvider.__version__,
        'python_version': sys.version,
        'platform': platform.platform(),
        'grpc_version': grpc.__version__,
        'protobuf_version': google.protobuf.__version__,
    }

    print("๐Ÿ”ง Diagnostic Information:")
    for key, value in info.items():
        print(f"  {key}: {value}")

    return info

Minimal Reproduction Case

When reporting bugs, provide a minimal reproduction case:

import asyncio
from pyvider.server import RPCPluginServer
from pyvider.client import RPCPluginClient

async def reproduce_issue():
    """Minimal case reproducing the issue."""
    # Server setup
    server = RPCPluginServer()
    # ... minimal server configuration

    # Client setup
    client = RPCPluginClient()
    # ... minimal client configuration

    # Reproduce the issue
    try:
        result = await client.call("method", request)
        print(f"Expected error, but got: {result}")
    except Exception as e:
        print(f"Issue reproduced: {e}")

if __name__ == "__main__":
    asyncio.run(reproduce_issue())

Best Practices

  1. Start with the Basics - Check network connectivity, configuration, and resource availability first
  2. Enable Logging - Use debug logging to understand what's happening
  3. Isolate the Problem - Create minimal reproduction cases
  4. Check Documentation - Review relevant guides and API documentation
  5. Search Issues - Look for similar reported issues
  6. Provide Context - Include diagnostic information when reporting issues

Remember: systematic diagnosis is key to effective troubleshooting. Always check the most likely causes first before investigating less common issues.