This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.
defdetect_backup_create(self,events:list[FileEvent])->FileOperation|None:"""Detect backup creation pattern."""iflen(events)<2:returnNone# Look for move to backup name followed by create of originalforiinrange(len(events)-1):move_event=events[i]create_event=events[i+1]if(move_event.event_type=="moved"andcreate_event.event_type=="created"andis_backup_file(move_event.dest_pathormove_event.path)andmove_event.path==create_event.pathandnotis_temp_file(create_event.path)):# Time window check (backup operations usually happen quickly)time_diff=(create_event.timestamp-move_event.timestamp).total_seconds()iftime_diff<=2.0:returnFileOperation(operation_type=OperationType.BACKUP_CREATE,primary_path=create_event.path,events=[move_event,create_event],confidence=0.90,description=f"Backup created for {create_event.path.name}",start_time=move_event.timestamp,end_time=create_event.timestamp,is_atomic=True,is_safe=True,has_backup=True,files_affected=[create_event.path],metadata={"backup_file":str(move_event.dest_pathormove_event.path),"pattern":"backup_create",},)returnNone
defdetect_batch_update(self,events:list[FileEvent])->FileOperation|None:"""Detect batch update pattern (multiple related files updated together)."""iflen(events)<3:returnNone# Group events by directory and time proximitydirectory_groups=defaultdict(list)foreventinevents:ifevent.event_typein{"created","modified","deleted"}:directory_groups[event.path.parent].append(event)fordirectory,dir_eventsindirectory_groups.items():iflen(dir_events)<3:continuedir_events.sort(key=lambdae:e.timestamp)# Check if events are clustered in time (within 5 seconds)time_span=(dir_events[-1].timestamp-dir_events[0].timestamp).total_seconds()iftime_span<=5.0andself._files_are_related(dir_events):returnFileOperation(operation_type=OperationType.BATCH_UPDATE,primary_path=directory,events=dir_events,confidence=0.85,description=f"Batch operation on {len(dir_events)} files",start_time=dir_events[0].timestamp,end_time=dir_events[-1].timestamp,is_atomic=False,is_safe=True,files_affected=[e.pathforeindir_events],metadata={"file_count":len(dir_events),"pattern":"batch_update",},)returnNone
defdetect_rename_sequence(self,events:list[FileEvent])->FileOperation|None:"""Detect rename sequence pattern."""iflen(events)<2:returnNone# Look for chain of moves: A -> B -> Cmove_events=[eforeineventsife.event_type=="moved"]iflen(move_events)<2:returnNone# Build rename chainschains=[]formove_eventinmove_events:# Find chains where this move's source path is another move's destinationchain=[move_event]# Look backwardscurrent_src=move_event.pathforother_moveinmove_events:ifother_move!=move_eventandother_move.dest_path==current_src:chain.insert(0,other_move)current_src=other_move.path# Look forwardscurrent_dest=move_event.dest_pathforother_moveinmove_events:ifother_move!=move_eventandother_move.path==current_dest:chain.append(other_move)current_dest=other_move.dest_pathiflen(chain)>=2:chains.append(chain)# Find the longest chainifchains:longest_chain=max(chains,key=len)longest_chain.sort(key=lambdae:e.timestamp)final_path=longest_chain[-1].dest_pathorlongest_chain[-1].pathreturnFileOperation(operation_type=OperationType.RENAME_SEQUENCE,primary_path=final_path,events=longest_chain,confidence=0.90,description=f"Rename sequence of {len(longest_chain)} moves",start_time=longest_chain[0].timestamp,end_time=longest_chain[-1].timestamp,is_atomic=True,is_safe=True,files_affected=[final_path],metadata={"original_path":str(longest_chain[0].path),"chain_length":len(longest_chain),"pattern":"rename_sequence",},)returnNone