Performance Optimization¶
Alpha Status
pyvider is in alpha. This guide covers stable functionality. See project status for details.
Building fast, efficient Terraform providers is essential for a good user experience. This guide covers performance optimization techniques for Pyvider providers.
Table of Contents¶
- Async Best Practices
- Caching Strategies
- Batching Operations
- Connection Pooling
- Schema Optimization
- State Management
- Profiling and Monitoring
- Common Performance Pitfalls
Async Best Practices¶
Always Use Async for I/O¶
Bad - Blocking I/O:
import requests # Synchronous library
async def read(self, ctx: ResourceContext):
# This BLOCKS the entire event loop!
response = requests.get(f"{self.api_url}/resource/{ctx.state.id}")
return State(...)
Good - Non-blocking I/O:
import httpx # Async library
async def read(self, ctx: ResourceContext):
# Non-blocking, allows other operations to proceed
response = await self.http_client.get(f"{self.api_url}/resource/{ctx.state.id}")
return State(...)
Parallelize Independent Operations¶
When operations don't depend on each other, run them concurrently:
Bad - Sequential:
async def _create_apply(self, ctx: ResourceContext):
# These run one after another (slow)
network = await self.create_network(ctx.config.network)
storage = await self.create_storage(ctx.config.storage)
compute = await self.create_compute(ctx.config.compute)
return State(...), None
Good - Parallel:
import asyncio
async def _create_apply(self, ctx: ResourceContext):
# These run concurrently (fast)
network, storage, compute = await asyncio.gather(
self.create_network(ctx.config.network),
self.create_storage(ctx.config.storage),
self.create_compute(ctx.config.compute),
)
return State(...), None
Use asyncio.gather() Effectively¶
async def read_multiple_resources(self, resource_ids: list[str]):
"""Read multiple resources in parallel."""
# Create tasks for all reads
tasks = [
self.api.get_resource(resource_id)
for resource_id in resource_ids
]
# Execute in parallel
results = await asyncio.gather(*tasks, return_exceptions=True)
# Handle results
resources = []
for result in results:
if isinstance(result, Exception):
logger.warning("Resource read failed", error=str(result))
continue
resources.append(result)
return resources
Avoid Blocking Calls¶
import asyncio
from concurrent.futures import ThreadPoolExecutor
# If you MUST use a blocking library
executor = ThreadPoolExecutor(max_workers=4)
async def expensive_blocking_operation(self, data):
"""Run blocking operation in thread pool."""
loop = asyncio.get_event_loop()
# Run in separate thread to avoid blocking event loop
result = await loop.run_in_executor(
executor,
self._blocking_function,
data
)
return result
def _blocking_function(self, data):
"""Actual blocking operation."""
# Blocking code here
return processed_data
Caching Strategies¶
Cache Expensive Lookups¶
from functools import lru_cache
from datetime import datetime, timedelta
class MyProvider(BaseProvider):
def __init__(self):
super().__init__(...)
self._cache = {}
self._cache_ttl = {}
async def get_with_cache(
self,
key: str,
fetcher: callable,
ttl: int = 300
):
"""Get data with time-based caching."""
now = datetime.now()
# Check cache
if key in self._cache:
cached_at = self._cache_ttl.get(key)
if cached_at and (now - cached_at).total_seconds() < ttl:
logger.debug("Cache hit", key=key)
return self._cache[key]
# Fetch fresh data
logger.debug("Cache miss", key=key)
data = await fetcher()
# Update cache
self._cache[key] = data
self._cache_ttl[key] = now
return data
# Usage
async def read(self, ctx: ResourceContext):
# Cache region data for 5 minutes
region_info = await self.get_with_cache(
key=f"region:{self.config.region}",
fetcher=lambda: self.api.get_region_info(self.config.region),
ttl=300
)
return State(...)
Cache Provider Configuration¶
@register_provider("mycloud")
class MyCloudProvider(BaseProvider):
def __init__(self):
super().__init__(...)
self._api_metadata_cache = None
self._cache_expiry = None
async def get_api_metadata(self):
"""Get API metadata with caching."""
now = datetime.now()
# Return cached if still valid
if (self._api_metadata_cache and
self._cache_expiry and
now < self._cache_expiry):
return self._api_metadata_cache
# Fetch fresh metadata
metadata = await self.api.get_metadata()
# Cache for 1 hour
self._api_metadata_cache = metadata
self._cache_expiry = now + timedelta(hours=1)
return metadata
Invalidate Cache Appropriately¶
class CacheManager:
def __init__(self):
self._cache = {}
self._ttl = {}
async def get(self, key: str, fetcher: callable, ttl: int = 300):
"""Get from cache or fetch."""
now = datetime.now()
if key in self._cache:
if (now - self._ttl[key]).total_seconds() < ttl:
return self._cache[key]
data = await fetcher()
self._cache[key] = data
self._ttl[key] = now
return data
def invalidate(self, key: str):
"""Invalidate specific cache entry."""
self._cache.pop(key, None)
self._ttl.pop(key, None)
def invalidate_pattern(self, pattern: str):
"""Invalidate all keys matching pattern."""
keys_to_remove = [
key for key in self._cache.keys()
if pattern in key
]
for key in keys_to_remove:
self.invalidate(key)
def clear(self):
"""Clear entire cache."""
self._cache.clear()
self._ttl.clear()
# Usage
async def _update_apply(self, ctx: ResourceContext):
result = await self.api.update_resource(ctx.state.id, ctx.config)
# Invalidate cache for this resource
self.cache_manager.invalidate(f"resource:{ctx.state.id}")
return State(...), None
Batching Operations¶
Batch API Calls¶
Bad - N+1 Query Problem:
async def read_all_servers(self, server_ids: list[str]):
servers = []
for server_id in server_ids:
# Makes N API calls!
server = await self.api.get_server(server_id)
servers.append(server)
return servers
Good - Single Batch Call:
async def read_all_servers(self, server_ids: list[str]):
# Single API call for all servers
servers = await self.api.batch_get_servers(server_ids)
return servers
Implement Request Batching¶
from collections import defaultdict
import asyncio
class BatchingProvider(BaseProvider):
def __init__(self):
super().__init__(...)
self._pending_reads = defaultdict(list)
self._batch_lock = asyncio.Lock()
self._batch_delay = 0.1 # 100ms batching window
async def get_resource_batched(self, resource_id: str):
"""Get resource with automatic batching."""
# Create future for this request
future = asyncio.Future()
async with self._batch_lock:
self._pending_reads[resource_id].append(future)
# Schedule batch execution
if len(self._pending_reads) == 1:
asyncio.create_task(self._execute_batch())
# Wait for result
return await future
async def _execute_batch(self):
"""Execute batched reads after delay."""
# Wait for batch window
await asyncio.sleep(self._batch_delay)
async with self._batch_lock:
if not self._pending_reads:
return
# Collect all IDs
resource_ids = list(self._pending_reads.keys())
futures_by_id = dict(self._pending_reads)
# Clear pending
self._pending_reads.clear()
# Execute batch read
try:
results = await self.api.batch_get_resources(resource_ids)
# Resolve futures
for resource_id, result in results.items():
for future in futures_by_id[resource_id]:
future.set_result(result)
except Exception as e:
# Reject all futures
for futures in futures_by_id.values():
for future in futures:
future.set_exception(e)
Connection Pooling¶
Reuse HTTP Connections¶
import httpx
@register_provider("mycloud")
class MyCloudProvider(BaseProvider):
async def configure(self, config: dict) -> None:
await super().configure(config)
# Create connection pool
self.http_client = httpx.AsyncClient(
base_url=config["api_endpoint"],
headers={"Authorization": f"Bearer {config['api_key']}"},
timeout=30.0,
# Connection pool settings
limits=httpx.Limits(
max_connections=100, # Total connections
max_keepalive_connections=20, # Persistent connections
keepalive_expiry=30.0, # Keep-alive timeout
),
)
async def close(self) -> None:
"""Clean up connection pool."""
if self.http_client:
await self.http_client.aclose()
Configure Pool Sizes Appropriately¶
# For high-traffic providers
limits = httpx.Limits(
max_connections=200,
max_keepalive_connections=50,
)
# For low-traffic providers
limits = httpx.Limits(
max_connections=20,
max_keepalive_connections=5,
)
Schema Optimization¶
Lazy Schema Generation¶
from functools import cached_property
@register_resource("server")
class Server(BaseResource):
@classmethod
@cached_property
def _schema(cls) -> PvsSchema:
"""Cache schema - only computed once."""
return s_resource({
"name": a_str(required=True),
"size": a_str(default="medium"),
# ... large schema definition
})
@classmethod
def get_schema(cls) -> PvsSchema:
return cls._schema
Simplify Complex Schemas¶
Bad - Over-complicated:
# Deeply nested with many validations
@classmethod
def get_schema(cls):
return s_resource({
"config": a_obj({
"network": a_obj({
"vpc": a_obj({
"subnets": a_list(a_obj({
"cidr": a_str(validators=[...]),
"az": a_str(validators=[...]),
# etc...
}))
})
})
})
})
Good - Flattened when possible:
@classmethod
def get_schema(cls):
return s_resource({
"vpc_id": a_str(required=True),
"subnet_ids": a_list(a_str()),
"availability_zones": a_list(a_str()),
})
State Management¶
Minimize State Size¶
# Bad - Storing unnecessary data
@attrs.define
class ServerState:
id: str
full_config: dict # Don't store entire config
api_response: dict # Don't store raw API response
internal_metadata: dict # Don't store internal data
# Good - Only essential data
@attrs.define
class ServerState:
id: str
name: str
status: str
public_ip: str
Use Private State Sparingly¶
Private state requires encryption/decryption overhead:
# Only use for truly sensitive data
async def _create_apply(self, ctx):
# Public state - fast
state = State(id="srv-123", status="running")
# Private state - slower due to encryption
# Only for sensitive data
private = {
"admin_password": "secret123",
"internal_token": "tok_abc",
} if ctx.config.store_credentials else None
return state, private
Profiling and Monitoring¶
Use Structured Logging with Timing¶
from provide.foundation import logger
import time
async def _create_apply(self, ctx: ResourceContext):
start = time.time()
try:
result = await self.api.create_resource(ctx.config)
# Log operation timing
duration = time.time() - start
logger.info(
"Resource created successfully",
duration_ms=duration * 1000,
resource_id=result.id,
)
return State(...), None
except Exception as e:
duration = time.time() - start
logger.error(
"Resource creation failed",
duration_ms=duration * 1000,
error=str(e),
)
raise
Profile with py-spy¶
# Install py-spy
pip install py-spy
# Profile running provider
py-spy top --pid $(pgrep -f terraform-provider-pyvider)
# Generate flame graph
py-spy record -o profile.svg --pid $(pgrep -f terraform-provider-pyvider)
Use Context Managers for Timing¶
from contextlib import asynccontextmanager
import time
@asynccontextmanager
async def timed_operation(operation_name: str):
"""Context manager for timing operations."""
start = time.time()
try:
yield
finally:
duration = time.time() - start
logger.debug(
f"{operation_name} completed",
operation=operation_name,
duration_ms=duration * 1000,
)
# Usage
async def _create_apply(self, ctx):
async with timed_operation("create_network"):
network = await self.create_network(ctx.config.network)
async with timed_operation("create_storage"):
storage = await self.create_storage(ctx.config.storage)
return State(...), None
Common Performance Pitfalls¶
1. Synchronous I/O in Async Functions¶
Problem:
Solution:
2. Not Using Connection Pooling¶
Problem:
async def _api_call(self):
# Creates new connection every time
async with httpx.AsyncClient() as client:
return await client.get(url)
Solution:
# Reuse client with connection pool (configured in provider)
async def _api_call(self):
return await self.http_client.get(url)
3. Sequential When Could Be Parallel¶
Problem:
async def validate_all(self, resources):
results = []
for resource in resources:
result = await self.validate_resource(resource)
results.append(result)
return results
Solution:
async def validate_all(self, resources):
tasks = [self.validate_resource(r) for r in resources]
return await asyncio.gather(*tasks)
4. Over-fetching Data¶
Problem:
async def read(self, ctx):
# Fetches entire object graph
full_data = await self.api.get_resource_with_all_related(ctx.state.id)
return State(id=full_data.id) # Only uses ID!
Solution:
async def read(self, ctx):
# Only fetch what you need
resource = await self.api.get_resource(ctx.state.id, fields=["id", "status"])
return State(id=resource.id, status=resource.status)
5. Not Setting Timeouts¶
Problem:
Solution:
# Always set reasonable timeouts
self.http_client = httpx.AsyncClient(
timeout=httpx.Timeout(
connect=5.0,
read=30.0,
write=10.0,
pool=5.0,
)
)
Performance Checklist¶
Before releasing your provider:
- All I/O operations use async/await
- No blocking calls (time.sleep, requests, etc.)
- Independent operations run in parallel (asyncio.gather)
- HTTP client connection pooling configured
- Expensive lookups are cached appropriately
- Batch operations when possible (no N+1 queries)
- Timeouts set on all network operations
- State kept minimal (only essential data)
- Schemas are cached/optimized
- Performance logging in place
- Profiled with realistic workload
Benchmarking Example¶
import asyncio
import time
async def benchmark_provider():
"""Benchmark provider operations."""
provider = MyCloudProvider()
await provider.configure({
"api_endpoint": "https://api.example.com",
"api_key": "test_key",
})
# Benchmark resource creation
iterations = 100
start = time.time()
tasks = [
create_test_resource(provider, i)
for i in range(iterations)
]
results = await asyncio.gather(*tasks)
duration = time.time() - start
ops_per_second = iterations / duration
print(f"Created {iterations} resources in {duration:.2f}s")
print(f"Throughput: {ops_per_second:.2f} ops/sec")
async def create_test_resource(provider, i):
"""Create a test resource."""
config = ResourceConfig(name=f"test-{i}")
state, _ = await provider.create_resource(config)
return state
Related Documentation¶
- Architecture Overview - System design
- Async Patterns Guide - Advanced async usage
- Best Practices - General best practices
- Logging Guide - Performance logging
- Testing Providers - Performance testing