Skip to content

Batch

provide.foundation.file.operations.detectors.batch

Batch operation detectors.

Classes

BatchOperationDetector

Detects batch operations and rename sequences.

Functions
detect_backup_create
detect_backup_create(
    events: list[FileEvent],
) -> FileOperation | None

Detect backup creation pattern.

Source code in provide/foundation/file/operations/detectors/batch.py
def detect_backup_create(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect backup creation pattern."""
    if len(events) < 2:
        return None

    # Look for move to backup name followed by create of original
    for i in range(len(events) - 1):
        move_event = events[i]
        create_event = events[i + 1]

        if (
            move_event.event_type == "moved"
            and create_event.event_type == "created"
            and is_backup_file(move_event.dest_path or move_event.path)
            and move_event.path == create_event.path
            and not is_temp_file(create_event.path)
        ):
            # Time window check (backup operations usually happen quickly)
            time_diff = (create_event.timestamp - move_event.timestamp).total_seconds()
            if time_diff <= 2.0:
                return FileOperation(
                    operation_type=OperationType.BACKUP_CREATE,
                    primary_path=create_event.path,
                    events=[move_event, create_event],
                    confidence=0.90,
                    description=f"Backup created for {create_event.path.name}",
                    start_time=move_event.timestamp,
                    end_time=create_event.timestamp,
                    is_atomic=True,
                    is_safe=True,
                    has_backup=True,
                    files_affected=[create_event.path],
                    metadata={
                        "backup_file": str(move_event.dest_path or move_event.path),
                        "pattern": "backup_create",
                    },
                )

    return None
detect_batch_update
detect_batch_update(
    events: list[FileEvent],
) -> FileOperation | None

Detect batch update pattern (multiple related files updated together).

Source code in provide/foundation/file/operations/detectors/batch.py
def detect_batch_update(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect batch update pattern (multiple related files updated together)."""
    if len(events) < 3:
        return None

    # Group events by directory and time proximity
    directory_groups = defaultdict(list)
    for event in events:
        if event.event_type in {"created", "modified", "deleted"}:
            directory_groups[event.path.parent].append(event)

    for directory, dir_events in directory_groups.items():
        if len(dir_events) < 3:
            continue

        dir_events.sort(key=lambda e: e.timestamp)

        # Check if events are clustered in time (within 5 seconds)
        time_span = (dir_events[-1].timestamp - dir_events[0].timestamp).total_seconds()
        if time_span <= 5.0 and self._files_are_related(dir_events):
            return FileOperation(
                operation_type=OperationType.BATCH_UPDATE,
                primary_path=directory,
                events=dir_events,
                confidence=0.85,
                description=f"Batch operation on {len(dir_events)} files",
                start_time=dir_events[0].timestamp,
                end_time=dir_events[-1].timestamp,
                is_atomic=False,
                is_safe=True,
                files_affected=[e.path for e in dir_events],
                metadata={
                    "file_count": len(dir_events),
                    "pattern": "batch_update",
                },
            )

    return None
detect_rename_sequence
detect_rename_sequence(
    events: list[FileEvent],
) -> FileOperation | None

Detect rename sequence pattern.

Source code in provide/foundation/file/operations/detectors/batch.py
def detect_rename_sequence(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect rename sequence pattern."""
    if len(events) < 2:
        return None

    # Look for chain of moves: A -> B -> C
    move_events = [e for e in events if e.event_type == "moved"]
    if len(move_events) < 2:
        return None

    # Build rename chains
    chains = []
    for move_event in move_events:
        # Find chains where this move's source path is another move's destination
        chain = [move_event]

        # Look backwards
        current_src = move_event.path
        for other_move in move_events:
            if other_move != move_event and other_move.dest_path == current_src:
                chain.insert(0, other_move)
                current_src = other_move.path

        # Look forwards
        current_dest = move_event.dest_path
        for other_move in move_events:
            if other_move != move_event and other_move.path == current_dest:
                chain.append(other_move)
                current_dest = other_move.dest_path

        if len(chain) >= 2:
            chains.append(chain)

    # Find the longest chain
    if chains:
        longest_chain = max(chains, key=len)
        longest_chain.sort(key=lambda e: e.timestamp)

        final_path = longest_chain[-1].dest_path or longest_chain[-1].path
        return FileOperation(
            operation_type=OperationType.RENAME_SEQUENCE,
            primary_path=final_path,
            events=longest_chain,
            confidence=0.90,
            description=f"Rename sequence of {len(longest_chain)} moves",
            start_time=longest_chain[0].timestamp,
            end_time=longest_chain[-1].timestamp,
            is_atomic=True,
            is_safe=True,
            files_affected=[final_path],
            metadata={
                "original_path": str(longest_chain[0].path),
                "chain_length": len(longest_chain),
                "pattern": "rename_sequence",
            },
        )

    return None

Functions