This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
@resilient(fallback=None,context_provider=lambda:{"function":"execute_error_handlers","module":"hub.handlers",},)defexecute_error_handlers(exception:Exception,context:dict[str,Any])->dict[str,Any]|None:"""Execute error handlers until one handles the exception."""handlers=get_handlers_for_exception(exception)forentryinhandlers:handler=entry.valuetry:result=handler(exception,context)ifresultisnotNone:returnresultexceptExceptionashandler_error:get_foundation_logger().error("Error handler failed",handler=entry.name,error=str(handler_error))returnNone
defget_handlers_for_exception(exception:Exception)->list[RegistryEntry]:"""Get error handlers that can handle the given exception type."""registry,ComponentCategory=_get_registry_and_lock()# Get all error handlersall_entries=list(registry)handlers=[entryforentryinall_entriesifentry.dimension==ComponentCategory.ERROR_HANDLER.value]# Filter by exception typeexception_type_name=type(exception).__name__matching_handlers=[]forentryinhandlers:exception_types=entry.metadata.get("exception_types",[])ifany(exc_typeinexception_type_nameorexception_type_nameinexc_typeforexc_typeinexception_types):matching_handlers.append(entry)# Sort by priority (highest first)matching_handlers.sort(key=lambdae:e.metadata.get("priority",0),reverse=True)returnmatching_handlers