Skip to content

File

provide.foundation.file

TODO: Add module docstring.

Classes

DetectorConfig

Configuration for operation detection.

FileEvent

Single file system event with rich metadata.

Attributes
sequence property
sequence: int

Convenience accessor for sequence number.

size_delta property
size_delta: int | None

Change in file size, if known.

timestamp property
timestamp: datetime

Convenience accessor for timestamp.

FileEventMetadata

Rich metadata for a file event.

FileLock

FileLock(
    path: Path | str,
    timeout: float = DEFAULT_FILE_LOCK_TIMEOUT,
    check_interval: float = 0.1,
)

File-based lock for concurrent access control.

Uses exclusive file creation as the locking mechanism. The lock file contains the PID of the process holding the lock.

Thread-safe: Multiple threads can safely use the same FileLock instance. The internal thread lock protects instance state while the file lock provides inter-process synchronization.

Example

with FileLock("/tmp/myapp.lock"): # Exclusive access to resource do_something()

Initialize file lock.

Parameters:

Name Type Description Default
path Path | str

Lock file path

required
timeout float

Max seconds to wait for lock

DEFAULT_FILE_LOCK_TIMEOUT
check_interval float

Seconds between lock checks

0.1
Source code in provide/foundation/file/lock.py
def __init__(
    self,
    path: Path | str,
    timeout: float = DEFAULT_FILE_LOCK_TIMEOUT,
    check_interval: float = 0.1,
) -> None:
    """Initialize file lock.

    Args:
        path: Lock file path
        timeout: Max seconds to wait for lock
        check_interval: Seconds between lock checks

    """
    self.path = Path(path)
    self.timeout = timeout
    self.check_interval = check_interval
    self.locked = False
    self.pid = os.getpid()
    self._thread_lock = threading.RLock()  # Protect instance state from concurrent threads
Functions
__enter__
__enter__() -> FileLock

Context manager entry.

Source code in provide/foundation/file/lock.py
def __enter__(self) -> FileLock:
    """Context manager entry."""
    self.acquire()
    return self
__exit__
__exit__(
    exc_type: object, exc_val: object, exc_tb: object
) -> None

Context manager exit.

Source code in provide/foundation/file/lock.py
def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
    """Context manager exit."""
    self.release()
acquire
acquire(blocking: bool = True) -> bool

Acquire the lock.

Parameters:

Name Type Description Default
blocking bool

If True, wait for lock. If False, return immediately.

True

Returns:

Type Description
bool

True if lock acquired, False if not (non-blocking mode only)

Raises:

Type Description
LockError

If timeout exceeded (blocking mode)

Source code in provide/foundation/file/lock.py
def acquire(self, blocking: bool = True) -> bool:  # noqa: C901
    """Acquire the lock.

    Args:
        blocking: If True, wait for lock. If False, return immediately.

    Returns:
        True if lock acquired, False if not (non-blocking mode only)

    Raises:
        LockError: If timeout exceeded (blocking mode)

    """
    with self._thread_lock:
        if self.timeout <= 0:
            raise LockError("Timeout must be positive", code="INVALID_TIMEOUT", path=str(self.path))

        # If already locked by this instance, treat as re-entrant
        if self.locked:
            return True

        # Use a finite loop with hard limits to prevent any possibility of hanging
        start_time = time.time()
        end_time = start_time + self.timeout
        max_iterations = 1000  # Hard limit regardless of timeout
        iteration = 0

        while iteration < max_iterations:
            iteration += 1
            current_time = time.time()

            # Hard timeout check - exit immediately if time is up
            if current_time >= end_time:
                elapsed = current_time - start_time
                raise LockError(
                    f"Failed to acquire lock within {self.timeout}s (elapsed: {elapsed:.3f}s, iterations: {iteration})",
                    code="LOCK_TIMEOUT",
                    path=str(self.path),
                ) from None

            try:
                # Try to create lock file exclusively
                fd = os.open(str(self.path), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
                try:
                    # Write lock metadata as JSON for robust validation
                    lock_info = {
                        "pid": self.pid,
                        "hostname": socket.gethostname(),
                        "created": current_time,
                    }
                    # Add process start time for PID recycling protection (if psutil available)
                    if _HAS_PSUTIL:
                        try:
                            proc = psutil.Process(self.pid)
                            lock_info["start_time"] = proc.create_time()
                        except (psutil.NoSuchProcess, psutil.AccessDenied):
                            pass
                    os.write(fd, json_dumps(lock_info).encode())
                finally:
                    os.close(fd)

                self.locked = True
                elapsed = current_time - start_time
                log.debug(
                    "Acquired lock",
                    path=str(self.path),
                    pid=self.pid,
                    iterations=iteration,
                    elapsed=elapsed,
                )
                return True

            except FileExistsError:
                # Lock file exists, check if holder is still alive
                if self._check_stale_lock():
                    continue  # Retry after removing stale lock

                if not blocking:
                    log.debug("Lock unavailable (non-blocking)", path=str(self.path))
                    return False

                # Calculate remaining time
                remaining = end_time - current_time
                if remaining <= 0:
                    # Time is up
                    break

                # Sleep for a small fixed interval or remaining time, whichever is smaller
                sleep_time = min(0.01, remaining * 0.5)  # Never sleep more than 10ms
                if sleep_time > 0:
                    time.sleep(sleep_time)

        # If we exit the loop without acquiring the lock
        elapsed = time.time() - start_time
        raise LockError(
            f"Failed to acquire lock within {self.timeout}s (elapsed: {elapsed:.3f}s, iterations: {iteration})",
            code="LOCK_TIMEOUT",
            path=str(self.path),
        ) from None
release
release() -> None

Release the lock.

Only removes the lock file if we own it.

Source code in provide/foundation/file/lock.py
def release(self) -> None:
    """Release the lock.

    Only removes the lock file if we own it.
    """
    with self._thread_lock:
        if not self.locked:
            return

        try:
            # Verify we own the lock before removing
            if self.path.exists():
                try:
                    content = self.path.read_text().strip()
                    try:
                        lock_info = json_loads(content)
                        if isinstance(lock_info, dict):
                            owner_pid = lock_info.get("pid")
                        else:
                            owner_pid = lock_info if isinstance(lock_info, int) else None
                    except (ValueError, Exception):
                        owner_pid = int(content) if content.isdigit() else None

                    if owner_pid == self.pid:
                        self.path.unlink()
                        log.debug("Released lock", path=str(self.path), pid=self.pid)
                    else:
                        log.warning(
                            "Lock owned by different process",
                            path=str(self.path),
                            owner_pid=owner_pid,
                            our_pid=self.pid,
                        )
                except Exception as e:
                    log.warning(
                        "Error checking lock ownership",
                        path=str(self.path),
                        error=str(e),
                    )
                    # Still try to remove if we think we own it
                    if self.locked:
                        self.path.unlink()
        except FileNotFoundError:
            pass  # Lock already released
        except (OSError, PermissionError) as e:
            # Failed to unlink lock file due to permission or filesystem error
            log.error("Failed to release lock", path=str(self.path), error=str(e))
        finally:
            self.locked = False

FileOperation

A detected logical file system operation.

Attributes
duration_ms property
duration_ms: float

Total operation duration.

event_count property
event_count: int

Number of events in this operation.

Functions
get_timeline
get_timeline() -> list[tuple[float, FileEvent]]

Get events with relative timestamps (ms from start).

Source code in provide/foundation/file/operations/types.py
def get_timeline(self) -> list[tuple[float, FileEvent]]:
    """Get events with relative timestamps (ms from start)."""
    return [
        ((e.timestamp - self.start_time).total_seconds() * 1000, e)
        for e in sorted(self.events, key=lambda x: x.sequence)
    ]

LockError

LockError(
    message: str,
    *,
    lock_path: str | None = None,
    timeout: float | None = None,
    **kwargs: Any
)

Bases: FoundationError

Raised when file lock operations fail.

Parameters:

Name Type Description Default
message str

Error message describing the lock issue.

required
lock_path str | None

Optional path to the lock file.

None
timeout float | None

Optional timeout that was exceeded.

None
**kwargs Any

Additional context passed to FoundationError.

{}

Examples:

>>> raise LockError("Failed to acquire lock")
>>> raise LockError("Lock timeout", lock_path="/tmp/app.lock", timeout=30)
Source code in provide/foundation/errors/resources.py
def __init__(
    self,
    message: str,
    *,
    lock_path: str | None = None,
    timeout: float | None = None,
    **kwargs: Any,
) -> None:
    if lock_path:
        kwargs.setdefault("context", {})["lock.path"] = lock_path
    if timeout is not None:
        kwargs.setdefault("context", {})["lock.timeout"] = timeout
    super().__init__(message, **kwargs)

OperationDetector

OperationDetector(
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
)

Detects and classifies file operations from events.

Initialize with optional configuration and callback.

Parameters:

Name Type Description Default
config DetectorConfig | None

Detector configuration

None
on_operation_complete Any

Callback function(operation: FileOperation) called when an operation is detected. Used for streaming mode.

None
registry Registry | None

Optional registry for detectors (defaults to global)

None
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def __init__(
    self,
    config: DetectorConfig | None = None,
    on_operation_complete: Any = None,
    registry: Registry | None = None,
) -> None:
    """Initialize with optional configuration and callback.

    Args:
        config: Detector configuration
        on_operation_complete: Callback function(operation: FileOperation) called
                             when an operation is detected. Used for streaming mode.
        registry: Optional registry for detectors (defaults to global)
    """
    self.config = config or DetectorConfig()
    self.on_operation_complete = on_operation_complete
    self.registry = registry or get_detector_registry()
    self._pending_events: list[FileEvent] = []
    self._last_flush = datetime.now()

    # Create auto-flush handler for streaming mode
    self._auto_flush_handler = AutoFlushHandler(
        time_window_ms=self.config.time_window_ms,
        on_operation_complete=on_operation_complete,
        analyze_func=self._analyze_event_group,
    )
Functions
add_event
add_event(event: FileEvent) -> None

Add event with auto-flush and callback support.

This is the recommended method for streaming detection with automatic temp file hiding and callback-based operation reporting.

Parameters:

Name Type Description Default
event FileEvent

File event to process

required
Behavior
  • Hides temp files automatically (no callback until operation completes)
  • Schedules auto-flush timer for pending operations
  • Calls on_operation_complete(operation) when pattern detected
  • Emits non-temp files immediately if no operation pattern found
Source code in provide/foundation/file/operations/detectors/orchestrator.py
def add_event(self, event: FileEvent) -> None:
    """Add event with auto-flush and callback support.

    This is the recommended method for streaming detection with automatic
    temp file hiding and callback-based operation reporting.

    Args:
        event: File event to process

    Behavior:
        - Hides temp files automatically (no callback until operation completes)
        - Schedules auto-flush timer for pending operations
        - Calls on_operation_complete(operation) when pattern detected
        - Emits non-temp files immediately if no operation pattern found
    """
    # Delegate to auto-flush handler
    self._auto_flush_handler.add_event(event)
detect
detect(events: list[FileEvent]) -> list[FileOperation]

Detect all operations from a list of events.

Parameters:

Name Type Description Default
events list[FileEvent]

List of file events to analyze

required

Returns:

Type Description
list[FileOperation]

List of detected operations, ordered by start time

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect(self, events: list[FileEvent]) -> list[FileOperation]:
    """Detect all operations from a list of events.

    Args:
        events: List of file events to analyze

    Returns:
        List of detected operations, ordered by start time
    """
    if not events:
        return []

    # Sort events by timestamp
    sorted_events = sorted(events, key=lambda e: e.timestamp)

    # Group events by time windows
    event_groups = self._group_events_by_time(sorted_events)

    operations = []
    for group in event_groups:
        operation = self._analyze_event_group(group)
        if operation:
            operations.append(operation)

    return operations
detect_streaming
detect_streaming(event: FileEvent) -> FileOperation | None

Process events in streaming fashion.

Parameters:

Name Type Description Default
event FileEvent

Single file event

required

Returns:

Type Description
FileOperation | None

Completed operation if detected, None otherwise

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def detect_streaming(self, event: FileEvent) -> FileOperation | None:
    """Process events in streaming fashion.

    Args:
        event: Single file event

    Returns:
        Completed operation if detected, None otherwise
    """
    self._pending_events.append(event)

    # Check if we should flush based on time window
    now = datetime.now()
    time_since_last = (now - self._last_flush).total_seconds() * 1000

    if time_since_last >= self.config.time_window_ms:
        return self._flush_pending()

    return None
flush
flush() -> list[FileOperation]

Get any pending operations and clear buffer.

Source code in provide/foundation/file/operations/detectors/orchestrator.py
def flush(self) -> list[FileOperation]:
    """Get any pending operations and clear buffer."""
    operations = []
    if self._pending_events:
        operation = self._flush_pending()
        if operation:
            operations.append(operation)
    return operations

OperationType

Bases: Enum

Types of detected file operations.

Functions

align_offset

align_offset(
    offset: int, alignment: int = DEFAULT_ALIGNMENT
) -> int

Align offset to specified boundary.

Aligns an offset up to the next boundary. The alignment must be a power of 2.

Parameters:

Name Type Description Default
offset int

The offset to align (in bytes)

required
alignment int

Alignment boundary in bytes (must be power of 2)

DEFAULT_ALIGNMENT

Returns:

Type Description
int

Aligned offset (>= input offset)

Raises:

Type Description
ValueError

If alignment is not a power of 2 or is <= 0

Examples:

>>> align_offset(10, 16)
16
>>> align_offset(16, 16)
16
>>> align_offset(17, 16)
32
>>> align_offset(0, 16)
0
Notes

Uses bit manipulation for efficiency: aligned = (offset + alignment - 1) & ~(alignment - 1)

Source code in provide/foundation/file/alignment.py
def align_offset(offset: int, alignment: int = DEFAULT_ALIGNMENT) -> int:
    """Align offset to specified boundary.

    Aligns an offset up to the next boundary. The alignment must be a power of 2.

    Args:
        offset: The offset to align (in bytes)
        alignment: Alignment boundary in bytes (must be power of 2)

    Returns:
        Aligned offset (>= input offset)

    Raises:
        ValueError: If alignment is not a power of 2 or is <= 0

    Examples:
        >>> align_offset(10, 16)
        16
        >>> align_offset(16, 16)
        16
        >>> align_offset(17, 16)
        32
        >>> align_offset(0, 16)
        0

    Notes:
        Uses bit manipulation for efficiency:
        aligned = (offset + alignment - 1) & ~(alignment - 1)
    """
    if alignment <= 0 or (alignment & (alignment - 1)) != 0:
        raise ValueError(f"Alignment must be a positive power of 2, got {alignment}")

    return (offset + alignment - 1) & ~(alignment - 1)

align_to_page

align_to_page(
    offset: int, page_size: int = PAGE_SIZE_4K
) -> int

Align offset to page boundary for optimal mmap performance.

Page alignment is required for memory-mapped file operations on most systems. Common page sizes: - 4KB (4096 bytes): Most x86_64 systems, Linux, Windows - 16KB (16384 bytes): Apple Silicon (M1/M2/M3), some ARM64 systems

Parameters:

Name Type Description Default
offset int

The offset to align (in bytes)

required
page_size int

Page size in bytes (default: 4096)

PAGE_SIZE_4K

Returns:

Type Description
int

Page-aligned offset (>= input offset)

Raises:

Type Description
ValueError

If page_size is not a power of 2

Examples:

>>> align_to_page(100)
4096
>>> align_to_page(4096)
4096
>>> align_to_page(4097)
8192
>>> align_to_page(100, page_size=16384)
16384
See Also

get_system_page_size() for detecting the system's page size

Source code in provide/foundation/file/alignment.py
def align_to_page(offset: int, page_size: int = PAGE_SIZE_4K) -> int:
    """Align offset to page boundary for optimal mmap performance.

    Page alignment is required for memory-mapped file operations on most systems.
    Common page sizes:
    - 4KB (4096 bytes): Most x86_64 systems, Linux, Windows
    - 16KB (16384 bytes): Apple Silicon (M1/M2/M3), some ARM64 systems

    Args:
        offset: The offset to align (in bytes)
        page_size: Page size in bytes (default: 4096)

    Returns:
        Page-aligned offset (>= input offset)

    Raises:
        ValueError: If page_size is not a power of 2

    Examples:
        >>> align_to_page(100)
        4096
        >>> align_to_page(4096)
        4096
        >>> align_to_page(4097)
        8192
        >>> align_to_page(100, page_size=16384)
        16384

    See Also:
        get_system_page_size() for detecting the system's page size
    """
    return align_offset(offset, page_size)

atomic_replace

atomic_replace(
    path: Path | str,
    data: bytes,
    preserve_mode: bool = True,
) -> None

Replace existing file atomically, preserving permissions.

Parameters:

Name Type Description Default
path Path | str

Target file path (must exist)

required
data bytes

Binary data to write

required
preserve_mode bool

Whether to preserve file permissions

True

Raises:

Type Description
FileNotFoundError

If file doesn't exist

OSError

If file operation fails

Source code in provide/foundation/file/atomic.py
def atomic_replace(
    path: Path | str,
    data: bytes,
    preserve_mode: bool = True,
) -> None:
    """Replace existing file atomically, preserving permissions.

    Args:
        path: Target file path (must exist)
        data: Binary data to write
        preserve_mode: Whether to preserve file permissions

    Raises:
        FileNotFoundError: If file doesn't exist
        OSError: If file operation fails

    """
    path = Path(path)

    if not path.exists():
        raise FileNotFoundError(f"File does not exist: {path}")

    mode = None
    if preserve_mode:
        with contextlib.suppress(OSError):
            mode = path.stat().st_mode

    # When preserve_mode is False, we explicitly pass preserve_mode=False to atomic_write
    # and let it handle the non-preservation (atomic_write won't preserve even if file exists)
    atomic_write(path, data, mode=mode, backup=False, preserve_mode=preserve_mode)

atomic_write

atomic_write(
    path: Path | str,
    data: bytes,
    mode: int | None = None,
    backup: bool = False,
    preserve_mode: bool = True,
) -> None

Write file atomically using temp file + rename.

This ensures that the file is either fully written or not written at all, preventing partial writes or corruption.

Parameters:

Name Type Description Default
path Path | str

Target file path

required
data bytes

Binary data to write

required
mode int | None

Optional file permissions (e.g., 0o644)

None
backup bool

Create .bak file before overwrite

False
preserve_mode bool

Whether to preserve existing file permissions when mode is None

True

Raises:

Type Description
OSError

If file operation fails

Source code in provide/foundation/file/atomic.py
def atomic_write(
    path: Path | str,
    data: bytes,
    mode: int | None = None,
    backup: bool = False,
    preserve_mode: bool = True,
) -> None:
    """Write file atomically using temp file + rename.

    This ensures that the file is either fully written or not written at all,
    preventing partial writes or corruption.

    Args:
        path: Target file path
        data: Binary data to write
        mode: Optional file permissions (e.g., 0o644)
        backup: Create .bak file before overwrite
        preserve_mode: Whether to preserve existing file permissions when mode is None

    Raises:
        OSError: If file operation fails

    """
    path = Path(path)

    # Create backup if requested and file exists
    if backup and path.exists():
        backup_path = path.with_suffix(path.suffix + ".bak")
        try:
            path.rename(backup_path)
            log.debug("Created backup", backup=str(backup_path))
        except OSError as e:
            log.warning("Failed to create backup", error=str(e))

    # Ensure parent directory exists
    path.parent.mkdir(parents=True, exist_ok=True)

    # Determine final permissions before creating file (avoid race condition)
    final_mode = None
    if mode is not None:
        final_mode = mode
    elif preserve_mode and path.exists():
        # Get existing permissions
        with contextlib.suppress(OSError):
            final_mode = path.stat().st_mode

    if final_mode is None:
        # Default permissions (respecting umask on Unix, simplified on Windows)
        default_mode = 0o666
        if sys.platform == "win32":
            # Windows doesn't support umask; use default mode
            final_mode = default_mode
        else:
            # Unix: Respect umask
            current_umask = os.umask(0)
            os.umask(current_umask)
            final_mode = default_mode & ~current_umask

    # Create temp file with final permissions in a single operation (no race)
    # Use os.open() instead of secure_temp_file for atomic permission setting
    import tempfile

    temp_fd, temp_path = tempfile.mkstemp(
        dir=path.parent,
        prefix=f".{path.name}.",
        suffix=".tmp",
    )

    try:
        # Set permissions immediately on the file descriptor (atomic)
        # On Windows, fchmod has limited effect (only read-only bit)
        if sys.platform != "win32":
            os.fchmod(temp_fd, final_mode)

        # Write data
        with os.fdopen(temp_fd, "wb") as f:
            f.write(data)
            f.flush()
            os.fsync(f.fileno())

        # Atomic rename
        Path(temp_path).replace(path)

        log.debug(
            "Atomically wrote file",
            path=str(path),
            size=len(data),
            mode=oct(mode) if mode else None,
        )
    except (OSError, PermissionError) as e:
        # Clean up temp file on error
        log.error(
            "Atomic write failed, cleaning up temp file",
            path=str(path),
            temp_path=temp_path,
            error=str(e),
        )
        with contextlib.suppress(OSError):
            Path(temp_path).unlink()
        raise

atomic_write_text

atomic_write_text(
    path: Path | str,
    text: str,
    encoding: str = "utf-8",
    mode: int | None = None,
    backup: bool = False,
    preserve_mode: bool = True,
) -> None

Write text file atomically.

Parameters:

Name Type Description Default
path Path | str

Target file path

required
text str

Text content to write

required
encoding str

Text encoding (default: utf-8)

'utf-8'
mode int | None

Optional file permissions

None
backup bool

Create .bak file before overwrite

False
preserve_mode bool

Whether to preserve existing file permissions when mode is None

True

Raises:

Type Description
OSError

If file operation fails

UnicodeEncodeError

If text cannot be encoded

Source code in provide/foundation/file/atomic.py
def atomic_write_text(
    path: Path | str,
    text: str,
    encoding: str = "utf-8",
    mode: int | None = None,
    backup: bool = False,
    preserve_mode: bool = True,
) -> None:
    """Write text file atomically.

    Args:
        path: Target file path
        text: Text content to write
        encoding: Text encoding (default: utf-8)
        mode: Optional file permissions
        backup: Create .bak file before overwrite
        preserve_mode: Whether to preserve existing file permissions when mode is None

    Raises:
        OSError: If file operation fails
        UnicodeEncodeError: If text cannot be encoded

    """
    data = text.encode(encoding)
    atomic_write(path, data, mode=mode, backup=backup, preserve_mode=preserve_mode)

backup_file

backup_file(
    path: Path | str,
    suffix: str = ".bak",
    timestamp: bool = False,
) -> Path | None

Create backup copy of file.

Parameters:

Name Type Description Default
path Path | str

File to backup

required
suffix str

Backup suffix

'.bak'
timestamp bool

If True, add timestamp to backup name

False

Returns:

Type Description
Path | None

Path to backup file, or None if source doesn't exist

Source code in provide/foundation/file/utils.py
def backup_file(
    path: Path | str,
    suffix: str = ".bak",
    timestamp: bool = False,
) -> Path | None:
    """Create backup copy of file.

    Args:
        path: File to backup
        suffix: Backup suffix
        timestamp: If True, add timestamp to backup name

    Returns:
        Path to backup file, or None if source doesn't exist

    """
    path = Path(path)

    if not path.exists():
        log.debug("Source file doesn't exist, no backup created", path=str(path))
        return None

    # Build backup filename
    if timestamp:
        ts = datetime.now().strftime("%Y%m%d_%H%M%S")
        backup_path = path.with_suffix(f".{ts}{suffix}")
    else:
        backup_path = path.with_suffix(path.suffix + suffix)

        # Find unique name if backup already exists
        counter = 1
        while backup_path.exists():
            backup_path = path.with_suffix(f"{path.suffix}{suffix}.{counter}")
            counter += 1

    try:
        shutil.copy2(str(path), str(backup_path))
        log.debug("Created backup", source=str(path), backup=str(backup_path))
        return backup_path
    except Exception as e:
        log.error("Failed to create backup", path=str(path), error=str(e))
        return None

calculate_padding

calculate_padding(
    current_offset: int, alignment: int = DEFAULT_ALIGNMENT
) -> int

Calculate padding bytes needed to align to boundary.

Parameters:

Name Type Description Default
current_offset int

Current offset position (in bytes)

required
alignment int

Desired alignment boundary (in bytes)

DEFAULT_ALIGNMENT

Returns:

Type Description
int

Number of padding bytes needed (0 if already aligned)

Raises:

Type Description
ValueError

If alignment is not a power of 2 or is <= 0

Examples:

>>> calculate_padding(10, 16)
6
>>> calculate_padding(16, 16)
0
>>> calculate_padding(17, 16)
15
>>> calculate_padding(100, 64)
28
Notes

This is useful when writing binary formats where you need to insert padding bytes to maintain alignment.

Source code in provide/foundation/file/alignment.py
def calculate_padding(current_offset: int, alignment: int = DEFAULT_ALIGNMENT) -> int:
    """Calculate padding bytes needed to align to boundary.

    Args:
        current_offset: Current offset position (in bytes)
        alignment: Desired alignment boundary (in bytes)

    Returns:
        Number of padding bytes needed (0 if already aligned)

    Raises:
        ValueError: If alignment is not a power of 2 or is <= 0

    Examples:
        >>> calculate_padding(10, 16)
        6
        >>> calculate_padding(16, 16)
        0
        >>> calculate_padding(17, 16)
        15
        >>> calculate_padding(100, 64)
        28

    Notes:
        This is useful when writing binary formats where you need to insert
        padding bytes to maintain alignment.
    """
    if alignment <= 0 or (alignment & (alignment - 1)) != 0:
        raise ValueError(f"Alignment must be a positive power of 2, got {alignment}")

    aligned = align_offset(current_offset, alignment)
    return aligned - current_offset

check_disk_space

check_disk_space(
    path: Path,
    required_bytes: int,
    raise_on_insufficient: bool = True,
) -> bool

Check if sufficient disk space is available.

Parameters:

Name Type Description Default
path Path

Directory path to check (or parent if it doesn't exist)

required
required_bytes int

Number of bytes required

required
raise_on_insufficient bool

Raise OSError if insufficient space (default: True)

True

Returns:

Type Description
bool

True if sufficient space available, False otherwise

Raises:

Type Description
OSError

If insufficient space and raise_on_insufficient=True

Examples:

>>> from pathlib import Path
>>> # Check if 1GB is available
>>> check_disk_space(Path.home(), 1024**3, raise_on_insufficient=False)
True
>>> # Will raise if insufficient (default behavior)
>>> check_disk_space(Path.home(), 10**15)
Traceback (most recent call last):
    ...
OSError: Insufficient disk space...
Notes

On systems where disk space cannot be determined (e.g., Windows without proper permissions), this function logs a warning but does not fail, returning True to allow the operation to proceed.

Source code in provide/foundation/file/disk.py
def check_disk_space(
    path: Path,
    required_bytes: int,
    raise_on_insufficient: bool = True,
) -> bool:
    """Check if sufficient disk space is available.

    Args:
        path: Directory path to check (or parent if it doesn't exist)
        required_bytes: Number of bytes required
        raise_on_insufficient: Raise OSError if insufficient space (default: True)

    Returns:
        True if sufficient space available, False otherwise

    Raises:
        OSError: If insufficient space and raise_on_insufficient=True

    Examples:
        >>> from pathlib import Path
        >>> # Check if 1GB is available
        >>> check_disk_space(Path.home(), 1024**3, raise_on_insufficient=False)
        True

        >>> # Will raise if insufficient (default behavior)
        >>> check_disk_space(Path.home(), 10**15)  # doctest: +SKIP
        Traceback (most recent call last):
            ...
        OSError: Insufficient disk space...

    Notes:
        On systems where disk space cannot be determined (e.g., Windows
        without proper permissions), this function logs a warning but
        does not fail, returning True to allow the operation to proceed.
    """
    try:
        # Use parent directory if path doesn't exist yet
        check_path = path if path.exists() else path.parent

        # Get available disk space
        available = get_available_space(check_path)

        # If we can't determine space, log warning and allow operation
        if available is None:
            log.warning(
                "Could not determine disk space, operation will proceed",
                path=str(path),
                required_bytes=required_bytes,
            )
            return True

        # Convert to GB for human-readable logging
        required_gb = required_bytes / (1024**3)
        available_gb = available / (1024**3)

        log.debug(
            "Disk space requirement check",
            path=str(check_path),
            required_gb=f"{required_gb:.2f}",
            available_gb=f"{available_gb:.2f}",
            sufficient=available >= required_bytes,
        )

        # Check if sufficient space available
        if available < required_bytes:
            error_msg = (
                f"Insufficient disk space at {check_path}: "
                f"need {required_gb:.2f} GB, have {available_gb:.2f} GB"
            )

            log.error(
                "Insufficient disk space",
                path=str(check_path),
                required_gb=f"{required_gb:.2f}",
                available_gb=f"{available_gb:.2f}",
                shortfall_gb=f"{(required_bytes - available) / (1024**3):.2f}",
            )

            if raise_on_insufficient:
                raise OSError(error_msg)

            return False

        return True

    except OSError:
        # Re-raise OSError from insufficient space check
        raise
    except Exception as e:
        # Unexpected error - log but don't fail
        log.warning(
            "Unexpected error checking disk space, operation will proceed",
            path=str(path),
            error=str(e),
            error_type=type(e).__name__,
        )
        return True

detect_atomic_save

detect_atomic_save(
    events: list[FileEvent],
) -> FileOperation | None

Detect if events represent an atomic save operation.

Source code in provide/foundation/file/operations/utils.py
def detect_atomic_save(events: list[FileEvent]) -> FileOperation | None:
    """Detect if events represent an atomic save operation."""
    from provide.foundation.file.operations.detectors.orchestrator import OperationDetector

    detector = OperationDetector()
    operations = detector.detect(events)
    return next((op for op in operations if op.operation_type == OperationType.ATOMIC_SAVE), None)

ensure_dir

ensure_dir(
    path: Path | str, mode: int = 493, parents: bool = True
) -> Path

Ensure directory exists with proper permissions.

Parameters:

Name Type Description Default
path Path | str

Directory path

required
mode int

Directory permissions

493
parents bool

Create parent directories if needed

True

Returns:

Type Description
Path

Path object for the directory

Source code in provide/foundation/file/directory.py
def ensure_dir(
    path: Path | str,
    mode: int = 0o755,
    parents: bool = True,
) -> Path:
    """Ensure directory exists with proper permissions.

    Args:
        path: Directory path
        mode: Directory permissions
        parents: Create parent directories if needed

    Returns:
        Path object for the directory

    """
    path = Path(path)

    if not path.exists():
        path.mkdir(mode=mode, parents=parents, exist_ok=True)
        log.debug("Created directory", path=str(path), mode=oct(mode))
    elif not path.is_dir():
        raise NotADirectoryError(f"Path exists but is not a directory: {path}")

    return path

ensure_parent_dir

ensure_parent_dir(
    file_path: Path | str, mode: int = 493
) -> Path

Ensure parent directory of file exists.

Parameters:

Name Type Description Default
file_path Path | str

File path whose parent to ensure

required
mode int

Directory permissions

493

Returns:

Type Description
Path

Path object for the parent directory

Source code in provide/foundation/file/directory.py
def ensure_parent_dir(
    file_path: Path | str,
    mode: int = 0o755,
) -> Path:
    """Ensure parent directory of file exists.

    Args:
        file_path: File path whose parent to ensure
        mode: Directory permissions

    Returns:
        Path object for the parent directory

    """
    file_path = Path(file_path)
    parent = file_path.parent

    if parent and parent != Path():
        return ensure_dir(parent, mode=mode, parents=True)

    return parent

ensure_secure_permissions

ensure_secure_permissions(
    path: Path,
    is_executable: bool = False,
    file_mode: int = DEFAULT_FILE_PERMS,
    dir_mode: int = DEFAULT_DIR_PERMS,
    executable_mode: int = DEFAULT_EXECUTABLE_PERMS,
) -> None

Apply secure default permissions to a file or directory.

Automatically determines the appropriate permission mode based on whether the path is a file, directory, or executable.

Parameters:

Name Type Description Default
path Path

Path to file or directory

required
is_executable bool

Whether file should be executable (ignored for directories)

False
file_mode int

Permission mode for regular files

DEFAULT_FILE_PERMS
dir_mode int

Permission mode for directories

DEFAULT_DIR_PERMS
executable_mode int

Permission mode for executable files

DEFAULT_EXECUTABLE_PERMS

Examples:

>>> from pathlib import Path
>>> # Regular file gets 0o644
>>> p = Path("/tmp/file.txt")
>>> p.touch()
>>> ensure_secure_permissions(p)
>>> # Executable gets 0o755
>>> p2 = Path("/tmp/script.sh")
>>> p2.touch()
>>> ensure_secure_permissions(p2, is_executable=True)
>>> # Directory gets 0o755
>>> d = Path("/tmp/mydir")
>>> d.mkdir(exist_ok=True)
>>> ensure_secure_permissions(d)
Source code in provide/foundation/file/permissions.py
def ensure_secure_permissions(
    path: Path,
    is_executable: bool = False,
    file_mode: int = DEFAULT_FILE_PERMS,
    dir_mode: int = DEFAULT_DIR_PERMS,
    executable_mode: int = DEFAULT_EXECUTABLE_PERMS,
) -> None:
    """Apply secure default permissions to a file or directory.

    Automatically determines the appropriate permission mode based on whether
    the path is a file, directory, or executable.

    Args:
        path: Path to file or directory
        is_executable: Whether file should be executable (ignored for directories)
        file_mode: Permission mode for regular files
        dir_mode: Permission mode for directories
        executable_mode: Permission mode for executable files

    Examples:
        >>> from pathlib import Path
        >>> # Regular file gets 0o644
        >>> p = Path("/tmp/file.txt")
        >>> p.touch()
        >>> ensure_secure_permissions(p)

        >>> # Executable gets 0o755
        >>> p2 = Path("/tmp/script.sh")
        >>> p2.touch()
        >>> ensure_secure_permissions(p2, is_executable=True)

        >>> # Directory gets 0o755
        >>> d = Path("/tmp/mydir")
        >>> d.mkdir(exist_ok=True)
        >>> ensure_secure_permissions(d)
    """
    if path.is_dir():
        mode = dir_mode
    elif is_executable:
        mode = executable_mode
    else:
        mode = file_mode

    set_file_permissions(path, mode)
    log.trace(
        "Applied secure permissions",
        path=str(path),
        mode=format_permissions(mode),
        is_dir=path.is_dir(),
        is_executable=is_executable,
    )

extract_original_path

extract_original_path(temp_path: Path) -> Path | None

Extract the original filename from a temp file path.

Source code in provide/foundation/file/operations/utils.py
def extract_original_path(temp_path: Path) -> Path | None:
    """Extract the original filename from a temp file path."""
    from provide.foundation.file.operations.detectors.helpers import extract_base_name

    base_name = extract_base_name(temp_path)
    if base_name:
        return temp_path.parent / base_name
    else:
        # If no temp pattern matches, return the original path
        return temp_path

find_files

find_files(
    pattern: str,
    root: Path | str = ".",
    recursive: bool = True,
) -> list[Path]

Find files matching pattern.

Parameters:

Name Type Description Default
pattern str

Glob pattern (e.g., ".py", "**/.json")

required
root Path | str

Root directory to search from

'.'
recursive bool

If True, search recursively

True

Returns:

Type Description
list[Path]

List of matching file paths

Source code in provide/foundation/file/utils.py
def find_files(
    pattern: str,
    root: Path | str = ".",
    recursive: bool = True,
) -> list[Path]:
    """Find files matching pattern.

    Args:
        pattern: Glob pattern (e.g., "*.py", "**/*.json")
        root: Root directory to search from
        recursive: If True, search recursively

    Returns:
        List of matching file paths

    """
    root = Path(root)

    if not root.exists():
        log.warning("Search root doesn't exist", root=str(root))
        return []

    # Use glob or rglob based on recursive flag
    if recursive and "**" not in pattern:
        pattern = f"**/{pattern}"

    try:
        matches = list(root.glob(pattern)) if recursive else list(root.glob(pattern.lstrip("/")))

        # Filter to files only
        files = [p for p in matches if p.is_file()]

        log.debug("Found files", pattern=pattern, root=str(root), count=len(files))
        return files
    except Exception as e:
        log.error("Failed to find files", pattern=pattern, root=str(root), error=str(e))
        return []

format_bytes

format_bytes(num_bytes: int) -> str

Format bytes as human-readable string.

Parameters:

Name Type Description Default
num_bytes int

Number of bytes

required

Returns:

Type Description
str

Formatted string (e.g., "1.50 GB", "256.00 MB")

Examples:

>>> format_bytes(1024)
'1.00 KB'
>>> format_bytes(1024**2)
'1.00 MB'
>>> format_bytes(1536 * 1024**2)
'1.50 GB'
>>> format_bytes(500)
'500 B'
Source code in provide/foundation/file/disk.py
def format_bytes(num_bytes: int) -> str:
    """Format bytes as human-readable string.

    Args:
        num_bytes: Number of bytes

    Returns:
        Formatted string (e.g., "1.50 GB", "256.00 MB")

    Examples:
        >>> format_bytes(1024)
        '1.00 KB'
        >>> format_bytes(1024**2)
        '1.00 MB'
        >>> format_bytes(1536 * 1024**2)
        '1.50 GB'
        >>> format_bytes(500)
        '500 B'
    """
    num_bytes_float: float = float(num_bytes)
    for unit in ["B", "KB", "MB", "GB", "TB", "PB"]:
        if num_bytes_float < 1024.0 or unit == "PB":
            return f"{num_bytes_float:.2f} {unit}"
        num_bytes_float /= 1024.0
    return f"{num_bytes_float:.2f} PB"

format_permissions

format_permissions(mode: int) -> str

Format permission bits as octal string.

Parameters:

Name Type Description Default
mode int

Permission bits (can include file type bits)

required

Returns:

Type Description
str

Formatted string like "0755" (last 3 octal digits only)

Examples:

>>> format_permissions(0o755)
'0755'
>>> format_permissions(0o644)
'0644'
>>> format_permissions(493)  # 0o755 in decimal
'0755'
Source code in provide/foundation/file/permissions.py
def format_permissions(mode: int) -> str:
    """Format permission bits as octal string.

    Args:
        mode: Permission bits (can include file type bits)

    Returns:
        Formatted string like "0755" (last 3 octal digits only)

    Examples:
        >>> format_permissions(0o755)
        '0755'
        >>> format_permissions(0o644)
        '0644'
        >>> format_permissions(493)  # 0o755 in decimal
        '0755'
    """
    # Mask to only permission bits (last 9 bits = 3 octal digits)
    perms_only = mode & 0o777
    return f"0{perms_only:03o}"

get_available_space

get_available_space(path: Path) -> int | None

Get available disk space in bytes for a path.

Parameters:

Name Type Description Default
path Path

Directory path to check (uses parent if path doesn't exist)

required

Returns:

Type Description
int | None

Available bytes or None if unable to determine

Examples:

>>> from pathlib import Path
>>> space = get_available_space(Path.home())
>>> space is not None and space > 0
True
Notes

Uses os.statvfs on Unix-like systems (Linux, macOS, BSD). Returns None on Windows or if statvfs is unavailable.

Source code in provide/foundation/file/disk.py
def get_available_space(path: Path) -> int | None:
    """Get available disk space in bytes for a path.

    Args:
        path: Directory path to check (uses parent if path doesn't exist)

    Returns:
        Available bytes or None if unable to determine

    Examples:
        >>> from pathlib import Path
        >>> space = get_available_space(Path.home())
        >>> space is not None and space > 0
        True

    Notes:
        Uses os.statvfs on Unix-like systems (Linux, macOS, BSD).
        Returns None on Windows or if statvfs is unavailable.
    """
    try:
        # Use the path if it exists, otherwise use parent directory
        check_path = path if path.exists() else path.parent

        # Get filesystem statistics (Unix-like systems only)
        stat_result = os.statvfs(check_path)

        # Calculate available space: blocks available * block size
        available = stat_result.f_bavail * stat_result.f_frsize

        log.trace(
            "Disk space checked",
            path=str(check_path),
            available_bytes=available,
            available_gb=f"{available / (1024**3):.2f}",
        )

        return available

    except (AttributeError, OSError) as e:
        # AttributeError: statvfs not available (Windows)
        # OSError: permission denied or path issues
        log.debug(
            "Could not check disk space",
            path=str(path),
            error=str(e),
            error_type=type(e).__name__,
        )
        return None

get_disk_usage

get_disk_usage(path: Path) -> tuple[int, int, int] | None

Get total, used, and free disk space for a path.

Parameters:

Name Type Description Default
path Path

Directory path to check

required

Returns:

Type Description
tuple[int, int, int] | None

Tuple of (total, used, free) in bytes, or None if unable to determine

Examples:

>>> from pathlib import Path
>>> usage = get_disk_usage(Path.home())
>>> if usage:
...     total, used, free = usage
...     assert total > 0 and used >= 0 and free > 0
...     assert total >= used + free  # May have reserved space
Notes

Uses os.statvfs on Unix-like systems. Returns None on Windows or if unavailable.

Source code in provide/foundation/file/disk.py
def get_disk_usage(path: Path) -> tuple[int, int, int] | None:
    """Get total, used, and free disk space for a path.

    Args:
        path: Directory path to check

    Returns:
        Tuple of (total, used, free) in bytes, or None if unable to determine

    Examples:
        >>> from pathlib import Path
        >>> usage = get_disk_usage(Path.home())
        >>> if usage:
        ...     total, used, free = usage
        ...     assert total > 0 and used >= 0 and free > 0
        ...     assert total >= used + free  # May have reserved space

    Notes:
        Uses os.statvfs on Unix-like systems.
        Returns None on Windows or if unavailable.
    """
    try:
        check_path = path if path.exists() else path.parent

        stat_result = os.statvfs(check_path)

        # Total space: total blocks * block size
        total = stat_result.f_blocks * stat_result.f_frsize

        # Free space: free blocks * block size
        free = stat_result.f_bfree * stat_result.f_frsize

        # Used space: total - free
        used = total - free

        log.trace(
            "Disk usage retrieved",
            path=str(check_path),
            total_gb=f"{total / (1024**3):.2f}",
            used_gb=f"{used / (1024**3):.2f}",
            free_gb=f"{free / (1024**3):.2f}",
        )

        return (total, used, free)

    except (AttributeError, OSError) as e:
        log.debug(
            "Could not get disk usage",
            path=str(path),
            error=str(e),
        )
        return None

get_mtime

get_mtime(path: Path | str) -> float | None

Get modification time, None if not exists.

Parameters:

Name Type Description Default
path Path | str

File path

required

Returns:

Type Description
float | None

Modification time as timestamp, or None if doesn't exist

Source code in provide/foundation/file/utils.py
def get_mtime(path: Path | str) -> float | None:
    """Get modification time, None if not exists.

    Args:
        path: File path

    Returns:
        Modification time as timestamp, or None if doesn't exist

    """
    path = Path(path)

    try:
        return path.stat().st_mtime
    except FileNotFoundError:
        return None
    except Exception as e:
        log.warning("Failed to get modification time", path=str(path), error=str(e))
        return None

get_permissions

get_permissions(path: Path) -> int

Get current file permissions.

Parameters:

Name Type Description Default
path Path

File or directory path

required

Returns:

Type Description
int

Permission bits as integer (0 if file doesn't exist or error)

Examples:

>>> from pathlib import Path
>>> p = Path("/tmp/test.txt")
>>> p.touch()
>>> p.chmod(0o644)
>>> get_permissions(p)
420
>>> format_permissions(get_permissions(p))
'0644'
Source code in provide/foundation/file/permissions.py
def get_permissions(path: Path) -> int:
    """Get current file permissions.

    Args:
        path: File or directory path

    Returns:
        Permission bits as integer (0 if file doesn't exist or error)

    Examples:
        >>> from pathlib import Path
        >>> p = Path("/tmp/test.txt")
        >>> p.touch()
        >>> p.chmod(0o644)
        >>> get_permissions(p)
        420
        >>> format_permissions(get_permissions(p))
        '0644'
    """
    try:
        return path.stat().st_mode & 0o777
    except OSError as e:
        log.debug(
            "Could not read permissions",
            path=str(path),
            error=str(e),
        )
        return 0

get_size

get_size(path: Path | str) -> int

Get file size in bytes, 0 if not exists.

Parameters:

Name Type Description Default
path Path | str

File path

required

Returns:

Type Description
int

Size in bytes, or 0 if file doesn't exist

Source code in provide/foundation/file/utils.py
def get_size(path: Path | str) -> int:
    """Get file size in bytes, 0 if not exists.

    Args:
        path: File path

    Returns:
        Size in bytes, or 0 if file doesn't exist

    """
    path = Path(path)

    try:
        return path.stat().st_size
    except FileNotFoundError:
        return 0
    except Exception as e:
        log.warning("Failed to get file size", path=str(path), error=str(e))
        return 0

get_system_page_size

get_system_page_size() -> int

Get the system's page size.

Returns:

Type Description
int

Page size in bytes (typically 4096 or 16384)

Examples:

>>> size = get_system_page_size()
>>> size in (4096, 16384, 8192, 65536)
True
Notes

Uses os.sysconf('SC_PAGE_SIZE') on Unix-like systems. Falls back to PAGE_SIZE_4K if detection fails.

Source code in provide/foundation/file/alignment.py
def get_system_page_size() -> int:
    """Get the system's page size.

    Returns:
        Page size in bytes (typically 4096 or 16384)

    Examples:
        >>> size = get_system_page_size()
        >>> size in (4096, 16384, 8192, 65536)
        True

    Notes:
        Uses os.sysconf('SC_PAGE_SIZE') on Unix-like systems.
        Falls back to PAGE_SIZE_4K if detection fails.
    """
    import os

    try:
        # Unix-like systems
        return os.sysconf("SC_PAGE_SIZE")
    except (AttributeError, ValueError, OSError):
        # Fallback to common default
        return PAGE_SIZE_4K
group_related_events(
    events: list[FileEvent], time_window_ms: int = 500
) -> list[list[FileEvent]]

Group events that occur within a time window.

Source code in provide/foundation/file/operations/utils.py
def group_related_events(events: list[FileEvent], time_window_ms: int = 500) -> list[list[FileEvent]]:
    """Group events that occur within a time window."""
    from provide.foundation.file.operations.detectors.orchestrator import OperationDetector

    config = DetectorConfig(time_window_ms=time_window_ms)
    detector = OperationDetector(config)
    return detector._group_events_by_time(sorted(events, key=lambda e: e.timestamp))

is_aligned

is_aligned(
    offset: int, alignment: int = DEFAULT_ALIGNMENT
) -> bool

Check if offset is aligned to boundary.

Parameters:

Name Type Description Default
offset int

The offset to check (in bytes)

required
alignment int

Alignment boundary in bytes

DEFAULT_ALIGNMENT

Returns:

Type Description
bool

True if offset is aligned to the boundary

Raises:

Type Description
ValueError

If alignment is not a power of 2 or is <= 0

Examples:

>>> is_aligned(16, 16)
True
>>> is_aligned(17, 16)
False
>>> is_aligned(0, 16)
True
>>> is_aligned(4096, 4096)
True
Notes

Uses bit manipulation for efficiency: is_aligned = (offset & (alignment - 1)) == 0

Source code in provide/foundation/file/alignment.py
def is_aligned(offset: int, alignment: int = DEFAULT_ALIGNMENT) -> bool:
    """Check if offset is aligned to boundary.

    Args:
        offset: The offset to check (in bytes)
        alignment: Alignment boundary in bytes

    Returns:
        True if offset is aligned to the boundary

    Raises:
        ValueError: If alignment is not a power of 2 or is <= 0

    Examples:
        >>> is_aligned(16, 16)
        True
        >>> is_aligned(17, 16)
        False
        >>> is_aligned(0, 16)
        True
        >>> is_aligned(4096, 4096)
        True

    Notes:
        Uses bit manipulation for efficiency:
        is_aligned = (offset & (alignment - 1)) == 0
    """
    if alignment <= 0 or (alignment & (alignment - 1)) != 0:
        raise ValueError(f"Alignment must be a positive power of 2, got {alignment}")

    return (offset & (alignment - 1)) == 0

is_power_of_two

is_power_of_two(value: int) -> bool

Check if a value is a power of 2.

Parameters:

Name Type Description Default
value int

Value to check

required

Returns:

Type Description
bool

True if value is a power of 2

Examples:

>>> is_power_of_two(16)
True
>>> is_power_of_two(17)
False
>>> is_power_of_two(4096)
True
>>> is_power_of_two(0)
False
Notes

Uses bit manipulation: (value & (value - 1)) == 0

Source code in provide/foundation/file/alignment.py
def is_power_of_two(value: int) -> bool:
    """Check if a value is a power of 2.

    Args:
        value: Value to check

    Returns:
        True if value is a power of 2

    Examples:
        >>> is_power_of_two(16)
        True
        >>> is_power_of_two(17)
        False
        >>> is_power_of_two(4096)
        True
        >>> is_power_of_two(0)
        False

    Notes:
        Uses bit manipulation: (value & (value - 1)) == 0
    """
    return value > 0 and (value & (value - 1)) == 0

is_temp_file

is_temp_file(path: Path) -> bool

Check if a path represents a temporary file.

Source code in provide/foundation/file/operations/utils.py
def is_temp_file(path: Path) -> bool:
    """Check if a path represents a temporary file."""
    from provide.foundation.file.operations.detectors.helpers import is_temp_file as helper_is_temp_file

    return helper_is_temp_file(path)

parse_permissions

parse_permissions(
    perms_str: str | None, default: int = DEFAULT_FILE_PERMS
) -> int

Parse permission string to octal integer.

Accepts various permission string formats: - Octal with prefix: "0o755", "0755" - Octal without prefix: "755" - Integer strings: "493" (decimal for 0o755)

Parameters:

Name Type Description Default
perms_str str | None

Permission string (e.g., "0755", "755", "0o755")

required
default int

Default permissions if parsing fails

DEFAULT_FILE_PERMS

Returns:

Type Description
int

Permission as integer (e.g., 0o755 = 493)

Examples:

>>> parse_permissions("0755")
493
>>> parse_permissions("0o755")
493
>>> parse_permissions("755")
493
>>> parse_permissions(None)
420
>>> parse_permissions("invalid")
420
Source code in provide/foundation/file/permissions.py
def parse_permissions(perms_str: str | None, default: int = DEFAULT_FILE_PERMS) -> int:
    """Parse permission string to octal integer.

    Accepts various permission string formats:
    - Octal with prefix: "0o755", "0755"
    - Octal without prefix: "755"
    - Integer strings: "493" (decimal for 0o755)

    Args:
        perms_str: Permission string (e.g., "0755", "755", "0o755")
        default: Default permissions if parsing fails

    Returns:
        Permission as integer (e.g., 0o755 = 493)

    Examples:
        >>> parse_permissions("0755")
        493
        >>> parse_permissions("0o755")
        493
        >>> parse_permissions("755")
        493
        >>> parse_permissions(None)
        420
        >>> parse_permissions("invalid")
        420
    """
    if not perms_str:
        return default

    try:
        # Remove leading '0o' or '0' prefix if present
        cleaned = perms_str.strip()
        if cleaned.startswith("0o"):
            cleaned = cleaned[2:]
        elif cleaned.startswith("0") and len(cleaned) > 1:
            cleaned = cleaned[1:]

        # Try parsing as octal
        return int(cleaned, 8)
    except (ValueError, TypeError):
        log.warning(
            "Invalid permission string, using default",
            perms_str=perms_str,
            default=oct(default),
        )
        return default

read_json

read_json(
    path: Path | str,
    default: Any = None,
    encoding: str = "utf-8",
) -> Any

Read JSON file with error handling.

Parameters:

Name Type Description Default
path Path | str

JSON file path

required
default Any

Default value if file doesn't exist or is invalid

None
encoding str

Text encoding

'utf-8'

Returns:

Type Description
Any

Parsed JSON data or default value

Source code in provide/foundation/file/formats.py
def read_json(
    path: Path | str,
    default: Any = None,
    encoding: str = "utf-8",
) -> Any:
    """Read JSON file with error handling.

    Args:
        path: JSON file path
        default: Default value if file doesn't exist or is invalid
        encoding: Text encoding

    Returns:
        Parsed JSON data or default value

    """
    content = safe_read_text(path, default="", encoding=encoding)

    if not content:
        log.debug("Empty or missing JSON file, returning default", path=str(path))
        return default

    try:
        return json_loads(content)
    except Exception as e:
        log.warning("Invalid JSON file", path=str(path), error=str(e))
        return default

read_toml

read_toml(
    path: Path | str,
    default: Any = None,
    encoding: str = "utf-8",
) -> dict[str, Any]

Read TOML file with error handling.

Parameters:

Name Type Description Default
path Path | str

TOML file path

required
default Any

Default value if file doesn't exist or is invalid

None
encoding str

Text encoding

'utf-8'

Returns:

Type Description
dict[str, Any]

Parsed TOML data or default value

Source code in provide/foundation/file/formats.py
def read_toml(
    path: Path | str,
    default: Any = None,
    encoding: str = "utf-8",
) -> dict[str, Any]:
    """Read TOML file with error handling.

    Args:
        path: TOML file path
        default: Default value if file doesn't exist or is invalid
        encoding: Text encoding

    Returns:
        Parsed TOML data or default value

    """
    content = safe_read_text(path, default="", encoding=encoding)

    if not content:
        log.debug("Empty or missing TOML file, returning default", path=str(path))
        return default if default is not None else {}

    try:
        return toml_loads(content)
    except Exception as e:
        log.warning("Invalid TOML file", path=str(path), error=str(e))
        return default if default is not None else {}

read_yaml

read_yaml(
    path: Path | str,
    default: Any = None,
    encoding: str = "utf-8",
) -> Any

Read YAML file with error handling.

Parameters:

Name Type Description Default
path Path | str

YAML file path

required
default Any

Default value if file doesn't exist or is invalid

None
encoding str

Text encoding

'utf-8'

Returns:

Type Description
Any

Parsed YAML data or default value

Source code in provide/foundation/file/formats.py
def read_yaml(
    path: Path | str,
    default: Any = None,
    encoding: str = "utf-8",
) -> Any:
    """Read YAML file with error handling.

    Args:
        path: YAML file path
        default: Default value if file doesn't exist or is invalid
        encoding: Text encoding

    Returns:
        Parsed YAML data or default value

    """
    try:
        import yaml  # noqa: F401
    except ImportError:
        log.warning("PyYAML not installed, returning default")
        return default

    content = safe_read_text(path, default="", encoding=encoding)

    if not content:
        log.debug("Empty or missing YAML file, returning default", path=str(path))
        return default

    try:
        return yaml_loads(content)
    except Exception as e:
        log.warning("Invalid YAML file", path=str(path), error=str(e))
        return default

safe_copy

safe_copy(
    src: Path | str,
    dst: Path | str,
    overwrite: bool = False,
    preserve_mode: bool = True,
) -> None

Copy file safely with metadata preservation.

Parameters:

Name Type Description Default
src Path | str

Source file path

required
dst Path | str

Destination file path

required
overwrite bool

Whether to overwrite existing destination

False
preserve_mode bool

Whether to preserve file permissions

True

Raises:

Type Description
FileNotFoundError

If source doesn't exist

FileExistsError

If destination exists and overwrite=False

OSError

If copy operation fails

Source code in provide/foundation/file/safe.py
def safe_copy(
    src: Path | str,
    dst: Path | str,
    overwrite: bool = False,
    preserve_mode: bool = True,
) -> None:
    """Copy file safely with metadata preservation.

    Args:
        src: Source file path
        dst: Destination file path
        overwrite: Whether to overwrite existing destination
        preserve_mode: Whether to preserve file permissions

    Raises:
        FileNotFoundError: If source doesn't exist
        FileExistsError: If destination exists and overwrite=False
        OSError: If copy operation fails

    """
    src = Path(src)
    dst = Path(dst)

    if not src.exists():
        raise FileNotFoundError(f"Source file does not exist: {src}")

    if dst.exists() and not overwrite:
        raise FileExistsError(f"Destination already exists: {dst}")

    # Ensure destination directory exists
    dst.parent.mkdir(parents=True, exist_ok=True)

    try:
        if preserve_mode:
            shutil.copy2(str(src), str(dst))
        else:
            shutil.copy(str(src), str(dst))
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().debug("Copied file", src=str(src), dst=str(dst))
    except Exception as e:
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().error("Failed to copy file", src=str(src), dst=str(dst), error=str(e))
        raise

safe_delete

safe_delete(
    path: Path | str, missing_ok: bool = True
) -> bool

Delete file safely.

Parameters:

Name Type Description Default
path Path | str

File to delete

required
missing_ok bool

If True, don't raise error if file doesn't exist

True

Returns:

Type Description
bool

True if deleted, False if didn't exist

Raises:

Type Description
OSError

If deletion fails and file exists

Source code in provide/foundation/file/safe.py
def safe_delete(
    path: Path | str,
    missing_ok: bool = True,
) -> bool:
    """Delete file safely.

    Args:
        path: File to delete
        missing_ok: If True, don't raise error if file doesn't exist

    Returns:
        True if deleted, False if didn't exist

    Raises:
        OSError: If deletion fails and file exists

    """
    path = Path(path)

    try:
        path.unlink()
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().debug("Deleted file", path=str(path))
        return True
    except FileNotFoundError:
        if missing_ok:
            from provide.foundation.hub.foundation import get_foundation_logger

            get_foundation_logger().debug("File already absent", path=str(path))
            return False
        raise
    except Exception as e:
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().error("Failed to delete file", path=str(path), error=str(e))
        raise

safe_move

safe_move(
    src: Path | str,
    dst: Path | str,
    overwrite: bool = False,
) -> None

Move file safely with optional overwrite.

Parameters:

Name Type Description Default
src Path | str

Source file path

required
dst Path | str

Destination file path

required
overwrite bool

Whether to overwrite existing destination

False

Raises:

Type Description
FileNotFoundError

If source doesn't exist

FileExistsError

If destination exists and overwrite=False

OSError

If move operation fails

Source code in provide/foundation/file/safe.py
def safe_move(
    src: Path | str,
    dst: Path | str,
    overwrite: bool = False,
) -> None:
    """Move file safely with optional overwrite.

    Args:
        src: Source file path
        dst: Destination file path
        overwrite: Whether to overwrite existing destination

    Raises:
        FileNotFoundError: If source doesn't exist
        FileExistsError: If destination exists and overwrite=False
        OSError: If move operation fails

    """
    src = Path(src)
    dst = Path(dst)

    if not src.exists():
        raise FileNotFoundError(f"Source file does not exist: {src}")

    if dst.exists() and not overwrite:
        raise FileExistsError(f"Destination already exists: {dst}")

    # Ensure destination directory exists
    dst.parent.mkdir(parents=True, exist_ok=True)

    try:
        shutil.move(str(src), str(dst))
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().debug("Moved file", src=str(src), dst=str(dst))
    except Exception as e:
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().error("Failed to move file", src=str(src), dst=str(dst), error=str(e))
        raise

safe_read

safe_read(
    path: Path | str,
    default: bytes | None = None,
    encoding: str | None = None,
) -> bytes | str | None

Read file safely, returning default if not found.

Parameters:

Name Type Description Default
path Path | str

File to read

required
default bytes | None

Value to return if file doesn't exist

None
encoding str | None

If provided, decode bytes to str

None

Returns:

Type Description
bytes | str | None

File contents or default value

Source code in provide/foundation/file/safe.py
def safe_read(
    path: Path | str,
    default: bytes | None = None,
    encoding: str | None = None,
) -> bytes | str | None:
    """Read file safely, returning default if not found.

    Args:
        path: File to read
        default: Value to return if file doesn't exist
        encoding: If provided, decode bytes to str

    Returns:
        File contents or default value

    """
    path = Path(path)

    try:
        data = path.read_bytes()
        if encoding:
            return data.decode(encoding)
        return data
    except FileNotFoundError:
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().debug("File not found, returning default", path=str(path))
        if default is not None and encoding:
            return default.decode(encoding) if isinstance(default, bytes) else default
        return default
    except Exception as e:
        from provide.foundation.hub.foundation import get_foundation_logger

        get_foundation_logger().warning("Failed to read file", path=str(path), error=str(e))
        return default

safe_read_text

safe_read_text(
    path: Path | str,
    default: str = "",
    encoding: str = "utf-8",
) -> str

Read text file safely with default.

Parameters:

Name Type Description Default
path Path | str

File to read

required
default str

Default text if file doesn't exist

''
encoding str

Text encoding

'utf-8'

Returns:

Type Description
str

File contents or default text

Source code in provide/foundation/file/safe.py
def safe_read_text(
    path: Path | str,
    default: str = "",
    encoding: str = "utf-8",
) -> str:
    """Read text file safely with default.

    Args:
        path: File to read
        default: Default text if file doesn't exist
        encoding: Text encoding

    Returns:
        File contents or default text

    """
    result = safe_read(path, default=default.encode(encoding), encoding=encoding)
    return result if isinstance(result, str) else default

safe_rmtree

safe_rmtree(
    path: Path | str, missing_ok: bool = True
) -> bool

Remove directory tree safely.

Parameters:

Name Type Description Default
path Path | str

Directory to remove

required
missing_ok bool

If True, don't raise error if doesn't exist

True

Returns:

Type Description
bool

True if removed, False if didn't exist

Raises:

Type Description
OSError

If removal fails and directory exists

Source code in provide/foundation/file/directory.py
@resilient(fallback=False, suppress=(FileNotFoundError,) if False else ())
def safe_rmtree(
    path: Path | str,
    missing_ok: bool = True,
) -> bool:
    """Remove directory tree safely.

    Args:
        path: Directory to remove
        missing_ok: If True, don't raise error if doesn't exist

    Returns:
        True if removed, False if didn't exist

    Raises:
        OSError: If removal fails and directory exists

    """
    path = Path(path)

    if path.exists():
        shutil.rmtree(path)
        log.debug("Removed directory tree", path=str(path))
        return True
    if missing_ok:
        log.debug("Directory already absent", path=str(path))
        return False
    raise FileNotFoundError(f"Directory does not exist: {path}")

secure_temp_file

secure_temp_file(
    suffix: str = DEFAULT_TEMP_SUFFIX,
    prefix: str = DEFAULT_TEMP_PREFIX,
    dir: Path | str | None = None,
) -> tuple[int, Path]

Create a secure temporary file with restricted permissions.

This is similar to tempfile.mkstemp but uses Foundation's defaults. The file is created with permissions 0o600 (owner read/write only).

Use this when you need: - Direct file descriptor access (for os.fdopen, os.fsync, etc.) - Atomic file operations - Maximum security (restricted permissions)

Parameters:

Name Type Description Default
suffix str

File suffix

DEFAULT_TEMP_SUFFIX
prefix str

File name prefix

DEFAULT_TEMP_PREFIX
dir Path | str | None

Directory for the temp file (None = system temp)

None

Returns:

Type Description
tuple[int, Path]

Tuple of (file_descriptor, Path) - caller must close the fd

Example

fd, path = secure_temp_file(suffix='.tmp') try: ... with os.fdopen(fd, 'wb') as f: ... f.write(b'data') ... os.fsync(f.fileno()) ... finally: ... path.unlink(missing_ok=True)

Source code in provide/foundation/file/temp.py
def secure_temp_file(
    suffix: str = DEFAULT_TEMP_SUFFIX,
    prefix: str = DEFAULT_TEMP_PREFIX,
    dir: Path | str | None = None,
) -> tuple[int, Path]:
    """Create a secure temporary file with restricted permissions.

    This is similar to tempfile.mkstemp but uses Foundation's defaults.
    The file is created with permissions 0o600 (owner read/write only).

    Use this when you need:
    - Direct file descriptor access (for os.fdopen, os.fsync, etc.)
    - Atomic file operations
    - Maximum security (restricted permissions)

    Args:
        suffix: File suffix
        prefix: File name prefix
        dir: Directory for the temp file (None = system temp)

    Returns:
        Tuple of (file_descriptor, Path) - caller must close the fd

    Example:
        >>> fd, path = secure_temp_file(suffix='.tmp')
        >>> try:
        ...     with os.fdopen(fd, 'wb') as f:
        ...         f.write(b'data')
        ...         os.fsync(f.fileno())
        ... finally:
        ...     path.unlink(missing_ok=True)

    """
    if dir and isinstance(dir, Path):
        dir = str(dir)

    fd, temp_path = tempfile.mkstemp(suffix=suffix, prefix=prefix, dir=dir)
    return fd, Path(temp_path)

set_file_permissions

set_file_permissions(path: Path, mode: int) -> None

Set file permissions safely with error handling.

Parameters:

Name Type Description Default
path Path

File or directory path

required
mode int

Unix permission mode (e.g., 0o755)

required

Raises:

Type Description
OSError

If setting permissions fails on the underlying filesystem

Examples:

>>> from pathlib import Path
>>> p = Path("/tmp/test.txt")
>>> p.touch()
>>> set_file_permissions(p, 0o644)
Source code in provide/foundation/file/permissions.py
def set_file_permissions(path: Path, mode: int) -> None:
    """Set file permissions safely with error handling.

    Args:
        path: File or directory path
        mode: Unix permission mode (e.g., 0o755)

    Raises:
        OSError: If setting permissions fails on the underlying filesystem

    Examples:
        >>> from pathlib import Path
        >>> p = Path("/tmp/test.txt")
        >>> p.touch()
        >>> set_file_permissions(p, 0o644)
    """
    try:
        Path(path).chmod(mode)
        log.trace(
            "Set file permissions",
            path=str(path),
            mode=format_permissions(mode),
        )
    except OSError as e:
        log.warning(
            "Could not set permissions",
            path=str(path),
            mode=format_permissions(mode),
            error=str(e),
        )
        raise

system_temp_dir

system_temp_dir() -> Path

Get the operating system's temporary directory.

Returns:

Type Description
Path

Path to the OS temp directory

Example

temp_path = system_temp_dir() print(temp_path) # e.g., /tmp or C:\Users...\Temp

Source code in provide/foundation/file/temp.py
def system_temp_dir() -> Path:
    """Get the operating system's temporary directory.

    Returns:
        Path to the OS temp directory

    Example:
        >>> temp_path = system_temp_dir()
        >>> print(temp_path)  # e.g., /tmp or C:\\Users\\...\\Temp

    """
    return Path(tempfile.gettempdir())

temp_dir

temp_dir(
    prefix: str = DEFAULT_TEMP_PREFIX,
    cleanup: bool = DEFAULT_TEMP_CLEANUP,
) -> Generator[Path, None, None]

Create temporary directory with automatic cleanup.

Parameters:

Name Type Description Default
prefix str

Directory name prefix

DEFAULT_TEMP_PREFIX
cleanup bool

Whether to remove directory on exit

DEFAULT_TEMP_CLEANUP

Yields:

Type Description
Path

Path object for the temporary directory

Example

with temp_dir() as tmpdir: ... (tmpdir / 'data.txt').write_text('content') ... process_directory(tmpdir)

Source code in provide/foundation/file/temp.py
@contextmanager
def temp_dir(
    prefix: str = DEFAULT_TEMP_PREFIX,
    cleanup: bool = DEFAULT_TEMP_CLEANUP,
) -> Generator[Path, None, None]:
    """Create temporary directory with automatic cleanup.

    Args:
        prefix: Directory name prefix
        cleanup: Whether to remove directory on exit

    Yields:
        Path object for the temporary directory

    Example:
        >>> with temp_dir() as tmpdir:
        ...     (tmpdir / 'data.txt').write_text('content')
        ...     process_directory(tmpdir)

    """
    temp_path = None
    try:
        temp_path = Path(tempfile.mkdtemp(prefix=prefix))
        log.debug("Created temp directory", path=str(temp_path))
        yield temp_path
    finally:
        if cleanup and temp_path and temp_path.exists():
            with error_boundary(Exception, reraise=False):
                shutil.rmtree(temp_path)
                # Safe logging - catch ValueError/OSError for closed file streams during test teardown
                with contextlib.suppress(ValueError, OSError):
                    log.debug("Cleaned up temp directory", path=str(temp_path))

temp_file

temp_file(
    suffix: str = DEFAULT_TEMP_SUFFIX,
    prefix: str = DEFAULT_TEMP_PREFIX,
    dir: Path | str | None = None,
    text: bool = DEFAULT_TEMP_TEXT_MODE,
    cleanup: bool = DEFAULT_TEMP_CLEANUP,
) -> Generator[Path, None, None]

Create a temporary file with automatic cleanup.

Parameters:

Name Type Description Default
suffix str

File suffix (e.g., '.txt', '.json')

DEFAULT_TEMP_SUFFIX
prefix str

File name prefix

DEFAULT_TEMP_PREFIX
dir Path | str | None

Directory for the temp file (None = system temp)

None
text bool

Whether to open in text mode

DEFAULT_TEMP_TEXT_MODE
cleanup bool

Whether to remove file on exit

DEFAULT_TEMP_CLEANUP

Yields:

Type Description
Path

Path object for the temporary file

Example

with temp_file(suffix='.json') as tmp: ... tmp.write_text('{"key": "value"}') ... process_file(tmp)

Source code in provide/foundation/file/temp.py
@contextmanager
def temp_file(
    suffix: str = DEFAULT_TEMP_SUFFIX,
    prefix: str = DEFAULT_TEMP_PREFIX,
    dir: Path | str | None = None,
    text: bool = DEFAULT_TEMP_TEXT_MODE,
    cleanup: bool = DEFAULT_TEMP_CLEANUP,
) -> Generator[Path, None, None]:
    """Create a temporary file with automatic cleanup.

    Args:
        suffix: File suffix (e.g., '.txt', '.json')
        prefix: File name prefix
        dir: Directory for the temp file (None = system temp)
        text: Whether to open in text mode
        cleanup: Whether to remove file on exit

    Yields:
        Path object for the temporary file

    Example:
        >>> with temp_file(suffix='.json') as tmp:
        ...     tmp.write_text('{"key": "value"}')
        ...     process_file(tmp)

    """
    temp_path = None
    try:
        if dir and isinstance(dir, Path):
            dir = str(dir)

        # Create temp file and immediately close it
        with tempfile.NamedTemporaryFile(
            suffix=suffix,
            prefix=prefix,
            dir=dir,
            delete=False,
            mode="w" if text else "wb",
        ) as f:
            temp_path = Path(f.name)

        log.debug("Created temp file", path=str(temp_path))
        yield temp_path

    finally:
        if cleanup and temp_path and temp_path.exists():
            with error_boundary(Exception, reraise=False):
                safe_delete(temp_path, missing_ok=True)
                # Safe logging - catch ValueError/OSError for closed file streams during test teardown
                with contextlib.suppress(ValueError, OSError):
                    log.debug("Cleaned up temp file", path=str(temp_path))

touch

touch(
    path: Path | str, mode: int = 420, exist_ok: bool = True
) -> None

Create empty file or update timestamp.

Parameters:

Name Type Description Default
path Path | str

File path

required
mode int

File permissions for new files

420
exist_ok bool

If False, raise error if file exists

True

Raises:

Type Description
FileExistsError

If exist_ok=False and file exists

Source code in provide/foundation/file/utils.py
def touch(
    path: Path | str,
    mode: int = 0o644,
    exist_ok: bool = True,
) -> None:
    """Create empty file or update timestamp.

    Args:
        path: File path
        mode: File permissions for new files
        exist_ok: If False, raise error if file exists

    Raises:
        FileExistsError: If exist_ok=False and file exists

    """
    path = Path(path)

    if path.exists() and not exist_ok:
        raise FileExistsError(f"File already exists: {path}")

    # Ensure parent directory exists
    path.parent.mkdir(parents=True, exist_ok=True)

    # Touch the file
    path.touch(mode=mode, exist_ok=exist_ok)
    log.debug("Touched file", path=str(path))

write_json

write_json(
    path: Path | str,
    data: Any,
    indent: int = 2,
    sort_keys: bool = False,
    atomic: bool = True,
    encoding: str = "utf-8",
) -> None

Write JSON file, optionally atomic.

Parameters:

Name Type Description Default
path Path | str

JSON file path

required
data Any

Data to serialize

required
indent int

Indentation level (None for compact)

2
sort_keys bool

Whether to sort dictionary keys

False
atomic bool

Use atomic write

True
encoding str

Text encoding

'utf-8'
Source code in provide/foundation/file/formats.py
def write_json(
    path: Path | str,
    data: Any,
    indent: int = 2,
    sort_keys: bool = False,
    atomic: bool = True,
    encoding: str = "utf-8",
) -> None:
    """Write JSON file, optionally atomic.

    Args:
        path: JSON file path
        data: Data to serialize
        indent: Indentation level (None for compact)
        sort_keys: Whether to sort dictionary keys
        atomic: Use atomic write
        encoding: Text encoding

    """
    path = Path(path)

    try:
        content = json_dumps(data, indent=indent, sort_keys=sort_keys, ensure_ascii=False)

        if atomic:
            atomic_write_text(path, content, encoding=encoding)
        else:
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(content, encoding=encoding)

        log.debug("Wrote JSON file", path=str(path), atomic=atomic)
    except Exception as e:
        log.error("Failed to write JSON file", path=str(path), error=str(e))
        raise

write_toml

write_toml(
    path: Path | str,
    data: dict[str, Any],
    atomic: bool = True,
    encoding: str = "utf-8",
) -> None

Write TOML file, optionally atomic.

Parameters:

Name Type Description Default
path Path | str

TOML file path

required
data dict[str, Any]

Data to serialize (must be a dictionary)

required
atomic bool

Use atomic write

True
encoding str

Text encoding

'utf-8'
Source code in provide/foundation/file/formats.py
def write_toml(
    path: Path | str,
    data: dict[str, Any],
    atomic: bool = True,
    encoding: str = "utf-8",
) -> None:
    """Write TOML file, optionally atomic.

    Args:
        path: TOML file path
        data: Data to serialize (must be a dictionary)
        atomic: Use atomic write
        encoding: Text encoding

    """
    try:
        import tomli_w  # noqa: F401
    except ImportError as e:
        raise ImportError("tomli-w is required for TOML write operations") from e

    path = Path(path)

    try:
        content = toml_dumps(data)

        if atomic:
            atomic_write_text(path, content, encoding=encoding)
        else:
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(content, encoding=encoding)

        log.debug("Wrote TOML file", path=str(path), atomic=atomic)
    except Exception as e:
        log.error("Failed to write TOML file", path=str(path), error=str(e))
        raise

write_yaml

write_yaml(
    path: Path | str,
    data: Any,
    atomic: bool = True,
    encoding: str = "utf-8",
    default_flow_style: bool = False,
) -> None

Write YAML file, optionally atomic.

Parameters:

Name Type Description Default
path Path | str

YAML file path

required
data Any

Data to serialize

required
atomic bool

Use atomic write

True
encoding str

Text encoding

'utf-8'
default_flow_style bool

Use flow style (JSON-like) instead of block style

False
Source code in provide/foundation/file/formats.py
def write_yaml(
    path: Path | str,
    data: Any,
    atomic: bool = True,
    encoding: str = "utf-8",
    default_flow_style: bool = False,
) -> None:
    """Write YAML file, optionally atomic.

    Args:
        path: YAML file path
        data: Data to serialize
        atomic: Use atomic write
        encoding: Text encoding
        default_flow_style: Use flow style (JSON-like) instead of block style

    """
    try:
        import yaml  # noqa: F401
    except ImportError as e:
        raise ImportError("PyYAML is required for YAML operations") from e

    path = Path(path)

    try:
        content = yaml_dumps(
            data,
            default_flow_style=default_flow_style,
            allow_unicode=True,
            sort_keys=False,
        )

        if atomic:
            atomic_write_text(path, content, encoding=encoding)
        else:
            path.parent.mkdir(parents=True, exist_ok=True)
            path.write_text(content, encoding=encoding)

        log.debug("Wrote YAML file", path=str(path), atomic=atomic)
    except Exception as e:
        log.error("Failed to write YAML file", path=str(path), error=str(e))
        raise