Skip to content

Async locks

provide.foundation.concurrency.async_locks

TODO: Add module docstring.

Classes

AsyncLockInfo

Information about a registered async lock.

AsyncLockManager

AsyncLockManager()

Async-native centralized lock manager to prevent deadlocks.

Enforces lock ordering and provides timeout mechanisms for async code. All async locks should be acquired through this manager to prevent deadlocks.

Initialize async lock manager.

Source code in provide/foundation/concurrency/async_locks.py
def __init__(self) -> None:
    """Initialize async lock manager."""
    self._locks: dict[str, AsyncLockInfo] = {}
    self._manager_lock = asyncio.Lock()
    self._task_local: dict[asyncio.Task[Any], list[AsyncLockInfo]] = {}
Functions
acquire async
acquire(
    *lock_names: str, timeout: float = 10.0
) -> AsyncGenerator[None, None]

Acquire multiple locks in order to prevent deadlocks.

Parameters:

Name Type Description Default
*lock_names str

Names of locks to acquire

()
timeout float

Timeout in seconds

10.0

Yields:

Type Description
AsyncGenerator[None, None]

None when all locks are acquired

Raises:

Type Description
TimeoutError

If locks cannot be acquired within timeout

RuntimeError

If deadlock would occur or other lock issues

Source code in provide/foundation/concurrency/async_locks.py
@contextlib.asynccontextmanager
async def acquire(self, *lock_names: str, timeout: float = 10.0) -> AsyncGenerator[None, None]:
    """Acquire multiple locks in order to prevent deadlocks.

    Args:
        *lock_names: Names of locks to acquire
        timeout: Timeout in seconds

    Yields:
        None when all locks are acquired

    Raises:
        TimeoutError: If locks cannot be acquired within timeout
        RuntimeError: If deadlock would occur or other lock issues
    """
    if not lock_names:
        yield
        return

    lock_infos = await self._prepare_lock_acquisition(lock_names)
    acquired_locks: list[AsyncLockInfo] = []
    start_time = time.time()

    try:
        current_task = asyncio.current_task()
    except RuntimeError:
        current_task = None

    try:
        for lock_info in lock_infos:
            # Skip locks already in stack
            if current_task and lock_info in self._task_local.get(current_task, []):
                continue

            remaining_timeout = timeout - (time.time() - start_time)
            await self._acquire_lock_with_timeout(lock_info, remaining_timeout)

            acquired_locks.append(lock_info)
            if current_task:
                if current_task not in self._task_local:
                    self._task_local[current_task] = []
                self._task_local[current_task].append(lock_info)

        yield

    finally:
        await self._release_acquired_locks(acquired_locks)
detect_potential_deadlocks async
detect_potential_deadlocks() -> list[str]

Detect potential deadlock situations.

Returns:

Type Description
list[str]

List of warnings about potential deadlocks

Source code in provide/foundation/concurrency/async_locks.py
async def detect_potential_deadlocks(self) -> list[str]:
    """Detect potential deadlock situations.

    Returns:
        List of warnings about potential deadlocks
    """
    warnings = []

    async with self._manager_lock:
        for name, lock_info in self._locks.items():
            if lock_info.acquired_at and lock_info.owner:
                hold_time = time.time() - lock_info.acquired_at
                if hold_time > 30:  # 30 seconds is a long time to hold a lock
                    warnings.append(
                        f"Lock '{name}' held by {lock_info.owner} for {hold_time:.1f}s - "
                        f"potential deadlock or resource leak"
                    )

    return warnings
get_lock async
get_lock(name: str) -> asyncio.Lock

Get a registered lock by name.

Parameters:

Name Type Description Default
name str

Name of the lock

required

Returns:

Type Description
Lock

The lock instance

Raises:

Type Description
KeyError

If lock is not registered

Source code in provide/foundation/concurrency/async_locks.py
async def get_lock(self, name: str) -> asyncio.Lock:
    """Get a registered lock by name.

    Args:
        name: Name of the lock

    Returns:
        The lock instance

    Raises:
        KeyError: If lock is not registered
    """
    async with self._manager_lock:
        if name not in self._locks:
            raise KeyError(f"Lock '{name}' not registered")
        return self._locks[name].lock
get_lock_status async
get_lock_status() -> dict[str, dict[str, Any]]

Get current status of all locks.

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary with lock status information

Source code in provide/foundation/concurrency/async_locks.py
async def get_lock_status(self) -> dict[str, dict[str, Any]]:
    """Get current status of all locks.

    Returns:
        Dictionary with lock status information
    """
    async with self._manager_lock:
        status = {}
        for name, lock_info in self._locks.items():
            status[name] = {
                "order": lock_info.order,
                "description": lock_info.description,
                "owner": lock_info.owner,
                "acquired_at": lock_info.acquired_at,
                "is_locked": lock_info.lock.locked(),
            }
        return status
register_lock async
register_lock(
    name: str,
    order: int,
    description: str = "",
    lock: Lock | None = None,
) -> asyncio.Lock

Register a lock with the manager.

Parameters:

Name Type Description Default
name str

Unique name for the lock

required
order int

Order number for deadlock prevention (acquire in ascending order)

required
description str

Human-readable description

''
lock Lock | None

Existing lock to register, or None to create new one

None

Returns:

Type Description
Lock

The registered lock

Raises:

Type Description
ValueError

If lock name already exists or order conflicts

Source code in provide/foundation/concurrency/async_locks.py
async def register_lock(
    self,
    name: str,
    order: int,
    description: str = "",
    lock: asyncio.Lock | None = None,
) -> asyncio.Lock:
    """Register a lock with the manager.

    Args:
        name: Unique name for the lock
        order: Order number for deadlock prevention (acquire in ascending order)
        description: Human-readable description
        lock: Existing lock to register, or None to create new one

    Returns:
        The registered lock

    Raises:
        ValueError: If lock name already exists or order conflicts
    """
    async with self._manager_lock:
        if name in self._locks:
            raise ValueError(f"Lock '{name}' already registered")

        # Check for order conflicts
        for existing_name, lock_info in self._locks.items():
            if lock_info.order == order:
                raise ValueError(
                    f"Lock order {order} already used by '{existing_name}'. "
                    f"Each lock must have a unique order."
                )

        actual_lock = lock or asyncio.Lock()
        lock_info = AsyncLockInfo(name=name, lock=actual_lock, order=order, description=description)

        self._locks[name] = lock_info
        return actual_lock

Functions

get_async_lock_manager async

get_async_lock_manager() -> AsyncLockManager

Get the global async lock manager instance.

Source code in provide/foundation/concurrency/async_locks.py
async def get_async_lock_manager() -> AsyncLockManager:
    """Get the global async lock manager instance."""
    global _async_lock_manager, _async_locks_registered, _async_locks_registration_event

    if _async_lock_manager is None:
        _async_lock_manager = AsyncLockManager()

    # Fast path: registration already complete
    if _async_locks_registered:
        return _async_lock_manager

    # Coordinate registration with threading lock for state machine
    with _async_locks_registration_lock:
        # Re-check after acquiring lock (another task may have completed it)
        if _async_locks_registered:
            return _async_lock_manager

        # If registration is in progress by another task, get the event
        if _async_locks_registration_event is not None:
            event = _async_locks_registration_event
        else:
            # This task will perform registration - create threading.Event (loop-agnostic)
            _async_locks_registration_event = threading.Event()
            event = None

    # If we're waiting for another task/thread's registration
    if event is not None:
        # Wait on threading.Event in async-friendly way (works across event loops)
        # Use to_thread to avoid blocking the event loop
        await asyncio.to_thread(event.wait)

        # After waking, check if registration succeeded
        if _async_locks_registered:
            return _async_lock_manager
        # Registration failed, retry
        return await get_async_lock_manager()

    # This task performs registration
    try:
        await register_foundation_async_locks()
        _async_locks_registered = True
    except BaseException:
        # Clean up partial registration on failure
        if _async_lock_manager is not None:
            _async_lock_manager._locks.clear()
        raise
    finally:
        # Always unblock waiting tasks/threads and clear event
        if _async_locks_registration_event is not None:
            _async_locks_registration_event.set()
        _async_locks_registration_event = None

    return _async_lock_manager

register_foundation_async_locks async

register_foundation_async_locks() -> None

Register all foundation async locks with proper ordering.

Lock ordering hierarchy (LOWER numbers = MORE fundamental): - 0-99: Orchestration (coordinator, hub initialization) - 100-199: Early subsystems (logger - needed for debugging) - 200-299: Core infrastructure (config, registry, components) - 300+: Reserved for future subsystems

Source code in provide/foundation/concurrency/async_locks.py
async def register_foundation_async_locks() -> None:
    """Register all foundation async locks with proper ordering.

    Lock ordering hierarchy (LOWER numbers = MORE fundamental):
    - 0-99: Orchestration (coordinator, hub initialization)
    - 100-199: Early subsystems (logger - needed for debugging)
    - 200-299: Core infrastructure (config, registry, components)
    - 300+: Reserved for future subsystems
    """
    global _async_lock_manager

    # Use global directly - manager is guaranteed to exist because
    # get_async_lock_manager() creates it before calling this function
    if _async_lock_manager is None:
        raise RuntimeError("AsyncLockManager not initialized. Call get_async_lock_manager() first.")

    manager = _async_lock_manager

    # Orchestration (order 0-99) - most fundamental, acquired first
    await manager.register_lock("foundation.async.hub.init", order=0, description="Async hub initialization")
    await manager.register_lock(
        "foundation.async.init.coordinator", order=10, description="Async initialization coordinator"
    )
    await manager.register_lock("foundation.async.stream", order=20, description="Async log stream management")

    # Early subsystems (order 100-199) - needed early for debugging
    await manager.register_lock(
        "foundation.async.logger.lazy", order=100, description="Async lazy logger initialization"
    )
    await manager.register_lock(
        "foundation.async.logger.setup", order=110, description="Async logger setup coordination"
    )

    # Core infrastructure (order 200-299)
    await manager.register_lock("foundation.async.config", order=200, description="Async configuration system")
    await manager.register_lock("foundation.async.registry", order=210, description="Async component registry")
    await manager.register_lock(
        "foundation.async.hub.components", order=220, description="Async hub component management"
    )