Skip to content

Index

provide.foundation.file.operations

File operation detection and analysis.

This module provides intelligent detection and grouping of file system events into logical operations (e.g., atomic saves, batch updates, rename sequences).

For most use cases, use the simple functional API:

>>> from provide.foundation.file.operations import detect, Event, Operation
>>>
>>> events = [Event(...), Event(...)]
>>> operation = detect(events)
>>> if operation:
...     print(f"{operation.type}: {operation.path}")

Advanced API

For streaming detection or custom configuration:

>>> from provide.foundation.file.operations import create_detector, DetectorConfig
>>>
>>> config = DetectorConfig(time_window_ms=1000)
>>> detector = create_detector(config)
>>>
>>> for event in event_stream:
...     if operation := detector.detect_streaming(event):
...         handle_operation(operation)

Classes

DetectorConfig

Configuration for operation detection.

Event

Single file system event with rich metadata.

Attributes
sequence property
sequence: int

Convenience accessor for sequence number.

size_delta property
size_delta: int | None

Change in file size, if known.

timestamp property
timestamp: datetime

Convenience accessor for timestamp.

FileEvent

Single file system event with rich metadata.

Attributes
sequence property
sequence: int

Convenience accessor for sequence number.

size_delta property
size_delta: int | None

Change in file size, if known.

timestamp property
timestamp: datetime

Convenience accessor for timestamp.

FileEventMetadata

Rich metadata for a file event.

FileOperation

A detected logical file system operation.

Attributes
duration_ms property
duration_ms: float

Total operation duration.

event_count property
event_count: int

Number of events in this operation.

Functions
get_timeline
get_timeline() -> list[tuple[float, FileEvent]]

Get events with relative timestamps (ms from start).

Source code in provide/foundation/file/operations/types.py
def get_timeline(self) -> list[tuple[float, FileEvent]]:
    """Get events with relative timestamps (ms from start)."""
    return [
        ((e.timestamp - self.start_time).total_seconds() * 1000, e)
        for e in sorted(self.events, key=lambda x: x.sequence)
    ]

Operation

A detected logical file system operation.

Attributes
duration_ms property
duration_ms: float

Total operation duration.

event_count property
event_count: int

Number of events in this operation.

Functions
get_timeline
get_timeline() -> list[tuple[float, FileEvent]]

Get events with relative timestamps (ms from start).

Source code in provide/foundation/file/operations/types.py
def get_timeline(self) -> list[tuple[float, FileEvent]]:
    """Get events with relative timestamps (ms from start)."""
    return [
        ((e.timestamp - self.start_time).total_seconds() * 1000, e)
        for e in sorted(self.events, key=lambda x: x.sequence)
    ]

OperationDetector

OperationDetector(
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
)

Detects and classifies file operations from events.

Initialize with optional configuration and callback.

Parameters:

Name Type Description Default
config DetectorConfig | None

Detector configuration

None
on_operation_complete Any

Callback function(operation: FileOperation) called when an operation is detected. Used for streaming mode.

None
registry Registry | None

Optional registry for detectors (defaults to global)

None
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def __init__(
    self,
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
) -> None:
    """Initialize with optional configuration and callback.

    Args:
        config: Detector configuration
        on_operation_complete: Callback function(operation: FileOperation) called
                             when an operation is detected. Used for streaming mode.
        registry: Optional registry for detectors (defaults to global)
    """
    self.config = config or DetectorConfig()
    self.on_operation_complete = on_operation_complete
    self.registry = registry or get_detector_registry()
    self._pending_events: list[FileEvent] = []
    self._last_flush = datetime.now()

    # Create auto-flush handler for streaming mode
    self._auto_flush_handler = AutoFlushHandler(
        time_window_ms=self.config.time_window_ms,
        on_operation_complete=on_operation_complete,
        analyze_func=self._analyze_event_group,
    )
Functions
add_event
add_event(event: FileEvent) -> None

Add event with auto-flush and callback support.

This is the recommended method for streaming detection with automatic temp file hiding and callback-based operation reporting.

Parameters:

Name Type Description Default
event FileEvent

File event to process

required
Behavior
  • Hides temp files automatically (no callback until operation completes)
  • Schedules auto-flush timer for pending operations
  • Calls on_operation_complete(operation) when pattern detected
  • Emits non-temp files immediately if no operation pattern found
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def add_event(self, event: FileEvent) -> None:
    """Add event with auto-flush and callback support.

    This is the recommended method for streaming detection with automatic
    temp file hiding and callback-based operation reporting.

    Args:
        event: File event to process

    Behavior:
        - Hides temp files automatically (no callback until operation completes)
        - Schedules auto-flush timer for pending operations
        - Calls on_operation_complete(operation) when pattern detected
        - Emits non-temp files immediately if no operation pattern found
    """
    # Delegate to auto-flush handler
    self._auto_flush_handler.add_event(event)
detect
detect(events: list[FileEvent]) -> list[FileOperation]

Detect all operations from a list of events.

Parameters:

Name Type Description Default
events list[FileEvent]

List of file events to analyze

required

Returns:

Type Description
list[FileOperation]

List of detected operations, ordered by start time

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect(self, events: list[FileEvent]) -> list[FileOperation]:
    """Detect all operations from a list of events.

    Args:
        events: List of file events to analyze

    Returns:
        List of detected operations, ordered by start time
    """
    if not events:
        return []

    # Sort events by timestamp
    sorted_events = sorted(events, key=lambda e: e.timestamp)

    # Group events by time windows
    event_groups = self._group_events_by_time(sorted_events)

    operations = []
    for group in event_groups:
        operation = self._analyze_event_group(group)
        if operation:
            operations.append(operation)

    return operations
detect_streaming
detect_streaming(event: FileEvent) -> FileOperation | None

Process events in streaming fashion.

Parameters:

Name Type Description Default
event FileEvent

Single file event

required

Returns:

Type Description
FileOperation | None

Completed operation if detected, None otherwise

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect_streaming(self, event: FileEvent) -> FileOperation | None:
    """Process events in streaming fashion.

    Args:
        event: Single file event

    Returns:
        Completed operation if detected, None otherwise
    """
    self._pending_events.append(event)

    # Check if we should flush based on time window
    now = datetime.now()
    time_since_last = (now - self._last_flush).total_seconds() * 1000

    if time_since_last >= self.config.time_window_ms:
        return self._flush_pending()

    return None
flush
flush() -> list[FileOperation]

Get any pending operations and clear buffer.

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def flush(self) -> list[FileOperation]:
    """Get any pending operations and clear buffer."""
    operations = []
    if self._pending_events:
        operation = self._flush_pending()
        if operation:
            operations.append(operation)
    return operations

OperationType

Bases: Enum

Types of detected file operations.

Functions

create_detector

create_detector(
    config: DetectorConfig | None = None,
) -> OperationDetector

Create a new operation detector instance.

Use this when you need a persistent detector for streaming detection or want custom configuration.

Parameters:

Name Type Description Default
config DetectorConfig | None

Optional detector configuration

None

Returns:

Type Description
OperationDetector

New OperationDetector instance

Examples:

>>> from provide.foundation.file.operations import create_detector, DetectorConfig
>>>
>>> # Custom configuration
>>> config = DetectorConfig(time_window_ms=1000, min_confidence=0.8)
>>> detector = create_detector(config)
>>>
>>> # Use for streaming
>>> for event in events:
...     operation = detector.detect_streaming(event)
Source code in provide/foundation/file/operations/detect.py
def create_detector(config: DetectorConfig | None = None) -> OperationDetector:
    """Create a new operation detector instance.

    Use this when you need a persistent detector for streaming detection
    or want custom configuration.

    Args:
        config: Optional detector configuration

    Returns:
        New OperationDetector instance

    Examples:
        >>> from provide.foundation.file.operations import create_detector, DetectorConfig
        >>>
        >>> # Custom configuration
        >>> config = DetectorConfig(time_window_ms=1000, min_confidence=0.8)
        >>> detector = create_detector(config)
        >>>
        >>> # Use for streaming
        >>> for event in events:
        ...     operation = detector.detect_streaming(event)
    """
    return OperationDetector(config)

detect_all

detect_all(
    events: list[FileEvent],
    config: DetectorConfig | None = None,
) -> list[FileOperation]

Detect all operations from a list of events.

Explicit function for when you always want a list result, even for single events.

Parameters:

Name Type Description Default
events list[FileEvent]

List of events to analyze

required
config DetectorConfig | None

Optional detector configuration

None

Returns:

Type Description
list[FileOperation]

List of detected operations (may be empty)

Examples:

>>> operations = detect_all(events)
>>> for op in operations:
...     print(f"{op.operation_type}: {op.primary_path}")
Source code in provide/foundation/file/operations/detect.py
def detect_all(events: list[FileEvent], config: DetectorConfig | None = None) -> list[FileOperation]:
    """Detect all operations from a list of events.

    Explicit function for when you always want a list result, even for single events.

    Args:
        events: List of events to analyze
        config: Optional detector configuration

    Returns:
        List of detected operations (may be empty)

    Examples:
        >>> operations = detect_all(events)
        >>> for op in operations:
        ...     print(f"{op.operation_type}: {op.primary_path}")
    """
    detector = _get_default_detector() if config is None else OperationDetector(config)
    return detector.detect(events)

detect_atomic_save

detect_atomic_save(
    events: list[FileEvent],
) -> FileOperation | None

Detect if events represent an atomic save operation.

Source code in provide/foundation/file/operations/utils.py
def detect_atomic_save(events: list[FileEvent]) -> FileOperation | None:
    """Detect if events represent an atomic save operation."""
    from provide.foundation.file.operations.detectors.orchestrator import OperationDetector

    detector = OperationDetector()
    operations = detector.detect(events)
    return next((op for op in operations if op.operation_type == OperationType.ATOMIC_SAVE), None)

detect_streaming

detect_streaming(
    event: FileEvent,
    detector: OperationDetector | None = None,
) -> FileOperation | None

Process a single event in streaming mode.

For real-time detection, use this with a persistent OperationDetector instance. Operations are returned when patterns are detected based on time windows.

Parameters:

Name Type Description Default
event FileEvent

Single file event to process

required
detector OperationDetector | None

Optional persistent detector instance (required for stateful detection)

None

Returns:

Type Description
FileOperation | None

Completed operation if detected, None otherwise

Examples:

>>> # Create persistent detector for streaming
>>> from provide.foundation.file.operations import OperationDetector
>>> detector = OperationDetector()
>>>
>>> # Feed events as they arrive
>>> for event in event_stream:
...     operation = detect_streaming(event, detector)
...     if operation:
...         print(f"Operation detected: {operation.operation_type}")
>>>
>>> # Flush at end
>>> remaining = detector.flush()
Note

This is a lower-level API. For most use cases, the batch detect() function is simpler and sufficient.

Source code in provide/foundation/file/operations/detect.py
def detect_streaming(
    event: FileEvent,
    detector: OperationDetector | None = None,
) -> FileOperation | None:
    """Process a single event in streaming mode.

    For real-time detection, use this with a persistent OperationDetector instance.
    Operations are returned when patterns are detected based on time windows.

    Args:
        event: Single file event to process
        detector: Optional persistent detector instance (required for stateful detection)

    Returns:
        Completed operation if detected, None otherwise

    Examples:
        >>> # Create persistent detector for streaming
        >>> from provide.foundation.file.operations import OperationDetector
        >>> detector = OperationDetector()
        >>>
        >>> # Feed events as they arrive
        >>> for event in event_stream:
        ...     operation = detect_streaming(event, detector)
        ...     if operation:
        ...         print(f"Operation detected: {operation.operation_type}")
        >>>
        >>> # Flush at end
        >>> remaining = detector.flush()

    Note:
        This is a lower-level API. For most use cases, the batch `detect()` function
        is simpler and sufficient.
    """
    if detector is None:
        detector = _get_default_detector()

    return detector.detect_streaming(event)

extract_original_path

extract_original_path(temp_path: Path) -> Path | None

Extract the original filename from a temp file path.

Source code in provide/foundation/file/operations/utils.py
def extract_original_path(temp_path: Path) -> Path | None:
    """Extract the original filename from a temp file path."""
    from provide.foundation.file.operations.detectors.helpers import extract_base_name

    base_name = extract_base_name(temp_path)
    if base_name:
        return temp_path.parent / base_name
    else:
        # If no temp pattern matches, return the original path
        return temp_path
group_related_events(
    events: list[FileEvent], time_window_ms: int = 500
) -> list[list[FileEvent]]

Group events that occur within a time window.

Source code in provide/foundation/file/operations/utils.py
def group_related_events(events: list[FileEvent], time_window_ms: int = 500) -> list[list[FileEvent]]:
    """Group events that occur within a time window."""
    from provide.foundation.file.operations.detectors.orchestrator import OperationDetector

    config = DetectorConfig(time_window_ms=time_window_ms)
    detector = OperationDetector(config)
    return detector._group_events_by_time(sorted(events, key=lambda e: e.timestamp))

is_temp_file

is_temp_file(path: Path) -> bool

Check if a path represents a temporary file.

Source code in provide/foundation/file/operations/utils.py
def is_temp_file(path: Path) -> bool:
    """Check if a path represents a temporary file."""
    from provide.foundation.file.operations.detectors.helpers import is_temp_file as helper_is_temp_file

    return helper_is_temp_file(path)