Skip to content

Concurrency

provide.foundation.concurrency

TODO: Add module docstring.

Classes

AsyncLockInfo

Information about a registered async lock.

AsyncLockManager

AsyncLockManager()

Async-native centralized lock manager to prevent deadlocks.

Enforces lock ordering and provides timeout mechanisms for async code. All async locks should be acquired through this manager to prevent deadlocks.

Initialize async lock manager.

Source code in provide/foundation/concurrency/async_locks.py
def __init__(self) -> None:
    """Initialize async lock manager."""
    self._locks: dict[str, AsyncLockInfo] = {}
    self._manager_lock = asyncio.Lock()
    self._task_local: dict[asyncio.Task[Any], list[AsyncLockInfo]] = {}
Functions
acquire async
acquire(
    *lock_names: str, timeout: float = 10.0
) -> AsyncGenerator[None, None]

Acquire multiple locks in order to prevent deadlocks.

Parameters:

Name Type Description Default
*lock_names str

Names of locks to acquire

()
timeout float

Timeout in seconds

10.0

Yields:

Type Description
AsyncGenerator[None, None]

None when all locks are acquired

Raises:

Type Description
TimeoutError

If locks cannot be acquired within timeout

RuntimeError

If deadlock would occur or other lock issues

Source code in provide/foundation/concurrency/async_locks.py
@contextlib.asynccontextmanager
async def acquire(self, *lock_names: str, timeout: float = 10.0) -> AsyncGenerator[None, None]:
    """Acquire multiple locks in order to prevent deadlocks.

    Args:
        *lock_names: Names of locks to acquire
        timeout: Timeout in seconds

    Yields:
        None when all locks are acquired

    Raises:
        TimeoutError: If locks cannot be acquired within timeout
        RuntimeError: If deadlock would occur or other lock issues
    """
    if not lock_names:
        yield
        return

    lock_infos = await self._prepare_lock_acquisition(lock_names)
    acquired_locks: list[AsyncLockInfo] = []
    start_time = time.time()

    try:
        current_task = asyncio.current_task()
    except RuntimeError:
        current_task = None

    try:
        for lock_info in lock_infos:
            # Skip locks already in stack
            if current_task and lock_info in self._task_local.get(current_task, []):
                continue

            remaining_timeout = timeout - (time.time() - start_time)
            await self._acquire_lock_with_timeout(lock_info, remaining_timeout)

            acquired_locks.append(lock_info)
            if current_task:
                if current_task not in self._task_local:
                    self._task_local[current_task] = []
                self._task_local[current_task].append(lock_info)

        yield

    finally:
        await self._release_acquired_locks(acquired_locks)
detect_potential_deadlocks async
detect_potential_deadlocks() -> list[str]

Detect potential deadlock situations.

Returns:

Type Description
list[str]

List of warnings about potential deadlocks

Source code in provide/foundation/concurrency/async_locks.py
async def detect_potential_deadlocks(self) -> list[str]:
    """Detect potential deadlock situations.

    Returns:
        List of warnings about potential deadlocks
    """
    warnings = []

    async with self._manager_lock:
        for name, lock_info in self._locks.items():
            if lock_info.acquired_at and lock_info.owner:
                hold_time = time.time() - lock_info.acquired_at
                if hold_time > 30:  # 30 seconds is a long time to hold a lock
                    warnings.append(
                        f"Lock '{name}' held by {lock_info.owner} for {hold_time:.1f}s - "
                        f"potential deadlock or resource leak"
                    )

    return warnings
get_lock async
get_lock(name: str) -> asyncio.Lock

Get a registered lock by name.

Parameters:

Name Type Description Default
name str

Name of the lock

required

Returns:

Type Description
Lock

The lock instance

Raises:

Type Description
KeyError

If lock is not registered

Source code in provide/foundation/concurrency/async_locks.py
async def get_lock(self, name: str) -> asyncio.Lock:
    """Get a registered lock by name.

    Args:
        name: Name of the lock

    Returns:
        The lock instance

    Raises:
        KeyError: If lock is not registered
    """
    async with self._manager_lock:
        if name not in self._locks:
            raise KeyError(f"Lock '{name}' not registered")
        return self._locks[name].lock
get_lock_status async
get_lock_status() -> dict[str, dict[str, Any]]

Get current status of all locks.

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary with lock status information

Source code in provide/foundation/concurrency/async_locks.py
async def get_lock_status(self) -> dict[str, dict[str, Any]]:
    """Get current status of all locks.

    Returns:
        Dictionary with lock status information
    """
    async with self._manager_lock:
        status = {}
        for name, lock_info in self._locks.items():
            status[name] = {
                "order": lock_info.order,
                "description": lock_info.description,
                "owner": lock_info.owner,
                "acquired_at": lock_info.acquired_at,
                "is_locked": lock_info.lock.locked(),
            }
        return status
register_lock async
register_lock(
    name: str,
    order: int,
    description: str = "",
    lock: Lock | None = None,
) -> asyncio.Lock

Register a lock with the manager.

Parameters:

Name Type Description Default
name str

Unique name for the lock

required
order int

Order number for deadlock prevention (acquire in ascending order)

required
description str

Human-readable description

''
lock Lock | None

Existing lock to register, or None to create new one

None

Returns:

Type Description
Lock

The registered lock

Raises:

Type Description
ValueError

If lock name already exists or order conflicts

Source code in provide/foundation/concurrency/async_locks.py
async def register_lock(
    self,
    name: str,
    order: int,
    description: str = "",
    lock: asyncio.Lock | None = None,
) -> asyncio.Lock:
    """Register a lock with the manager.

    Args:
        name: Unique name for the lock
        order: Order number for deadlock prevention (acquire in ascending order)
        description: Human-readable description
        lock: Existing lock to register, or None to create new one

    Returns:
        The registered lock

    Raises:
        ValueError: If lock name already exists or order conflicts
    """
    async with self._manager_lock:
        if name in self._locks:
            raise ValueError(f"Lock '{name}' already registered")

        # Check for order conflicts
        for existing_name, lock_info in self._locks.items():
            if lock_info.order == order:
                raise ValueError(
                    f"Lock order {order} already used by '{existing_name}'. "
                    f"Each lock must have a unique order."
                )

        actual_lock = lock or asyncio.Lock()
        lock_info = AsyncLockInfo(name=name, lock=actual_lock, order=order, description=description)

        self._locks[name] = lock_info
        return actual_lock

LockInfo

Information about a registered lock.

LockManager

LockManager()

Centralized lock manager to prevent deadlocks.

Enforces lock ordering and provides timeout mechanisms. All locks must be acquired through this manager to prevent deadlocks.

Initialize lock manager.

Source code in provide/foundation/concurrency/locks.py
def __init__(self) -> None:
    """Initialize lock manager."""
    self._locks: dict[str, LockInfo] = {}
    self._manager_lock = threading.RLock()
    self._thread_local = threading.local()
Functions
acquire
acquire(
    *lock_names: str,
    timeout: float = 10.0,
    blocking: bool = True
) -> Generator[None, None, None]

Acquire multiple locks in order to prevent deadlocks.

Parameters:

Name Type Description Default
*lock_names str

Names of locks to acquire

()
timeout float

Timeout in seconds

10.0
blocking bool

Whether to block or raise immediately if locks unavailable

True

Yields:

Type Description
None

None when all locks are acquired

Raises:

Type Description
TimeoutError

If locks cannot be acquired within timeout

RuntimeError

If deadlock would occur or other lock issues

Source code in provide/foundation/concurrency/locks.py
@contextlib.contextmanager
def acquire(
    self, *lock_names: str, timeout: float = 10.0, blocking: bool = True
) -> Generator[None, None, None]:
    """Acquire multiple locks in order to prevent deadlocks.

    Args:
        *lock_names: Names of locks to acquire
        timeout: Timeout in seconds
        blocking: Whether to block or raise immediately if locks unavailable

    Yields:
        None when all locks are acquired

    Raises:
        TimeoutError: If locks cannot be acquired within timeout
        RuntimeError: If deadlock would occur or other lock issues
    """
    if not lock_names:
        yield
        return

    lock_infos = self._prepare_lock_acquisition(lock_names)
    acquired_locks: list[LockInfo] = []
    start_time = time.time()

    try:
        for lock_info in lock_infos:
            # Skip locks already in stack (re-entrant behavior)
            if lock_info in self._thread_local.lock_stack:
                continue

            remaining_timeout = timeout - (time.time() - start_time)
            self._acquire_lock_with_timeout(lock_info, remaining_timeout, blocking)

            acquired_locks.append(lock_info)
            self._thread_local.lock_stack.append(lock_info)

        yield

    finally:
        self._release_acquired_locks(acquired_locks)
detect_potential_deadlocks
detect_potential_deadlocks() -> list[str]

Detect potential deadlock situations.

Returns:

Type Description
list[str]

List of warnings about potential deadlocks

Source code in provide/foundation/concurrency/locks.py
def detect_potential_deadlocks(self) -> list[str]:
    """Detect potential deadlock situations.

    Returns:
        List of warnings about potential deadlocks
    """
    warnings = []

    # Check for lock ordering violations across threads
    # This is a simplified check - real deadlock detection is complex
    with self._manager_lock:
        for name, lock_info in self._locks.items():
            if lock_info.acquired_at and lock_info.owner:
                hold_time = time.time() - lock_info.acquired_at
                if hold_time > 30:  # 30 seconds is a long time to hold a lock
                    warnings.append(
                        f"Lock '{name}' held by {lock_info.owner} for {hold_time:.1f}s - "
                        f"potential deadlock or resource leak"
                    )

    return warnings
get_lock
get_lock(name: str) -> threading.RLock

Get a registered lock by name.

Parameters:

Name Type Description Default
name str

Name of the lock

required

Returns:

Type Description
RLock

The lock instance

Raises:

Type Description
KeyError

If lock is not registered

Source code in provide/foundation/concurrency/locks.py
def get_lock(self, name: str) -> threading.RLock:
    """Get a registered lock by name.

    Args:
        name: Name of the lock

    Returns:
        The lock instance

    Raises:
        KeyError: If lock is not registered
    """
    with self._manager_lock:
        if name not in self._locks:
            raise KeyError(f"Lock '{name}' not registered")
        return self._locks[name].lock
get_lock_status
get_lock_status() -> dict[str, dict[str, Any]]

Get current status of all locks.

Returns:

Type Description
dict[str, dict[str, Any]]

Dictionary with lock status information

Source code in provide/foundation/concurrency/locks.py
def get_lock_status(self) -> dict[str, dict[str, Any]]:
    """Get current status of all locks.

    Returns:
        Dictionary with lock status information
    """
    with self._manager_lock:
        status = {}
        for name, lock_info in self._locks.items():
            status[name] = {
                "order": lock_info.order,
                "description": lock_info.description,
                "owner": lock_info.owner,
                "acquired_at": lock_info.acquired_at,
                "is_locked": lock_info.lock._is_owned() if hasattr(lock_info.lock, "_is_owned") else None,
            }
        return status
register_lock
register_lock(
    name: str,
    order: int,
    description: str = "",
    lock: RLock | None = None,
) -> threading.RLock

Register a lock with the manager.

Parameters:

Name Type Description Default
name str

Unique name for the lock

required
order int

Order number for deadlock prevention (acquire in ascending order)

required
description str

Human-readable description

''
lock RLock | None

Existing lock to register, or None to create new one

None

Returns:

Type Description
RLock

The registered lock

Raises:

Type Description
ValueError

If lock name already exists or order conflicts

Source code in provide/foundation/concurrency/locks.py
def register_lock(
    self,
    name: str,
    order: int,
    description: str = "",
    lock: threading.RLock | None = None,
) -> threading.RLock:
    """Register a lock with the manager.

    Args:
        name: Unique name for the lock
        order: Order number for deadlock prevention (acquire in ascending order)
        description: Human-readable description
        lock: Existing lock to register, or None to create new one

    Returns:
        The registered lock

    Raises:
        ValueError: If lock name already exists or order conflicts
    """
    with self._manager_lock:
        if name in self._locks:
            raise ValueError(f"Lock '{name}' already registered")

        # Check for order conflicts
        for existing_name, lock_info in self._locks.items():
            if lock_info.order == order:
                raise ValueError(
                    f"Lock order {order} already used by '{existing_name}'. "
                    f"Each lock must have a unique order."
                )

        actual_lock = lock or threading.RLock()
        lock_info = LockInfo(name=name, lock=actual_lock, order=order, description=description)

        self._locks[name] = lock_info
        return actual_lock

Functions

async_gather async

async_gather(
    *awaitables: Awaitable[Any],
    return_exceptions: bool = False
) -> list[Any]

Run awaitables concurrently with Foundation tracking.

Parameters:

Name Type Description Default
*awaitables Awaitable[Any]

Awaitable objects to run concurrently

()
return_exceptions bool

If True, exceptions are returned as results

False

Returns:

Type Description
list[Any]

List of results in the same order as input awaitables

Raises:

Type Description
ValidationError

If no awaitables provided

Example

import asyncio async def fetch_data(n): ... await async_sleep(0.1) ... return n * 2 async def main(): ... results = await async_gather( ... fetch_data(1), fetch_data(2), fetch_data(3) ... ) ... return results asyncio.run(main()) [2, 4, 6]

Source code in provide/foundation/concurrency/core.py
async def async_gather(*awaitables: Awaitable[Any], return_exceptions: bool = False) -> list[Any]:
    """Run awaitables concurrently with Foundation tracking.

    Args:
        *awaitables: Awaitable objects to run concurrently
        return_exceptions: If True, exceptions are returned as results

    Returns:
        List of results in the same order as input awaitables

    Raises:
        ValidationError: If no awaitables provided

    Example:
        >>> import asyncio
        >>> async def fetch_data(n):
        ...     await async_sleep(0.1)
        ...     return n * 2
        >>> async def main():
        ...     results = await async_gather(
        ...         fetch_data(1), fetch_data(2), fetch_data(3)
        ...     )
        ...     return results
        >>> asyncio.run(main())
        [2, 4, 6]

    """
    if not awaitables:
        raise ValidationError("At least one awaitable must be provided")

    return await asyncio.gather(*awaitables, return_exceptions=return_exceptions)

async_run

async_run(
    main: Callable[[], Awaitable[Any]],
    *,
    debug: bool = False
) -> Any

Run async function with Foundation tracking.

Parameters:

Name Type Description Default
main Callable[[], Awaitable[Any]]

Async function to run

required
debug bool

Whether to run in debug mode

False

Returns:

Type Description
Any

Result of the main function

Raises:

Type Description
ValidationError

If main is not callable

Example

async def main(): ... await async_sleep(0.1) ... return "hello" result = async_run(main) result 'hello'

Source code in provide/foundation/concurrency/core.py
def async_run(main: Callable[[], Awaitable[Any]], *, debug: bool = False) -> Any:
    """Run async function with Foundation tracking.

    Args:
        main: Async function to run
        debug: Whether to run in debug mode

    Returns:
        Result of the main function

    Raises:
        ValidationError: If main is not callable

    Example:
        >>> async def main():
        ...     await async_sleep(0.1)
        ...     return "hello"
        >>> result = async_run(main)
        >>> result
        'hello'

    """
    if not callable(main):
        raise ValidationError("Main must be callable")

    return asyncio.run(main(), debug=debug)  # type: ignore[arg-type]

async_sleep async

async_sleep(delay: float) -> None

Async sleep with Foundation tracking and cancellation support.

Parameters:

Name Type Description Default
delay float

Number of seconds to sleep

required

Raises:

Type Description
ValidationError

If delay is negative

Example

import asyncio async def main(): ... await async_sleep(0.1) asyncio.run(main())

Source code in provide/foundation/concurrency/core.py
async def async_sleep(delay: float) -> None:
    """Async sleep with Foundation tracking and cancellation support.

    Args:
        delay: Number of seconds to sleep

    Raises:
        ValidationError: If delay is negative

    Example:
        >>> import asyncio
        >>> async def main():
        ...     await async_sleep(0.1)
        >>> asyncio.run(main())

    """
    if delay < 0:
        raise ValidationError("Sleep delay must be non-negative")
    await asyncio.sleep(delay)

async_wait_for async

async_wait_for(
    awaitable: Awaitable[Any], timeout: float | None
) -> Any

Wait for an awaitable with optional timeout.

Parameters:

Name Type Description Default
awaitable Awaitable[Any]

The awaitable to wait for

required
timeout float | None

Timeout in seconds (None for no timeout)

required

Returns:

Type Description
Any

Result of the awaitable

Raises:

Type Description
ValidationError

If timeout is negative

TimeoutError

If timeout is exceeded

Example

import asyncio async def slow_task(): ... await async_sleep(0.2) ... return "done" async def main(): ... try: ... result = await async_wait_for(slow_task(), timeout=0.1) ... except asyncio.TimeoutError: ... result = "timed out" ... return result asyncio.run(main()) 'timed out'

Source code in provide/foundation/concurrency/core.py
async def async_wait_for(awaitable: Awaitable[Any], timeout: float | None) -> Any:
    """Wait for an awaitable with optional timeout.

    Args:
        awaitable: The awaitable to wait for
        timeout: Timeout in seconds (None for no timeout)

    Returns:
        Result of the awaitable

    Raises:
        ValidationError: If timeout is negative
        asyncio.TimeoutError: If timeout is exceeded

    Example:
        >>> import asyncio
        >>> async def slow_task():
        ...     await async_sleep(0.2)
        ...     return "done"
        >>> async def main():
        ...     try:
        ...         result = await async_wait_for(slow_task(), timeout=0.1)
        ...     except asyncio.TimeoutError:
        ...         result = "timed out"
        ...     return result
        >>> asyncio.run(main())
        'timed out'

    """
    if timeout is not None and timeout < 0:
        raise ValidationError("Timeout must be non-negative")

    return await asyncio.wait_for(awaitable, timeout=timeout)

get_async_lock_manager async

get_async_lock_manager() -> AsyncLockManager

Get the global async lock manager instance.

Source code in provide/foundation/concurrency/async_locks.py
async def get_async_lock_manager() -> AsyncLockManager:
    """Get the global async lock manager instance."""
    global _async_lock_manager, _async_locks_registered, _async_locks_registration_event

    if _async_lock_manager is None:
        _async_lock_manager = AsyncLockManager()

    # Fast path: registration already complete
    if _async_locks_registered:
        return _async_lock_manager

    # Coordinate registration with threading lock for state machine
    with _async_locks_registration_lock:
        # Re-check after acquiring lock (another task may have completed it)
        if _async_locks_registered:
            return _async_lock_manager

        # If registration is in progress by another task, get the event
        if _async_locks_registration_event is not None:
            event = _async_locks_registration_event
        else:
            # This task will perform registration - create threading.Event (loop-agnostic)
            _async_locks_registration_event = threading.Event()
            event = None

    # If we're waiting for another task/thread's registration
    if event is not None:
        # Wait on threading.Event in async-friendly way (works across event loops)
        # Use to_thread to avoid blocking the event loop
        await asyncio.to_thread(event.wait)

        # After waking, check if registration succeeded
        if _async_locks_registered:
            return _async_lock_manager
        # Registration failed, retry
        return await get_async_lock_manager()

    # This task performs registration
    try:
        await register_foundation_async_locks()
        _async_locks_registered = True
    except BaseException:
        # Clean up partial registration on failure
        if _async_lock_manager is not None:
            _async_lock_manager._locks.clear()
        raise
    finally:
        # Always unblock waiting tasks/threads and clear event
        if _async_locks_registration_event is not None:
            _async_locks_registration_event.set()
        _async_locks_registration_event = None

    return _async_lock_manager

get_lock_manager

get_lock_manager() -> LockManager

Get the global lock manager instance.

Source code in provide/foundation/concurrency/locks.py
def get_lock_manager() -> LockManager:
    """Get the global lock manager instance."""
    global _locks_registered
    with _registration_lock:
        if not _locks_registered:
            register_foundation_locks()
            _locks_registered = True
    return _lock_manager

register_foundation_async_locks async

register_foundation_async_locks() -> None

Register all foundation async locks with proper ordering.

Lock ordering hierarchy (LOWER numbers = MORE fundamental): - 0-99: Orchestration (coordinator, hub initialization) - 100-199: Early subsystems (logger - needed for debugging) - 200-299: Core infrastructure (config, registry, components) - 300+: Reserved for future subsystems

Source code in provide/foundation/concurrency/async_locks.py
async def register_foundation_async_locks() -> None:
    """Register all foundation async locks with proper ordering.

    Lock ordering hierarchy (LOWER numbers = MORE fundamental):
    - 0-99: Orchestration (coordinator, hub initialization)
    - 100-199: Early subsystems (logger - needed for debugging)
    - 200-299: Core infrastructure (config, registry, components)
    - 300+: Reserved for future subsystems
    """
    global _async_lock_manager

    # Use global directly - manager is guaranteed to exist because
    # get_async_lock_manager() creates it before calling this function
    if _async_lock_manager is None:
        raise RuntimeError("AsyncLockManager not initialized. Call get_async_lock_manager() first.")

    manager = _async_lock_manager

    # Orchestration (order 0-99) - most fundamental, acquired first
    await manager.register_lock("foundation.async.hub.init", order=0, description="Async hub initialization")
    await manager.register_lock(
        "foundation.async.init.coordinator", order=10, description="Async initialization coordinator"
    )
    await manager.register_lock("foundation.async.stream", order=20, description="Async log stream management")

    # Early subsystems (order 100-199) - needed early for debugging
    await manager.register_lock(
        "foundation.async.logger.lazy", order=100, description="Async lazy logger initialization"
    )
    await manager.register_lock(
        "foundation.async.logger.setup", order=110, description="Async logger setup coordination"
    )

    # Core infrastructure (order 200-299)
    await manager.register_lock("foundation.async.config", order=200, description="Async configuration system")
    await manager.register_lock("foundation.async.registry", order=210, description="Async component registry")
    await manager.register_lock(
        "foundation.async.hub.components", order=220, description="Async hub component management"
    )

register_foundation_locks

register_foundation_locks() -> None

Register all foundation locks with proper ordering.

Lock ordering hierarchy (LOWER numbers = MORE fundamental): - 0-99: Orchestration (coordinator, hub initialization) - 100-199: Early subsystems (logger - needed for debugging) - 200-299: Core infrastructure (config, registry, components) - 300+: Reserved for future subsystems

Source code in provide/foundation/concurrency/locks.py
def register_foundation_locks() -> None:
    """Register all foundation locks with proper ordering.

    Lock ordering hierarchy (LOWER numbers = MORE fundamental):
    - 0-99: Orchestration (coordinator, hub initialization)
    - 100-199: Early subsystems (logger - needed for debugging)
    - 200-299: Core infrastructure (config, registry, components)
    - 300+: Reserved for future subsystems
    """
    manager = _lock_manager

    # Orchestration (order 0-99) - most fundamental, acquired first
    manager.register_lock("foundation.hub.init", order=0, description="Hub initialization")
    manager.register_lock(
        "foundation.init.coordinator", order=10, description="Master initialization coordinator"
    )
    manager.register_lock("foundation.stream", order=20, description="Log stream management lock")

    # Early subsystems (order 100-199) - needed early for debugging
    manager.register_lock("foundation.logger.lazy", order=100, description="Lazy logger initialization")
    manager.register_lock("foundation.logger.setup", order=110, description="Logger setup coordination")

    # Core infrastructure (order 200-299)
    manager.register_lock("foundation.config", order=200, description="Configuration system lock")
    manager.register_lock("foundation.registry", order=210, description="Component registry lock")
    manager.register_lock("foundation.hub.components", order=220, description="Hub component management")