mTLS Configuration¶
Configure mutual TLS (mTLS) authentication for secure plugin communication using Foundation's certificate management capabilities.
Overview¶
mTLS (mutual TLS) ensures both client and server authenticate each other using X.509 certificates. Foundation handles certificate management, validation, and rotation.
Benefits: - Mutual Authentication - Both sides verify each other's identity - Encrypted Communication - All data encrypted in transit using TLS - Certificate-Based Identity - Cryptographic identity verification - Foundation Integration - Automatic certificate management and rotation
Quick Setup¶
Automatic mTLS (Recommended)¶
Let Foundation handle certificate generation and management:
from pyvider.rpcplugin import plugin_server, plugin_client
from provide.foundation import logger
# Server with automatic mTLS
server = plugin_server(
protocol=my_protocol,
handler=my_handler,
auto_mtls=True # Foundation generates certificates automatically
)
# Client connects with automatic certificate discovery
async with plugin_client(auto_mtls=True) as client:
response = await client.my_service.secure_method(data="sensitive")
logger.info("🔒 Secure communication established")
Manual Certificate Configuration¶
For production environments with existing PKI:
from provide.foundation.crypto import Certificate
# Load certificates via Foundation
# Note: from_pem() expects PEM content strings, not file paths
from pathlib import Path
server_cert_content = Path("server.pem").read_text()
server_key_content = Path("server.key").read_text()
server_cert = Certificate.from_pem(
cert_pem=server_cert_content,
key_pem=server_key_content
)
ca_cert_content = Path("ca.pem").read_text()
ca_cert = Certificate.from_pem(cert_pem=ca_cert_content)
# Server with manual certificates
server = plugin_server(
protocol=my_protocol,
handler=my_handler,
tls_certificate=server_cert,
tls_ca_certificate=ca_cert,
require_client_certificate=True
)
# Client with certificates
client_cert_content = Path("client.pem").read_text()
client_key_content = Path("client.key").read_text()
client_cert = Certificate.from_pem(
cert_pem=client_cert_content,
key_pem=client_key_content
)
async with plugin_client(
tls_client_certificate=client_cert,
tls_ca_certificate=ca_cert,
verify_server_certificate=True
) as client:
result = await client.my_service.process(data="example")
Certificate Management¶
Environment Configuration¶
Configure mTLS via environment variables:
# Enable mTLS
export PLUGIN_AUTO_MTLS=true
# Manual certificate paths
export PLUGIN_SERVER_CERT=file:///etc/ssl/certs/server.pem
export PLUGIN_SERVER_KEY=file:///etc/ssl/private/server.key
export PLUGIN_SERVER_ROOT_CERTS=file:///etc/ssl/certs/ca.pem
# Client certificates
export PLUGIN_CLIENT_CERT=file:///etc/ssl/certs/client.pem
export PLUGIN_CLIENT_KEY=file:///etc/ssl/private/client.key
export PLUGIN_CLIENT_ROOT_CERTS=file:///etc/ssl/certs/ca.pem
Foundation Certificate Generation¶
Use Foundation to generate development certificates:
from pathlib import Path
from provide.foundation.crypto import Certificate
from provide.foundation import logger
# Create Certificate Authority
ca_cert = Certificate.create_ca(
common_name="Plugin CA",
organization_name="My Company",
validity_days=3650
)
# Generate server certificate
server_cert = Certificate.create_self_signed_server_cert(
common_name="plugin-server.local",
organization_name="My Company",
alt_names=["DNS:localhost", "IP:127.0.0.1"],
validity_days=365
)
# Save certificates (write PEM strings to files)
Path("ca.pem").write_text(ca_cert.cert_pem)
Path("server.pem").write_text(server_cert.cert_pem)
Path("server.key").write_text(server_cert.key_pem)
logger.info(f"✅ CA certificate generated: {ca_cert.common_name}")
logger.info(f"✅ Server certificate generated: {server_cert.common_name}")
logger.info(f"✅ Certificates are valid: CA={ca_cert.is_valid}, Server={server_cert.is_valid}")
Production Deployment¶
Certificate Authority Setup¶
from provide.foundation.crypto import Certificate
# Production CA hierarchy
root_ca = Certificate.generate_ca(
common_name="Production Plugin Root CA",
organization="My Company",
country="US",
validity_days=3650,
key_strength=4096
)
# Intermediate CA for plugin certificates
intermediate_ca = root_ca.generate_intermediate_ca(
common_name="Plugin Intermediate CA",
validity_days=1095,
path_length_constraint=0
)
Server Certificates¶
# Production server certificate
server_cert = intermediate_ca.generate_server_certificate(
common_name="plugin-api.company.com",
subject_alternative_names=[
"DNS:plugin-api.company.com",
"DNS:plugin-api.internal",
"IP:10.0.1.100"
],
validity_days=90,
extended_key_usage=["server_auth"],
key_usage=["digital_signature", "key_encipherment"]
)
# Client certificate for service authentication
client_cert = intermediate_ca.generate_client_certificate(
common_name="plugin-client-001",
email_address="[email protected]",
validity_days=30,
extended_key_usage=["client_auth"],
key_usage=["digital_signature"]
)
Certificate Rotation¶
Automatic Rotation¶
Implement automatic certificate rotation with Foundation:
import asyncio
from pathlib import Path
from provide.foundation.crypto import Certificate
from provide.foundation import logger
async def check_and_rotate_certificates(
cert_path: str,
key_path: str,
ca_cert: Certificate,
check_interval: int = 3600
):
"""Periodically check and rotate certificates."""
while True:
await asyncio.sleep(check_interval)
# Load current certificate
cert = Certificate.from_pem(
cert_pem=f"file://{cert_path}",
key_pem=f"file://{key_path}"
)
# Check if rotation needed (based on validity)
if not cert.is_valid:
logger.warning("Certificate is invalid, rotating now")
# Generate new certificate
new_cert = Certificate.create_self_signed_server_cert(
common_name=cert.common_name,
organization_name=cert.organization_name,
validity_days=90
)
# Save new certificate
Path(cert_path).write_text(new_cert.cert_pem)
Path(key_path).write_text(new_cert.key_pem)
logger.info(f"✅ Certificate rotated for {new_cert.common_name}")
# Start rotation service
asyncio.create_task(check_and_rotate_certificates(
"server.pem",
"server.key",
ca_cert
))
Manual Rotation¶
from pathlib import Path
from provide.foundation.crypto import Certificate
from provide.foundation import logger
# Load current certificate
cert = Certificate.from_pem(
cert_pem="file://server.pem",
key_pem="file://server.key"
)
# Check validity and rotate if needed
if not cert.is_valid:
logger.warning("Certificate is invalid, rotating")
# Generate new certificate
new_cert = Certificate.create_self_signed_server_cert(
common_name=cert.common_name,
organization_name=cert.organization_name,
validity_days=90
)
# Save new certificate
Path("server.pem").write_text(new_cert.cert_pem)
Path("server.key").write_text(new_cert.key_pem)
# Note: Hot-swapping requires server restart or custom reload mechanism
logger.info("✅ Certificate rotated - restart server to apply")
Validation and Debugging¶
Certificate Validation¶
from provide.foundation.crypto import Certificate
from provide.foundation import logger
from pathlib import Path
# Load certificate - read PEM content from files
cert_pem_content = Path("server.pem").read_text()
key_pem_content = Path("server.key").read_text()
cert = Certificate.from_pem(
cert_pem=cert_pem_content,
key_pem=key_pem_content
)
# Basic validation
if cert.is_valid:
logger.info("✅ Certificate is valid")
logger.info(f" Common Name: {cert.common_name}")
logger.info(f" Organization: {cert.organization_name}")
else:
logger.error("❌ Certificate validation failed")
# Check certificate against CA (using verify_trust method)
ca_cert_content = Path("ca.pem").read_text()
ca_cert = Certificate.from_pem(cert_pem=ca_cert_content)
try:
# verify_trust checks if cert is signed by CA
if cert.verify_trust(ca_cert):
logger.info("✅ Certificate trust chain validated")
else:
logger.error("❌ Certificate trust verification failed")
except Exception as e:
logger.error(f"❌ Trust verification error: {e}")
Connection Debugging¶
# Enable TLS debugging
import os
os.environ["PLUGIN_LOG_LEVEL"] = "DEBUG"
os.environ["GRPC_VERBOSITY"] = "DEBUG"
os.environ["GRPC_TRACE"] = "tls,secure_endpoint"
# Test mTLS connection
try:
async with plugin_client(
auto_mtls=True,
verify_server_certificate=True
) as client:
await client.health.check()
logger.info("✅ mTLS connection successful")
except Exception as e:
logger.error(f"❌ mTLS connection failed: {e}")
# Check certificate paths, permissions, validity
Security Best Practices¶
Certificate Security¶
- Strong Key Lengths: Use RSA 4096+ or ECDSA P-384
- Short Validity Periods: 90 days maximum for server certificates
- Proper Key Usage: Set appropriate key usage extensions
- Secure Storage: Protect private keys with 600 permissions
- Regular Rotation: Automate certificate renewal
Network Security¶
# Production server configuration
server = plugin_server(
protocol=my_protocol,
handler=my_handler,
# Strong TLS configuration
tls_certificate=server_cert,
tls_ca_certificate=ca_cert,
require_client_certificate=True,
# Additional security
allowed_client_cns=["plugin-client-*"], # Restrict client CNs
cipher_suites=["ECDHE-RSA-AES256-GCM-SHA384"], # Strong ciphers
min_tls_version="1.3" # Require TLS 1.3
)
Monitoring¶
from provide.foundation.crypto import Certificate
from provide.foundation import logger
from pathlib import Path
# Monitor certificate health
cert_paths = ["server.pem", "client.pem", "ca.pem"]
for cert_path in cert_paths:
try:
# Read PEM content from file
cert_content = Path(cert_path).read_text()
cert = Certificate.from_pem(cert_pem=cert_content)
if cert.is_valid:
logger.info(f"✅ {cert_path}: Valid")
logger.info(f" CN: {cert.common_name}")
else:
logger.error(f"❌ {cert_path}: Invalid certificate")
except Exception as e:
logger.error(f"❌ {cert_path}: Error loading - {e}")
Common Patterns¶
Development Environment¶
# Development with self-signed certificates
from provide.foundation.crypto import Certificate
from pyvider.rpcplugin import plugin_server
# Create CA and server certificate for development
ca_cert = Certificate.create_ca(
common_name="Dev Plugin CA",
organization_name="Development",
validity_days=90
)
server_cert = Certificate.create_self_signed_server_cert(
common_name="localhost",
organization_name="Development",
alt_names=["DNS:localhost", "IP:127.0.0.1"],
validity_days=90
)
server = plugin_server(
protocol=my_protocol,
handler=my_handler,
tls_certificate=server_cert,
tls_ca_certificate=ca_cert
)
Staging Environment¶
# Staging with intermediate CA
from provide.foundation.crypto import Certificate
from pyvider.rpcplugin import plugin_server
# Load staging CA
staging_ca = Certificate.from_pem(
cert_pem="file://staging-ca.pem",
key_pem="file://staging-ca.key"
)
# Create server certificate for staging
server_cert = Certificate.create_self_signed_server_cert(
common_name="staging-plugin.company.internal",
organization_name=staging_ca.organization_name,
validity_days=30
)
server = plugin_server(
protocol=my_protocol,
handler=my_handler,
tls_certificate=server_cert,
tls_ca_certificate=staging_ca
)
Load Balancer Integration¶
# Behind load balancer with SSL termination
server = plugin_server(
protocol=my_protocol,
handler=my_handler,
# Internal mTLS for backend security
auto_mtls=True,
bind_address="0.0.0.0:8080", # Internal port
# Trust load balancer headers
trust_forwarded_headers=True,
allowed_forwarded_ips=["10.0.0.0/8"]
)
Troubleshooting¶
Common Issues¶
Certificate Not Found:
# Check certificate paths and permissions
ls -la /etc/ssl/certs/server.pem
openssl x509 -in /etc/ssl/certs/server.pem -text -noout
Certificate Expired:
# Check expiration
openssl x509 -in server.pem -noout -dates
# Verify certificate chain
openssl verify -CAfile ca.pem server.pem
Connection Refused:
# Enable detailed TLS logging
import logging
from pathlib import Path
from provide.foundation.crypto import Certificate
from provide.foundation import logger
logging.getLogger('grpc').setLevel(logging.DEBUG)
# Check certificate validation - read PEM content from file
cert_content = Path("server.pem").read_text()
cert = Certificate.from_pem(cert_pem=cert_content)
logger.info(f"Certificate valid: {cert.is_valid}")
logger.info(f"Common Name: {cert.common_name}")
logger.info(f"Organization: {cert.organization_name}")
logger.info(f"Certificate type: {cert.key_type}")
Next Steps¶
- Certificate API Reference - Quick reference for Certificate.from_pem() usage
- Certificate Management - Detailed certificate operations
- Process Isolation - Additional security layers
- Production Configuration - Production deployment and security setup
For enterprise certificate management and PKI integration, consult Foundation's security documentation and consider professional PKI services.