Skip to content

Orchestrator

provide.foundation.file.operations.detectors.orchestrator

File operation detector orchestrator.

Coordinates detector functions via registry to identify the best match for file events.

Classes

OperationDetector

OperationDetector(
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
)

Detects and classifies file operations from events.

Initialize with optional configuration and callback.

Parameters:

Name Type Description Default
config DetectorConfig | None

Detector configuration

None
on_operation_complete Any

Callback function(operation: FileOperation) called when an operation is detected. Used for streaming mode.

None
registry Registry | None

Optional registry for detectors (defaults to global)

None
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def __init__(
    self,
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
) -> None:
    """Initialize with optional configuration and callback.

    Args:
        config: Detector configuration
        on_operation_complete: Callback function(operation: FileOperation) called
                             when an operation is detected. Used for streaming mode.
        registry: Optional registry for detectors (defaults to global)
    """
    self.config = config or DetectorConfig()
    self.on_operation_complete = on_operation_complete
    self.registry = registry or get_detector_registry()
    self._pending_events: list[FileEvent] = []
    self._last_flush = datetime.now()

    # Create auto-flush handler for streaming mode
    self._auto_flush_handler = AutoFlushHandler(
        time_window_ms=self.config.time_window_ms,
        on_operation_complete=on_operation_complete,
        analyze_func=self._analyze_event_group,
    )
Functions
add_event
add_event(event: FileEvent) -> None

Add event with auto-flush and callback support.

This is the recommended method for streaming detection with automatic temp file hiding and callback-based operation reporting.

Parameters:

Name Type Description Default
event FileEvent

File event to process

required
Behavior
  • Hides temp files automatically (no callback until operation completes)
  • Schedules auto-flush timer for pending operations
  • Calls on_operation_complete(operation) when pattern detected
  • Emits non-temp files immediately if no operation pattern found
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def add_event(self, event: FileEvent) -> None:
    """Add event with auto-flush and callback support.

    This is the recommended method for streaming detection with automatic
    temp file hiding and callback-based operation reporting.

    Args:
        event: File event to process

    Behavior:
        - Hides temp files automatically (no callback until operation completes)
        - Schedules auto-flush timer for pending operations
        - Calls on_operation_complete(operation) when pattern detected
        - Emits non-temp files immediately if no operation pattern found
    """
    # Delegate to auto-flush handler
    self._auto_flush_handler.add_event(event)
detect
detect(events: list[FileEvent]) -> list[FileOperation]

Detect all operations from a list of events.

Parameters:

Name Type Description Default
events list[FileEvent]

List of file events to analyze

required

Returns:

Type Description
list[FileOperation]

List of detected operations, ordered by start time

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect(self, events: list[FileEvent]) -> list[FileOperation]:
    """Detect all operations from a list of events.

    Args:
        events: List of file events to analyze

    Returns:
        List of detected operations, ordered by start time
    """
    if not events:
        return []

    # Sort events by timestamp
    sorted_events = sorted(events, key=lambda e: e.timestamp)

    # Group events by time windows
    event_groups = self._group_events_by_time(sorted_events)

    operations = []
    for group in event_groups:
        operation = self._analyze_event_group(group)
        if operation:
            operations.append(operation)

    return operations
detect_streaming
detect_streaming(event: FileEvent) -> FileOperation | None

Process events in streaming fashion.

Parameters:

Name Type Description Default
event FileEvent

Single file event

required

Returns:

Type Description
FileOperation | None

Completed operation if detected, None otherwise

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect_streaming(self, event: FileEvent) -> FileOperation | None:
    """Process events in streaming fashion.

    Args:
        event: Single file event

    Returns:
        Completed operation if detected, None otherwise
    """
    self._pending_events.append(event)

    # Check if we should flush based on time window
    now = datetime.now()
    time_since_last = (now - self._last_flush).total_seconds() * 1000

    if time_since_last >= self.config.time_window_ms:
        return self._flush_pending()

    return None
flush
flush() -> list[FileOperation]

Get any pending operations and clear buffer.

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def flush(self) -> list[FileOperation]:
    """Get any pending operations and clear buffer."""
    operations = []
    if self._pending_events:
        operation = self._flush_pending()
        if operation:
            operations.append(operation)
    return operations

Functions