Skip to content

Utils

provide.foundation.utils

TODO: Add module docstring.

Classes

ContextScopedCache

ContextScopedCache(name: str = 'cache')

Bases: Generic[K, V]

Thread-safe, async-safe cache scoped to context managers.

Unlike global LRU caches (for memoization), this provides isolated cache instances per execution context - ideal for recursive operations that need temporary storage without memory leaks.

The cache uses ContextVar for automatic thread/async isolation, and context managers for automatic cleanup. Nested contexts reuse the parent's cache to maintain consistency within an operation.

Examples:

>>> cache = ContextScopedCache[str, int]("user_ids")
>>>
>>> with cache.scope():
...     cache.set("alice", 1)
...     cache.set("bob", 2)
...     print(cache.get("alice"))  # 1
...
>>> # Cache is automatically cleared when exiting scope
>>> with cache.scope():
...     print(cache.get("alice"))  # None (fresh scope)

Nested contexts reuse parent cache:

>>> with cache.scope():
...     cache.set("key", "outer")
...     with cache.scope():
...         print(cache.get("key"))  # "outer" (same cache)
...         cache.set("key", "inner")
...     print(cache.get("key"))  # "inner" (modified in nested scope)

Initialize a context-scoped cache.

Parameters:

Name Type Description Default
name str

Identifier for the cache (used in ContextVar name)

'cache'
Source code in provide/foundation/utils/scoped_cache.py
def __init__(self, name: str = "cache") -> None:
    """Initialize a context-scoped cache.

    Args:
        name: Identifier for the cache (used in ContextVar name)
    """
    self._context_var: ContextVar[dict[K, V] | None] = ContextVar(name, default=None)
    self.name = name
Functions
clear
clear() -> None

Clear current context's cache.

Raises:

Type Description
RuntimeError

If called outside a cache scope

Source code in provide/foundation/utils/scoped_cache.py
def clear(self) -> None:
    """Clear current context's cache.

    Raises:
        RuntimeError: If called outside a cache scope
    """
    cache = self._context_var.get()
    if cache is None:
        raise RuntimeError(f"Cache '{self.name}' accessed outside scope context")
    cache.clear()
contains
contains(key: K) -> bool

Check if key exists in current context's cache.

Parameters:

Name Type Description Default
key K

Cache key to check

required

Returns:

Type Description
bool

True if key exists in cache

Raises:

Type Description
RuntimeError

If called outside a cache scope

Source code in provide/foundation/utils/scoped_cache.py
def contains(self, key: K) -> bool:
    """Check if key exists in current context's cache.

    Args:
        key: Cache key to check

    Returns:
        True if key exists in cache

    Raises:
        RuntimeError: If called outside a cache scope
    """
    cache = self._context_var.get()
    if cache is None:
        raise RuntimeError(f"Cache '{self.name}' accessed outside scope context")
    return key in cache
get
get(key: K, default: V | None = None) -> V | None

Get value from current context's cache.

Parameters:

Name Type Description Default
key K

Cache key

required
default V | None

Value to return if key not found

None

Returns:

Type Description
V | None

Cached value or default

Raises:

Type Description
RuntimeError

If called outside a cache scope

Source code in provide/foundation/utils/scoped_cache.py
def get(self, key: K, default: V | None = None) -> V | None:
    """Get value from current context's cache.

    Args:
        key: Cache key
        default: Value to return if key not found

    Returns:
        Cached value or default

    Raises:
        RuntimeError: If called outside a cache scope
    """
    cache = self._context_var.get()
    if cache is None:
        raise RuntimeError(f"Cache '{self.name}' accessed outside scope context")
    return cache.get(key, default)
is_active
is_active() -> bool

Check if cache context is currently active.

Returns:

Type Description
bool

True if inside a cache scope, False otherwise

Source code in provide/foundation/utils/scoped_cache.py
def is_active(self) -> bool:
    """Check if cache context is currently active.

    Returns:
        True if inside a cache scope, False otherwise
    """
    return self._context_var.get() is not None
scope
scope() -> Generator[None]

Create an isolated cache scope.

If a cache context already exists (nested call), reuses the existing cache. Otherwise, creates a new cache and cleans it up on exit.

Yields:

Type Description
Generator[None]

None (use cache methods within the context)

Note

Cleanup is guaranteed even on errors.

Source code in provide/foundation/utils/scoped_cache.py
@contextmanager
def scope(self) -> Generator[None]:
    """Create an isolated cache scope.

    If a cache context already exists (nested call), reuses the
    existing cache. Otherwise, creates a new cache and cleans it
    up on exit.

    Yields:
        None (use cache methods within the context)

    Note:
        Cleanup is guaranteed even on errors.
    """
    if self._context_var.get() is None:
        # No existing cache - create new scope
        token = self._context_var.set({})
        try:
            yield
        finally:
            self._context_var.reset(token)
    else:
        # Reuse existing cache (nested scope)
        yield
set
set(key: K, value: V) -> None

Set value in current context's cache.

Parameters:

Name Type Description Default
key K

Cache key

required
value V

Value to cache

required

Raises:

Type Description
RuntimeError

If called outside a cache scope

Source code in provide/foundation/utils/scoped_cache.py
def set(self, key: K, value: V) -> None:
    """Set value in current context's cache.

    Args:
        key: Cache key
        value: Value to cache

    Raises:
        RuntimeError: If called outside a cache scope
    """
    cache = self._context_var.get()
    if cache is None:
        raise RuntimeError(f"Cache '{self.name}' accessed outside scope context")
    cache[key] = value
size
size() -> int

Get number of items in current context's cache.

Returns:

Type Description
int

Number of cached items

Raises:

Type Description
RuntimeError

If called outside a cache scope

Source code in provide/foundation/utils/scoped_cache.py
def size(self) -> int:
    """Get number of items in current context's cache.

    Returns:
        Number of cached items

    Raises:
        RuntimeError: If called outside a cache scope
    """
    cache = self._context_var.get()
    if cache is None:
        raise RuntimeError(f"Cache '{self.name}' accessed outside scope context")
    return len(cache)

DependencyStatus

Status of an optional dependency.

EnvPrefix

EnvPrefix(prefix: str, separator: str = '_')

Environment variable reader with prefix support.

Provides convenient access to environment variables with a common prefix, useful for application-specific configuration namespacing.

Uses caching to improve performance for repeated name lookups.

Examples:

>>> app_env = EnvPrefix('MYAPP')
>>> app_env.get_bool('DEBUG')  # Reads MYAPP_DEBUG
>>> app_env['database_url']  # Reads MYAPP_DATABASE_URL

Initialize with prefix.

Parameters:

Name Type Description Default
prefix str

Prefix for all environment variables

required
separator str

Separator between prefix and variable name

'_'
Source code in provide/foundation/utils/environment/prefix.py
def __init__(self, prefix: str, separator: str = "_") -> None:
    """Initialize with prefix.

    Args:
        prefix: Prefix for all environment variables
        separator: Separator between prefix and variable name

    """
    self.prefix = prefix.upper()
    self.separator = separator
    self._name_cache = LRUCache(maxsize=128)
Functions
__contains__
__contains__(name: str) -> bool

Check if environment variable exists.

Source code in provide/foundation/utils/environment/prefix.py
def __contains__(self, name: str) -> bool:
    """Check if environment variable exists."""
    return self._make_name(name) in os.environ
__getitem__
__getitem__(name: str) -> str | None

Get environment variable using subscript notation.

Source code in provide/foundation/utils/environment/prefix.py
def __getitem__(self, name: str) -> str | None:
    """Get environment variable using subscript notation."""
    return self.get_str(name)
all_with_prefix
all_with_prefix() -> dict[str, str]

Get all environment variables with this prefix.

Returns:

Type Description
dict[str, str]

Dictionary of variable names (without prefix) to values

Source code in provide/foundation/utils/environment/prefix.py
def all_with_prefix(self) -> dict[str, str]:
    """Get all environment variables with this prefix.

    Returns:
        Dictionary of variable names (without prefix) to values

    """
    result = {}
    prefix_with_sep = f"{self.prefix}{self.separator}"

    for key, value in os.environ.items():
        if key.startswith(prefix_with_sep):
            # Remove prefix and add to result
            var_name = key[len(prefix_with_sep) :]
            result[var_name] = value

    return result
get_bool
get_bool(
    name: str, default: bool | None = None
) -> bool | None

Get boolean with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def get_bool(self, name: str, default: bool | None = None) -> bool | None:
    """Get boolean with prefix."""
    return get_bool(self._make_name(name), default)
get_dict
get_dict(
    name: str,
    default: dict[str, str] | None = None,
    item_separator: str = ",",
    key_value_separator: str = "=",
) -> dict[str, str]

Get dictionary with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def get_dict(
    self,
    name: str,
    default: dict[str, str] | None = None,
    item_separator: str = ",",
    key_value_separator: str = "=",
) -> dict[str, str]:
    """Get dictionary with prefix."""
    return get_dict(self._make_name(name), default, item_separator, key_value_separator)
get_float
get_float(
    name: str, default: float | None = None
) -> float | None

Get float with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def get_float(self, name: str, default: float | None = None) -> float | None:
    """Get float with prefix."""
    return get_float(self._make_name(name), default)
get_int
get_int(
    name: str, default: int | None = None
) -> int | None

Get integer with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def get_int(self, name: str, default: int | None = None) -> int | None:
    """Get integer with prefix."""
    return get_int(self._make_name(name), default)
get_list
get_list(
    name: str,
    default: list[str] | None = None,
    separator: str = ",",
) -> list[str]

Get list with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def get_list(self, name: str, default: list[str] | None = None, separator: str = ",") -> list[str]:
    """Get list with prefix."""
    return get_list(self._make_name(name), default, separator)
get_path
get_path(
    name: str, default: Path | str | None = None
) -> Path | None

Get path with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def get_path(self, name: str, default: Path | str | None = None) -> Path | None:
    """Get path with prefix."""
    return get_path(self._make_name(name), default)
get_str
get_str(
    name: str, default: str | None = None
) -> str | None

Get string with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def get_str(self, name: str, default: str | None = None) -> str | None:
    """Get string with prefix."""
    return get_str(self._make_name(name), default)
require
require(name: str, type_hint: type[T] | None = None) -> Any

Require variable with prefix.

Source code in provide/foundation/utils/environment/prefix.py
def require(self, name: str, type_hint: type[T] | None = None) -> Any:
    """Require variable with prefix."""
    return require(self._make_name(name), type_hint)

TokenBucketRateLimiter

TokenBucketRateLimiter(
    capacity: float,
    refill_rate: float,
    time_source: Callable[[], float] | None = None,
)

A Token Bucket rate limiter for asyncio applications.

This limiter allows for bursts up to a specified capacity and refills tokens at a constant rate. It is designed to be thread-safe using an asyncio.Lock.

Initialize the TokenBucketRateLimiter.

Parameters:

Name Type Description Default
capacity float

The maximum number of tokens the bucket can hold (burst capacity).

required
refill_rate float

The rate at which tokens are refilled per second.

required
time_source Callable[[], float] | None

Optional callable that returns current time (for testing). Defaults to time.monotonic.

None
Source code in provide/foundation/utils/rate_limiting.py
def __init__(
    self,
    capacity: float,
    refill_rate: float,
    time_source: Callable[[], float] | None = None,
) -> None:
    """Initialize the TokenBucketRateLimiter.

    Args:
        capacity: The maximum number of tokens the bucket can hold
                  (burst capacity).
        refill_rate: The rate at which tokens are refilled per second.
        time_source: Optional callable that returns current time (for testing).
                    Defaults to time.monotonic.

    """
    if capacity <= 0:
        raise ValueError("Capacity must be positive.")
    if refill_rate <= 0:
        raise ValueError("Refill rate must be positive.")

    self._capacity: float = float(capacity)
    self._refill_rate: float = float(refill_rate)
    self._tokens: float = float(capacity)  # Start with a full bucket
    self._time_source = time_source if time_source is not None else time.monotonic
    self._last_refill_timestamp: float = self._time_source()
    self._lock = asyncio.Lock()

    # Cache logger instance to avoid repeated imports
    self._logger = None
    try:
        from provide.foundation.logger import get_logger

        self._logger = get_logger(__name__)
        self._logger.debug(
            f"🔩🗑️ TokenBucketRateLimiter initialized: capacity={capacity}, refill_rate={refill_rate}",
        )
    except ImportError:
        # Fallback if logger not available
        pass
Functions
get_current_tokens async
get_current_tokens() -> float

Returns the current number of tokens, for testing/monitoring.

Source code in provide/foundation/utils/rate_limiting.py
async def get_current_tokens(self) -> float:
    """Returns the current number of tokens, for testing/monitoring."""
    async with self._lock:
        # It might be useful to refill before getting, to get the most
        # up-to-date count
        # await self._refill_tokens()
        return self._tokens
is_allowed async
is_allowed() -> bool

Check if a request is allowed based on available tokens.

This method is asynchronous and thread-safe. It refills tokens based on elapsed time and then attempts to consume a token.

Returns:

Type Description
bool

True if the request is allowed, False otherwise.

Source code in provide/foundation/utils/rate_limiting.py
async def is_allowed(self) -> bool:
    """Check if a request is allowed based on available tokens.

    This method is asynchronous and thread-safe. It refills tokens
    based on elapsed time and then attempts to consume a token.

    Returns:
        True if the request is allowed, False otherwise.

    """
    async with self._lock:
        await self._refill_tokens()  # Refill before checking

        if self._tokens >= 1.0:
            self._tokens -= 1.0
            if self._logger:
                self._logger.debug(
                    f"🔩✅ Request allowed. Tokens remaining: {self._tokens:.2f}/{self._capacity:.2f}",
                )
            return True
        if self._logger:
            self._logger.warning(
                "🔩🗑️❌ Request denied. No tokens available. Tokens: "
                f"{self._tokens:.2f}/{self._capacity:.2f}",
            )
        return False

Functions

__getattr__

__getattr__(name: str) -> Any

Lazy import for modules.

Source code in provide/foundation/utils/__init__.py
def __getattr__(name: str) -> Any:
    """Lazy import for modules."""
    if name == "environment":
        from provide.foundation.utils import environment as env_module

        return env_module
    raise AttributeError(f"module '{__name__}' has no attribute '{name}'")

auto_parse

auto_parse(attr: Any, value: str) -> Any

Automatically parse value based on an attrs field's type and metadata.

This function first checks for a converter in the field's metadata, then falls back to type-based parsing.

Parameters:

Name Type Description Default
attr Any

attrs field (from fields(Class))

required
value str

String value to parse

required

Returns:

Type Description
Any

Parsed value based on field type or converter

Examples:

>>> from attrs import define, field, fields
>>> @define
... class Config:
...     count: int = field()
...     enabled: bool = field()
...     custom: str = field(converter=lambda x: x.upper())
>>> c = Config(count=0, enabled=False, custom="")
>>> auto_parse(fields(Config).count, "42")
42
>>> auto_parse(fields(Config).enabled, "true")
True
>>> auto_parse(fields(Config).custom, "hello")
'HELLO'
Source code in provide/foundation/parsers/attrs_integration.py
def auto_parse(attr: Any, value: str) -> Any:
    """Automatically parse value based on an attrs field's type and metadata.

    This function first checks for a converter in the field's metadata,
    then falls back to type-based parsing.

    Args:
        attr: attrs field (from fields(Class))
        value: String value to parse

    Returns:
        Parsed value based on field type or converter

    Examples:
        >>> from attrs import define, field, fields
        >>> @define
        ... class Config:
        ...     count: int = field()
        ...     enabled: bool = field()
        ...     custom: str = field(converter=lambda x: x.upper())
        >>> c = Config(count=0, enabled=False, custom="")
        >>> auto_parse(fields(Config).count, "42")
        42
        >>> auto_parse(fields(Config).enabled, "true")
        True
        >>> auto_parse(fields(Config).custom, "hello")
        'HELLO'

    """
    # Check for attrs field converter first
    if hasattr(attr, "converter"):
        success, result = _try_converter(attr.converter, value)
        if success:
            return result

    # Check for converter in metadata as fallback
    if hasattr(attr, "metadata") and attr.metadata:
        converter = attr.metadata.get("converter")
        success, result = _try_converter(converter, value)
        if success:
            return result

    # Get type hint from attrs field and try type-based parsing
    field_type = _extract_field_type(attr)
    if field_type is not None:
        return parse_typed_value(value, field_type)

    # No type info, return as string
    return value

check_optional_deps

check_optional_deps(
    *, quiet: bool = False, return_status: bool = False
) -> list[DependencyStatus] | None

Check and display optional dependency status.

Parameters:

Name Type Description Default
quiet bool

If True, don't print status (just return it)

False
return_status bool

If True, return the status list

False

Returns:

Type Description
list[DependencyStatus] | None

Optional list of dependency statuses if return_status=True

Source code in provide/foundation/utils/deps.py
def check_optional_deps(*, quiet: bool = False, return_status: bool = False) -> list[DependencyStatus] | None:
    """Check and display optional dependency status.

    Args:
        quiet: If True, don't print status (just return it)
        return_status: If True, return the status list

    Returns:
        Optional list of dependency statuses if return_status=True

    """
    deps = get_optional_dependencies()

    if not quiet:
        from provide.foundation.hub.foundation import get_foundation_logger

        log = get_foundation_logger()
        log.info("=" * 50)

        available_count = sum(1 for dep in deps if dep.available)
        total_count = len(deps)

        for dep in deps:
            status_icon = "✅" if dep.available else "❌"
            version_info = f" (v{dep.version})" if dep.version else ""
            log.info(f"  {status_icon} {dep.name}{version_info}")
            log.info(f"     {dep.description}")
            if not dep.available:
                log.info(f"     Install with: pip install 'provide-foundation[{dep.name}]'")

        log.info(f"📊 Summary: {available_count}/{total_count} optional dependencies available")

        if available_count == total_count:
            log.info("🎉 All optional features are available!")
        elif available_count == 0:
            log.info("💡 Install optional features with: pip install 'provide-foundation[all]'")
        else:
            missing = [dep.name for dep in deps if not dep.available]
            log.info(f"💡 Missing features: {', '.join(missing)}")

    if return_status:
        return deps
    return None

create_dependency_stub

create_dependency_stub(package: str, feature: str) -> type

Create a stub class that raises DependencyError on instantiation or use.

Parameters:

Name Type Description Default
package str

Name of the missing package (e.g., "httpx", "cryptography")

required
feature str

Foundation feature name (e.g., "transport", "crypto")

required

Returns:

Type Description
type

A stub class that raises DependencyError when instantiated or used

Example

HTTPTransport = create_dependency_stub("httpx", "transport") transport = HTTPTransport() # Raises DependencyError with install instructions

Source code in provide/foundation/utils/stubs.py
def create_dependency_stub(package: str, feature: str) -> type:
    """Create a stub class that raises DependencyError on instantiation or use.

    Args:
        package: Name of the missing package (e.g., "httpx", "cryptography")
        feature: Foundation feature name (e.g., "transport", "crypto")

    Returns:
        A stub class that raises DependencyError when instantiated or used

    Example:
        >>> HTTPTransport = create_dependency_stub("httpx", "transport")
        >>> transport = HTTPTransport()  # Raises DependencyError with install instructions
    """

    class DependencyStub:
        """Stub class for missing optional dependency."""

        def __init__(self, *args: Any, **kwargs: Any) -> None:
            raise DependencyError(package, feature=feature)

        def __new__(cls, *args: Any, **kwargs: Any) -> Never:
            raise DependencyError(package, feature=feature)

        def __call__(self, *args: Any, **kwargs: Any) -> Never:
            raise DependencyError(package, feature=feature)

        def __getattr__(self, name: str) -> Never:
            raise DependencyError(package, feature=feature)

        @classmethod
        def __class_getitem__(cls, item: Any) -> Never:
            raise DependencyError(package, feature=feature)

    DependencyStub.__name__ = f"{feature.capitalize()}Stub"
    DependencyStub.__qualname__ = f"{feature.capitalize()}Stub"

    return DependencyStub

create_function_stub

create_function_stub(package: str, feature: str) -> Any

Create a stub function that raises DependencyError when called.

Parameters:

Name Type Description Default
package str

Name of the missing package (e.g., "httpx", "mkdocs")

required
feature str

Foundation feature name (e.g., "transport", "docs")

required

Returns:

Type Description
Any

A stub function that raises DependencyError when called

Example

generate_docs = create_function_stub("mkdocs", "docs") generate_docs() # Raises DependencyError with install instructions

Source code in provide/foundation/utils/stubs.py
def create_function_stub(package: str, feature: str) -> Any:
    """Create a stub function that raises DependencyError when called.

    Args:
        package: Name of the missing package (e.g., "httpx", "mkdocs")
        feature: Foundation feature name (e.g., "transport", "docs")

    Returns:
        A stub function that raises DependencyError when called

    Example:
        >>> generate_docs = create_function_stub("mkdocs", "docs")
        >>> generate_docs()  # Raises DependencyError with install instructions
    """

    def stub_function(*args: Any, **kwargs: Any) -> Never:
        raise DependencyError(package, feature=feature)

    stub_function.__name__ = f"{feature}_stub"
    stub_function.__qualname__ = f"{feature}_stub"

    return stub_function

create_module_stub

create_module_stub(package: str, feature: str) -> Any

Create a stub module-like object that raises DependencyError on attribute access.

Parameters:

Name Type Description Default
package str

Name of the missing package (e.g., "httpx")

required
feature str

Foundation feature name (e.g., "transport")

required

Returns:

Type Description
Any

A stub object that raises DependencyError on any attribute access

Example

httpx = create_module_stub("httpx", "transport") httpx.AsyncClient() # Raises DependencyError with install instructions

Source code in provide/foundation/utils/stubs.py
def create_module_stub(package: str, feature: str) -> Any:
    """Create a stub module-like object that raises DependencyError on attribute access.

    Args:
        package: Name of the missing package (e.g., "httpx")
        feature: Foundation feature name (e.g., "transport")

    Returns:
        A stub object that raises DependencyError on any attribute access

    Example:
        >>> httpx = create_module_stub("httpx", "transport")
        >>> httpx.AsyncClient()  # Raises DependencyError with install instructions
    """

    class ModuleStub:
        """Stub module for missing optional dependency."""

        def __getattr__(self, name: str) -> Never:
            raise DependencyError(package, feature=feature)

        def __call__(self, *args: Any, **kwargs: Any) -> Never:
            raise DependencyError(package, feature=feature)

    ModuleStub.__name__ = f"{package}_stub"
    ModuleStub.__qualname__ = f"{package}_stub"

    return ModuleStub()

get_available_features

get_available_features() -> dict[str, bool]

Get a dictionary of available optional features.

Returns:

Type Description
dict[str, bool]

Dictionary mapping feature names to availability

Source code in provide/foundation/utils/deps.py
def get_available_features() -> dict[str, bool]:
    """Get a dictionary of available optional features.

    Returns:
        Dictionary mapping feature names to availability

    """
    deps = get_optional_dependencies()
    return {dep.name: dep.available for dep in deps}

get_bool

get_bool(
    name: str, default: bool | None = None
) -> bool | None

Get boolean environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name

required
default bool | None

Default value if not set

None

Returns:

Type Description
bool | None

Boolean value, None (if set but empty), or default (if unset)

Note

Empty string is treated as ambiguous and returns None with a warning. Unset variable returns the default value.

Examples:

>>> os.environ['DEBUG'] = 'true'
>>> get_bool('DEBUG')
True
>>> get_bool('MISSING', False)
False
Source code in provide/foundation/utils/environment/getters.py
def get_bool(name: str, default: bool | None = None) -> bool | None:
    """Get boolean environment variable.

    Args:
        name: Environment variable name
        default: Default value if not set

    Returns:
        Boolean value, None (if set but empty), or default (if unset)

    Note:
        Empty string is treated as ambiguous and returns None with a warning.
        Unset variable returns the default value.

    Examples:
        >>> os.environ['DEBUG'] = 'true'
        >>> get_bool('DEBUG')
        True
        >>> get_bool('MISSING', False)
        False

    """
    from provide.foundation.logger import get_logger

    value = os.environ.get(name)
    if value is None:
        return default

    # Handle empty/whitespace-only strings as ambiguous
    if not value.strip():
        logger = get_logger(__name__)
        logger.warning(
            f"Environment variable {name} is set but empty - treating as None. "
            f"Either provide a value or unset the variable to use default."
        )
        return None

    try:
        return parse_bool(value)
    except ValueError as e:
        raise ValidationError(
            f"Invalid boolean value for {name}: {value}",
            field=name,
            value=value,
            rule="boolean",
        ) from e

get_dict

get_dict(
    name: str,
    default: dict[str, str] | None = None,
    item_separator: str = ",",
    key_value_separator: str = "=",
) -> dict[str, str]

Get dictionary from environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name

required
default dict[str, str] | None

Default dict if not set

None
item_separator str

Separator between items

','
key_value_separator str

Separator between key and value

'='

Returns:

Type Description
dict[str, str]

Dictionary of string key-value pairs

Examples:

>>> os.environ['CONFIG'] = 'key1=val1,key2=val2'
>>> get_dict('CONFIG')
{'key1': 'val1', 'key2': 'val2'}
Source code in provide/foundation/utils/environment/getters.py
def get_dict(
    name: str,
    default: dict[str, str] | None = None,
    item_separator: str = ",",
    key_value_separator: str = "=",
) -> dict[str, str]:
    """Get dictionary from environment variable.

    Args:
        name: Environment variable name
        default: Default dict if not set
        item_separator: Separator between items
        key_value_separator: Separator between key and value

    Returns:
        Dictionary of string key-value pairs

    Examples:
        >>> os.environ['CONFIG'] = 'key1=val1,key2=val2'
        >>> get_dict('CONFIG')
        {'key1': 'val1', 'key2': 'val2'}

    """
    value = os.environ.get(name)
    if value is None:
        return default or {}

    try:
        return parse_dict(
            value,
            item_separator=item_separator,
            key_separator=key_value_separator,
            strip=True,
        )
    except ValueError as e:
        # parse_dict raises on invalid format, log warning and return partial result
        _get_logger().warning(
            "Invalid dictionary format in environment variable",
            var=name,
            value=value,
            error=str(e),
        )
        # Try to parse what we can, skipping invalid items
        result = {}
        items = value.split(item_separator)
        for item in items:
            item = item.strip()
            if not item:
                continue
            if key_value_separator not in item:
                continue
            key, val = item.split(key_value_separator, 1)
            result[key.strip()] = val.strip()
        return result

get_float

get_float(
    name: str, default: float | None = None
) -> float | None

Get float environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name

required
default float | None

Default value if not set

None

Returns:

Type Description
float | None

Float value or default

Raises:

Type Description
ValidationError

If value cannot be parsed as float

Source code in provide/foundation/utils/environment/getters.py
def get_float(name: str, default: float | None = None) -> float | None:
    """Get float environment variable.

    Args:
        name: Environment variable name
        default: Default value if not set

    Returns:
        Float value or default

    Raises:
        ValidationError: If value cannot be parsed as float

    """
    value = os.environ.get(name)
    if value is None:
        return default

    try:
        return float(value)
    except ValueError as e:
        raise ValidationError(
            f"Invalid float value for {name}: {value}",
            field=name,
            value=value,
            rule="float",
        ) from e

get_int

get_int(
    name: str, default: int | None = None
) -> int | None

Get integer environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name

required
default int | None

Default value if not set

None

Returns:

Type Description
int | None

Integer value or default

Raises:

Type Description
ValidationError

If value cannot be parsed as integer

Source code in provide/foundation/utils/environment/getters.py
def get_int(name: str, default: int | None = None) -> int | None:
    """Get integer environment variable.

    Args:
        name: Environment variable name
        default: Default value if not set

    Returns:
        Integer value or default

    Raises:
        ValidationError: If value cannot be parsed as integer

    """
    value = os.environ.get(name)
    if value is None:
        return default

    try:
        return int(value)
    except ValueError as e:
        raise ValidationError(
            f"Invalid integer value for {name}: {value}",
            field=name,
            value=value,
            rule="integer",
        ) from e

get_list

get_list(
    name: str,
    default: list[str] | None = None,
    separator: str = ",",
) -> list[str]

Get list from environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name

required
default list[str] | None

Default list if not set

None
separator str

String separator (default: comma)

','

Returns:

Type Description
list[str]

List of strings

Examples:

>>> os.environ['ITEMS'] = 'a,b,c'
>>> get_list('ITEMS')
['a', 'b', 'c']
Source code in provide/foundation/utils/environment/getters.py
def get_list(name: str, default: list[str] | None = None, separator: str = ",") -> list[str]:
    """Get list from environment variable.

    Args:
        name: Environment variable name
        default: Default list if not set
        separator: String separator (default: comma)

    Returns:
        List of strings

    Examples:
        >>> os.environ['ITEMS'] = 'a,b,c'
        >>> get_list('ITEMS')
        ['a', 'b', 'c']

    """
    value = os.environ.get(name)
    if value is None:
        return default or []

    # Use existing parse_list which handles empty strings and stripping
    items = parse_list(value, separator=separator, strip=True)
    # Filter empty strings (parse_list doesn't do this by default)
    return [item for item in items if item]

get_optional_dependencies

get_optional_dependencies() -> list[DependencyStatus]

Get status of all optional dependencies.

Returns:

Type Description
list[DependencyStatus]

List of dependency status objects

Source code in provide/foundation/utils/deps.py
def get_optional_dependencies() -> list[DependencyStatus]:
    """Get status of all optional dependencies.

    Returns:
        List of dependency status objects

    """
    return [
        _check_click(),
        _check_cryptography(),
        _check_httpx(),
        _check_mkdocs(),
        _check_opentelemetry(),
    ]

get_path

get_path(
    name: str, default: Path | str | None = None
) -> Path | None

Get path environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name

required
default Path | str | None

Default path if not set

None

Returns:

Type Description
Path | None

Path object or None

Source code in provide/foundation/utils/environment/getters.py
def get_path(name: str, default: Path | str | None = None) -> Path | None:
    """Get path environment variable.

    Args:
        name: Environment variable name
        default: Default path if not set

    Returns:
        Path object or None

    """
    value = os.environ.get(name)
    if value is None:
        if default is None:
            return None
        return Path(default) if not isinstance(default, Path) else default

    # Expand user and environment variables
    expanded = os.path.expandvars(value)
    return Path(expanded).expanduser()

get_str

get_str(
    name: str, default: str | None = None
) -> str | None

Get string environment variable.

Parameters:

Name Type Description Default
name str

Environment variable name

required
default str | None

Default value if not set

None

Returns:

Type Description
str | None

String value or default

Source code in provide/foundation/utils/environment/getters.py
def get_str(name: str, default: str | None = None) -> str | None:
    """Get string environment variable.

    Args:
        name: Environment variable name
        default: Default value if not set

    Returns:
        String value or default

    """
    return os.environ.get(name, default)

get_version

get_version(
    package_name: str, caller_file: str | Path | None = None
) -> str

Get the version for a package.

Reads from VERSION file if it exists, otherwise falls back to package metadata, then to default development version.

This function is thread-safe and caches results after the first call per package.

Parameters:

Name Type Description Default
package_name str

The package name as it appears in PyPI (e.g., "provide-foundation")

required
caller_file str | Path | None

Path to the calling module's file, used to find VERSION file. If None, uses the calling context.

None

Returns:

Type Description
str

The current version string

Source code in provide/foundation/utils/versioning.py
def get_version(package_name: str, caller_file: str | Path | None = None) -> str:
    """Get the version for a package.

    Reads from VERSION file if it exists, otherwise falls back to package metadata,
    then to default development version.

    This function is thread-safe and caches results after the first call per package.

    Args:
        package_name: The package name as it appears in PyPI (e.g., "provide-foundation")
        caller_file: Path to the calling module's __file__, used to find VERSION file.
                    If None, uses the calling context.

    Returns:
        The current version string
    """
    global _cached_versions

    # Fast path: return cached version if available
    if package_name in _cached_versions:
        return _cached_versions[package_name]

    # Slow path: load version with thread-safe locking
    with _version_lock:
        # Double-check after acquiring lock
        if package_name in _cached_versions:
            return _cached_versions[package_name]

        # Determine start path for searching
        if caller_file is not None:
            start_path = Path(caller_file).parent
        else:
            # Try to infer from the call stack
            import inspect

            frame = inspect.currentframe()
            if frame and frame.f_back:
                caller_frame = frame.f_back
                start_path = Path(caller_frame.f_code.co_filename).parent
            else:
                start_path = Path.cwd()

        # Try VERSION file first (single source of truth)
        project_root = _find_project_root(start_path)
        if project_root:
            version_file = project_root / "VERSION"
            if version_file.exists():
                try:
                    version_str = version_file.read_text().strip()
                    _cached_versions[package_name] = version_str
                    return version_str
                except OSError:
                    # Fall back to metadata if VERSION file can't be read
                    pass

        # Fallback to package metadata
        try:
            from importlib.metadata import PackageNotFoundError, version as get_metadata_version

            version_str = get_metadata_version(package_name)
            _cached_versions[package_name] = version_str
            return version_str
        except PackageNotFoundError:
            pass

        # Final fallback
        version_str = "0.0.0-dev"
        _cached_versions[package_name] = version_str
        return version_str

has_dependency

has_dependency(name: str) -> bool

Check if a specific optional dependency is available.

Parameters:

Name Type Description Default
name str

Name of the dependency to check

required

Returns:

Type Description
bool

True if dependency is available

Source code in provide/foundation/utils/deps.py
def has_dependency(name: str) -> bool:
    """Check if a specific optional dependency is available.

    Args:
        name: Name of the dependency to check

    Returns:
        True if dependency is available

    """
    deps = get_optional_dependencies()
    for dep in deps:
        if dep.name == name:
            return dep.available
    return False

lazy_import

lazy_import(parent_module: str, name: str) -> object

Import a module lazily with comprehensive safety checks.

This function provides thread-safe lazy loading with protection against: - Circular imports (tracks import chains) - Stack overflow (enforces maximum depth) - Corrupted module states (validates sys.modules)

Commonly lazy-loaded modules: - cli: Requires optional 'click' dependency - crypto: Cryptographic utilities - docs: Documentation generation - formatting: Text formatting utilities - metrics: Metrics collection - observability: Observability features

Parameters:

Name Type Description Default
parent_module str

The parent module name (e.g., "provide.foundation")

required
name str

Module name to lazy-load (e.g., "cli")

required

Returns:

Type Description
object

The imported module

Raises:

Type Description
AttributeError

If module is not allowed for lazy loading or circular import detected

ImportError

If module import fails

RecursionError

If import depth exceeds safe limits

Note

Complexity is intentionally high to handle all edge cases in this critical import hook (recursion, corruption, depth limits).

Example

from provide.foundation.utils.importer import lazy_import cli = lazy_import("provide.foundation", "cli")

Source code in provide/foundation/utils/importer.py
def lazy_import(parent_module: str, name: str) -> object:
    """Import a module lazily with comprehensive safety checks.

    This function provides thread-safe lazy loading with protection against:
    - Circular imports (tracks import chains)
    - Stack overflow (enforces maximum depth)
    - Corrupted module states (validates sys.modules)

    Commonly lazy-loaded modules:
    - cli: Requires optional 'click' dependency
    - crypto: Cryptographic utilities
    - docs: Documentation generation
    - formatting: Text formatting utilities
    - metrics: Metrics collection
    - observability: Observability features

    Args:
        parent_module: The parent module name (e.g., "provide.foundation")
        name: Module name to lazy-load (e.g., "cli")

    Returns:
        The imported module

    Raises:
        AttributeError: If module is not allowed for lazy loading or circular import detected
        ImportError: If module import fails
        RecursionError: If import depth exceeds safe limits

    Note:
        Complexity is intentionally high to handle all edge cases
        in this critical import hook (recursion, corruption, depth limits).

    Example:
        >>> from provide.foundation.utils.importer import lazy_import
        >>> cli = lazy_import("provide.foundation", "cli")
    """
    # Build the full module name
    module_name = f"{parent_module}.{name}"

    # Initialize thread-local state if needed
    if not hasattr(_thread_local, "getattr_in_progress"):
        _thread_local.getattr_in_progress = set()
        _thread_local.import_depth = 0
        _thread_local.import_chain = []

    # Check recursion depth to prevent stack overflow from complex import chains
    if _thread_local.import_depth >= MAX_LAZY_IMPORT_DEPTH:
        chain_str = " -> ".join([*_thread_local.import_chain, name])
        raise RecursionError(
            f"Lazy import depth limit ({MAX_LAZY_IMPORT_DEPTH}) exceeded. "
            f"Import chain: {chain_str}. This indicates a complex nested import "
            f"that should be refactored or imported eagerly."
        )

    # Check if we've already entered recursion for this specific module
    # This prevents infinite loops when a module has been corrupted
    if name in _thread_local.getattr_in_progress:
        chain_str = " -> ".join([*_thread_local.import_chain, name])
        raise AttributeError(
            f"module '{parent_module}' has no attribute '{name}' "
            f"(circular import detected in chain: {chain_str}). "
            f"Module may be corrupted in sys.modules."
        )

    # Set recursion guards
    _thread_local.getattr_in_progress.add(name)
    _thread_local.import_depth += 1
    _thread_local.import_chain.append(name)

    try:
        # Check if module is already in sys.modules but corrupted
        if module_name in sys.modules:
            existing_module = sys.modules[module_name]
            # If it exists and is valid, return it
            if existing_module is not None:
                return existing_module
            # If it's None or invalid, remove it so we can re-import
            del sys.modules[module_name]

        # Import the submodule with appropriate error handling
        try:
            mod = __import__(module_name, fromlist=[""])
            sys.modules[module_name] = mod
            return mod
        except ImportError as e:
            # Provide helpful error messages for known optional dependencies
            if name in SPECIAL_MODULES:
                error_str = str(e)
                # Check if error is about missing dependency for this feature
                if (
                    (name == "cli" and "click" in error_str)
                    or (name == "transport" and "httpx" in error_str)
                    or (name == "docs" and ("mkdocs" in error_str or "mkdocstrings" in error_str))
                ):
                    raise ImportError(SPECIAL_MODULES[name]) from e
            raise
    finally:
        # Always clear recursion guards in reverse order
        _thread_local.getattr_in_progress.discard(name)
        _thread_local.import_depth -= 1
        if _thread_local.import_chain and _thread_local.import_chain[-1] == name:
            _thread_local.import_chain.pop()

parse_bool

parse_bool(value: Any, strict: bool = False) -> bool

Parse a boolean value from string or other types.

Accepts: true/false, yes/no, 1/0, on/off (case-insensitive)

Parameters:

Name Type Description Default
value Any

Value to parse as boolean

required
strict bool

If True, only accept bool or string types (raise TypeError otherwise)

False

Returns:

Type Description
bool

Boolean value

Raises:

Type Description
TypeError

If strict=True and value is not bool or string, or if value is not bool/str

ValueError

If value cannot be parsed as boolean

Source code in provide/foundation/parsers/primitives.py
def parse_bool(value: Any, strict: bool = False) -> bool:
    """Parse a boolean value from string or other types.

    Accepts: true/false, yes/no, 1/0, on/off (case-insensitive)

    Args:
        value: Value to parse as boolean
        strict: If True, only accept bool or string types (raise TypeError otherwise)

    Returns:
        Boolean value

    Raises:
        TypeError: If strict=True and value is not bool or string, or if value is not bool/str
        ValueError: If value cannot be parsed as boolean

    """
    if strict and not isinstance(value, (bool, str)):
        raise TypeError(f"Cannot convert {type(value).__name__} to bool: {value!r}")

    return parse_bool_strict(value)

parse_dict

parse_dict(
    value: str | dict[str, str],
    item_separator: str = ",",
    key_separator: str = "=",
    strip: bool = True,
) -> dict[str, str]

Parse a dictionary from a string.

Format: "key1=value1,key2=value2"

Parameters:

Name Type Description Default
value str | dict[str, str]

String or dict to parse

required
item_separator str

Separator between items

','
key_separator str

Separator between key and value

'='
strip bool

Whether to strip whitespace

True

Returns:

Type Description
dict[str, str]

Dictionary of string keys and values

Raises:

Type Description
ValueError

If format is invalid

Source code in provide/foundation/parsers/collections.py
def parse_dict(
    value: str | dict[str, str],
    item_separator: str = ",",
    key_separator: str = "=",
    strip: bool = True,
) -> dict[str, str]:
    """Parse a dictionary from a string.

    Format: "key1=value1,key2=value2"

    Args:
        value: String or dict to parse
        item_separator: Separator between items
        key_separator: Separator between key and value
        strip: Whether to strip whitespace

    Returns:
        Dictionary of string keys and values

    Raises:
        ValueError: If format is invalid

    """
    if isinstance(value, dict):
        return value

    if not value:
        return {}

    result = {}
    items = value.split(item_separator)

    for item in items:
        if not item:
            continue

        if key_separator not in item:
            raise ValueError(f"Invalid dict format: '{item}' missing '{key_separator}'")

        key, val = item.split(key_separator, 1)

        if strip:
            key = key.strip()
            val = val.strip()

        result[key] = val

    return result

parse_duration

parse_duration(value: str) -> int

Parse duration string to seconds.

Supports formats like: 30s, 5m, 2h, 1d, 1h30m, etc.

Results are cached for performance on repeated calls.

Parameters:

Name Type Description Default
value str

Duration string

required

Returns:

Type Description
int

Duration in seconds

Examples:

>>> parse_duration('30s')
30
>>> parse_duration('1h30m')
5400
>>> parse_duration('2d')
172800
Source code in provide/foundation/utils/environment/parsers.py
@cached(maxsize=256)
def parse_duration(value: str) -> int:
    """Parse duration string to seconds.

    Supports formats like: 30s, 5m, 2h, 1d, 1h30m, etc.

    Results are cached for performance on repeated calls.

    Args:
        value: Duration string

    Returns:
        Duration in seconds

    Examples:
        >>> parse_duration('30s')
        30
        >>> parse_duration('1h30m')
        5400
        >>> parse_duration('2d')
        172800

    """
    if value.isdigit():
        return int(value)

    total_seconds = 0

    # Use pre-compiled pattern
    matches = _DURATION_PATTERN.findall(value.lower())

    if not matches:
        raise ValidationError(f"Invalid duration format: {value}", value=value, rule="duration")

    units = {
        "s": 1,
        "m": 60,
        "h": 3600,
        "d": 86400,
    }

    for amount, unit in matches:
        if unit in units:
            total_seconds += int(amount) * units[unit]
        else:
            raise ValidationError(f"Unknown duration unit: {unit}", value=value, rule="duration_unit")

    return total_seconds

parse_list

parse_list(
    value: str | list[str],
    separator: str = ",",
    strip: bool = True,
) -> list[str]

Parse a list from a string.

Parameters:

Name Type Description Default
value str | list[str]

String or list to parse

required
separator str

Separator character

','
strip bool

Whether to strip whitespace from items

True

Returns:

Type Description
list[str]

List of strings

Source code in provide/foundation/parsers/collections.py
def parse_list(
    value: str | list[str],
    separator: str = ",",
    strip: bool = True,
) -> list[str]:
    """Parse a list from a string.

    Args:
        value: String or list to parse
        separator: Separator character
        strip: Whether to strip whitespace from items

    Returns:
        List of strings

    """
    if isinstance(value, list):
        return value

    if not value:
        return []

    items = value.split(separator)

    if strip:
        items = [item.strip() for item in items]

    return items

parse_size

parse_size(value: str) -> int

Parse size string to bytes.

Supports formats like: 1024, 1KB, 10MB, 1.5GB, etc.

Results are cached for performance on repeated calls.

Parameters:

Name Type Description Default
value str

Size string

required

Returns:

Type Description
int

Size in bytes

Examples:

>>> parse_size('1024')
1024
>>> parse_size('10MB')
10485760
>>> parse_size('1.5GB')
1610612736
Source code in provide/foundation/utils/environment/parsers.py
@cached(maxsize=256)
def parse_size(value: str) -> int:
    """Parse size string to bytes.

    Supports formats like: 1024, 1KB, 10MB, 1.5GB, etc.

    Results are cached for performance on repeated calls.

    Args:
        value: Size string

    Returns:
        Size in bytes

    Examples:
        >>> parse_size('1024')
        1024
        >>> parse_size('10MB')
        10485760
        >>> parse_size('1.5GB')
        1610612736

    """
    if value.isdigit():
        return int(value)

    # Use pre-compiled pattern
    match = _SIZE_PATTERN.match(value.upper())

    if not match:
        raise ValidationError(f"Invalid size format: {value}", value=value, rule="size")

    amount = float(match.group(1))
    unit = match.group(2) or "B"

    units = {
        "B": 1,
        "KB": 1024,
        "K": 1024,
        "MB": 1024**2,
        "M": 1024**2,
        "GB": 1024**3,
        "G": 1024**3,
        "TB": 1024**4,
        "T": 1024**4,
    }

    if unit not in units:
        raise ValidationError(f"Unknown size unit: {unit}", value=value, rule="size_unit")

    return int(amount * units[unit])

parse_typed_value

parse_typed_value(value: str, target_type: type) -> Any

Parse a string value to a specific type.

Handles basic types (int, float, bool, str) and generic types (list, dict). For attrs fields, pass field.type as target_type.

Parameters:

Name Type Description Default
value str

String value to parse

required
target_type type

Target type to convert to

required

Returns:

Type Description
Any

Parsed value of the target type

Examples:

>>> parse_typed_value("42", int)
42
>>> parse_typed_value("true", bool)
True
>>> parse_typed_value("a,b,c", list)
['a', 'b', 'c']
Source code in provide/foundation/parsers/typed.py
def parse_typed_value(value: str, target_type: type) -> Any:
    """Parse a string value to a specific type.

    Handles basic types (int, float, bool, str) and generic types (list, dict).
    For attrs fields, pass field.type as target_type.

    Args:
        value: String value to parse
        target_type: Target type to convert to

    Returns:
        Parsed value of the target type

    Examples:
        >>> parse_typed_value("42", int)
        42
        >>> parse_typed_value("true", bool)
        True
        >>> parse_typed_value("a,b,c", list)
        ['a', 'b', 'c']

    """
    if value is None:
        return None

    # Try basic types first
    result = _parse_basic_type(value, target_type)
    if result is not None or target_type in (bool, int, float, str):
        return result

    # Try generic types
    result = _parse_generic_type(value, target_type)
    if result is not None:
        return result

    # Default to string
    return value

require

require(name: str, type_hint: type[T] | None = None) -> Any

Require an environment variable to be set.

Parameters:

Name Type Description Default
name str

Environment variable name

required
type_hint type[T] | None

Optional type hint for parsing

None

Returns:

Type Description
Any

Parsed value

Raises:

Type Description
ValidationError

If variable is not set

Source code in provide/foundation/utils/environment/getters.py
def require(name: str, type_hint: type[T] | None = None) -> Any:
    """Require an environment variable to be set.

    Args:
        name: Environment variable name
        type_hint: Optional type hint for parsing

    Returns:
        Parsed value

    Raises:
        ValidationError: If variable is not set

    """
    if name not in os.environ:
        raise ValidationError(
            f"Required environment variable not set: {name}",
            field=name,
            rule="required",
        )

    if type_hint is None:
        return os.environ[name]

    # Parse based on type hint
    origin = get_origin(type_hint)
    if origin is None:
        return _parse_simple_type(name, type_hint)
    else:
        return _parse_complex_type(name, origin)

require_dependency

require_dependency(name: str) -> None

Require a specific optional dependency, raise ImportError if missing.

Parameters:

Name Type Description Default
name str

Name of the dependency to require

required

Raises:

Type Description
ImportError

If dependency is not available

Source code in provide/foundation/utils/deps.py
def require_dependency(name: str) -> None:
    """Require a specific optional dependency, raise ImportError if missing.

    Args:
        name: Name of the dependency to require

    Raises:
        ImportError: If dependency is not available

    """
    if not has_dependency(name):
        raise ImportError(
            f"Optional dependency '{name}' is required for this feature. "
            f"Install with: pip install 'provide-foundation[{name}]'",
        )

reset_version_cache

reset_version_cache(
    package_name: str | None = None,
) -> None

Reset the cached version for testing.

Parameters:

Name Type Description Default
package_name str | None

Specific package to reset, or None to reset all

None
Warning

This should only be called from test code or test fixtures.

Source code in provide/foundation/utils/versioning.py
def reset_version_cache(package_name: str | None = None) -> None:
    """Reset the cached version for testing.

    Args:
        package_name: Specific package to reset, or None to reset all

    Warning:
        This should only be called from test code or test fixtures.
    """
    global _cached_versions
    with _version_lock:
        if package_name is None:
            _cached_versions.clear()
        else:
            _cached_versions.pop(package_name, None)

timed_block

timed_block(
    logger_instance: FoundationLogger,
    event_name: str,
    layer_keys: dict[str, Any] | None = None,
    initial_kvs: dict[str, Any] | None = None,
    **extra_kvs: Any
) -> Generator[dict[str, Any], None, None]

Context manager that logs the duration of a code block.

Logs at DEBUG when entering, INFO on success, ERROR on exception.

Parameters:

Name Type Description Default
logger_instance FoundationLogger

Logger to use for output

required
event_name str

Name of the operation being timed

required
layer_keys dict[str, Any] | None

Semantic layer keys (e.g., llm-specific keys)

None
initial_kvs dict[str, Any] | None

Initial key-value pairs to include in logs

None
**extra_kvs Any

Additional key-value pairs

{}

Yields:

Type Description
dict[str, Any]

A mutable dict that can be updated with additional context

Example

with timed_block(logger, "database_query") as ctx: ctx["query"] = "SELECT * FROM users" result = db.query("SELECT * FROM users") ctx["rows"] = len(result)

Source code in provide/foundation/utils/timing.py
@contextmanager
def timed_block(
    logger_instance: FoundationLogger,
    event_name: str,
    layer_keys: dict[str, Any] | None = None,
    initial_kvs: dict[str, Any] | None = None,
    **extra_kvs: Any,
) -> Generator[dict[str, Any], None, None]:
    """Context manager that logs the duration of a code block.

    Logs at DEBUG when entering, INFO on success, ERROR on exception.

    Args:
        logger_instance: Logger to use for output
        event_name: Name of the operation being timed
        layer_keys: Semantic layer keys (e.g., llm-specific keys)
        initial_kvs: Initial key-value pairs to include in logs
        **extra_kvs: Additional key-value pairs

    Yields:
        A mutable dict that can be updated with additional context

    Example:
        >>> with timed_block(logger, "database_query") as ctx:
        >>>     ctx["query"] = "SELECT * FROM users"
        >>>     result = db.query("SELECT * FROM users")
        >>>     ctx["rows"] = len(result)

    """
    # Combine all key-value pairs
    all_kvs = {}
    if layer_keys:
        all_kvs.update(layer_keys)
    if initial_kvs:
        all_kvs.update(initial_kvs)
    all_kvs.update(extra_kvs)

    # Try to get trace_id from context
    trace_id = _PROVIDE_CONTEXT_TRACE_ID.get()
    if trace_id and "trace_id" not in all_kvs:
        all_kvs["trace_id"] = trace_id

    # Create context dict that can be modified
    context: dict[str, Any] = {}

    # Log start
    logger_instance.debug(f"{event_name} started", **all_kvs)

    start_time = time.perf_counter()
    try:
        yield context

        # Success - calculate duration and log
        duration = time.perf_counter() - start_time
        all_kvs.update(context)
        all_kvs["duration_seconds"] = round(duration, 3)
        all_kvs["outcome"] = "success"

        logger_instance.info(f"{event_name} completed", **all_kvs)

    except Exception as e:
        # Error - calculate duration and log with exception
        duration = time.perf_counter() - start_time
        all_kvs.update(context)
        all_kvs["duration_seconds"] = round(duration, 3)
        all_kvs["outcome"] = "error"
        all_kvs["error.message"] = str(e)
        all_kvs["error.type"] = type(e).__name__

        logger_instance.error(f"{event_name} failed", exc_info=True, **all_kvs)
        raise