Skip to content

Auto flush

provide.foundation.file.operations.detectors.auto_flush

Auto-flush handler for streaming file operation detection.

Handles automatic flushing of pending events after a time window, with temp file filtering and operation emission callbacks.

Classes

AutoFlushHandler

AutoFlushHandler(
    time_window_ms: float,
    on_operation_complete: Any = None,
    analyze_func: Any = None,
)

Handles automatic flushing of pending events with temp file filtering.

Thread-safe: Uses internal locking to protect shared state from concurrent access. Multiple threads can safely call add_event() simultaneously.

Initialize auto-flush handler.

Parameters:

Name Type Description Default
time_window_ms float

Time window in milliseconds for event grouping

required
on_operation_complete Any

Callback function(operation: FileOperation)

None
analyze_func Any

Function to analyze event groups and detect operations

None
Source code in provide/foundation/file/operations/detectors/auto_flush.py
def __init__(
    self,
    time_window_ms: float,
    on_operation_complete: Any = None,
    analyze_func: Any = None,
) -> None:
    """Initialize auto-flush handler.

    Args:
        time_window_ms: Time window in milliseconds for event grouping
        on_operation_complete: Callback function(operation: FileOperation)
        analyze_func: Function to analyze event groups and detect operations
    """
    self.time_window_ms = time_window_ms
    self.on_operation_complete = on_operation_complete
    self.analyze_func = analyze_func
    self._pending_events: list[FileEvent] = []
    self._last_flush = datetime.now()
    self._flush_timer: Any = None  # asyncio.TimerHandle or threading.Timer
    self._lock = threading.RLock()  # Protect shared state from concurrent threads
    self._failed_operations: list[FileOperation] = []  # Queue for retry on callback failure
    self._no_loop_buffer: list[FileOperation] = []  # Buffer when no event loop available
    self._currently_retrying: set[int] = set()  # Track operations being retried to prevent infinite loops
Attributes
failed_operations_count property
failed_operations_count: int

Get count of failed operations awaiting retry.

Thread-safe: Uses internal locking.

pending_events property
pending_events: list[FileEvent]

Get pending events (read-only access).

Thread-safe: Returns a copy to prevent external modification.

Functions
add_event
add_event(event: FileEvent) -> None

Add event and schedule auto-flush.

Thread-safe: Uses internal locking for concurrent add_event() calls.

Parameters:

Name Type Description Default
event FileEvent

File event to buffer for processing

required
Source code in provide/foundation/file/operations/detectors/auto_flush.py
def add_event(self, event: FileEvent) -> None:
    """Add event and schedule auto-flush.

    Thread-safe: Uses internal locking for concurrent add_event() calls.

    Args:
        event: File event to buffer for processing
    """
    with self._lock:
        self._pending_events.append(event)

        # Check if this is a temp file
        is_temp = is_temp_file(event.path) or (event.dest_path and is_temp_file(event.dest_path))

        log.trace(
            "Event added to auto-flush buffer",
            path=str(event.path),
            dest_path=str(event.dest_path) if event.dest_path else None,
            is_temp=is_temp,
            pending_count=len(self._pending_events),
        )

        # Schedule auto-flush timer
        self._schedule_auto_flush()
clear
clear() -> None

Clear pending events and cancel timer.

Thread-safe: Uses internal locking.

Source code in provide/foundation/file/operations/detectors/auto_flush.py
def clear(self) -> None:
    """Clear pending events and cancel timer.

    Thread-safe: Uses internal locking.
    """
    with self._lock:
        self._pending_events.clear()
        if self._flush_timer:
            if isinstance(self._flush_timer, threading.Timer):
                self._flush_timer.cancel()
            else:
                # asyncio.TimerHandle
                self._flush_timer.cancel()
            self._flush_timer = None
clear_failed_operations
clear_failed_operations() -> int

Clear all failed operations (data loss - use carefully).

Thread-safe: Uses internal locking.

Returns:

Type Description
int

Number of operations that were cleared

Source code in provide/foundation/file/operations/detectors/auto_flush.py
def clear_failed_operations(self) -> int:
    """Clear all failed operations (data loss - use carefully).

    Thread-safe: Uses internal locking.

    Returns:
        Number of operations that were cleared
    """
    with self._lock:
        count = len(self._failed_operations)
        self._failed_operations.clear()
        if count > 0:
            log.warning(f"Cleared {count} failed operations - data loss!")
        return count
get_failed_operations
get_failed_operations() -> list[FileOperation]

Get copy of failed operations list for inspection.

Thread-safe: Returns a copy.

Source code in provide/foundation/file/operations/detectors/auto_flush.py
def get_failed_operations(self) -> list[FileOperation]:
    """Get copy of failed operations list for inspection.

    Thread-safe: Returns a copy.
    """
    with self._lock:
        return self._failed_operations.copy()
retry_failed_operations
retry_failed_operations() -> int

Retry failed operations.

Thread-safe: Uses internal locking.

Returns:

Type Description
int

Number of operations successfully retried

Source code in provide/foundation/file/operations/detectors/auto_flush.py
def retry_failed_operations(self) -> int:
    """Retry failed operations.

    Thread-safe: Uses internal locking.

    Returns:
        Number of operations successfully retried
    """
    with self._lock:
        if not self._failed_operations:
            return 0

        retry_count = 0
        remaining = []

        # Mark all operations as being retried to prevent infinite loop
        for operation in self._failed_operations:
            self._currently_retrying.add(id(operation))

        try:
            for operation in self._failed_operations:
                if self._emit_operation_safe(operation):
                    retry_count += 1
                    log.info(
                        "Retry successful",
                        operation_type=operation.operation_type.value,
                        primary_file=operation.primary_path.name,
                    )
                else:
                    # Still failing, keep for next retry
                    remaining.append(operation)

            self._failed_operations = remaining

            if retry_count > 0:
                log.info(f"Retried {retry_count} failed operations, {len(remaining)} still pending")

            return retry_count
        finally:
            # Clear retry tracking
            self._currently_retrying.clear()
schedule_flush
schedule_flush() -> None

Schedule auto-flush timer (public interface).

Thread-safe: Uses internal locking.

Source code in provide/foundation/file/operations/detectors/auto_flush.py
def schedule_flush(self) -> None:
    """Schedule auto-flush timer (public interface).

    Thread-safe: Uses internal locking.
    """
    with self._lock:
        self._schedule_auto_flush()

Functions