Skip to content

Locks

provide.foundation.concurrency.locks

TODO: Add module docstring.

Classes

LockInfo

Information about a registered lock.

LockManager

LockManager()

Centralized lock manager to prevent deadlocks.

Enforces lock ordering and provides timeout mechanisms. All locks must be acquired through this manager to prevent deadlocks.

Initialize lock manager.

Source code in provide/foundation/concurrency/locks.py
def __init__(self) -> None:
    """Initialize lock manager."""
    self._locks: dict[str, LockInfo] = {}
    self._manager_lock = threading.RLock()
    self._thread_local = threading.local()
Functions
acquire
acquire(
    *lock_names: str,
    timeout: float = 10.0,
    blocking: bool = True
) -> Generator[None, None, None]

Acquire multiple locks in order to prevent deadlocks.

Parameters:

Name Type Description Default
*lock_names str

Names of locks to acquire

()
timeout float

Timeout in seconds

10.0
blocking bool

Whether to block or raise immediately if locks unavailable

True

Yields:

Type Description
None

None when all locks are acquired

Raises:

Type Description
TimeoutError

If locks cannot be acquired within timeout

RuntimeError

If deadlock would occur or other lock issues

Source code in provide/foundation/concurrency/locks.py
@contextlib.contextmanager
def acquire(
    self, *lock_names: str, timeout: float = 10.0, blocking: bool = True
) -> Generator[None, None, None]:
    """Acquire multiple locks in order to prevent deadlocks.

    Args:
        *lock_names: Names of locks to acquire
        timeout: Timeout in seconds
        blocking: Whether to block or raise immediately if locks unavailable

    Yields:
        None when all locks are acquired

    Raises:
        TimeoutError: If locks cannot be acquired within timeout
        RuntimeError: If deadlock would occur or other lock issues
    """
    if not lock_names:
        yield
        return

    lock_infos = self._prepare_lock_acquisition(lock_names)
    acquired_locks: list[LockInfo] = []
    start_time = time.time()

    try:
        for lock_info in lock_infos:
            # Skip locks already in stack (re-entrant behavior)
            if lock_info in self._thread_local.lock_stack:
                continue

            remaining_timeout = timeout - (time.time() - start_time)
            self._acquire_lock_with_timeout(lock_info, remaining_timeout, blocking)

            acquired_locks.append(lock_info)
            self._thread_local.lock_stack.append(lock_info)

        yield

    finally:
        self._release_acquired_locks(acquired_locks)
detect_potential_deadlocks
detect_potential_deadlocks() -> list[str]

Detect potential deadlock situations.

Returns:

Type Description
list[str]

List of warnings about potential deadlocks

Source code in provide/foundation/concurrency/locks.py
def detect_potential_deadlocks(self) -> list[str]:
    """Detect potential deadlock situations.

    Returns:
        List of warnings about potential deadlocks
    """
    warnings = []

    # Check for lock ordering violations across threads
    # This is a simplified check - real deadlock detection is complex
    with self._manager_lock:
        for name, lock_info in self._locks.items():
            if lock_info.acquired_at and lock_info.owner:
                hold_time = time.time() - lock_info.acquired_at
                if hold_time > 30:  # 30 seconds is a long time to hold a lock
                    warnings.append(
                        f"Lock '{name}' held by {lock_info.owner} for {hold_time:.1f}s - "
                        f"potential deadlock or resource leak"
                    )

    return warnings
get_lock
get_lock(name: str) -> threading.RLock

Get a registered lock by name.

Parameters:

Name Type Description Default
name str

Name of the lock

required

Returns:

Type Description
RLock

The lock instance

Raises:

Type Description
KeyError

If lock is not registered

Source code in provide/foundation/concurrency/locks.py
def get_lock(self, name: str) -> threading.RLock:
    """Get a registered lock by name.

    Args:
        name: Name of the lock

    Returns:
        The lock instance

    Raises:
        KeyError: If lock is not registered
    """
    with self._manager_lock:
        if name not in self._locks:
            raise KeyError(f"Lock '{name}' not registered")
        return self._locks[name].lock
get_lock_status
get_lock_status() -> dict[str, dict[str, Any]]

Get current status of all locks.

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary with lock status information

Source code in provide/foundation/concurrency/locks.py
def get_lock_status(self) -> dict[str, dict[str, Any]]:
    """Get current status of all locks.

    Returns:
        Dictionary with lock status information
    """
    with self._manager_lock:
        status = {}
        for name, lock_info in self._locks.items():
            status[name] = {
                "order": lock_info.order,
                "description": lock_info.description,
                "owner": lock_info.owner,
                "acquired_at": lock_info.acquired_at,
                "is_locked": lock_info.lock._is_owned() if hasattr(lock_info.lock, "_is_owned") else None,
            }
        return status
register_lock
register_lock(
    name: str,
    order: int,
    description: str = "",
    lock: RLock | None = None,
) -> threading.RLock

Register a lock with the manager.

Parameters:

Name Type Description Default
name str

Unique name for the lock

required
order int

Order number for deadlock prevention (acquire in ascending order)

required
description str

Human-readable description

''
lock RLock | None

Existing lock to register, or None to create new one

None

Returns:

Type Description
RLock

The registered lock

Raises:

Type Description
ValueError

If lock name already exists or order conflicts

Source code in provide/foundation/concurrency/locks.py
def register_lock(
    self,
    name: str,
    order: int,
    description: str = "",
    lock: threading.RLock | None = None,
) -> threading.RLock:
    """Register a lock with the manager.

    Args:
        name: Unique name for the lock
        order: Order number for deadlock prevention (acquire in ascending order)
        description: Human-readable description
        lock: Existing lock to register, or None to create new one

    Returns:
        The registered lock

    Raises:
        ValueError: If lock name already exists or order conflicts
    """
    with self._manager_lock:
        if name in self._locks:
            raise ValueError(f"Lock '{name}' already registered")

        # Check for order conflicts
        for existing_name, lock_info in self._locks.items():
            if lock_info.order == order:
                raise ValueError(
                    f"Lock order {order} already used by '{existing_name}'. "
                    f"Each lock must have a unique order."
                )

        actual_lock = lock or threading.RLock()
        lock_info = LockInfo(name=name, lock=actual_lock, order=order, description=description)

        self._locks[name] = lock_info
        return actual_lock

Functions

get_lock_manager

get_lock_manager() -> LockManager

Get the global lock manager instance.

Source code in provide/foundation/concurrency/locks.py
def get_lock_manager() -> LockManager:
    """Get the global lock manager instance."""
    global _locks_registered
    with _registration_lock:
        if not _locks_registered:
            register_foundation_locks()
            _locks_registered = True
    return _lock_manager

register_foundation_locks

register_foundation_locks() -> None

Register all foundation locks with proper ordering.

Lock ordering hierarchy (LOWER numbers = MORE fundamental): - 0-99: Orchestration (coordinator, hub initialization) - 100-199: Early subsystems (logger - needed for debugging) - 200-299: Core infrastructure (config, registry, components) - 300+: Reserved for future subsystems

Source code in provide/foundation/concurrency/locks.py
def register_foundation_locks() -> None:
    """Register all foundation locks with proper ordering.

    Lock ordering hierarchy (LOWER numbers = MORE fundamental):
    - 0-99: Orchestration (coordinator, hub initialization)
    - 100-199: Early subsystems (logger - needed for debugging)
    - 200-299: Core infrastructure (config, registry, components)
    - 300+: Reserved for future subsystems
    """
    manager = _lock_manager

    # Orchestration (order 0-99) - most fundamental, acquired first
    manager.register_lock("foundation.hub.init", order=0, description="Hub initialization")
    manager.register_lock(
        "foundation.init.coordinator", order=10, description="Master initialization coordinator"
    )
    manager.register_lock("foundation.stream", order=20, description="Log stream management lock")

    # Early subsystems (order 100-199) - needed early for debugging
    manager.register_lock("foundation.logger.lazy", order=100, description="Lazy logger initialization")
    manager.register_lock("foundation.logger.setup", order=110, description="Logger setup coordination")

    # Core infrastructure (order 200-299)
    manager.register_lock("foundation.config", order=200, description="Configuration system lock")
    manager.register_lock("foundation.registry", order=210, description="Component registry lock")
    manager.register_lock("foundation.hub.components", order=220, description="Hub component management")