Skip to content

raw_to_cty

pyvider.cty.conversion.raw_to_cty

TODO: Add module docstring.

Classes

Functions

infer_cty_type_from_raw

infer_cty_type_from_raw(value: Any) -> CtyType[Any]

Infers the most specific CtyType from a raw Python value. This function uses an iterative approach with a work stack to avoid recursion limits and leverages a context-aware cache for performance and thread-safety.

Source code in pyvider/cty/conversion/raw_to_cty.py
@with_inference_cache
def infer_cty_type_from_raw(value: Any) -> CtyType[Any]:  # noqa: C901
    """
    Infers the most specific CtyType from a raw Python value.
    This function uses an iterative approach with a work stack to avoid recursion limits
    and leverages a context-aware cache for performance and thread-safety.
    """
    with error_boundary(
        context={
            "operation": "cty_type_inference",
            "value_type": type(value).__name__,
            "is_attrs_class": attrs.has(type(value)) if hasattr(value, "__class__") else False,
            "value_repr": str(value)[:100] if value is not None else "None",  # Truncated for safety
        }
    ):
        from pyvider.cty.types import (
            CtyBool,
            CtyDynamic,
            CtyList,
            CtyMap,
            CtyNumber,
            CtyObject,
            CtySet,
            CtyString,
            CtyTuple,
            CtyType,
        )

        if isinstance(value, CtyValue) or value is None:
            return CtyDynamic()

        if isinstance(value, CtyType):
            return CtyDynamic()

        if attrs.has(type(value)):
            value = _attrs_to_dict_safe(value)

    container_cache = get_container_schema_cache()

    # If no cache is available (e.g., in worker threads for thread safety),
    # proceed without caching
    structural_key = None
    if container_cache is not None:
        structural_key = _get_structural_cache_key(value)
        if structural_key in container_cache:
            return container_cache[structural_key]

    POST_PROCESS = object()
    work_stack: list[Any] = [value]
    results: dict[int, CtyType[Any]] = {}
    processing: set[int] = set()

    while work_stack:
        current_item = work_stack.pop()

        if current_item is POST_PROCESS:
            container = work_stack.pop()
            container_id = id(container)
            processing.remove(container_id)

            if isinstance(container, dict) and all(isinstance(k, str) for k in container):
                container = {unicodedata.normalize("NFC", k): v for k, v in container.items()}

            child_values = container.values() if isinstance(container, dict) else container
            child_types = [
                (v.type if isinstance(v, CtyValue) else results.get(id(v), CtyDynamic())) for v in child_values
            ]

            inferred_schema: CtyType[Any]
            if isinstance(container, dict):
                if not container:
                    inferred_schema = CtyObject({})
                elif not all(isinstance(k, str) for k in container):
                    unified = _unify_types(set(child_types))
                    inferred_schema = CtyMap(element_type=unified)
                else:
                    attr_types = dict(zip(container.keys(), child_types, strict=True))
                    inferred_schema = CtyObject(attribute_types=attr_types)
            elif isinstance(container, tuple):
                inferred_schema = CtyTuple(element_types=tuple(child_types))
            elif isinstance(container, list | set):
                unified = _unify_types(set(child_types))
                inferred_schema = (
                    CtyList(element_type=unified)
                    if isinstance(container, list)
                    else CtySet(element_type=unified)
                )
            else:
                inferred_schema = CtyDynamic()

            results[container_id] = inferred_schema
            continue

        if attrs.has(type(current_item)) and not isinstance(current_item, CtyType):
            try:
                current_item = _attrs_to_dict_safe(current_item)
            except TypeError:
                results[id(current_item)] = CtyDynamic()
                continue

        if current_item is None:
            continue
        item_id = id(current_item)
        if item_id in results or item_id in processing:
            continue
        if isinstance(current_item, CtyValue):
            results[item_id] = current_item.type
            continue

        if not isinstance(current_item, dict | list | tuple | set):
            if isinstance(current_item, bool):
                results[item_id] = CtyBool()
            elif isinstance(current_item, int | float | Decimal):
                results[item_id] = CtyNumber()
            elif isinstance(current_item, str | bytes):
                results[item_id] = CtyString()
            else:
                results[item_id] = CtyDynamic()
            continue

        structural_key = _get_structural_cache_key(current_item)
        if container_cache is not None and structural_key in container_cache:
            results[item_id] = container_cache[structural_key]
            continue

        processing.add(item_id)
        work_stack.extend([current_item, POST_PROCESS])
        work_stack.extend(
            reversed(list(current_item.values() if isinstance(current_item, dict) else current_item))
        )

    final_type = results.get(id(value), CtyDynamic())

    # Cache the result if caching is available
    if container_cache is not None:
        final_structural_key = _get_structural_cache_key(value)
        container_cache[final_structural_key] = final_type

    return final_type