Skip to content

Tools

provide.foundation.tools

TODO: Add module docstring.

Classes

BaseToolManager

BaseToolManager(config: BaseConfig)

Bases: ABC

Abstract base class for tool managers.

Provides common functionality for downloading, verifying, and installing development tools. Subclasses must implement platform-specific logic.

Attributes:

Name Type Description
config

Configuration object.

tool_name str

Name of the tool being managed.

executable_name str

Name of the executable file.

supported_platforms list[str]

List of supported platforms.

Initialize the tool manager.

Parameters:

Name Type Description Default
config BaseConfig

Configuration object containing settings.

required
Source code in provide/foundation/tools/base.py
def __init__(self, config: BaseConfig) -> None:
    """Initialize the tool manager.

    Args:
        config: Configuration object containing settings.

    """
    if not self.tool_name:
        raise ToolError("Subclass must define tool_name")
    if not self.executable_name:
        raise ToolError("Subclass must define executable_name")

    self.config = config

    # Lazy-load components to avoid circular imports
    self._cache: ToolCache | None = None
    self._downloader: ToolDownloader | None = None
    self._verifier: ToolVerifier | None = None
    self._installer: ToolInstaller | None = None
    self._resolver: VersionResolver | None = None

    log.debug(f"Initialized {self.tool_name} manager")
Attributes
cache property
cache: ToolCache

Get or create cache instance.

downloader property
downloader: ToolDownloader

Get or create downloader instance.

installer property
installer: ToolInstaller

Get or create installer instance.

resolver property
resolver: VersionResolver

Get or create version resolver instance.

verifier property
verifier: ToolVerifier

Get or create verifier instance.

Functions
get_available_versions abstractmethod
get_available_versions() -> list[str]

Get list of available versions from upstream.

Returns:

Type Description
list[str]

List of version strings available for download.

Source code in provide/foundation/tools/base.py
@abstractmethod
def get_available_versions(self) -> list[str]:
    """Get list of available versions from upstream.

    Returns:
        List of version strings available for download.

    """
get_install_path
get_install_path(version: str) -> Path

Get the installation path for a version.

Parameters:

Name Type Description Default
version str

Version string.

required

Returns:

Type Description
Path

Path where the version is/will be installed.

Source code in provide/foundation/tools/base.py
def get_install_path(self, version: str) -> Path:
    """Get the installation path for a version.

    Args:
        version: Version string.

    Returns:
        Path where the version is/will be installed.

    """
    base_path = Path.home() / ".provide-foundation" / "tools" / self.tool_name / version
    return base_path
get_metadata abstractmethod
get_metadata(version: str) -> ToolMetadata

Get metadata for a specific version.

Parameters:

Name Type Description Default
version str

Version string to get metadata for.

required

Returns:

Type Description
ToolMetadata

ToolMetadata object with download URLs and checksums.

Source code in provide/foundation/tools/base.py
@abstractmethod
def get_metadata(self, version: str) -> ToolMetadata:
    """Get metadata for a specific version.

    Args:
        version: Version string to get metadata for.

    Returns:
        ToolMetadata object with download URLs and checksums.

    """
get_platform_info
get_platform_info() -> dict[str, str]

Get current platform information.

Returns:

Type Description
dict[str, str]

Dictionary with platform and arch keys.

Source code in provide/foundation/tools/base.py
def get_platform_info(self) -> dict[str, str]:
    """Get current platform information.

    Returns:
        Dictionary with platform and arch keys.

    """
    import platform

    system = platform.system().lower()
    if system == "darwin":
        system = "darwin"
    elif system == "linux":
        system = "linux"
    elif system == "windows":
        system = "windows"

    machine = platform.machine().lower()
    if machine in ["x86_64", "amd64"]:
        arch = "amd64"
    elif machine in ["aarch64", "arm64"]:
        arch = "arm64"
    else:
        arch = machine

    return {"platform": system, "arch": arch}
install async
install(
    version: str = "latest", force: bool = False
) -> Path

Install a specific version of the tool.

Parameters:

Name Type Description Default
version str

Version to install (default: "latest").

'latest'
force bool

Force reinstall even if cached.

False

Returns:

Type Description
Path

Path to the installed tool.

Raises:

Type Description
ToolInstallError

If installation fails.

Source code in provide/foundation/tools/base.py
async def install(self, version: str = "latest", force: bool = False) -> Path:
    """Install a specific version of the tool.

    Args:
        version: Version to install (default: "latest").
        force: Force reinstall even if cached.

    Returns:
        Path to the installed tool.

    Raises:
        ToolInstallError: If installation fails.

    """
    # Resolve version
    if version in ["latest", "stable", "dev"] or version.startswith(("~", "^")):
        version = self.resolve_version(version)

    # Check cache unless forced
    if not force and (cached_path := self.cache.get(self.tool_name, version)):
        log.info(f"Using cached {self.tool_name} {version}")
        return cached_path

    log.info(f"Installing {self.tool_name} {version}")

    # Get metadata
    metadata = self.get_metadata(version)
    if not metadata.download_url:
        raise ToolInstallError(f"No download URL for {self.tool_name} {version}")

    # Download to secure temporary directory
    from provide.foundation.file.temp import system_temp_dir

    download_path = system_temp_dir() / f"{self.tool_name}-{version}"
    artifact_path = await self.downloader.download_with_progress(
        metadata.download_url,
        download_path,
        metadata.checksum,
    )

    # Verify if checksum provided
    if metadata.checksum and not self.verifier.verify_checksum(artifact_path, metadata.checksum):
        artifact_path.unlink()
        raise ToolVerificationError(f"Checksum verification failed for {self.tool_name} {version}")

    # Install
    install_path = self.installer.install(artifact_path, metadata)

    # Cache the installation
    self.cache.store(self.tool_name, version, install_path)

    # Clean up download
    if artifact_path.exists():
        artifact_path.unlink()

    log.info(f"Successfully installed {self.tool_name} {version} to {install_path}")
    return install_path
is_installed
is_installed(version: str) -> bool

Check if a version is installed.

Parameters:

Name Type Description Default
version str

Version to check.

required

Returns:

Type Description
bool

True if installed, False otherwise.

Source code in provide/foundation/tools/base.py
def is_installed(self, version: str) -> bool:
    """Check if a version is installed.

    Args:
        version: Version to check.

    Returns:
        True if installed, False otherwise.

    """
    install_path = self.get_install_path(version)
    executable = install_path / "bin" / self.executable_name
    return executable.exists()
resolve_version
resolve_version(spec: str) -> str

Resolve a version specification to a concrete version.

Parameters:

Name Type Description Default
spec str

Version specification (e.g., "latest", "~1.5.0").

required

Returns:

Type Description
str

Concrete version string.

Raises:

Type Description
ToolNotFoundError

If version cannot be resolved.

Source code in provide/foundation/tools/base.py
def resolve_version(self, spec: str) -> str:
    """Resolve a version specification to a concrete version.

    Args:
        spec: Version specification (e.g., "latest", "~1.5.0").

    Returns:
        Concrete version string.

    Raises:
        ToolNotFoundError: If version cannot be resolved.

    """
    available = self.get_available_versions()
    if not available:
        raise ToolNotFoundError(f"No versions available for {self.tool_name}")

    resolved = self.resolver.resolve(spec, available)
    if not resolved:
        raise ToolNotFoundError(f"Cannot resolve version '{spec}' for {self.tool_name}")

    log.debug(f"Resolved {self.tool_name} version {spec} to {resolved}")
    return resolved
uninstall
uninstall(version: str) -> bool

Uninstall a specific version.

Parameters:

Name Type Description Default
version str

Version to uninstall.

required

Returns:

Type Description
bool

True if uninstalled, False if not found.

Source code in provide/foundation/tools/base.py
def uninstall(self, version: str) -> bool:
    """Uninstall a specific version.

    Args:
        version: Version to uninstall.

    Returns:
        True if uninstalled, False if not found.

    """
    # Invalidate cache
    self.cache.invalidate(self.tool_name, version)

    # Remove from filesystem
    install_path = self.get_install_path(version)
    if install_path.exists():
        import shutil

        shutil.rmtree(install_path)
        log.info(f"Uninstalled {self.tool_name} {version}")
        return True

    return False

ToolCache

ToolCache(cache_dir: Path | None = None)

Cache for installed tools with TTL support.

Tracks installed tool locations and expiration times to avoid unnecessary re-downloads and installations.

Initialize the cache.

Parameters:

Name Type Description Default
cache_dir Path | None

Cache directory (defaults to ~/.provide-foundation/cache).

None
Source code in provide/foundation/tools/cache.py
def __init__(self, cache_dir: Path | None = None) -> None:
    """Initialize the cache.

    Args:
        cache_dir: Cache directory (defaults to ~/.provide-foundation/cache).

    """
    self.cache_dir = cache_dir or (Path.home() / ".provide-foundation" / "cache")
    self.cache_dir.mkdir(parents=True, exist_ok=True)

    self.metadata_file = self.cache_dir / "metadata.json"
    self.metadata = self._load_metadata()
Functions
clear
clear() -> None

Clear all cache entries.

Source code in provide/foundation/tools/cache.py
def clear(self) -> None:
    """Clear all cache entries."""
    self.metadata = {}
    self._save_metadata()
    log.info("Cleared tool cache")
get
get(tool: str, version: str) -> Path | None

Get cached tool path if valid.

Parameters:

Name Type Description Default
tool str

Tool name.

required
version str

Tool version.

required

Returns:

Type Description
Path | None

Path to cached tool if valid, None otherwise.

Source code in provide/foundation/tools/cache.py
def get(self, tool: str, version: str) -> Path | None:
    """Get cached tool path if valid.

    Args:
        tool: Tool name.
        version: Tool version.

    Returns:
        Path to cached tool if valid, None otherwise.

    """
    key = f"{tool}:{version}"

    if entry := self.metadata.get(key):
        path = Path(entry["path"])

        # Check if path exists
        if not path.exists():
            log.debug(f"Cache miss: {key} path doesn't exist")
            self.invalidate(tool, version)
            return None

        # Check if expired
        if self._is_expired(entry):
            log.debug(f"Cache miss: {key} expired")
            self.invalidate(tool, version)
            return None

        log.debug(f"Cache hit: {key}")
        return path

    log.debug(f"Cache miss: {key} not in cache")
    return None
get_size
get_size() -> int

Get total size of cached tools in bytes.

Returns:

Type Description
int

Total size in bytes.

Source code in provide/foundation/tools/cache.py
def get_size(self) -> int:
    """Get total size of cached tools in bytes.

    Returns:
        Total size in bytes.

    """
    total = 0

    for entry in self.metadata.values():
        path = Path(entry["path"])
        try:
            if path.exists():
                # Calculate directory size
                if path.is_dir():
                    for f in path.rglob("*"):
                        if f.is_file():
                            try:
                                total += f.stat().st_size
                            except Exception as e:
                                log.debug(f"Failed to get size of file {f}: {e}")
                else:
                    total += path.stat().st_size
        except Exception as e:
            log.debug(f"Failed to get size of {path}: {e}")

    return total
invalidate
invalidate(tool: str, version: str | None = None) -> None

Invalidate cache entries.

Parameters:

Name Type Description Default
tool str

Tool name.

required
version str | None

Specific version, or None for all versions.

None
Source code in provide/foundation/tools/cache.py
def invalidate(self, tool: str, version: str | None = None) -> None:
    """Invalidate cache entries.

    Args:
        tool: Tool name.
        version: Specific version, or None for all versions.

    """
    if version:
        # Invalidate specific version
        key = f"{tool}:{version}"
        if key in self.metadata:
            del self.metadata[key]
            log.debug(f"Invalidated cache for {key}")
    else:
        # Invalidate all versions of tool
        keys_to_remove = [k for k in self.metadata if self.metadata[k].get("tool") == tool]
        for key in keys_to_remove:
            del self.metadata[key]
            log.debug(f"Invalidated cache for {key}")

    self._save_metadata()
list_cached
list_cached() -> list[dict]

List all cached tools.

Returns:

Type Description
list[dict]

List of cache entries with metadata.

Source code in provide/foundation/tools/cache.py
def list_cached(self) -> list[dict]:
    """List all cached tools.

    Returns:
        List of cache entries with metadata.

    """
    results = []

    for key, entry in self.metadata.items():
        # Add expiry status
        entry = entry.copy()
        entry["key"] = key
        entry["expired"] = self._is_expired(entry)

        # Calculate days until expiry
        try:
            cached_at = datetime.fromisoformat(entry["cached_at"])
            ttl_days = entry.get("ttl_days", 7)
            if ttl_days > 0:
                expiry = cached_at + timedelta(days=ttl_days)
                days_left = (expiry - datetime.now()).days
                entry["days_until_expiry"] = max(0, days_left)
            else:
                entry["days_until_expiry"] = -1  # Never expires
        except Exception:
            entry["days_until_expiry"] = 0

        results.append(entry)

    return results
prune_expired
prune_expired() -> int

Remove expired entries from cache.

Returns:

Type Description
int

Number of entries removed.

Source code in provide/foundation/tools/cache.py
def prune_expired(self) -> int:
    """Remove expired entries from cache.

    Returns:
        Number of entries removed.

    """
    expired_keys = [key for key, entry in self.metadata.items() if self._is_expired(entry)]

    for key in expired_keys:
        del self.metadata[key]

    if expired_keys:
        self._save_metadata()
        log.info(f"Pruned {len(expired_keys)} expired cache entries")

    return len(expired_keys)
store
store(
    tool: str, version: str, path: Path, ttl_days: int = 7
) -> None

Store tool in cache.

Parameters:

Name Type Description Default
tool str

Tool name.

required
version str

Tool version.

required
path Path

Path to installed tool.

required
ttl_days int

Time-to-live in days.

7
Source code in provide/foundation/tools/cache.py
def store(self, tool: str, version: str, path: Path, ttl_days: int = 7) -> None:
    """Store tool in cache.

    Args:
        tool: Tool name.
        version: Tool version.
        path: Path to installed tool.
        ttl_days: Time-to-live in days.

    """
    key = f"{tool}:{version}"

    self.metadata[key] = {
        "path": str(path),
        "tool": tool,
        "version": version,
        "cached_at": datetime.now().isoformat(),
        "ttl_days": ttl_days,
    }

    self._save_metadata()
    log.debug(f"Cached {key} at {path} (TTL: {ttl_days} days)")

ToolDownloader

ToolDownloader(
    client: UniversalClient,
    time_source: Callable[[], float] | None = None,
    async_sleep_func: (
        Callable[[float], Awaitable[None]] | None
    ) = None,
)

Advanced download capabilities for tools.

Features: - Progress reporting with callbacks - Parallel downloads for multiple files - Mirror fallback support - Checksum verification

Attributes:

Name Type Description
client

Transport client for HTTP requests.

progress_callbacks list[Callable[[int, int], None]]

List of progress callback functions.

retry_policy

Policy for retry behavior on downloads.

Initialize the downloader.

Parameters:

Name Type Description Default
client UniversalClient

Universal client for making HTTP requests.

required
time_source Callable[[], float] | None

Optional time source for testing (defaults to time.time).

None
async_sleep_func Callable[[float], Awaitable[None]] | None

Optional async sleep function for testing (defaults to asyncio.sleep).

None
Source code in provide/foundation/tools/downloader.py
def __init__(
    self,
    client: UniversalClient,
    time_source: Callable[[], float] | None = None,
    async_sleep_func: Callable[[float], Awaitable[None]] | None = None,
) -> None:
    """Initialize the downloader.

    Args:
        client: Universal client for making HTTP requests.
        time_source: Optional time source for testing (defaults to time.time).
        async_sleep_func: Optional async sleep function for testing (defaults to asyncio.sleep).

    """
    self.client = client
    self.progress_callbacks: list[Callable[[int, int], None]] = []

    # Create retry policy for downloads
    self.retry_policy = RetryPolicy(max_attempts=3, base_delay=1.0)
    self._retry_executor = RetryExecutor(
        self.retry_policy,
        time_source=time_source,
        async_sleep_func=async_sleep_func,
    )
Functions
add_progress_callback
add_progress_callback(
    callback: Callable[[int, int], None],
) -> None

Add a progress callback.

Parameters:

Name Type Description Default
callback Callable[[int, int], None]

Function that receives (downloaded_bytes, total_bytes).

required
Source code in provide/foundation/tools/downloader.py
def add_progress_callback(self, callback: Callable[[int, int], None]) -> None:
    """Add a progress callback.

    Args:
        callback: Function that receives (downloaded_bytes, total_bytes).

    """
    self.progress_callbacks.append(callback)
download_parallel async
download_parallel(
    urls: list[tuple[str, Path]],
) -> list[Path]

Download multiple files in parallel.

Parameters:

Name Type Description Default
urls list[tuple[str, Path]]

List of (url, destination) tuples.

required

Returns:

Type Description
list[Path]

List of downloaded file paths in the same order as input.

Raises:

Type Description
DownloadError

If any download fails.

Source code in provide/foundation/tools/downloader.py
async def download_parallel(self, urls: list[tuple[str, Path]]) -> list[Path]:
    """Download multiple files in parallel.

    Args:
        urls: List of (url, destination) tuples.

    Returns:
        List of downloaded file paths in the same order as input.

    Raises:
        DownloadError: If any download fails.

    """
    import asyncio

    errors = []

    # Create tasks for all downloads
    tasks = [self.download_with_progress(url, dest) for url, dest in urls]

    # Execute downloads concurrently
    results = []
    task_results = await asyncio.gather(*tasks, return_exceptions=True)

    for i, result in enumerate(task_results):
        url, _dest = urls[i]
        if isinstance(result, Exception):
            errors.append((url, result))
            log.error(f"Failed to download {url}: {result}")
        else:
            results.append(result)

    if errors:
        raise DownloadError(f"Some downloads failed: {errors}")

    return results  # type: ignore[return-value]
download_with_mirrors async
download_with_mirrors(
    mirrors: list[str], dest: Path
) -> Path

Try multiple mirrors until one succeeds using fallback pattern.

Parameters:

Name Type Description Default
mirrors list[str]

List of mirror URLs to try.

required
dest Path

Destination file path.

required

Returns:

Type Description
Path

Path to downloaded file.

Raises:

Type Description
DownloadError

If all mirrors fail.

Source code in provide/foundation/tools/downloader.py
async def download_with_mirrors(self, mirrors: list[str], dest: Path) -> Path:
    """Try multiple mirrors until one succeeds using fallback pattern.

    Args:
        mirrors: List of mirror URLs to try.
        dest: Destination file path.

    Returns:
        Path to downloaded file.

    Raises:
        DownloadError: If all mirrors fail.

    """
    if not mirrors:
        raise DownloadError("No mirrors provided")

    last_error = None

    # Try each mirror in sequence
    for mirror_url in mirrors:
        try:
            log.debug(f"Trying mirror: {mirror_url}")
            return await self.download_with_progress(mirror_url, dest)
        except Exception as e:
            last_error = e
            log.warning(f"Mirror {mirror_url} failed: {e}")
            # Clean up any partial download
            if dest.exists():
                dest.unlink()

    # All mirrors failed
    raise DownloadError(f"All mirrors failed: {last_error}") from last_error
download_with_progress async
download_with_progress(
    url: str, dest: Path, checksum: str | None = None
) -> Path

Download a file with progress reporting.

Parameters:

Name Type Description Default
url str

URL to download from.

required
dest Path

Destination file path.

required
checksum str | None

Optional checksum for verification.

None

Returns:

Type Description
Path

Path to the downloaded file.

Raises:

Type Description
DownloadError

If download or verification fails.

Source code in provide/foundation/tools/downloader.py
async def download_with_progress(self, url: str, dest: Path, checksum: str | None = None) -> Path:
    """Download a file with progress reporting.

    Args:
        url: URL to download from.
        dest: Destination file path.
        checksum: Optional checksum for verification.

    Returns:
        Path to the downloaded file.

    Raises:
        DownloadError: If download or verification fails.

    """

    async def _download() -> Path:
        """Inner download function that will be retried."""
        log.debug(f"Downloading {url} to {dest}")

        # Ensure parent directory exists
        dest.parent.mkdir(parents=True, exist_ok=True)

        # Stream download with progress
        total_size = 0
        downloaded = 0

        try:
            # Use the client to make a request first to get headers
            response = await self.client.request(url, "GET")

            # Check for HTTP errors (4xx/5xx status codes)
            if not response.is_success():
                raise DownloadError(f"HTTP {response.status} error for {url}")

            total_size = int(response.headers.get("content-length", 0))

            # Write to file and report progress
            with dest.open("wb") as f:
                async for chunk in self.client.stream(url, "GET"):
                    f.write(chunk)
                    downloaded += len(chunk)
                    self._report_progress(downloaded, total_size)

        except Exception as e:
            if dest.exists():
                dest.unlink()
            raise DownloadError(f"Failed to download {url}: {e}") from e

        # Verify checksum if provided
        if checksum and not self.verify_checksum(dest, checksum):
            dest.unlink()
            raise DownloadError(f"Checksum mismatch for {url}")

        log.info(f"Downloaded {url} successfully")
        return dest

    # Execute with retry
    return await self._retry_executor.execute_async(_download)
verify_checksum
verify_checksum(file_path: Path, expected: str) -> bool

Verify file checksum.

Uses Foundation's hash_file() for consistent hashing behavior.

Parameters:

Name Type Description Default
file_path Path

Path to file to verify.

required
expected str

Expected checksum (hex string).

required

Returns:

Type Description
bool

True if checksum matches, False otherwise.

Source code in provide/foundation/tools/downloader.py
def verify_checksum(self, file_path: Path, expected: str) -> bool:
    """Verify file checksum.

    Uses Foundation's hash_file() for consistent hashing behavior.

    Args:
        file_path: Path to file to verify.
        expected: Expected checksum (hex string).

    Returns:
        True if checksum matches, False otherwise.

    """
    # Use Foundation's hash_file with SHA256 (default)
    actual = hash_file(file_path, algorithm="sha256")
    return actual == expected

ToolError

ToolError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: FoundationError

Base exception for tool-related errors.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

ToolInstaller

Handle tool installation from various artifact formats.

Supports: - ZIP archives - TAR archives (with compression) - Single binary files - Platform-specific installation patterns

Functions
create_symlinks(
    install_dir: Path, metadata: ToolMetadata
) -> None

Create symlinks for easier access.

Parameters:

Name Type Description Default
install_dir Path

Installation directory.

required
metadata ToolMetadata

Tool metadata.

required
Source code in provide/foundation/tools/installer.py
def create_symlinks(self, install_dir: Path, metadata: ToolMetadata) -> None:
    """Create symlinks for easier access.

    Args:
        install_dir: Installation directory.
        metadata: Tool metadata.

    """
    import platform

    if platform.system() == "Windows":
        return  # Windows doesn't support symlinks easily

    # Create version-less symlink
    if metadata.name and metadata.version:
        parent = install_dir.parent
        latest_link = parent / "latest"

        if latest_link.exists() or latest_link.is_symlink():
            latest_link.unlink()

        latest_link.symlink_to(install_dir)
        log.debug(f"Created symlink {latest_link} -> {install_dir}")
extract_tar
extract_tar(archive: Path, dest: Path) -> None

Extract tar archive (with optional compression).

Parameters:

Name Type Description Default
archive Path

Path to tar file.

required
dest Path

Destination directory.

required
Source code in provide/foundation/tools/installer.py
def extract_tar(self, archive: Path, dest: Path) -> None:
    """Extract tar archive (with optional compression).

    Args:
        archive: Path to tar file.
        dest: Destination directory.

    """
    log.debug(f"Extracting tar {archive} to {dest}")

    dest.mkdir(parents=True, exist_ok=True)

    # Determine mode based on extension
    mode = "r"
    if archive.suffix in [".gz", ".tgz"]:
        mode = "r:gz"
    elif archive.suffix == ".bz2":
        mode = "r:bz2"
    elif archive.suffix == ".xz":
        mode = "r:xz"

    with tarfile.open(archive, mode) as tf:  # type: ignore[call-overload]
        # Check for unsafe paths and validate members
        safe_members = []
        for member in tf.getmembers():
            if member.name.startswith("/") or ".." in member.name:
                raise InstallError(f"Unsafe path in archive: {member.name}")

            # Additional security checks for symlinks
            if member.islnk() or member.issym():
                # Check that symlinks don't escape extraction directory
                link_path = Path(dest) / member.name
                target = Path(member.linkname)
                if not target.is_absolute():
                    target = link_path.parent / target
                try:
                    target.resolve().relative_to(Path(dest).resolve())
                except ValueError:
                    raise InstallError(
                        f"Unsafe symlink in archive: {member.name} -> {member.linkname}"
                    ) from None

            # Path traversal check
            member_path = Path(dest) / member.name
            try:
                member_path.resolve().relative_to(dest.resolve())
            except ValueError:
                raise InstallError(f"Path traversal detected in archive: {member.name}") from None

            safe_members.append(member)

        # Extract only validated members (all members have been security-checked above)
        tf.extractall(dest, members=safe_members)  # nosec B202
extract_zip
extract_zip(archive: Path, dest: Path) -> None

Extract ZIP archive.

Parameters:

Name Type Description Default
archive Path

Path to ZIP file.

required
dest Path

Destination directory.

required
Source code in provide/foundation/tools/installer.py
def extract_zip(self, archive: Path, dest: Path) -> None:
    """Extract ZIP archive.

    Args:
        archive: Path to ZIP file.
        dest: Destination directory.

    """
    log.debug(f"Extracting ZIP {archive} to {dest}")

    dest.mkdir(parents=True, exist_ok=True)

    with zipfile.ZipFile(archive, "r") as zf:
        # Check for unsafe paths and validate members
        safe_members = []
        for member_name in zf.namelist():
            if member_name.startswith("/") or ".." in member_name:
                raise InstallError(f"Unsafe path in archive: {member_name}")

            # Additional security check for path traversal
            member_path = Path(dest) / member_name
            try:
                member_path.resolve().relative_to(dest.resolve())
            except ValueError:
                raise InstallError(f"Path traversal detected in archive: {member_name}") from None

            safe_members.append(member_name)

        # Extract only validated members (all members have been security-checked above)
        zf.extractall(dest, members=safe_members)  # nosec B202
get_install_dir
get_install_dir(metadata: ToolMetadata) -> Path

Get installation directory for tool.

Parameters:

Name Type Description Default
metadata ToolMetadata

Tool metadata.

required

Returns:

Type Description
Path

Installation directory path.

Source code in provide/foundation/tools/installer.py
def get_install_dir(self, metadata: ToolMetadata) -> Path:
    """Get installation directory for tool.

    Args:
        metadata: Tool metadata.

    Returns:
        Installation directory path.

    """
    if metadata.install_path:
        return metadata.install_path

    # Default to ~/.provide-foundation/tools/<name>/<version>
    base = Path.home() / ".provide-foundation" / "tools"
    return base / metadata.name / metadata.version
install
install(artifact: Path, metadata: ToolMetadata) -> Path

Install tool from artifact.

Parameters:

Name Type Description Default
artifact Path

Path to downloaded artifact.

required
metadata ToolMetadata

Tool metadata with installation info.

required

Returns:

Type Description
Path

Path to installed tool directory.

Raises:

Type Description
InstallError

If installation fails.

Source code in provide/foundation/tools/installer.py
def install(self, artifact: Path, metadata: ToolMetadata) -> Path:
    """Install tool from artifact.

    Args:
        artifact: Path to downloaded artifact.
        metadata: Tool metadata with installation info.

    Returns:
        Path to installed tool directory.

    Raises:
        InstallError: If installation fails.

    """
    if not artifact.exists():
        raise InstallError(f"Artifact not found: {artifact}")

    # Determine install directory
    install_dir = self.get_install_dir(metadata)

    log.info(f"Installing {metadata.name} {metadata.version} to {install_dir}")

    # Extract based on file type
    suffix = artifact.suffix.lower()
    if suffix == ".zip":
        self.extract_zip(artifact, install_dir)
    elif suffix in [".tar", ".gz", ".tgz", ".bz2", ".xz"]:
        self.extract_tar(artifact, install_dir)
    elif self.is_binary(artifact):
        self.install_binary(artifact, install_dir, metadata)
    else:
        raise InstallError(f"Unknown artifact type: {suffix}")

    # Set permissions
    self.set_permissions(install_dir, metadata)

    # Create symlinks if needed
    self.create_symlinks(install_dir, metadata)

    log.info(f"Successfully installed {metadata.name} to {install_dir}")
    return install_dir
install_binary
install_binary(
    binary: Path, dest: Path, metadata: ToolMetadata
) -> None

Install single binary file.

Parameters:

Name Type Description Default
binary Path

Path to binary file.

required
dest Path

Destination directory.

required
metadata ToolMetadata

Tool metadata.

required
Source code in provide/foundation/tools/installer.py
def install_binary(self, binary: Path, dest: Path, metadata: ToolMetadata) -> None:
    """Install single binary file.

    Args:
        binary: Path to binary file.
        dest: Destination directory.
        metadata: Tool metadata.

    """
    log.debug(f"Installing binary {binary} to {dest}")

    dest.mkdir(parents=True, exist_ok=True)
    bin_dir = dest / "bin"
    bin_dir.mkdir(exist_ok=True)

    # Determine target name
    target_name = metadata.executable_name or binary.name
    target = bin_dir / target_name

    # Copy binary
    shutil.copy2(binary, target)

    # Make executable (Unix only - Windows uses file extension)
    import platform

    if platform.system() != "Windows":
        target.chmod(0o755)
is_binary
is_binary(file_path: Path) -> bool

Check if file is a binary executable.

Parameters:

Name Type Description Default
file_path Path

Path to check.

required

Returns:

Type Description
bool

True if file appears to be binary.

Source code in provide/foundation/tools/installer.py
def is_binary(self, file_path: Path) -> bool:
    """Check if file is a binary executable.

    Args:
        file_path: Path to check.

    Returns:
        True if file appears to be binary.

    """
    # Check if file has no extension or common binary extensions
    if not file_path.suffix or file_path.suffix in [".exe", ".bin"]:
        # Try to read first few bytes
        try:
            with file_path.open("rb") as f:
                header = f.read(4)
                # Check for common binary signatures
                if header.startswith(b"\x7fELF"):  # Linux ELF
                    return True
                if header.startswith(b"MZ"):  # Windows PE
                    return True
                if header.startswith(b"\xfe\xed\xfa"):  # macOS Mach-O
                    return True
                if header.startswith(b"\xca\xfe\xba\xbe"):  # macOS universal
                    return True
        except Exception:
            pass

    return False
set_permissions
set_permissions(
    install_dir: Path, metadata: ToolMetadata
) -> None

Set appropriate permissions on installed files.

Parameters:

Name Type Description Default
install_dir Path

Installation directory.

required
metadata ToolMetadata

Tool metadata.

required
Source code in provide/foundation/tools/installer.py
def set_permissions(self, install_dir: Path, metadata: ToolMetadata) -> None:
    """Set appropriate permissions on installed files.

    Args:
        install_dir: Installation directory.
        metadata: Tool metadata.

    """
    import platform

    if platform.system() == "Windows":
        return  # Windows handles permissions differently

    # Find executables and make them executable
    bin_dir = install_dir / "bin"
    if bin_dir.exists():
        for file in bin_dir.iterdir():
            if file.is_file():
                file.chmod(0o755)

    # Check for executable name in root
    if metadata.executable_name:
        exe_path = install_dir / metadata.executable_name
        if exe_path.exists():
            exe_path.chmod(0o755)

ToolMetadata

Metadata about a tool version.

Attributes:

Name Type Description
name str

Tool name (e.g., "terraform").

version str

Version string (e.g., "1.5.0").

platform str

Platform identifier (e.g., "linux", "darwin").

arch str

Architecture (e.g., "amd64", "arm64").

checksum str | None

Optional checksum for verification.

signature str | None

Optional GPG/PGP signature.

download_url str | None

URL to download the tool.

checksum_url str | None

URL to download checksums file.

install_path Path | None

Where the tool is/will be installed.

env_vars dict[str, str]

Environment variables to set.

dependencies list[str]

Other tools this depends on.

executable_name str | None

Name of the executable file.

VersionResolver

VersionResolver()

Resolve version specifications to concrete versions.

Supports: - "latest": Most recent stable version - "latest-beta": Most recent pre-release - "~1.2.3": Patch version range - "^1.2.3": Minor version range - "1.2.*": Wildcard matching - Exact versions

Initialize version resolver with pattern cache.

Source code in provide/foundation/tools/resolver.py
def __init__(self) -> None:
    """Initialize version resolver with pattern cache."""
    self._pattern_cache: dict[str, re.Pattern[str]] = {}
Functions
compare_versions
compare_versions(v1: str, v2: str) -> int

Compare two versions.

Parameters:

Name Type Description Default
v1 str

First version.

required
v2 str

Second version.

required

Returns:

Type Description
int

-1 if v1 < v2, 0 if equal, 1 if v1 > v2.

Source code in provide/foundation/tools/resolver.py
def compare_versions(self, v1: str, v2: str) -> int:
    """Compare two versions.

    Args:
        v1: First version.
        v2: Second version.

    Returns:
        -1 if v1 < v2, 0 if equal, 1 if v1 > v2.

    """
    parts1 = self.parse_version(v1)
    parts2 = self.parse_version(v2)

    # Pad with zeros
    max_len = max(len(parts1), len(parts2))
    parts1.extend([0] * (max_len - len(parts1)))
    parts2.extend([0] * (max_len - len(parts2)))

    for p1, p2 in zip(parts1, parts2, strict=False):
        if p1 < p2:
            return -1
        if p1 > p2:
            return 1

    return 0
get_latest_any
get_latest_any(versions: list[str]) -> str | None

Get latest version (including pre-releases).

Parameters:

Name Type Description Default
versions list[str]

List of available versions.

required

Returns:

Type Description
str | None

Latest version, or None if list is empty.

Source code in provide/foundation/tools/resolver.py
def get_latest_any(self, versions: list[str]) -> str | None:
    """Get latest version (including pre-releases).

    Args:
        versions: List of available versions.

    Returns:
        Latest version, or None if list is empty.

    """
    if not versions:
        return None

    return self.sort_versions(versions)[-1]
get_latest_prerelease
get_latest_prerelease(versions: list[str]) -> str | None

Get latest pre-release version.

Parameters:

Name Type Description Default
versions list[str]

List of available versions.

required

Returns:

Type Description
str | None

Latest pre-release version, or None if no pre-releases.

Source code in provide/foundation/tools/resolver.py
def get_latest_prerelease(self, versions: list[str]) -> str | None:
    """Get latest pre-release version.

    Args:
        versions: List of available versions.

    Returns:
        Latest pre-release version, or None if no pre-releases.

    """
    prerelease = [v for v in versions if self.is_prerelease(v)]
    if not prerelease:
        return None

    return self.sort_versions(prerelease)[-1]
get_latest_stable
get_latest_stable(versions: list[str]) -> str | None

Get latest stable version (no pre-release).

Parameters:

Name Type Description Default
versions list[str]

List of available versions.

required

Returns:

Type Description
str | None

Latest stable version, or None if no stable versions.

Source code in provide/foundation/tools/resolver.py
def get_latest_stable(self, versions: list[str]) -> str | None:
    """Get latest stable version (no pre-release).

    Args:
        versions: List of available versions.

    Returns:
        Latest stable version, or None if no stable versions.

    """
    stable = [v for v in versions if not self.is_prerelease(v)]
    if not stable:
        return None

    return self.sort_versions(stable)[-1]
is_prerelease
is_prerelease(version: str) -> bool

Check if version is a pre-release.

Parameters:

Name Type Description Default
version str

Version string.

required

Returns:

Type Description
bool

True if version appears to be pre-release.

Source code in provide/foundation/tools/resolver.py
def is_prerelease(self, version: str) -> bool:
    """Check if version is a pre-release.

    Args:
        version: Version string.

    Returns:
        True if version appears to be pre-release.

    """
    # Common pre-release indicators
    prerelease_patterns = [
        r"-alpha",
        r"-beta",
        r"-rc",
        r"-dev",
        r"-preview",
        r"-pre",
        r"-snapshot",
        r"\.dev\d+",
        r"a\d+$",  # 1.0a1
        r"b\d+$",  # 1.0b2
        r"rc\d+$",  # 1.0rc3
    ]

    version_lower = version.lower()
    return any(re.search(pattern, version_lower) for pattern in prerelease_patterns)
parse_version
parse_version(version: str) -> list[int]

Parse version string into numeric components.

Parameters:

Name Type Description Default
version str

Version string.

required

Returns:

Type Description
list[int]

List of numeric version components.

Source code in provide/foundation/tools/resolver.py
def parse_version(self, version: str) -> list[int]:
    """Parse version string into numeric components.

    Args:
        version: Version string.

    Returns:
        List of numeric version components.

    """
    # Extract just the numeric version part
    match = re.match(r"^v?(\d+(?:\.\d+)*)", version)
    if not match:
        return []

    version_str = match.group(1)
    parts = []

    for part in version_str.split("."):
        try:
            parts.append(int(part))
        except ValueError:
            break

    return parts
resolve
resolve(spec: str, available: list[str]) -> str | None

Resolve a version specification to a concrete version.

Parameters:

Name Type Description Default
spec str

Version specification.

required
available list[str]

List of available versions.

required

Returns:

Type Description
str | None

Resolved version string, or None if not found.

Source code in provide/foundation/tools/resolver.py
def resolve(self, spec: str, available: list[str]) -> str | None:
    """Resolve a version specification to a concrete version.

    Args:
        spec: Version specification.
        available: List of available versions.

    Returns:
        Resolved version string, or None if not found.

    """
    if not available:
        return None

    spec = spec.strip()

    # Handle special keywords
    if spec == "latest":
        return self.get_latest_stable(available)
    if spec == "latest-beta" or spec == "latest-prerelease":
        return self.get_latest_prerelease(available)
    if spec == "latest-any":
        return self.get_latest_any(available)

    # Handle ranges
    if spec.startswith("~"):
        return self.resolve_tilde(spec[1:], available)
    if spec.startswith("^"):
        return self.resolve_caret(spec[1:], available)

    # Handle wildcards
    if "*" in spec:
        return self.resolve_wildcard(spec, available)

    # Exact match
    if spec in available:
        return spec

    return None
resolve_caret
resolve_caret(
    base: str, available: list[str]
) -> str | None

Resolve caret range (^1.2.3 means >=1.2.3 <2.0.0).

Parameters:

Name Type Description Default
base str

Base version without caret.

required
available list[str]

List of available versions.

required

Returns:

Type Description
str | None

Best matching version, or None if no match.

Source code in provide/foundation/tools/resolver.py
def resolve_caret(self, base: str, available: list[str]) -> str | None:
    """Resolve caret range (^1.2.3 means >=1.2.3 <2.0.0).

    Args:
        base: Base version without caret.
        available: List of available versions.

    Returns:
        Best matching version, or None if no match.

    """
    try:
        parts = self.parse_version(base)
        if not parts:
            return None

        major = parts[0]

        # Filter versions that match the constraint
        matches = []
        for v in available:
            v_parts = self.parse_version(v)
            if v_parts and v_parts[0] == major and self.compare_versions(v, base) >= 0:
                # Must be >= base version
                matches.append(v)

        if matches:
            return self.sort_versions(matches)[-1]
    except Exception as e:
        log.debug(f"Failed to resolve caret range {base}: {e}")

    return None
resolve_tilde
resolve_tilde(
    base: str, available: list[str]
) -> str | None

Resolve tilde range (~1.2.3 means >=1.2.3 <1.3.0).

Parameters:

Name Type Description Default
base str

Base version without tilde.

required
available list[str]

List of available versions.

required

Returns:

Type Description
str | None

Best matching version, or None if no match.

Source code in provide/foundation/tools/resolver.py
def resolve_tilde(self, base: str, available: list[str]) -> str | None:
    """Resolve tilde range (~1.2.3 means >=1.2.3 <1.3.0).

    Args:
        base: Base version without tilde.
        available: List of available versions.

    Returns:
        Best matching version, or None if no match.

    """
    try:
        parts = self.parse_version(base)
        if len(parts) < 2:
            return None

        major, minor = parts[0], parts[1]

        # Filter versions that match the constraint
        matches = []
        for v in available:
            v_parts = self.parse_version(v)
            if len(v_parts) >= 2 and v_parts[0] == major and v_parts[1] == minor:
                if len(parts) >= 3:
                    # If patch specified, must be >= base patch
                    if len(v_parts) >= 3 and v_parts[2] >= parts[2]:
                        matches.append(v)
                else:
                    matches.append(v)

        if matches:
            return self.sort_versions(matches)[-1]
    except Exception as e:
        log.debug(f"Failed to resolve tilde range {base}: {e}")

    return None
resolve_wildcard
resolve_wildcard(
    pattern: str, available: list[str]
) -> str | None

Resolve wildcard pattern (1.2.* matches any 1.2.x).

Parameters:

Name Type Description Default
pattern str

Version pattern with wildcards.

required
available list[str]

List of available versions.

required

Returns:

Type Description
str | None

Best matching version, or None if no match.

Source code in provide/foundation/tools/resolver.py
def resolve_wildcard(self, pattern: str, available: list[str]) -> str | None:
    """Resolve wildcard pattern (1.2.* matches any 1.2.x).

    Args:
        pattern: Version pattern with wildcards.
        available: List of available versions.

    Returns:
        Best matching version, or None if no match.

    """
    # Convert wildcard to regex (with caching)
    regex_pattern = pattern.replace(".", r"\.")
    regex_pattern = regex_pattern.replace("*", r".*")
    regex_pattern = f"^{regex_pattern}$"

    try:
        # Check cache first
        if regex_pattern not in self._pattern_cache:
            self._pattern_cache[regex_pattern] = re.compile(regex_pattern)

        regex = self._pattern_cache[regex_pattern]
        matches = [v for v in available if regex.match(v)]

        if matches:
            # Return latest matching version
            return self.sort_versions(matches)[-1]
    except Exception as e:
        log.debug(f"Failed to resolve wildcard {pattern}: {e}")

    return None
sort_versions
sort_versions(versions: list[str]) -> list[str]

Sort versions in ascending order.

Parameters:

Name Type Description Default
versions list[str]

List of version strings.

required

Returns:

Type Description
list[str]

Sorted list of versions.

Source code in provide/foundation/tools/resolver.py
def sort_versions(self, versions: list[str]) -> list[str]:
    """Sort versions in ascending order.

    Args:
        versions: List of version strings.

    Returns:
        Sorted list of versions.

    """
    return sorted(
        versions,
        key=lambda v: (
            self.parse_version(v),
            v,  # Secondary sort by string for pre-releases
        ),
    )

Functions

get_tool_manager

get_tool_manager(
    name: str, config: BaseConfig
) -> BaseToolManager | None

Get a tool manager instance from the global registry.

Parameters:

Name Type Description Default
name str

Tool name or alias.

required
config BaseConfig

Configuration object.

required

Returns:

Type Description
BaseToolManager | None

Tool manager instance, or None if not found.

Source code in provide/foundation/tools/registry.py
def get_tool_manager(name: str, config: BaseConfig) -> BaseToolManager | None:
    """Get a tool manager instance from the global registry.

    Args:
        name: Tool name or alias.
        config: Configuration object.

    Returns:
        Tool manager instance, or None if not found.

    """
    registry = get_tool_registry()
    return registry.create_tool_manager(name, config)

get_tool_registry

get_tool_registry() -> ToolRegistry

Get the global tool registry instance.

Returns:

Type Description
ToolRegistry

Tool registry instance.

Source code in provide/foundation/tools/registry.py
def get_tool_registry() -> ToolRegistry:
    """Get the global tool registry instance.

    Returns:
        Tool registry instance.

    """
    global _tool_registry
    if _tool_registry is None:
        _tool_registry = ToolRegistry()
    return _tool_registry

register_tool_manager

register_tool_manager(
    name: str,
    manager_class: type[BaseToolManager],
    aliases: list[str] | None = None,
) -> None

Register a tool manager with the global registry.

Parameters:

Name Type Description Default
name str

Tool name.

required
manager_class type[BaseToolManager]

Tool manager class.

required
aliases list[str] | None

Optional aliases.

None
Source code in provide/foundation/tools/registry.py
def register_tool_manager(
    name: str,
    manager_class: type[BaseToolManager],
    aliases: list[str] | None = None,
) -> None:
    """Register a tool manager with the global registry.

    Args:
        name: Tool name.
        manager_class: Tool manager class.
        aliases: Optional aliases.

    """
    registry = get_tool_registry()
    registry.register_tool_manager(name, manager_class, aliases)