Skip to content

File Watching

Learn how to monitor files and directories for changes in real-time.

๐Ÿค– AI-Generated Content

This documentation was generated with AI assistance and is still being audited. Some, or potentially a lot, of this information may be inaccurate. Learn more.

Overview

Foundation provides file watching capabilities to detect when files are created, modified, or deleted.

Basic File Watching

from provide.foundation.file.operations import FileOperationDetector
from provide.foundation import logger

async def watch_config():
    """Watch configuration file for changes."""
    detector = FileOperationDetector()

    async for event in detector.watch("config.yaml"):
        logger.info(
            "config_changed",
            file=event.file_path,
            operation=event.operation_type
        )
        # Reload configuration
        reload_config()

Watch Multiple Files

files_to_watch = [
    "config.yaml",
    "secrets.env",
    "database.json"
]

async for event in detector.watch_multiple(files_to_watch):
    logger.info("file_changed", file=event.file_path)
    handle_change(event)

Watch Directory

async def watch_directory():
    """Watch all files in a directory."""
    detector = FileOperationDetector()

    async for event in detector.watch_directory("./config"):
        if event.operation_type == "created":
            logger.info("new_file", file=event.file_path)
        elif event.operation_type == "modified":
            logger.info("file_updated", file=event.file_path)
        elif event.operation_type == "deleted":
            logger.warning("file_removed", file=event.file_path)

Filter by File Type

from pathlib import Path

async def watch_yaml_files():
    """Watch only YAML files."""
    detector = FileOperationDetector()

    async for event in detector.watch_directory("./"):
        path = Path(event.file_path)
        if path.suffix in [".yaml", ".yml"]:
            logger.info("yaml_changed", file=path.name)
            process_yaml(path)

Debouncing Changes

Handle rapid successive changes:

import asyncio
from collections import defaultdict

async def watch_with_debounce():
    """Debounce file changes to avoid processing too frequently."""
    detector = FileOperationDetector()
    pending_changes = defaultdict(asyncio.Event)

    async def process_after_delay(file_path: str):
        """Process file after 500ms of no changes."""
        await asyncio.sleep(0.5)
        logger.info("processing_file", file=file_path)
        process_file(file_path)

    async for event in detector.watch_directory("./watched"):
        # Cancel pending task if exists
        if event.file_path in pending_changes:
            pending_changes[event.file_path].set()

        # Start new debounced task
        event = asyncio.Event()
        pending_changes[event.file_path] = event
        asyncio.create_task(process_after_delay(event.file_path))

Streaming File Detection

Detect when files are being actively written:

async def detect_streaming():
    """Detect when files are being streamed/written."""
    detector = FileOperationDetector()

    async for event in detector.detect_streaming_operations("logfile.txt"):
        if event.is_streaming:
            logger.debug("file_being_written", file=event.file_path)
        else:
            logger.info("file_write_complete", file=event.file_path)
            # Safe to process now
            process_complete_file(event.file_path)

Hot Reload Pattern

Automatically reload configuration on changes:

from provide.foundation.file.operations import FileOperationDetector
from provide.foundation.serialization import provide_loads
from pathlib import Path

class ConfigWatcher:
    def __init__(self, config_file: str):
        self.config_file = Path(config_file)
        self.config = {}
        self.detector = FileOperationDetector()

    async def start(self):
        """Start watching and reload on changes."""
        # Load initial config
        self.reload()

        # Watch for changes
        async for event in self.detector.watch(self.config_file):
            logger.info("config_reloaded")
            self.reload()

    def reload(self):
        """Reload configuration from file."""
        content = self.config_file.read_text()
        self.config = provide_loads(content)

# Usage
watcher = ConfigWatcher("config.json")
asyncio.run(watcher.start())

Stop Watching

detector = FileOperationDetector()

# Watch in background task
watch_task = asyncio.create_task(
    watch_files(detector)
)

# Later, stop watching
watch_task.cancel()
await detector.stop()

Next Steps


See also: examples/file_operations/02_streaming_detection.py