Skip to content

Index

wrknv.wenv.operations

wrknv Operations

Core operation modules for workenv functionality.

Functions

download_file

download_file(
    url: str,
    output_path: Path,
    show_progress: bool = True,
    headers: dict[str, str] | None = None,
    checksum: str | None = None,
) -> None

Synchronous wrapper for download_file_async.

Parameters:

Name Type Description Default
url str

URL to download from

required
output_path Path

Where to save the file

required
show_progress bool

Whether to log progress

True
headers dict[str, str] | None

Optional custom headers

None
checksum str | None

Optional checksum for verification

None
Source code in wrknv/wenv/operations/download.py
def download_file(
    url: str,
    output_path: pathlib.Path,
    show_progress: bool = True,
    headers: dict[str, str] | None = None,
    checksum: str | None = None,
) -> None:
    """Synchronous wrapper for download_file_async.

    Args:
        url: URL to download from
        output_path: Where to save the file
        show_progress: Whether to log progress
        headers: Optional custom headers
        checksum: Optional checksum for verification
    """
    asyncio.run(download_file_async(url, output_path, show_progress, headers, checksum=checksum))

extract_archive

extract_archive(
    archive_path: Path, extract_dir: Path
) -> None

Extract archive to specified directory.

Source code in wrknv/wenv/operations/install.py
@resilient
def extract_archive(archive_path: pathlib.Path, extract_dir: pathlib.Path) -> None:
    """Extract archive to specified directory."""

    if not archive_path.exists():
        raise ResourceError(f"Archive not found: {archive_path}")

    # Create extraction directory
    extract_dir.mkdir(parents=True, exist_ok=True)

    logger.debug(f"Extracting {archive_path} to {extract_dir}")

    archive_name = archive_path.name.lower()

    try:
        if archive_name.endswith(".tar.gz") or archive_name.endswith(".tgz"):
            # Use foundation's extract_tar_gz with built-in path traversal protection
            ArchiveOperations.extract_tar_gz(archive_path, extract_dir)
        elif archive_name.endswith(".tar"):
            # Use foundation's TarArchive with built-in path traversal protection
            TarArchive().extract(archive_path, extract_dir)
        elif archive_name.endswith(".zip"):
            # Use foundation's ZipArchive with built-in path traversal protection
            ZipArchive().extract(archive_path, extract_dir)
        else:
            raise ValidationError(f"Unsupported archive format: {archive_path}")

        logger.debug(f"Successfully extracted {archive_path}")

    except Exception as e:
        raise ResourceError(f"Failed to extract {archive_path}: {e}") from e

get_architecture

get_architecture() -> str

Get normalized architecture name for tool downloads.

Returns:

Type Description
str

Architecture string suitable for tool downloads (amd64, arm64, etc).

Source code in wrknv/wenv/operations/platform.py
def get_architecture() -> str:
    """Get normalized architecture name for tool downloads.

    Returns:
        Architecture string suitable for tool downloads (amd64, arm64, etc).
    """
    return get_arch_name()

get_os_name

get_os_name() -> str

Get normalized OS name for tool downloads.

DEPRECATED: Use provide.foundation.platform.get_os_name directly.

Source code in wrknv/wenv/operations/platform.py
def get_os_name() -> str:
    """Get normalized OS name for tool downloads.

    DEPRECATED: Use provide.foundation.platform.get_os_name directly.
    """
    return foundation_get_os_name()

get_platform_info

get_platform_info() -> dict[str, str]

Get platform information for tool downloads.

Returns:

Type Description
dict[str, str]

Dictionary with platform details for tool compatibility.

Source code in wrknv/wenv/operations/platform.py
def get_platform_info() -> dict[str, str]:
    """Get platform information for tool downloads.

    Returns:
        Dictionary with platform details for tool compatibility.
    """
    # Get comprehensive system info from foundation
    sys_info = get_system_info()

    return {
        "os": sys_info.os_name,
        "arch": sys_info.arch,
        "platform": sys_info.platform,
        "python_platform": sys_info.platform,
        "machine": sys_info.arch,
        "system": sys_info.os_name,
    }

make_executable

make_executable(file_path: Path) -> None

Make file executable on Unix-like systems.

Source code in wrknv/wenv/operations/install.py
@resilient
def make_executable(file_path: pathlib.Path) -> None:
    """Make file executable on Unix-like systems."""

    if not file_path.exists():
        raise ResourceError(f"File not found: {file_path}")

    # Only needed on Unix-like systems
    import platform

    if platform.system().lower() == "windows":
        logger.debug(f"Skipping chmod on Windows for {file_path}")
        return

    try:
        # Get current permissions
        current_mode = file_path.stat().st_mode

        # Add execute permissions for owner, group, and others
        new_mode = current_mode | stat.S_IXUSR | stat.S_IXGRP | stat.S_IXOTH

        # Set new permissions
        file_path.chmod(new_mode)

        logger.debug(f"Made {file_path} executable (mode: {oct(new_mode)})")

    except Exception as e:
        logger.warning(f"Failed to make {file_path} executable: {e}")

run_version_check

run_version_check(
    binary_path: Path, tool_name: str, timeout: int = 10
) -> str | None

Run version check command for a tool and return output.

Includes automatic retry with exponential backoff for transient failures.

Source code in wrknv/wenv/operations/verify.py
@retry(ProcessError, OSError, max_attempts=3, base_delay=1.0)
def run_version_check(binary_path: pathlib.Path, tool_name: str, timeout: int = 10) -> str | None:
    """Run version check command for a tool and return output.

    Includes automatic retry with exponential backoff for transient failures.
    """

    if not binary_path.exists():
        return None

    # Determine version command for different tools
    version_args = get_version_command_args(tool_name)

    cmd = [str(binary_path), *version_args]

    try:
        logger.debug(f"Running version check: {' '.join(cmd)}")

        result = run(cmd, timeout=timeout)

        if result.returncode == 0:
            return result.stdout.strip()
        else:
            logger.error(f"Version command failed for {tool_name}: {result.stderr}")
            return None

    except ProcessError as e:
        if e.timeout:
            logger.error(f"Version check timed out for {tool_name}")
        else:
            logger.error(f"Version check failed for {tool_name}: {e}")
        raise  # Re-raise to trigger retry
    except Exception as e:
        logger.error(f"Version check failed for {tool_name}: {e}")
        return None

verify_checksum

verify_checksum(
    file_path: Path,
    expected_checksum: str,
    algorithm: str = "sha256",
) -> bool

Verify file checksum using specified algorithm.

Source code in wrknv/wenv/operations/download.py
def verify_checksum(file_path: pathlib.Path, expected_checksum: str, algorithm: str = "sha256") -> bool:
    """Verify file checksum using specified algorithm."""

    # Use foundation's verify_file which handles all the details
    return verify_file(file_path, expected_checksum, algorithm)

verify_tool_installation

verify_tool_installation(
    binary_path: Path, expected_version: str, tool_name: str
) -> bool

Verify that tool installation works and version matches.

Source code in wrknv/wenv/operations/verify.py
def verify_tool_installation(binary_path: pathlib.Path, expected_version: str, tool_name: str) -> bool:
    """Verify that tool installation works and version matches."""

    if not binary_path.exists():
        logger.error(f"{tool_name} binary not found at {binary_path}")
        return False

    try:
        version_output = run_version_check(binary_path, tool_name)

        if version_output is None:
            logger.error(f"{tool_name} version check failed")
            return False

        # Check if expected version is in the output
        if expected_version in version_output:
            logger.debug(f"{tool_name} {expected_version} verification successful")
            return True
        else:
            logger.error(
                f"Version mismatch for {tool_name}: expected {expected_version}, got: {version_output}"
            )
            return False

    except Exception as e:
        logger.error(f"Failed to verify {tool_name} installation: {e}")
        return False