This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
defget_config_chain()->list[RegistryEntry]:"""Get configuration sources ordered by priority."""registry,ComponentCategory=_get_registry_and_lock()# Get all config sourcesall_entries=list(registry)config_sources=[entryforentryinall_entriesifentry.dimension==ComponentCategory.CONFIG_SOURCE.value]# Sort by priority (highest first)config_sources.sort(key=lambdae:e.metadata.get("priority",0),reverse=True)returnconfig_sources
@resilient(fallback={},context_provider=lambda:{"function":"load_all_configs"})asyncdefload_all_configs()->dict[str,Any]:"""Load configurations from all registered sources."""configs={}chain=get_config_chain()forentryinchain:source=entry.valueifhasattr(source,"load_config"):try:ifinspect.iscoroutinefunction(source.load_config):source_config=awaitsource.load_config()else:source_config=source.load_config()ifsource_config:configs.update(source_config)exceptExceptionase:get_foundation_logger().warning("Config source failed to load",source=entry.name,error=str(e))returnconfigs
defload_config_from_registry(config_class:type[T])->T:"""Load configuration from registry sources. Args: config_class: Configuration class to instantiate Returns: Configuration instance loaded from registry sources """_registry,_ComponentCategory=_get_registry_and_lock()# Get configuration data from registryconfig_data={}# Load from all config sourceschain=get_config_chain()forentryinchain:source=entry.valueifhasattr(source,"load_config"):try:# Skip async sources in sync contextifinspect.iscoroutinefunction(source.load_config):get_foundation_logger().debug("Skipping async config source in sync context",source=entry.name,)continuesource_data=source.load_config()ifsource_data:config_data.update(source_data)exceptExceptionase:get_foundation_logger().warning("Failed to load config from source",source=entry.name,error=str(e),)# Create config instancereturnconfig_class.from_dict(config_data)
@resilient(fallback=None,suppress=(Exception,))defresolve_config_value(key:str)->Any:"""Resolve configuration value using priority-ordered sources."""registry,ComponentCategory=_get_registry_and_lock()# Get all config sourcesall_entries=list(registry)config_sources=[entryforentryinall_entriesifentry.dimension==ComponentCategory.CONFIG_SOURCE.value]# Sort by priority (highest first)config_sources.sort(key=lambdae:e.metadata.get("priority",0),reverse=True)# Try each sourceforentryinconfig_sources:source=entry.valueifhasattr(source,"get_value"):# Try to get value, continue on errortry:value=source.get_value(key)ifvalueisnotNone:returnvalueexceptExceptionase:# Log but continue - config sources may legitimately not have this keyget_foundation_logger().debug("Config source failed to get value",source=entry.name,key=key,error=str(e),)continuereturnNone