Advanced Provider Features¶
This guide covers advanced features and patterns for building production-ready Pyvider providers, including error handling, retry logic, rate limiting, caching, and testing.
For basic provider creation, see Creating Providers.
Table of Contents¶
Error Handling¶
Implement comprehensive error handling:
from pyvider.exceptions import (
ProviderError,
ProviderConfigurationError,
ResourceNotFoundError,
APIError,
)
class MyCloudProvider(BaseProvider):
async def configure(self, config: dict) -> None:
try:
await super().configure(config)
# Validate API key
if not await self._validate_api_key(config["api_key"]):
raise ProviderConfigurationError("Invalid API key")
# Initialize client
self.api_client = self._create_client(config)
# Test connection
await self._test_connection()
except httpx.HTTPError as e:
raise ProviderConfigurationError(f"Failed to connect: {e}")
except Exception as e:
raise ProviderError(f"Provider configuration failed: {e}")
async def _validate_api_key(self, api_key: str) -> bool:
"""Validate API key format and permissions."""
if not api_key.startswith("mck_"):
return False
# Test API key
try:
response = await self.api_client.get("/auth/validate")
return response.status_code == 200
except Exception:
return False
async def _test_connection(self) -> None:
"""Test API connectivity."""
try:
response = await self.api_client.get("/health")
response.raise_for_status()
except httpx.HTTPError as e:
raise ProviderConfigurationError(
f"Health check failed: {e}. "
f"Verify API endpoint and network connectivity."
)
Retry Logic¶
Add automatic retries for transient failures:
from tenacity import (
retry,
stop_after_attempt,
wait_exponential,
retry_if_exception_type,
)
class MyCloudProvider(BaseProvider):
@retry(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=2, max=10),
retry=retry_if_exception_type(httpx.HTTPStatusError),
)
async def _api_request(self, method: str, path: str, **kwargs):
"""Make API request with automatic retry."""
response = await self.api_client.request(method, path, **kwargs)
# Don't retry client errors (4xx)
if 400 <= response.status_code < 500:
response.raise_for_status()
# Retry server errors (5xx) and network issues
if response.status_code >= 500:
response.raise_for_status()
return response
Rate Limiting¶
Implement rate limiting:
import asyncio
from datetime import datetime, timedelta
class MyCloudProvider(BaseProvider):
def __init__(self):
super().__init__(...)
self.rate_limiter = RateLimiter(
max_requests=100,
time_window=timedelta(minutes=1)
)
async def _api_request(self, method: str, path: str, **kwargs):
"""Make rate-limited API request."""
await self.rate_limiter.acquire()
return await self.api_client.request(method, path, **kwargs)
class RateLimiter:
def __init__(self, max_requests: int, time_window: timedelta):
self.max_requests = max_requests
self.time_window = time_window
self.requests = []
async def acquire(self):
"""Wait if rate limit reached."""
now = datetime.now()
# Remove old requests
self.requests = [
req_time for req_time in self.requests
if now - req_time < self.time_window
]
# Wait if limit reached
if len(self.requests) >= self.max_requests:
oldest = min(self.requests)
wait_time = (oldest + self.time_window - now).total_seconds()
if wait_time > 0:
await asyncio.sleep(wait_time)
self.requests.append(now)
Caching¶
Add response caching:
from functools import lru_cache
import time
class MyCloudProvider(BaseProvider):
def __init__(self):
super().__init__(...)
self.cache = {}
async def get_with_cache(
self,
path: str,
ttl: int = 300,
**kwargs
):
"""Get resource with caching."""
cache_key = f"{path}:{kwargs}"
# Check cache
if cache_key in self.cache:
cached_at, data = self.cache[cache_key]
if time.time() - cached_at < ttl:
return data
# Fetch fresh data
response = await self.api_client.get(path, **kwargs)
data = response.json()
# Cache result
self.cache[cache_key] = (time.time(), data)
return data
Logging¶
Add structured logging:
from provide.foundation import get_logger
class MyCloudProvider(BaseProvider):
def __init__(self):
super().__init__(...)
self.logger = get_logger(__name__)
async def configure(self, config: dict) -> None:
self.logger.info(
"Configuring MyCloud provider",
region=config.get("region"),
endpoint=config.get("api_endpoint"),
)
await super().configure(config)
self.logger.info("Provider configured successfully")
async def _api_request(self, method: str, path: str, **kwargs):
self.logger.debug(
"API request",
method=method,
path=path,
)
try:
response = await self.api_client.request(method, path, **kwargs)
self.logger.debug(
"API response",
method=method,
path=path,
status=response.status_code,
)
return response
except Exception as e:
self.logger.error(
"API request failed",
method=method,
path=path,
error=str(e),
)
raise
Testing Your Provider¶
Unit Tests¶
# tests/test_provider.py
import pytest
from my_provider.provider import MyCloudProvider
@pytest.fixture
def provider():
return MyCloudProvider()
@pytest.mark.asyncio
async def test_provider_configuration(provider):
"""Test provider configuration."""
config = {
"api_key": "mck_test_key_1234567890abcdefghijklmnop",
"region": "us-east-1",
}
await provider.configure(config)
assert provider.provider_config.api_key == config["api_key"]
assert provider.provider_config.region == config["region"]
assert provider.api_client is not None
@pytest.mark.asyncio
async def test_provider_validation():
"""Test configuration validation."""
provider = MyCloudProvider()
# Invalid API key
errors = await provider.validate_config({
"api_key": "invalid_key",
})
assert len(errors) > 0
assert any("API key" in err for err in errors)
Integration Tests¶
# tests/test_integration.py
import pytest
import os
@pytest.mark.skipif(
not os.getenv("MYCLOUD_API_KEY"),
reason="Requires MYCLOUD_API_KEY environment variable"
)
@pytest.mark.asyncio
async def test_real_api_connection():
"""Test connection to real API."""
provider = MyCloudProvider()
config = {
"api_key": os.getenv("MYCLOUD_API_KEY"),
"region": "us-east-1",
}
await provider.configure(config)
# Test health check
response = await provider.api_client.get("/health")
assert response.status_code == 200
Best Practices¶
1. Version Your Provider¶
metadata=ProviderMetadata(
name="mycloud",
version="1.0.0", # Semantic versioning
protocol_version="6",
)
2. Document Configuration Options¶
"api_key": a_str(
required=True,
sensitive=True,
description=(
"API key for MyCloud authentication. "
"Get your key from https://mycloud.com/settings/api"
)
)
3. Validate Early¶
async def validate_config(self, config: dict) -> list[str]:
"""Validate before attempting to use config."""
errors = []
# Check all requirements
if not config.get("api_key"):
errors.append("api_key is required")
return errors
4. Handle Cleanup¶
async def close(self) -> None:
"""Always cleanup resources."""
if self.api_client:
await self.api_client.aclose()
5. Use Type Hints¶
async def configure(self, config: dict) -> None:
self.provider_config: MyCloudConfig = MyCloudConfig(**config)
Complete Example¶
See the full example provider at: - GitHub: pyvider-components/providers/mycloud
Related Documentation¶
- Creating Providers - Provider basics
- Creating Resources - Resource implementation
- Creating Data Sources - Data source implementation
- Creating Functions - Function implementation
- Testing Providers - Comprehensive testing strategies
- Production Readiness - Production best practices
- Best Practices - General best practices
Remember: These advanced features help you build robust, production-ready providers that can handle real-world challenges like network failures, rate limits, and scale requirements.