This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
defdetect_direct_modification(self,events:list[FileEvent])->FileOperation|None:"""Detect direct file modification (multiple events on same file)."""iflen(events)<2:returnNone# Check if all events are for the same filefirst_event=events[0]ifnotall(event.path==first_event.pathforeventinevents):returnNone# Sort by timestampsorted_events=sorted(events,key=lambdae:e.timestamp)# Check if this is all modifies OR created followed by modifiesevent_types=[e.event_typeforeinsorted_events]is_all_modifies=all(et=="modified"foretinevent_types)is_create_then_modifies=event_types[0]=="created"andall(et=="modified"foretinevent_types[1:])ifnot(is_all_modifiesoris_create_then_modifies):returnNone# Determine operation type based on patternifis_create_then_modifies:op_type=OperationType.BACKUP_CREATEdescription=f"File created and modified: {first_event.path.name}"else:op_type=OperationType.ATOMIC_SAVEdescription=f"Multiple modifications to {first_event.path.name}"returnFileOperation(operation_type=op_type,primary_path=first_event.path,events=sorted_events,confidence=0.80,description=description,start_time=sorted_events[0].timestamp,end_time=sorted_events[-1].timestamp,is_atomic=False,is_safe=True,files_affected=[first_event.path],metadata={"event_count":len(sorted_events),"pattern":"direct_modification"ifis_all_modifieselse"create_modify",},)
defdetect_same_file_delete_create_pattern(self,events:list[FileEvent],window_ms:int=1000)->FileOperation|None:"""Detect delete followed by create of same file (replace pattern)."""iflen(events)<2:returnNone# Group events by pathpath_groups:dict[str,list[FileEvent]]={}foreventinevents:path_str=str(event.path)ifpath_strnotinpath_groups:path_groups[path_str]=[]path_groups[path_str].append(event)forpath_str,path_eventsinpath_groups.items():iflen(path_events)<2:continuepath_events.sort(key=lambdae:e.timestamp)# Look for delete followed by createforiinrange(len(path_events)-1):delete_event=path_events[i]create_event=path_events[i+1]ifdelete_event.event_type=="deleted"andcreate_event.event_type=="created":time_diff=(create_event.timestamp-delete_event.timestamp).total_seconds()*1000iftime_diff<=window_ms:returnFileOperation(operation_type=OperationType.ATOMIC_SAVE,primary_path=Path(path_str),events=[delete_event,create_event],confidence=0.90,description=f"File replaced: {Path(path_str).name}",start_time=delete_event.timestamp,end_time=create_event.timestamp,is_atomic=True,is_safe=True,files_affected=[Path(path_str)],metadata={"pattern":"delete_create_replace",},)returnNone
defdetect_simple_operation(self,events:list[FileEvent])->FileOperation|None:"""Detect simple single-event operations."""iflen(events)!=1:returnNoneevent=events[0]# Map event types to operation typestype_mapping={"created":OperationType.BACKUP_CREATE,"modified":OperationType.ATOMIC_SAVE,"deleted":OperationType.TEMP_CLEANUP,"moved":OperationType.RENAME_SEQUENCE,}ifevent.event_typenotintype_mapping:returnNoneoperation_type=type_mapping[event.event_type]# Special handling for move operationsifevent.event_type=="moved":primary_path=event.dest_pathorevent.pathmetadata={"original_path":str(event.path),"pattern":"simple_move",}else:primary_path=event.pathmetadata={"pattern":f"simple_{event.event_type}",}# Check if this is a backup fileis_backup_path=is_backup_file(primary_path)returnFileOperation(operation_type=operation_type,primary_path=primary_path,events=[event],confidence=0.70,description=f"Simple {event.event_type} on {primary_path.name}",start_time=event.timestamp,end_time=event.timestamp,is_atomic=True,is_safe=True,has_backup=is_backup_path,files_affected=[primary_path],metadata=metadata,)