File Watching¶
Learn how to monitor files and directories for changes in real-time.
Overview¶
Foundation provides file watching capabilities to detect when files are created, modified, or deleted.
Basic File Watching¶
from provide.foundation.file.operations import FileOperationDetector
from provide.foundation import logger
async def watch_config():
"""Watch configuration file for changes."""
detector = FileOperationDetector()
async for event in detector.watch("config.yaml"):
logger.info(
"config_changed",
file=event.file_path,
operation=event.operation_type
)
# Reload configuration
reload_config()
Watch Multiple Files¶
files_to_watch = [
"config.yaml",
"secrets.env",
"database.json"
]
async for event in detector.watch_multiple(files_to_watch):
logger.info("file_changed", file=event.file_path)
handle_change(event)
Watch Directory¶
async def watch_directory():
"""Watch all files in a directory."""
detector = FileOperationDetector()
async for event in detector.watch_directory("./config"):
if event.operation_type == "created":
logger.info("new_file", file=event.file_path)
elif event.operation_type == "modified":
logger.info("file_updated", file=event.file_path)
elif event.operation_type == "deleted":
logger.warning("file_removed", file=event.file_path)
Filter by File Type¶
from pathlib import Path
async def watch_yaml_files():
"""Watch only YAML files."""
detector = FileOperationDetector()
async for event in detector.watch_directory("./"):
path = Path(event.file_path)
if path.suffix in [".yaml", ".yml"]:
logger.info("yaml_changed", file=path.name)
process_yaml(path)
Debouncing Changes¶
Handle rapid successive changes:
import asyncio
from collections import defaultdict
async def watch_with_debounce():
"""Debounce file changes to avoid processing too frequently."""
detector = FileOperationDetector()
pending_changes = defaultdict(asyncio.Event)
async def process_after_delay(file_path: str):
"""Process file after 500ms of no changes."""
await asyncio.sleep(0.5)
logger.info("processing_file", file=file_path)
process_file(file_path)
async for event in detector.watch_directory("./watched"):
# Cancel pending task if exists
if event.file_path in pending_changes:
pending_changes[event.file_path].set()
# Start new debounced task
event = asyncio.Event()
pending_changes[event.file_path] = event
asyncio.create_task(process_after_delay(event.file_path))
Streaming File Detection¶
Detect when files are being actively written:
async def detect_streaming():
"""Detect when files are being streamed/written."""
detector = FileOperationDetector()
async for event in detector.detect_streaming_operations("logfile.txt"):
if event.is_streaming:
logger.debug("file_being_written", file=event.file_path)
else:
logger.info("file_write_complete", file=event.file_path)
# Safe to process now
process_complete_file(event.file_path)
Hot Reload Pattern¶
Automatically reload configuration on changes:
from provide.foundation.file.operations import FileOperationDetector
from provide.foundation.serialization import provide_loads
from pathlib import Path
class ConfigWatcher:
def __init__(self, config_file: str):
self.config_file = Path(config_file)
self.config = {}
self.detector = FileOperationDetector()
async def start(self):
"""Start watching and reload on changes."""
# Load initial config
self.reload()
# Watch for changes
async for event in self.detector.watch(self.config_file):
logger.info("config_reloaded")
self.reload()
def reload(self):
"""Reload configuration from file."""
content = self.config_file.read_text()
self.config = provide_loads(content)
# Usage
watcher = ConfigWatcher("config.json")
asyncio.run(watcher.start())
Stop Watching¶
detector = FileOperationDetector()
# Watch in background task
watch_task = asyncio.create_task(
watch_files(detector)
)
# Later, stop watching
watch_task.cancel()
await detector.stop()
Next Steps¶
- Atomic Writes - Safe file writes
- API Reference: File Operations - Complete file operations API
See also: examples/file_operations/02_streaming_detection.py