Skip to content

Lock

provide.foundation.file.lock

TODO: Add module docstring.

Classes

FileLock

FileLock(
    path: Path | str,
    timeout: float = DEFAULT_FILE_LOCK_TIMEOUT,
    check_interval: float = 0.1,
)

File-based lock for concurrent access control.

Uses exclusive file creation as the locking mechanism. The lock file contains the PID of the process holding the lock.

Thread-safe: Multiple threads can safely use the same FileLock instance. The internal thread lock protects instance state while the file lock provides inter-process synchronization.

Example

with FileLock("/tmp/myapp.lock"): # Exclusive access to resource do_something()

Initialize file lock.

Parameters:

Name Type Description Default
path Path | str

Lock file path

required
timeout float

Max seconds to wait for lock

DEFAULT_FILE_LOCK_TIMEOUT
check_interval float

Seconds between lock checks

0.1
Source code in provide/foundation/file/lock.py
def __init__(
    self,
    path: Path | str,
    timeout: float = DEFAULT_FILE_LOCK_TIMEOUT,
    check_interval: float = 0.1,
) -> None:
    """Initialize file lock.

    Args:
        path: Lock file path
        timeout: Max seconds to wait for lock
        check_interval: Seconds between lock checks

    """
    self.path = Path(path)
    self.timeout = timeout
    self.check_interval = check_interval
    self.locked = False
    self.pid = os.getpid()
    self._thread_lock = threading.RLock()  # Protect instance state from concurrent threads
Functions
__enter__
__enter__() -> FileLock

Context manager entry.

Source code in provide/foundation/file/lock.py
def __enter__(self) -> FileLock:
    """Context manager entry."""
    self.acquire()
    return self
__exit__
__exit__(
    exc_type: object, exc_val: object, exc_tb: object
) -> None

Context manager exit.

Source code in provide/foundation/file/lock.py
def __exit__(self, exc_type: object, exc_val: object, exc_tb: object) -> None:
    """Context manager exit."""
    self.release()
acquire
acquire(blocking: bool = True) -> bool

Acquire the lock.

Parameters:

Name Type Description Default
blocking bool

If True, wait for lock. If False, return immediately.

True

Returns:

Type Description
bool

True if lock acquired, False if not (non-blocking mode only)

Raises:

Type Description
LockError

If timeout exceeded (blocking mode)

Source code in provide/foundation/file/lock.py
def acquire(self, blocking: bool = True) -> bool:  # noqa: C901
    """Acquire the lock.

    Args:
        blocking: If True, wait for lock. If False, return immediately.

    Returns:
        True if lock acquired, False if not (non-blocking mode only)

    Raises:
        LockError: If timeout exceeded (blocking mode)

    """
    with self._thread_lock:
        if self.timeout <= 0:
            raise LockError("Timeout must be positive", code="INVALID_TIMEOUT", path=str(self.path))

        # If already locked by this instance, treat as re-entrant
        if self.locked:
            return True

        # Use a finite loop with hard limits to prevent any possibility of hanging
        start_time = time.time()
        end_time = start_time + self.timeout
        max_iterations = 1000  # Hard limit regardless of timeout
        iteration = 0

        while iteration < max_iterations:
            iteration += 1
            current_time = time.time()

            # Hard timeout check - exit immediately if time is up
            if current_time >= end_time:
                elapsed = current_time - start_time
                raise LockError(
                    f"Failed to acquire lock within {self.timeout}s (elapsed: {elapsed:.3f}s, iterations: {iteration})",
                    code="LOCK_TIMEOUT",
                    path=str(self.path),
                ) from None

            try:
                # Try to create lock file exclusively
                fd = os.open(str(self.path), os.O_CREAT | os.O_EXCL | os.O_WRONLY, 0o644)
                try:
                    # Write lock metadata as JSON for robust validation
                    lock_info = {
                        "pid": self.pid,
                        "hostname": socket.gethostname(),
                        "created": current_time,
                    }
                    # Add process start time for PID recycling protection (if psutil available)
                    if _HAS_PSUTIL:
                        try:
                            proc = psutil.Process(self.pid)
                            lock_info["start_time"] = proc.create_time()
                        except (psutil.NoSuchProcess, psutil.AccessDenied):
                            pass
                    os.write(fd, json_dumps(lock_info).encode())
                finally:
                    os.close(fd)

                self.locked = True
                elapsed = current_time - start_time
                log.debug(
                    "Acquired lock",
                    path=str(self.path),
                    pid=self.pid,
                    iterations=iteration,
                    elapsed=elapsed,
                )
                return True

            except FileExistsError:
                # Lock file exists, check if holder is still alive
                if self._check_stale_lock():
                    continue  # Retry after removing stale lock

                if not blocking:
                    log.debug("Lock unavailable (non-blocking)", path=str(self.path))
                    return False

                # Calculate remaining time
                remaining = end_time - current_time
                if remaining <= 0:
                    # Time is up
                    break

                # Sleep for a small fixed interval or remaining time, whichever is smaller
                sleep_time = min(0.01, remaining * 0.5)  # Never sleep more than 10ms
                if sleep_time > 0:
                    time.sleep(sleep_time)

        # If we exit the loop without acquiring the lock
        elapsed = time.time() - start_time
        raise LockError(
            f"Failed to acquire lock within {self.timeout}s (elapsed: {elapsed:.3f}s, iterations: {iteration})",
            code="LOCK_TIMEOUT",
            path=str(self.path),
        ) from None
release
release() -> None

Release the lock.

Only removes the lock file if we own it.

Source code in provide/foundation/file/lock.py
def release(self) -> None:
    """Release the lock.

    Only removes the lock file if we own it.
    """
    with self._thread_lock:
        if not self.locked:
            return

        try:
            # Verify we own the lock before removing
            if self.path.exists():
                try:
                    content = self.path.read_text().strip()
                    try:
                        lock_info = json_loads(content)
                        if isinstance(lock_info, dict):
                            owner_pid = lock_info.get("pid")
                        else:
                            owner_pid = lock_info if isinstance(lock_info, int) else None
                    except (ValueError, Exception):
                        owner_pid = int(content) if content.isdigit() else None

                    if owner_pid == self.pid:
                        self.path.unlink()
                        log.debug("Released lock", path=str(self.path), pid=self.pid)
                    else:
                        log.warning(
                            "Lock owned by different process",
                            path=str(self.path),
                            owner_pid=owner_pid,
                            our_pid=self.pid,
                        )
                except Exception as e:
                    log.warning(
                        "Error checking lock ownership",
                        path=str(self.path),
                        error=str(e),
                    )
                    # Still try to remove if we think we own it
                    if self.locked:
                        self.path.unlink()
        except FileNotFoundError:
            pass  # Lock already released
        except (OSError, PermissionError) as e:
            # Failed to unlink lock file due to permission or filesystem error
            log.error("Failed to release lock", path=str(self.path), error=str(e))
        finally:
            self.locked = False

LockError

LockError(
    message: str,
    *,
    lock_path: str | None = None,
    timeout: float | None = None,
    **kwargs: Any
)

Bases: FoundationError

Raised when file lock operations fail.

Parameters:

Name Type Description Default
message str

Error message describing the lock issue.

required
lock_path str | None

Optional path to the lock file.

None
timeout float | None

Optional timeout that was exceeded.

None
**kwargs Any

Additional context passed to FoundationError.

{}

Examples:

>>> raise LockError("Failed to acquire lock")
>>> raise LockError("Lock timeout", lock_path="/tmp/app.lock", timeout=30)
Source code in provide/foundation/errors/resources.py
def __init__(
    self,
    message: str,
    *,
    lock_path: str | None = None,
    timeout: float | None = None,
    **kwargs: Any,
) -> None:
    if lock_path:
        kwargs.setdefault("context", {})["lock.path"] = lock_path
    if timeout is not None:
        kwargs.setdefault("context", {})["lock.timeout"] = timeout
    super().__init__(message, **kwargs)

Functions