Skip to content

Circuit Breakers

Learn how to use circuit breakers to prevent cascading failures and protect your system from overload.

Overview

Circuit breakers automatically stop calling failing services to prevent resource exhaustion and cascading failures. When a service fails repeatedly, the circuit "opens" and immediately rejects calls without attempting the operation, giving the failing service time to recover.

This pattern is essential for building resilient distributed systems where one failing service shouldn't bring down the entire application.

Circuit States

The circuit breaker has three states:

┌─────────┐
│ CLOSED  │ ◄─── Normal operation
└────┬────┘      All requests pass through
     │           Count failures
     │ Failure threshold exceeded
┌─────────┐
│  OPEN   │ ◄─── Circuit tripped
└────┬────┘      Requests fail immediately
     │           No calls to service
     │ Timeout expires
┌──────────┐
│HALF-OPEN │ ◄─── Testing recovery
└────┬─────┘     Allow one test request
     ├─── Success: → CLOSED
     └─── Failure: → OPEN

Closed State

  • Normal operation - All requests pass through
  • Monitoring - Counts failures
  • Transition - Opens when failure threshold is exceeded

Open State

  • Protection mode - Requests fail immediately with CircuitBreakerOpen exception
  • No service calls - Gives failing service time to recover
  • Transition - Moves to half-open after timeout period

Half-Open State

  • Recovery testing - Allows limited test requests
  • Evaluation - Monitors if service has recovered
  • Transition - Returns to closed if tests succeed, back to open if they fail

Basic Circuit Breaker

from provide.foundation.resilience import circuit_breaker

@circuit_breaker(failure_threshold=5, timeout=60)
def call_external_api():
    """Call an external API with circuit breaker protection."""
    response = requests.get("https://api.example.com/data")
    return response.json()

How it works: - After 5 consecutive failures, the circuit opens - Circuit stays open for 60 seconds - After timeout, allows one test request (half-open) - If test succeeds, circuit closes; if it fails, stays open for another 60 seconds

Configuration Options

Failure Threshold

Number of consecutive failures before opening the circuit:

@circuit_breaker(
    failure_threshold=3,  # Open after 3 failures
    timeout=30,
)
def unreliable_service():
    """Service with low failure tolerance."""
    pass

Timeout Duration

How long the circuit stays open before testing recovery:

@circuit_breaker(
    failure_threshold=5,
    timeout=120,  # Stay open for 2 minutes
)
def slow_recovery_service():
    """Service that needs time to recover."""
    pass

Success Threshold (Half-Open)

Number of successful requests needed to close circuit from half-open:

@circuit_breaker(
    failure_threshold=5,
    timeout=60,
    success_threshold=3,  # Need 3 successes to fully close
)
def cautious_recovery():
    """Require multiple successes before trusting service again."""
    pass

Failure Window

Track failures within a time window instead of consecutively:

@circuit_breaker(
    failure_threshold=10,
    failure_window=60,  # 10 failures within 60 seconds opens circuit
    timeout=120,
)
def rate_based_protection():
    """Open circuit based on failure rate, not consecutive failures."""
    pass

Custom Failure Predicates

Define custom logic to determine what constitutes a "failure":

def is_retriable_error(exception):
    """Only count certain errors as circuit breaker failures."""
    # Don't open circuit for client errors (4xx)
    if isinstance(exception, HTTPError):
        return exception.status_code >= 500  # Only server errors
    # Count network errors
    return isinstance(exception, (ConnectionError, TimeoutError))

@circuit_breaker(
    failure_threshold=5,
    timeout=60,
    failure_predicate=is_retriable_error,
)
def smart_http_call(url):
    """Circuit breaker that ignores client errors."""
    response = requests.get(url)
    if response.status_code >= 400:
        raise HTTPError(response.status_code)
    return response.json()

Monitoring Circuit State

Check the circuit state programmatically:

from provide.foundation.resilience import CircuitBreaker

# Create a reusable circuit breaker
api_circuit = CircuitBreaker(
    failure_threshold=5,
    timeout=60,
    name="external_api_circuit",
)

@api_circuit.protect
def call_api():
    """Protected by named circuit breaker."""
    return requests.get("https://api.example.com/data").json()

# Check circuit state
if api_circuit.state == CircuitState.OPEN:
    logger.warning("Circuit is open, API calls are being rejected")
elif api_circuit.state == CircuitState.HALF_OPEN:
    logger.info("Circuit is testing recovery")
else:
    logger.info("Circuit is closed, operating normally")

# Get circuit metrics
logger.info(
    "Circuit metrics",
    failure_count=api_circuit.failure_count,
    success_count=api_circuit.success_count,
    last_failure_time=api_circuit.last_failure_time,
)

Handling Circuit Open Exceptions

Handle circuit breaker open exceptions gracefully:

from provide.foundation.resilience import circuit_breaker, CircuitBreakerOpen
from provide.foundation import logger

@circuit_breaker(failure_threshold=3, timeout=30)
def fetch_user_data(user_id):
    """Fetch user data with circuit protection."""
    return api_client.get(f"/users/{user_id}")

def get_user_with_fallback(user_id):
    """Get user data with fallback when circuit is open."""
    try:
        return fetch_user_data(user_id)
    except CircuitBreakerOpen:
        logger.warning(
            "Circuit breaker open, using cached data",
            user_id=user_id,
        )
        # Return cached data or default
        return get_cached_user_data(user_id)
    except Exception as e:
        logger.error("Failed to fetch user data", error=str(e))
        raise

Common Patterns

Database Connection Protection

@circuit_breaker(
    failure_threshold=3,
    timeout=60,
    success_threshold=2,
)
def execute_database_query(query, params):
    """Execute query with circuit breaker protection."""
    logger.debug("Executing database query", query=query[:100])

    connection = get_database_connection()
    cursor = connection.execute(query, params)
    result = cursor.fetchall()

    logger.debug("Query completed", row_count=len(result))
    return result

Microservice Call Protection

from provide.foundation.resilience import circuit_breaker, CircuitBreakerOpen

class UserServiceClient:
    """Client for user microservice with circuit protection."""

    def __init__(self):
        self.circuit = CircuitBreaker(
            failure_threshold=5,
            timeout=120,
            name="user_service",
        )

    @circuit.protect
    def get_user(self, user_id):
        """Get user from service."""
        response = requests.get(
            f"{self.service_url}/users/{user_id}",
            timeout=5,
        )
        response.raise_for_status()
        return response.json()

    def get_user_safe(self, user_id):
        """Get user with fallback."""
        try:
            return self.get_user(user_id)
        except CircuitBreakerOpen:
            logger.warning("User service circuit open")
            return {"id": user_id, "name": "Unknown"}

External API Protection

@circuit_breaker(
    failure_threshold=10,
    failure_window=60,  # 10 failures in 60 seconds
    timeout=300,  # Stay open for 5 minutes
)
def call_payment_gateway(transaction_data):
    """Call payment gateway with circuit protection."""
    logger.info(
        "Processing payment",
        amount=transaction_data["amount"],
        currency=transaction_data["currency"],
    )

    response = requests.post(
        "https://payment-gateway.example.com/charge",
        json=transaction_data,
        timeout=10,
    )

    if response.status_code >= 500:
        raise ServiceUnavailable("Payment gateway error")

    response.raise_for_status()
    return response.json()

Circuit Breaker with Caching

Combine circuit breaker with caching for maximum resilience:

from functools import lru_cache
from datetime import datetime, timedelta

class CachedAPIClient:
    """API client with circuit breaker and caching."""

    def __init__(self):
        self.circuit = CircuitBreaker(
            failure_threshold=5,
            timeout=60,
        )
        self.cache = {}
        self.cache_ttl = timedelta(minutes=5)

    def get_data(self, key):
        """Get data with circuit breaker and cache fallback."""
        # Check cache first
        cached = self._get_from_cache(key)
        if cached:
            return cached

        # Try to fetch from API
        try:
            data = self._fetch_from_api(key)
            self._store_in_cache(key, data)
            return data
        except CircuitBreakerOpen:
            logger.warning(
                "Circuit open, using stale cache",
                key=key,
            )
            # Return stale cache if available
            return self._get_stale_cache(key)

    @circuit.protect
    def _fetch_from_api(self, key):
        """Fetch from API (protected by circuit breaker)."""
        response = requests.get(f"https://api.example.com/data/{key}")
        response.raise_for_status()
        return response.json()

    def _get_from_cache(self, key):
        """Get fresh data from cache."""
        if key in self.cache:
            data, timestamp = self.cache[key]
            if datetime.now() - timestamp < self.cache_ttl:
                return data
        return None

    def _get_stale_cache(self, key):
        """Get stale data from cache as fallback."""
        if key in self.cache:
            data, _ = self.cache[key]
            return data
        return None

    def _store_in_cache(self, key, data):
        """Store data in cache."""
        self.cache[key] = (data, datetime.now())

Best Practices

✅ DO: Set Appropriate Failure Thresholds

# ✅ Good: Threshold based on expected error rate
@circuit_breaker(
    failure_threshold=5,  # Reasonable for most APIs
    timeout=60,
)
def api_call():
    pass

# ❌ Bad: Threshold too low (overly sensitive)
@circuit_breaker(
    failure_threshold=1,  # Opens on first failure!
    timeout=60,
)
def too_sensitive():
    pass

✅ DO: Use Different Circuits for Different Services

# ✅ Good: Separate circuits for different services
payment_circuit = CircuitBreaker(failure_threshold=3, timeout=120)
user_circuit = CircuitBreaker(failure_threshold=5, timeout=60)

@payment_circuit.protect
def process_payment():
    pass

@user_circuit.protect
def get_user():
    pass

✅ DO: Implement Fallbacks

# ✅ Good: Graceful degradation when circuit opens
def get_recommendations(user_id):
    try:
        return fetch_recommendations(user_id)
    except CircuitBreakerOpen:
        # Return default recommendations
        return get_default_recommendations()

✅ DO: Monitor Circuit State

# ✅ Good: Log circuit state changes
from provide.foundation.resilience import CircuitState

def on_state_change(circuit, old_state, new_state):
    """Called when circuit state changes."""
    logger.warning(
        "Circuit state changed",
        circuit=circuit.name,
        old_state=old_state.name,
        new_state=new_state.name,
    )

api_circuit = CircuitBreaker(
    failure_threshold=5,
    timeout=60,
    on_state_change=on_state_change,
)

❌ DON'T: Share Circuits Across Unrelated Services

# ❌ Bad: Single circuit for multiple services
@circuit_breaker(failure_threshold=5, timeout=60)
def call_any_service(service_url):
    # One service failure affects all services!
    return requests.get(service_url).json()

# ✅ Good: Separate circuits per service
@user_service_circuit.protect
def call_user_service():
    pass

@payment_service_circuit.protect
def call_payment_service():
    pass

❌ DON'T: Use Very Short Timeouts

# ❌ Bad: Circuit opens and closes too quickly
@circuit_breaker(
    failure_threshold=3,
    timeout=1,  # Only 1 second - too short!
)
def flaky_call():
    pass

# ✅ Good: Give service time to recover
@circuit_breaker(
    failure_threshold=3,
    timeout=60,  # 1 minute minimum
)
def stable_call():
    pass

Combining Circuit Breaker with Retry

Use both patterns together for maximum resilience:

from provide.foundation.resilience import retry, circuit_breaker

# Circuit breaker on the outside, retry on the inside
@circuit_breaker(failure_threshold=5, timeout=60)
@retry(
    (NetworkError, TimeoutError),
    max_attempts=3,
    base_delay=1.0,
)
def resilient_api_call():
    """
    Protected by both retry and circuit breaker.

    - Retry handles transient failures (3 attempts)
    - Circuit breaker prevents cascading failures
    - If retries repeatedly fail, circuit opens
    """
    return requests.get("https://api.example.com/data").json()

Order matters: - Circuit breaker outside, retry inside (recommended) - Circuit tracks overall failures including retries - If service is down, circuit opens and stops retry attempts

  • Retry outside, circuit breaker inside (not recommended)
  • Retry will attempt even when circuit is open
  • Wastes resources on calls that will fail immediately

Testing Circuit Breakers

Test circuit behavior in your tests:

import pytest
from provide.foundation.resilience import CircuitBreaker, CircuitBreakerOpen

def test_circuit_opens_after_failures():
    """Test circuit opens after threshold failures."""
    circuit = CircuitBreaker(failure_threshold=3, timeout=60)

    @circuit.protect
    def failing_operation():
        raise RuntimeError("Service unavailable")

    # First 3 calls fail and circuit opens
    for i in range(3):
        with pytest.raises(RuntimeError):
            failing_operation()

    # 4th call rejected by open circuit
    with pytest.raises(CircuitBreakerOpen):
        failing_operation()

def test_circuit_half_open_recovery():
    """Test circuit recovery through half-open state."""
    circuit = CircuitBreaker(
        failure_threshold=2,
        timeout=0.1,  # Short timeout for testing
        success_threshold=2,
    )

    call_count = 0

    @circuit.protect
    def sometimes_failing():
        nonlocal call_count
        call_count += 1
        if call_count <= 2:
            raise RuntimeError("Failing")
        return "Success"

    # Open circuit with failures
    for _ in range(2):
        with pytest.raises(RuntimeError):
            sometimes_failing()

    # Wait for timeout
    time.sleep(0.2)

    # Circuit moves to half-open, test succeeds
    assert sometimes_failing() == "Success"
    assert sometimes_failing() == "Success"

    # Circuit should now be closed
    assert circuit.state == CircuitState.CLOSED

Next Steps

Examples

  • See examples/production/02_error_handling.py for circuit breaker examples
  • See examples/transport/01_http_client.py for HTTP circuit protection

API Reference


Tip: Start with conservative thresholds (5-10 failures) and adjust based on your service's behavior. Always implement fallbacks for when circuits open.