Skip to content

Index

provide.foundation.file.operations.detectors

File operation detection system with extensible registry.

This module provides a registry-based system for detecting file operation patterns from file system events. Built-in detectors are automatically registered with priorities, and custom detectors can be added via the registry API.

Architecture
  • Protocol-based detector interface (DetectorFunc)
  • Priority-ordered execution (0-100, higher = earlier)
  • Extensible via register_detector()
  • Thread-safe singleton registry
Example - Using built-in detectors

from provide.foundation.file.operations.detectors import OperationDetector detector = OperationDetector() operation = detector.detect_operation(events)

Example - Registering custom detector

from provide.foundation.file.operations.detectors import register_detector def detect_my_pattern(events): ... # Custom detection logic ... return FileOperation(...) if pattern_found else None register_detector( ... name="detect_custom", ... func=detect_my_pattern, ... priority=85, ... description="Detects custom pattern" ... )

Classes

AtomicOperationDetector

Detects atomic file operations like safe writes and atomic saves.

Functions
detect_atomic_save
detect_atomic_save(
    events: list[FileEvent],
) -> FileOperation | None

Detect atomic save pattern (write to temp file, then rename).

Common pattern: create temp -> write temp -> rename temp to target

Source code in provide/foundation/file/operations/detectors/atomic.py
def detect_atomic_save(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect atomic save pattern (write to temp file, then rename).

    Common pattern: create temp -> write temp -> rename temp to target
    """
    if len(events) < 2:
        return None

    # Look for create/modify temp file followed by rename to target
    for i in range(len(events) - 1):
        current = events[i]
        next_event = events[i + 1]

        if (
            is_temp_file(current.path)
            and current.event_type in {"created", "modified"}
            and next_event.event_type == "moved"
            and next_event.path == current.path
            and next_event.dest_path
            and not is_temp_file(next_event.dest_path)
        ):
            # Found atomic save pattern
            target_path = next_event.dest_path

            # Look for other related events (additional writes to temp)
            related_events = [current, next_event]
            for j, event in enumerate(events):
                if j != i and j != i + 1 and event.path == current.path:
                    related_events.append(event)

            related_events.sort(key=lambda e: e.timestamp)

            return FileOperation(
                operation_type=OperationType.ATOMIC_SAVE,
                primary_path=target_path,
                events=related_events,
                confidence=0.95,
                description=f"Atomic save to {target_path.name}",
                start_time=related_events[0].timestamp,
                end_time=related_events[-1].timestamp,
                is_atomic=True,
                is_safe=True,
                files_affected=[target_path],
                metadata={
                    "temp_file": str(current.path),
                    "pattern": "atomic_save",
                },
            )

    return None
detect_safe_write
detect_safe_write(
    events: list[FileEvent],
) -> FileOperation | None

Detect safe write pattern (backup original, write new, cleanup).

Common pattern: create backup -> modify original OR rename original to backup -> create new

Source code in provide/foundation/file/operations/detectors/atomic.py
def detect_safe_write(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect safe write pattern (backup original, write new, cleanup).

    Common pattern: create backup -> modify original OR rename original to backup -> create new
    """
    if len(events) < 2:
        return None

    # Find backup files and match them with original files
    backup_events = []
    regular_events = []

    for event in events:
        if is_backup_file(event.path):
            backup_events.append(event)
        else:
            regular_events.append(event)

    # Try to match backup files with regular files
    for backup_event in backup_events:
        if backup_event.event_type not in {"moved", "created"}:
            continue

        # Extract base name from backup
        base_name = extract_base_name(backup_event.path)
        if not base_name:
            continue

        backup_parent = backup_event.path.parent
        expected_original = backup_parent / base_name

        # Find matching original file events
        matching_events = [
            e
            for e in regular_events
            if e.path == expected_original and e.event_type in {"created", "modified"}
        ]

        if matching_events:
            # Found safe write pattern
            original_event = matching_events[0]
            all_events = [backup_event, original_event]
            all_events.sort(key=lambda e: e.timestamp)

            return FileOperation(
                operation_type=OperationType.SAFE_WRITE,
                primary_path=original_event.path,
                events=all_events,
                confidence=0.95,
                description=f"Safe write to {original_event.path.name}",
                start_time=all_events[0].timestamp,
                end_time=all_events[-1].timestamp,
                is_atomic=False,
                is_safe=True,
                has_backup=True,
                files_affected=[original_event.path],
                metadata={
                    "backup_file": str(backup_event.path),
                    "pattern": "safe_write",
                },
            )

    return None

BatchOperationDetector

Detects batch operations and rename sequences.

Functions
detect_backup_create
detect_backup_create(
    events: list[FileEvent],
) -> FileOperation | None

Detect backup creation pattern.

Source code in provide/foundation/file/operations/detectors/batch.py
def detect_backup_create(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect backup creation pattern."""
    if len(events) < 2:
        return None

    # Look for move to backup name followed by create of original
    for i in range(len(events) - 1):
        move_event = events[i]
        create_event = events[i + 1]

        if (
            move_event.event_type == "moved"
            and create_event.event_type == "created"
            and is_backup_file(move_event.dest_path or move_event.path)
            and move_event.path == create_event.path
            and not is_temp_file(create_event.path)
        ):
            # Time window check (backup operations usually happen quickly)
            time_diff = (create_event.timestamp - move_event.timestamp).total_seconds()
            if time_diff <= 2.0:
                return FileOperation(
                    operation_type=OperationType.BACKUP_CREATE,
                    primary_path=create_event.path,
                    events=[move_event, create_event],
                    confidence=0.90,
                    description=f"Backup created for {create_event.path.name}",
                    start_time=move_event.timestamp,
                    end_time=create_event.timestamp,
                    is_atomic=True,
                    is_safe=True,
                    has_backup=True,
                    files_affected=[create_event.path],
                    metadata={
                        "backup_file": str(move_event.dest_path or move_event.path),
                        "pattern": "backup_create",
                    },
                )

    return None
detect_batch_update
detect_batch_update(
    events: list[FileEvent],
) -> FileOperation | None

Detect batch update pattern (multiple related files updated together).

Source code in provide/foundation/file/operations/detectors/batch.py
def detect_batch_update(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect batch update pattern (multiple related files updated together)."""
    if len(events) < 3:
        return None

    # Group events by directory and time proximity
    directory_groups = defaultdict(list)
    for event in events:
        if event.event_type in {"created", "modified", "deleted"}:
            directory_groups[event.path.parent].append(event)

    for directory, dir_events in directory_groups.items():
        if len(dir_events) < 3:
            continue

        dir_events.sort(key=lambda e: e.timestamp)

        # Check if events are clustered in time (within 5 seconds)
        time_span = (dir_events[-1].timestamp - dir_events[0].timestamp).total_seconds()
        if time_span <= 5.0 and self._files_are_related(dir_events):
            return FileOperation(
                operation_type=OperationType.BATCH_UPDATE,
                primary_path=directory,
                events=dir_events,
                confidence=0.85,
                description=f"Batch operation on {len(dir_events)} files",
                start_time=dir_events[0].timestamp,
                end_time=dir_events[-1].timestamp,
                is_atomic=False,
                is_safe=True,
                files_affected=[e.path for e in dir_events],
                metadata={
                    "file_count": len(dir_events),
                    "pattern": "batch_update",
                },
            )

    return None
detect_rename_sequence
detect_rename_sequence(
    events: list[FileEvent],
) -> FileOperation | None

Detect rename sequence pattern.

Source code in provide/foundation/file/operations/detectors/batch.py
def detect_rename_sequence(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect rename sequence pattern."""
    if len(events) < 2:
        return None

    # Look for chain of moves: A -> B -> C
    move_events = [e for e in events if e.event_type == "moved"]
    if len(move_events) < 2:
        return None

    # Build rename chains
    chains = []
    for move_event in move_events:
        # Find chains where this move's source path is another move's destination
        chain = [move_event]

        # Look backwards
        current_src = move_event.path
        for other_move in move_events:
            if other_move != move_event and other_move.dest_path == current_src:
                chain.insert(0, other_move)
                current_src = other_move.path

        # Look forwards
        current_dest = move_event.dest_path
        for other_move in move_events:
            if other_move != move_event and other_move.path == current_dest:
                chain.append(other_move)
                current_dest = other_move.dest_path

        if len(chain) >= 2:
            chains.append(chain)

    # Find the longest chain
    if chains:
        longest_chain = max(chains, key=len)
        longest_chain.sort(key=lambda e: e.timestamp)

        final_path = longest_chain[-1].dest_path or longest_chain[-1].path
        return FileOperation(
            operation_type=OperationType.RENAME_SEQUENCE,
            primary_path=final_path,
            events=longest_chain,
            confidence=0.90,
            description=f"Rename sequence of {len(longest_chain)} moves",
            start_time=longest_chain[0].timestamp,
            end_time=longest_chain[-1].timestamp,
            is_atomic=True,
            is_safe=True,
            files_affected=[final_path],
            metadata={
                "original_path": str(longest_chain[0].path),
                "chain_length": len(longest_chain),
                "pattern": "rename_sequence",
            },
        )

    return None

DetectorFunc

Bases: Protocol

Protocol for file operation detector functions.

A detector function analyzes a list of file events and attempts to identify a specific file operation pattern (atomic save, batch update, etc.).

Returns None if the pattern is not detected, or a FileOperation with confidence score if the pattern matches.

Functions
__call__
__call__(events: list[FileEvent]) -> FileOperation | None

Detect file operation pattern from events.

Parameters:

Name Type Description Default
events list[FileEvent]

List of file events to analyze

required

Returns:

Type Description
FileOperation | None

FileOperation if pattern detected, None otherwise

Source code in provide/foundation/file/operations/detectors/protocol.py
def __call__(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect file operation pattern from events.

    Args:
        events: List of file events to analyze

    Returns:
        FileOperation if pattern detected, None otherwise
    """
    ...

OperationDetector

OperationDetector(
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
)

Detects and classifies file operations from events.

Initialize with optional configuration and callback.

Parameters:

Name Type Description Default
config DetectorConfig | None

Detector configuration

None
on_operation_complete Any

Callback function(operation: FileOperation) called when an operation is detected. Used for streaming mode.

None
registry Registry | None

Optional registry for detectors (defaults to global)

None
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def __init__(
    self,
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
) -> None:
    """Initialize with optional configuration and callback.

    Args:
        config: Detector configuration
        on_operation_complete: Callback function(operation: FileOperation) called
                             when an operation is detected. Used for streaming mode.
        registry: Optional registry for detectors (defaults to global)
    """
    self.config = config or DetectorConfig()
    self.on_operation_complete = on_operation_complete
    self.registry = registry or get_detector_registry()
    self._pending_events: list[FileEvent] = []
    self._last_flush = datetime.now()

    # Create auto-flush handler for streaming mode
    self._auto_flush_handler = AutoFlushHandler(
        time_window_ms=self.config.time_window_ms,
        on_operation_complete=on_operation_complete,
        analyze_func=self._analyze_event_group,
    )
Functions
add_event
add_event(event: FileEvent) -> None

Add event with auto-flush and callback support.

This is the recommended method for streaming detection with automatic temp file hiding and callback-based operation reporting.

Parameters:

Name Type Description Default
event FileEvent

File event to process

required
Behavior
  • Hides temp files automatically (no callback until operation completes)
  • Schedules auto-flush timer for pending operations
  • Calls on_operation_complete(operation) when pattern detected
  • Emits non-temp files immediately if no operation pattern found
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def add_event(self, event: FileEvent) -> None:
    """Add event with auto-flush and callback support.

    This is the recommended method for streaming detection with automatic
    temp file hiding and callback-based operation reporting.

    Args:
        event: File event to process

    Behavior:
        - Hides temp files automatically (no callback until operation completes)
        - Schedules auto-flush timer for pending operations
        - Calls on_operation_complete(operation) when pattern detected
        - Emits non-temp files immediately if no operation pattern found
    """
    # Delegate to auto-flush handler
    self._auto_flush_handler.add_event(event)
detect
detect(events: list[FileEvent]) -> list[FileOperation]

Detect all operations from a list of events.

Parameters:

Name Type Description Default
events list[FileEvent]

List of file events to analyze

required

Returns:

Type Description
list[FileOperation]

List of detected operations, ordered by start time

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect(self, events: list[FileEvent]) -> list[FileOperation]:
    """Detect all operations from a list of events.

    Args:
        events: List of file events to analyze

    Returns:
        List of detected operations, ordered by start time
    """
    if not events:
        return []

    # Sort events by timestamp
    sorted_events = sorted(events, key=lambda e: e.timestamp)

    # Group events by time windows
    event_groups = self._group_events_by_time(sorted_events)

    operations = []
    for group in event_groups:
        operation = self._analyze_event_group(group)
        if operation:
            operations.append(operation)

    return operations
detect_streaming
detect_streaming(event: FileEvent) -> FileOperation | None

Process events in streaming fashion.

Parameters:

Name Type Description Default
event FileEvent

Single file event

required

Returns:

Type Description
FileOperation | None

Completed operation if detected, None otherwise

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect_streaming(self, event: FileEvent) -> FileOperation | None:
    """Process events in streaming fashion.

    Args:
        event: Single file event

    Returns:
        Completed operation if detected, None otherwise
    """
    self._pending_events.append(event)

    # Check if we should flush based on time window
    now = datetime.now()
    time_since_last = (now - self._last_flush).total_seconds() * 1000

    if time_since_last >= self.config.time_window_ms:
        return self._flush_pending()

    return None
flush
flush() -> list[FileOperation]

Get any pending operations and clear buffer.

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def flush(self) -> list[FileOperation]:
    """Get any pending operations and clear buffer."""
    operations = []
    if self._pending_events:
        operation = self._flush_pending()
        if operation:
            operations.append(operation)
    return operations

SimpleOperationDetector

Detects simple, direct file operations.

Functions
detect_direct_modification
detect_direct_modification(
    events: list[FileEvent],
) -> FileOperation | None

Detect direct file modification (multiple events on same file).

Source code in provide/foundation/file/operations/detectors/simple.py
def detect_direct_modification(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect direct file modification (multiple events on same file)."""
    if len(events) < 2:
        return None

    # Check if all events are for the same file
    first_event = events[0]
    if not all(event.path == first_event.path for event in events):
        return None

    # Sort by timestamp
    sorted_events = sorted(events, key=lambda e: e.timestamp)

    # Check if this is all modifies OR created followed by modifies
    event_types = [e.event_type for e in sorted_events]
    is_all_modifies = all(et == "modified" for et in event_types)
    is_create_then_modifies = event_types[0] == "created" and all(
        et == "modified" for et in event_types[1:]
    )

    if not (is_all_modifies or is_create_then_modifies):
        return None

    # Determine operation type based on pattern
    if is_create_then_modifies:
        op_type = OperationType.BACKUP_CREATE
        description = f"File created and modified: {first_event.path.name}"
    else:
        op_type = OperationType.ATOMIC_SAVE
        description = f"Multiple modifications to {first_event.path.name}"

    return FileOperation(
        operation_type=op_type,
        primary_path=first_event.path,
        events=sorted_events,
        confidence=0.80,
        description=description,
        start_time=sorted_events[0].timestamp,
        end_time=sorted_events[-1].timestamp,
        is_atomic=False,
        is_safe=True,
        files_affected=[first_event.path],
        metadata={
            "event_count": len(sorted_events),
            "pattern": "direct_modification" if is_all_modifies else "create_modify",
        },
    )
detect_same_file_delete_create_pattern
detect_same_file_delete_create_pattern(
    events: list[FileEvent], window_ms: int = 1000
) -> FileOperation | None

Detect delete followed by create of same file (replace pattern).

Source code in provide/foundation/file/operations/detectors/simple.py
def detect_same_file_delete_create_pattern(
    self, events: list[FileEvent], window_ms: int = 1000
) -> FileOperation | None:
    """Detect delete followed by create of same file (replace pattern)."""
    if len(events) < 2:
        return None

    # Group events by path
    path_groups: dict[str, list[FileEvent]] = {}
    for event in events:
        path_str = str(event.path)
        if path_str not in path_groups:
            path_groups[path_str] = []
        path_groups[path_str].append(event)

    for path_str, path_events in path_groups.items():
        if len(path_events) < 2:
            continue

        path_events.sort(key=lambda e: e.timestamp)

        # Look for delete followed by create
        for i in range(len(path_events) - 1):
            delete_event = path_events[i]
            create_event = path_events[i + 1]

            if delete_event.event_type == "deleted" and create_event.event_type == "created":
                time_diff = (create_event.timestamp - delete_event.timestamp).total_seconds() * 1000

                if time_diff <= window_ms:
                    return FileOperation(
                        operation_type=OperationType.ATOMIC_SAVE,
                        primary_path=Path(path_str),
                        events=[delete_event, create_event],
                        confidence=0.90,
                        description=f"File replaced: {Path(path_str).name}",
                        start_time=delete_event.timestamp,
                        end_time=create_event.timestamp,
                        is_atomic=True,
                        is_safe=True,
                        files_affected=[Path(path_str)],
                        metadata={
                            "pattern": "delete_create_replace",
                        },
                    )

    return None
detect_simple_operation
detect_simple_operation(
    events: list[FileEvent],
) -> FileOperation | None

Detect simple single-event operations.

Source code in provide/foundation/file/operations/detectors/simple.py
def detect_simple_operation(self, events: list[FileEvent]) -> FileOperation | None:
    """Detect simple single-event operations."""
    if len(events) != 1:
        return None

    event = events[0]

    # Map event types to operation types
    type_mapping = {
        "created": OperationType.BACKUP_CREATE,
        "modified": OperationType.ATOMIC_SAVE,
        "deleted": OperationType.TEMP_CLEANUP,
        "moved": OperationType.RENAME_SEQUENCE,
    }

    if event.event_type not in type_mapping:
        return None

    operation_type = type_mapping[event.event_type]

    # Special handling for move operations
    if event.event_type == "moved":
        primary_path = event.dest_path or event.path
        metadata = {
            "original_path": str(event.path),
            "pattern": "simple_move",
        }
    else:
        primary_path = event.path
        metadata = {
            "pattern": f"simple_{event.event_type}",
        }

    # Check if this is a backup file
    is_backup_path = is_backup_file(primary_path)

    return FileOperation(
        operation_type=operation_type,
        primary_path=primary_path,
        events=[event],
        confidence=0.70,
        description=f"Simple {event.event_type} on {primary_path.name}",
        start_time=event.timestamp,
        end_time=event.timestamp,
        is_atomic=True,
        is_safe=True,
        has_backup=is_backup_path,
        files_affected=[primary_path],
        metadata=metadata,
    )

TempPatternDetector

Detects patterns involving temporary files.

Functions
detect_delete_temp_pattern
detect_delete_temp_pattern(
    events: list[FileEvent], temp_window_ms: int = 1000
) -> FileOperation | None

Detect pattern: delete original -> create temp -> rename temp.

Source code in provide/foundation/file/operations/detectors/temp.py
def detect_delete_temp_pattern(
    self, events: list[FileEvent], temp_window_ms: int = 1000
) -> FileOperation | None:
    """Detect pattern: delete original -> create temp -> rename temp."""
    if len(events) < 3:
        return None

    for i in range(len(events) - 2):
        delete_event = events[i]
        temp_create = events[i + 1]
        temp_rename = events[i + 2]

        if (
            delete_event.event_type == "deleted"
            and temp_create.event_type == "created"
            and is_temp_file(temp_create.path)
            and temp_rename.event_type == "moved"
            and temp_rename.path == temp_create.path
            and temp_rename.dest_path == delete_event.path
        ):
            time_span = (temp_rename.timestamp - delete_event.timestamp).total_seconds() * 1000

            if time_span <= temp_window_ms:
                return FileOperation(
                    operation_type=OperationType.ATOMIC_SAVE,
                    primary_path=delete_event.path,
                    events=[delete_event, temp_create, temp_rename],
                    confidence=0.90,
                    description=f"File atomically saved via temp: {delete_event.path.name}",
                    start_time=delete_event.timestamp,
                    end_time=temp_rename.timestamp,
                    is_atomic=True,
                    is_safe=True,
                    files_affected=[delete_event.path],
                    metadata={
                        "temp_file": str(temp_create.path),
                        "pattern": "delete_temp_rename",
                    },
                )

    return None
detect_temp_create_delete_pattern
detect_temp_create_delete_pattern(
    events: list[FileEvent], temp_window_ms: int = 5000
) -> FileOperation | None

Detect pattern: create temp -> delete temp -> create real file.

Note: High complexity is intentional to handle all temp file patterns.

Source code in provide/foundation/file/operations/detectors/temp.py
def detect_temp_create_delete_pattern(  # noqa: C901
    self, events: list[FileEvent], temp_window_ms: int = 5000
) -> FileOperation | None:
    """Detect pattern: create temp -> delete temp -> create real file.

    Note: High complexity is intentional to handle all temp file patterns.
    """
    if len(events) < 2:
        return None

    # Look for temp file creation followed by deletion, then real file creation
    temp_files: dict[str, list[FileEvent]] = {}
    real_files: dict[str, list[FileEvent]] = {}

    for event in events:
        if is_temp_file(event.path):
            path_str = str(event.path)
            if path_str not in temp_files:
                temp_files[path_str] = []
            temp_files[path_str].append(event)
        else:
            path_str = str(event.path)
            if path_str not in real_files:
                real_files[path_str] = []
            real_files[path_str].append(event)

    # Check for create temp -> delete temp -> create real pattern
    for temp_path_str, temp_events in temp_files.items():
        if len(temp_events) < 2:
            continue

        temp_events.sort(key=lambda e: e.timestamp)

        # Check for create -> delete pattern on temp file
        if temp_events[0].event_type == "created" and temp_events[-1].event_type == "deleted":
            # Extract base name from temp file
            temp_path = Path(temp_path_str)
            base_name = extract_base_name(temp_path)

            # Look for real file creation after temp deletion
            if base_name:
                real_path = temp_path.parent / base_name
                real_path_str = str(real_path)

                if real_path_str in real_files:
                    real_events = real_files[real_path_str]
                    # Find create events after temp deletion
                    for real_event in real_events:
                        if (
                            real_event.event_type == "created"
                            and real_event.timestamp >= temp_events[-1].timestamp
                        ):
                            time_span = (
                                real_event.timestamp - temp_events[0].timestamp
                            ).total_seconds() * 1000

                            if time_span <= temp_window_ms:
                                all_events = [*temp_events, real_event]
                                all_events.sort(key=lambda e: e.timestamp)

                                return FileOperation(
                                    operation_type=OperationType.ATOMIC_SAVE,
                                    primary_path=real_path,
                                    events=all_events,
                                    confidence=0.92,
                                    description=f"Atomic save to {real_path.name}",
                                    start_time=all_events[0].timestamp,
                                    end_time=all_events[-1].timestamp,
                                    is_atomic=True,
                                    is_safe=True,
                                    files_affected=[real_path],
                                    metadata={
                                        "temp_file": temp_path_str,
                                        "pattern": "temp_create_delete_create_real",
                                    },
                                )

            # If no real file found, don't return an operation
            # Pure temp file operations (create→delete with no real file) should be
            # filtered by the auto-flush handler, not returned as invalid operations
            log.debug(
                "Temp file created and deleted with no real file - not returning operation",
                temp_file=temp_path_str,
                event_count=len(temp_events),
            )
            # Return None - let auto-flush handler filter these temp-only events

    return None
detect_temp_modify_pattern
detect_temp_modify_pattern(
    events: list[FileEvent], temp_window_ms: int = 1000
) -> FileOperation | None

Detect pattern: create temp -> modify temp -> rename to final.

Source code in provide/foundation/file/operations/detectors/temp.py
def detect_temp_modify_pattern(
    self, events: list[FileEvent], temp_window_ms: int = 1000
) -> FileOperation | None:
    """Detect pattern: create temp -> modify temp -> rename to final."""
    if len(events) < 3:
        return None

    # Group events by temp files
    temp_groups: dict[str, list[FileEvent]] = {}
    for event in events:
        if is_temp_file(event.path):
            path_str = str(event.path)
            if path_str not in temp_groups:
                temp_groups[path_str] = []
            temp_groups[path_str].append(event)

    for temp_path_str, temp_events in temp_groups.items():
        if len(temp_events) < 2:
            continue

        temp_events.sort(key=lambda e: e.timestamp)

        # Look for create -> modify -> rename sequence
        if temp_events[0].event_type == "created" and any(
            e.event_type == "modified" for e in temp_events[1:]
        ):
            # Find corresponding rename event
            temp_path = Path(temp_path_str)
            rename_events = [e for e in events if e.event_type == "moved" and e.path == temp_path]

            if rename_events:
                rename_event = rename_events[0]
                all_events = [*temp_events, rename_event]
                all_events.sort(key=lambda e: e.timestamp)

                time_span = (all_events[-1].timestamp - all_events[0].timestamp).total_seconds() * 1000

                if time_span <= temp_window_ms:
                    final_path = rename_event.dest_path or rename_event.path
                    return FileOperation(
                        operation_type=OperationType.ATOMIC_SAVE,
                        primary_path=final_path,
                        events=all_events,
                        confidence=0.92,
                        description=f"Atomic save to {final_path.name}",
                        start_time=all_events[0].timestamp,
                        end_time=all_events[-1].timestamp,
                        is_atomic=True,
                        is_safe=True,
                        files_affected=[final_path],
                        metadata={
                            "temp_file": temp_path_str,
                            "pattern": "temp_modify_rename",
                        },
                    )

    return None
detect_temp_rename_pattern
detect_temp_rename_pattern(
    events: list[FileEvent], temp_window_ms: int = 1000
) -> FileOperation | None

Detect temp file rename pattern: create temp -> rename to final.

Source code in provide/foundation/file/operations/detectors/temp.py
def detect_temp_rename_pattern(
    self, events: list[FileEvent], temp_window_ms: int = 1000
) -> FileOperation | None:
    """Detect temp file rename pattern: create temp -> rename to final."""
    if len(events) < 2:
        return None

    # Look for temp file creation followed by rename
    for i in range(len(events) - 1):
        current = events[i]
        next_event = events[i + 1]

        time_diff = (next_event.timestamp - current.timestamp).total_seconds() * 1000

        if (
            current.event_type == "created"
            and is_temp_file(current.path)
            and next_event.event_type == "moved"
            and next_event.path == current.path
            and next_event.dest_path
            and time_diff <= temp_window_ms
        ):
            return FileOperation(
                operation_type=OperationType.ATOMIC_SAVE,
                primary_path=next_event.dest_path,
                events=[current, next_event],
                confidence=0.95,
                description=f"Atomic save to {next_event.dest_path.name}",
                start_time=current.timestamp,
                end_time=next_event.timestamp,
                is_atomic=True,
                is_safe=True,
                files_affected=[next_event.dest_path],
                metadata={
                    "temp_file": str(current.path),
                    "pattern": "temp_rename",
                },
            )

    return None

Functions

clear_detector_registry

clear_detector_registry() -> None

Clear all registered detectors (primarily for testing).

Source code in provide/foundation/file/operations/detectors/registry.py
def clear_detector_registry() -> None:
    """Clear all registered detectors (primarily for testing)."""
    registry = get_detector_registry()
    registry.clear("file_operation_detector")

get_all_detectors

get_all_detectors() -> list[tuple[str, DetectorFunc, int]]

Get all registered detectors sorted by priority (highest first).

Returns:

Type Description
list[tuple[str, DetectorFunc, int]]

List of tuples: (name, detector_func, priority)

Source code in provide/foundation/file/operations/detectors/registry.py
def get_all_detectors() -> list[tuple[str, DetectorFunc, int]]:
    """Get all registered detectors sorted by priority (highest first).

    Returns:
        List of tuples: (name, detector_func, priority)
    """
    registry = get_detector_registry()

    # Collect all entries from the file_operation_detector dimension
    entries = [entry for entry in registry if entry.dimension == "file_operation_detector"]

    # Sort by priority (highest first)
    sorted_entries = sorted(
        entries,
        key=lambda e: e.metadata.get("priority", 0),
        reverse=True,
    )

    return [(e.name, e.value, e.metadata.get("priority", 0)) for e in sorted_entries]

get_detector_registry

get_detector_registry() -> Registry

Get the global detector registry singleton.

Returns:

Type Description
Registry

Registry instance for file operation detectors

Source code in provide/foundation/file/operations/detectors/registry.py
def get_detector_registry() -> Registry:
    """Get the global detector registry singleton.

    Returns:
        Registry instance for file operation detectors
    """
    global _detector_registry
    if _detector_registry is None:
        _detector_registry = Registry()
    return _detector_registry

register_detector

register_detector(
    name: str,
    func: DetectorFunc,
    priority: int,
    description: str = "",
) -> None

Register a file operation detector function.

Parameters:

Name Type Description Default
name str

Unique detector name (e.g., "detect_atomic_save")

required
func DetectorFunc

Detector function conforming to DetectorFunc protocol

required
priority int

Execution priority (0-100, higher = earlier execution)

required
description str

Human-readable description of what pattern is detected

''

Raises:

Type Description
AlreadyExistsError

If detector name already registered

Example

def detect_custom_pattern(events): ... # Custom detection logic ... return FileOperation(...) if pattern_found else None

register_detector( ... name="detect_custom", ... func=detect_custom_pattern, ... priority=75, ... description="Detects custom file operation pattern" ... )

Source code in provide/foundation/file/operations/detectors/registry.py
def register_detector(
    name: str,
    func: DetectorFunc,
    priority: int,
    description: str = "",
) -> None:
    """Register a file operation detector function.

    Args:
        name: Unique detector name (e.g., "detect_atomic_save")
        func: Detector function conforming to DetectorFunc protocol
        priority: Execution priority (0-100, higher = earlier execution)
        description: Human-readable description of what pattern is detected

    Raises:
        AlreadyExistsError: If detector name already registered

    Example:
        >>> def detect_custom_pattern(events):
        ...     # Custom detection logic
        ...     return FileOperation(...) if pattern_found else None
        >>>
        >>> register_detector(
        ...     name="detect_custom",
        ...     func=detect_custom_pattern,
        ...     priority=75,
        ...     description="Detects custom file operation pattern"
        ... )
    """
    registry = get_detector_registry()
    registry.register(
        name=name,
        value=func,
        dimension="file_operation_detector",
        metadata={
            "priority": priority,
            "description": description,
        },
    )