@with_inference_cache
def infer_cty_type_from_raw(value: Any) -> CtyType[Any]: # noqa: C901
"""
Infers the most specific CtyType from a raw Python value.
This function uses an iterative approach with a work stack to avoid recursion limits
and leverages a context-aware cache for performance and thread-safety.
"""
from pyvider.cty.types import (
CtyList,
CtyMap,
CtyObject,
CtySet,
CtyTuple,
CtyType,
)
# Fast path for primitives β avoid cache lookups and work stack allocation.
# Uses singleton instances to avoid repeated allocation of parameterless types.
if isinstance(value, bool):
return _get_singleton("bool")
if isinstance(value, int | float | Decimal):
return _get_singleton("number")
if isinstance(value, str | bytes):
return _get_singleton("string")
if isinstance(value, CtyValue) or value is None:
return _get_singleton("dynamic")
if isinstance(value, CtyType):
return _get_singleton("dynamic")
if attrs.has(type(value)):
value = _attrs_to_dict_safe(value)
container_cache = get_container_schema_cache()
# If no cache is available (e.g., in worker threads for thread safety),
# proceed without caching
structural_key = None
if container_cache is not None:
structural_key = _get_structural_cache_key(value)
if structural_key in container_cache:
return container_cache[structural_key]
POST_PROCESS = _POST_PROCESS
work_stack: list[Any] = [value]
results: dict[int, CtyType[Any]] = {}
processing: set[int] = set()
while work_stack:
current_item = work_stack.pop()
if current_item is POST_PROCESS:
container = work_stack.pop()
container_id = id(container)
processing.remove(container_id)
if isinstance(container, dict) and all(isinstance(k, str) for k in container):
container = {unicodedata.normalize("NFC", k): v for k, v in container.items()}
child_values = container.values() if isinstance(container, dict) else container
_dynamic = _get_singleton("dynamic")
child_types = [
(v.type if isinstance(v, CtyValue) else results.get(id(v), _dynamic)) for v in child_values
]
inferred_schema: CtyType[Any]
if isinstance(container, dict):
if not container:
inferred_schema = CtyObject({})
elif not all(isinstance(k, str) for k in container):
unified = _unify_types(set(child_types))
inferred_schema = CtyMap(element_type=unified)
else:
attr_types = dict(zip(container.keys(), child_types, strict=True))
inferred_schema = CtyObject(attribute_types=attr_types)
elif isinstance(container, tuple):
inferred_schema = CtyTuple(element_types=tuple(child_types))
elif isinstance(container, list | set):
unified = _unify_types(set(child_types))
inferred_schema = (
CtyList(element_type=unified)
if isinstance(container, list)
else CtySet(element_type=unified)
)
else:
inferred_schema = _get_singleton("dynamic")
results[container_id] = inferred_schema
continue
if attrs.has(type(current_item)) and not isinstance(current_item, CtyType):
try:
current_item = _attrs_to_dict_safe(current_item)
except TypeError:
results[id(current_item)] = _get_singleton("dynamic")
continue
if current_item is None:
continue
item_id = id(current_item)
if item_id in results or item_id in processing:
continue
if isinstance(current_item, CtyValue):
results[item_id] = current_item.type
continue
if not isinstance(current_item, dict | list | tuple | set):
if isinstance(current_item, bool):
results[item_id] = _get_singleton("bool")
elif isinstance(current_item, int | float | Decimal):
results[item_id] = _get_singleton("number")
elif isinstance(current_item, str | bytes):
results[item_id] = _get_singleton("string")
else:
results[item_id] = _get_singleton("dynamic")
continue
structural_key = _get_structural_cache_key(current_item)
if container_cache is not None and structural_key in container_cache:
results[item_id] = container_cache[structural_key]
continue
processing.add(item_id)
# Two appends avoid allocating a temporary 2-element list
work_stack.append(current_item)
work_stack.append(POST_PROCESS)
# Avoid intermediate list allocation: reversed() works directly on
# dict.values(), list, and tuple. Sets don't need ordering for inference.
if isinstance(current_item, dict):
work_stack.extend(reversed(current_item.values()))
elif isinstance(current_item, list | tuple):
work_stack.extend(reversed(current_item))
else:
work_stack.extend(current_item)
final_type = results.get(id(value), _get_singleton("dynamic"))
# Cache the result if caching is available
if container_cache is not None:
final_structural_key = _get_structural_cache_key(value)
container_cache[final_structural_key] = final_type
return final_type