Detect safe write pattern (backup original, write new, cleanup).
Common pattern: create backup -> modify original OR rename original to backup -> create new
Source code in provide/foundation/file/operations/detectors/atomic.py
| def detect_safe_write(self, events: list[FileEvent]) -> FileOperation | None:
"""Detect safe write pattern (backup original, write new, cleanup).
Common pattern: create backup -> modify original OR rename original to backup -> create new
"""
if len(events) < 2:
return None
# Find backup files and match them with original files
backup_events = []
regular_events = []
for event in events:
if is_backup_file(event.path):
backup_events.append(event)
else:
regular_events.append(event)
# Try to match backup files with regular files
for backup_event in backup_events:
if backup_event.event_type not in {"moved", "created"}:
continue
# Extract base name from backup
base_name = extract_base_name(backup_event.path)
if not base_name:
continue
backup_parent = backup_event.path.parent
expected_original = backup_parent / base_name
# Find matching original file events
matching_events = [
e
for e in regular_events
if e.path == expected_original and e.event_type in {"created", "modified"}
]
if matching_events:
# Found safe write pattern
original_event = matching_events[0]
all_events = [backup_event, original_event]
all_events.sort(key=lambda e: e.timestamp)
return FileOperation(
operation_type=OperationType.SAFE_WRITE,
primary_path=original_event.path,
events=all_events,
confidence=0.95,
description=f"Safe write to {original_event.path.name}",
start_time=all_events[0].timestamp,
end_time=all_events[-1].timestamp,
is_atomic=False,
is_safe=True,
has_backup=True,
files_affected=[original_event.path],
metadata={
"backup_file": str(backup_event.path),
"pattern": "safe_write",
},
)
return None
|