Skip to content

Archive

provide.foundation.archive

TODO: Add module docstring.

Classes

ArchiveError

ArchiveError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: FoundationError

Base exception for archive-related errors.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

ArchiveFormatError

ArchiveFormatError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: ArchiveError

Archive format is invalid or corrupted.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

ArchiveIOError

ArchiveIOError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: ArchiveError

I/O operation failed during archive processing.

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

ArchiveLimits

Configurable limits for archive extraction to prevent decompression bombs.

Attributes:

Name Type Description
max_total_size int

Maximum total extracted size in bytes (default: 1GB)

max_file_count int

Maximum number of files in archive (default: 10,000)

max_compression_ratio float

Maximum compression ratio (default: 100:1)

max_single_file_size int

Maximum size of any single file (default: 100MB)

enabled bool

Whether to enforce limits (default: True)

ArchiveOperation

Bases: IntEnum

Archive operation codes compatible with PSPF/2025 format.

These operation codes match the PSPF/2025 v0 specification to ensure compatibility with flavorpack's package format. The hex values are canonical and must not be changed without updating the PSPF spec.

Values

NONE: No operation (0x00) TAR: POSIX TAR archive bundling (0x01) GZIP: GZIP compression (0x10) BZIP2: BZIP2 compression (0x13) XZ: XZ/LZMA2 compression (0x16) ZSTD: Zstandard compression (0x1B)

Example

from provide.foundation.archive.types import ArchiveOperation op = ArchiveOperation.TAR assert op == 0x01 assert op.name == "TAR"

Functions
from_string classmethod
from_string(name: str) -> ArchiveOperation

Convert string operation name to enum.

Parameters:

Name Type Description Default
name str

Operation name (case-insensitive)

required

Returns:

Type Description
ArchiveOperation

ArchiveOperation enum value

Raises:

Type Description
ValueError

If operation name is invalid

Example

ArchiveOperation.from_string("tar") ArchiveOperation.from_string("GZIP")

Source code in provide/foundation/archive/types.py
@classmethod
def from_string(cls, name: str) -> ArchiveOperation:
    """Convert string operation name to enum.

    Args:
        name: Operation name (case-insensitive)

    Returns:
        ArchiveOperation enum value

    Raises:
        ValueError: If operation name is invalid

    Example:
        >>> ArchiveOperation.from_string("tar")
        <ArchiveOperation.TAR: 1>
        >>> ArchiveOperation.from_string("GZIP")
        <ArchiveOperation.GZIP: 16>

    """
    name_upper = name.upper()
    try:
        return cls[name_upper]
    except KeyError:
        raise ValueError(f"Unknown archive operation: {name}") from None
to_string
to_string() -> str

Convert enum to lowercase string name.

Returns:

Type Description
str

Lowercase operation name

Example

ArchiveOperation.TAR.to_string() 'tar'

Source code in provide/foundation/archive/types.py
def to_string(self) -> str:
    """Convert enum to lowercase string name.

    Returns:
        Lowercase operation name

    Example:
        >>> ArchiveOperation.TAR.to_string()
        'tar'

    """
    return self.name.lower()

ArchiveOperations

Helper class for common archive operation patterns.

Provides convenient methods for common archive formats.

Functions
create_tar_bz2 staticmethod
create_tar_bz2(
    source: Path, output: Path, deterministic: bool = True
) -> Path

Create .tar.bz2 archive in one step.

Parameters:

Name Type Description Default
source Path

Source file or directory

required
output Path

Output path (should end with .tar.bz2)

required
deterministic bool

Create reproducible archive

True

Returns:

Type Description
Path

Path to created archive

Raises:

Type Description
ArchiveError

If creation fails

Source code in provide/foundation/archive/operations.py
@staticmethod
def create_tar_bz2(source: Path, output: Path, deterministic: bool = True) -> Path:
    """Create .tar.bz2 archive in one step.

    Args:
        source: Source file or directory
        output: Output path (should end with .tar.bz2)
        deterministic: Create reproducible archive

    Returns:
        Path to created archive

    Raises:
        ArchiveError: If creation fails

    """
    ensure_parent_dir(output)

    chain = OperationChain(
        operations=[ArchiveOperation.TAR, ArchiveOperation.BZIP2],
        operation_config={ArchiveOperation.TAR: {"deterministic": deterministic}},
    )
    return chain.execute(source, output)
create_tar_gz staticmethod
create_tar_gz(
    source: Path, output: Path, deterministic: bool = True
) -> Path

Create .tar.gz archive in one step.

Parameters:

Name Type Description Default
source Path

Source file or directory

required
output Path

Output path (should end with .tar.gz)

required
deterministic bool

Create reproducible archive

True

Returns:

Type Description
Path

Path to created archive

Raises:

Type Description
ArchiveError

If creation fails

Source code in provide/foundation/archive/operations.py
@staticmethod
def create_tar_gz(source: Path, output: Path, deterministic: bool = True) -> Path:
    """Create .tar.gz archive in one step.

    Args:
        source: Source file or directory
        output: Output path (should end with .tar.gz)
        deterministic: Create reproducible archive

    Returns:
        Path to created archive

    Raises:
        ArchiveError: If creation fails

    """
    ensure_parent_dir(output)

    chain = OperationChain(
        operations=[ArchiveOperation.TAR, ArchiveOperation.GZIP],
        operation_config={ArchiveOperation.TAR: {"deterministic": deterministic}},
    )
    return chain.execute(source, output)
detect_format staticmethod
detect_format(file: Path) -> list[ArchiveOperation]

Detect archive format and return operation chain.

Parameters:

Name Type Description Default
file Path

File path to analyze

required

Returns:

Type Description
list[ArchiveOperation]

List of operations needed to extract

Raises:

Type Description
ArchiveError

If format cannot be detected

Source code in provide/foundation/archive/operations.py
@staticmethod
def detect_format(file: Path) -> list[ArchiveOperation]:
    """Detect archive format and return operation chain.

    Args:
        file: File path to analyze

    Returns:
        List of operations needed to extract

    Raises:
        ArchiveError: If format cannot be detected

    """
    # Try extension-based detection first
    operations = ArchiveOperations._detect_format_by_extension(file.name)
    if operations is not None:
        return operations

    # Fall back to magic number detection
    operations = ArchiveOperations._detect_format_by_magic(file)
    if operations is not None:
        return operations

    raise ArchiveError(f"Cannot detect format of {file}")
extract_tar_bz2 staticmethod
extract_tar_bz2(archive: Path, output: Path) -> Path

Extract .tar.bz2 archive in one step.

Parameters:

Name Type Description Default
archive Path

Archive path

required
output Path

Output directory

required

Returns:

Type Description
Path

Path to extraction directory

Raises:

Type Description
ArchiveError

If extraction fails

Source code in provide/foundation/archive/operations.py
@staticmethod
def extract_tar_bz2(archive: Path, output: Path) -> Path:
    """Extract .tar.bz2 archive in one step.

    Args:
        archive: Archive path
        output: Output directory

    Returns:
        Path to extraction directory

    Raises:
        ArchiveError: If extraction fails

    """
    output.mkdir(parents=True, exist_ok=True)

    chain = OperationChain(operations=[ArchiveOperation.TAR, ArchiveOperation.BZIP2])
    return chain.reverse(archive, output)
extract_tar_gz staticmethod
extract_tar_gz(archive: Path, output: Path) -> Path

Extract .tar.gz archive in one step.

Parameters:

Name Type Description Default
archive Path

Archive path

required
output Path

Output directory

required

Returns:

Type Description
Path

Path to extraction directory

Raises:

Type Description
ArchiveError

If extraction fails

Source code in provide/foundation/archive/operations.py
@staticmethod
def extract_tar_gz(archive: Path, output: Path) -> Path:
    """Extract .tar.gz archive in one step.

    Args:
        archive: Archive path
        output: Output directory

    Returns:
        Path to extraction directory

    Raises:
        ArchiveError: If extraction fails

    """
    output.mkdir(parents=True, exist_ok=True)

    chain = OperationChain(operations=[ArchiveOperation.TAR, ArchiveOperation.GZIP])
    return chain.reverse(archive, output)

ArchiveValidationError

ArchiveValidationError(
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any
)

Bases: ArchiveError

Archive validation failed (security checks, malformed paths, etc).

Source code in provide/foundation/errors/base.py
def __init__(
    self,
    message: str,
    *,
    code: str | None = None,
    context: dict[str, Any] | None = None,
    cause: Exception | None = None,
    **extra_context: Any,
) -> None:
    self.message = message
    self.code = code or self._default_code()
    self.context = context or {}
    self.context.update(extra_context)
    self.cause = cause
    if cause:
        self.__cause__ = cause
    super().__init__(message)

BaseArchive

Bases: ABC

Abstract base class for all archive implementations.

This defines the common interface that all archive implementations must follow, ensuring consistency across different archive formats.

Functions
create abstractmethod
create(source: Path, output: Path) -> Path

Create an archive from source path.

Parameters:

Name Type Description Default
source Path

Source file or directory to archive

required
output Path

Output archive file path

required

Returns:

Type Description
Path

Path to the created archive file

Raises:

Type Description
ArchiveError

If archive creation fails

Source code in provide/foundation/archive/base.py
@abstractmethod
def create(self, source: Path, output: Path) -> Path:
    """Create an archive from source path.

    Args:
        source: Source file or directory to archive
        output: Output archive file path

    Returns:
        Path to the created archive file

    Raises:
        ArchiveError: If archive creation fails

    """
extract abstractmethod
extract(archive: Path, output: Path) -> Path

Extract an archive to output path.

Parameters:

Name Type Description Default
archive Path

Archive file to extract

required
output Path

Output directory for extracted contents

required

Returns:

Type Description
Path

Path to the extraction directory

Raises:

Type Description
ArchiveError

If extraction fails

Source code in provide/foundation/archive/base.py
@abstractmethod
def extract(self, archive: Path, output: Path) -> Path:
    """Extract an archive to output path.

    Args:
        archive: Archive file to extract
        output: Output directory for extracted contents

    Returns:
        Path to the extraction directory

    Raises:
        ArchiveError: If extraction fails

    """
validate abstractmethod
validate(archive: Path) -> bool

Validate that an archive is properly formed.

Parameters:

Name Type Description Default
archive Path

Archive file to validate

required

Returns:

Type Description
bool

True if archive is valid, False otherwise

Raises:

Type Description
ArchiveError

If validation cannot be performed

Source code in provide/foundation/archive/base.py
@abstractmethod
def validate(self, archive: Path) -> bool:
    """Validate that an archive is properly formed.

    Args:
        archive: Archive file to validate

    Returns:
        True if archive is valid, False otherwise

    Raises:
        ArchiveError: If validation cannot be performed

    """

Bzip2Compressor

Bases: BaseCompressor

BZIP2 compression implementation.

Provides BZIP2 compression and decompression for single files. Does not handle bundling - use with TarArchive for .tar.bz2 files.

Attributes
format_name property
format_name: str

Return the name of the compression format.

ExtractionTracker

ExtractionTracker(limits: ArchiveLimits)

Track extraction progress to enforce limits.

Initialize tracker with limits.

Parameters:

Name Type Description Default
limits ArchiveLimits

Archive extraction limits

required
Source code in provide/foundation/archive/limits.py
def __init__(self, limits: ArchiveLimits) -> None:
    """Initialize tracker with limits.

    Args:
        limits: Archive extraction limits

    """
    self.limits = limits
    self.total_extracted_size = 0
    self.file_count = 0
    self.compressed_size = 0
Functions
add_extracted_size
add_extracted_size(size: int) -> None

Track extracted size and check total limit.

Parameters:

Name Type Description Default
size int

Size of extracted content in bytes

required

Raises:

Type Description
ArchiveError

If total extracted size exceeds limit

Source code in provide/foundation/archive/limits.py
def add_extracted_size(self, size: int) -> None:
    """Track extracted size and check total limit.

    Args:
        size: Size of extracted content in bytes

    Raises:
        ArchiveError: If total extracted size exceeds limit

    """
    if not self.limits.enabled:
        return

    self.total_extracted_size += size
    if self.total_extracted_size > self.limits.max_total_size:
        raise ArchiveError(
            f"Total extracted size exceeds maximum: {self.total_extracted_size} > {self.limits.max_total_size}",
            code="MAX_TOTAL_SIZE_EXCEEDED",
        )
check_compression_ratio
check_compression_ratio() -> None

Check if compression ratio exceeds limit.

Raises:

Type Description
ArchiveError

If compression ratio exceeds limit

Source code in provide/foundation/archive/limits.py
def check_compression_ratio(self) -> None:
    """Check if compression ratio exceeds limit.

    Raises:
        ArchiveError: If compression ratio exceeds limit

    """
    if not self.limits.enabled or self.compressed_size == 0:
        return

    ratio = self.total_extracted_size / self.compressed_size
    if ratio > self.limits.max_compression_ratio:
        raise ArchiveError(
            f"Compression ratio exceeds maximum: {ratio:.1f} > {self.limits.max_compression_ratio}",
            code="MAX_COMPRESSION_RATIO_EXCEEDED",
        )
check_file_count
check_file_count(count: int = 1) -> None

Check if adding files would exceed limit.

Parameters:

Name Type Description Default
count int

Number of files to add

1

Raises:

Type Description
ArchiveError

If file count would exceed limit

Source code in provide/foundation/archive/limits.py
def check_file_count(self, count: int = 1) -> None:
    """Check if adding files would exceed limit.

    Args:
        count: Number of files to add

    Raises:
        ArchiveError: If file count would exceed limit

    """
    if not self.limits.enabled:
        return

    self.file_count += count
    if self.file_count > self.limits.max_file_count:
        raise ArchiveError(
            f"Archive exceeds maximum file count: {self.file_count} > {self.limits.max_file_count}",
            code="MAX_FILE_COUNT_EXCEEDED",
        )
check_file_size
check_file_size(size: int) -> None

Check if file size exceeds single file limit.

Parameters:

Name Type Description Default
size int

File size in bytes

required

Raises:

Type Description
ArchiveError

If file size exceeds limit

Source code in provide/foundation/archive/limits.py
def check_file_size(self, size: int) -> None:
    """Check if file size exceeds single file limit.

    Args:
        size: File size in bytes

    Raises:
        ArchiveError: If file size exceeds limit

    """
    if not self.limits.enabled:
        return

    if size > self.limits.max_single_file_size:
        raise ArchiveError(
            f"File size exceeds maximum: {size} > {self.limits.max_single_file_size}",
            code="MAX_FILE_SIZE_EXCEEDED",
        )
set_compressed_size
set_compressed_size(size: int) -> None

Set the compressed archive size for ratio calculation.

Parameters:

Name Type Description Default
size int

Compressed archive size in bytes

required
Source code in provide/foundation/archive/limits.py
def set_compressed_size(self, size: int) -> None:
    """Set the compressed archive size for ratio calculation.

    Args:
        size: Compressed archive size in bytes

    """
    self.compressed_size = size
validate_member_size
validate_member_size(
    member_size: int,
    compressed_member_size: int | None = None,
) -> None

Validate a single archive member before extraction.

Parameters:

Name Type Description Default
member_size int

Uncompressed size of the member

required
compressed_member_size int | None

Optional compressed size for ratio check

None

Raises:

Type Description
ArchiveError

If member violates any limits

Source code in provide/foundation/archive/limits.py
def validate_member_size(self, member_size: int, compressed_member_size: int | None = None) -> None:
    """Validate a single archive member before extraction.

    Args:
        member_size: Uncompressed size of the member
        compressed_member_size: Optional compressed size for ratio check

    Raises:
        ArchiveError: If member violates any limits

    """
    # Check single file size limit
    self.check_file_size(member_size)

    # Check that adding this file won't exceed total size
    if self.limits.enabled and (self.total_extracted_size + member_size) > self.limits.max_total_size:
        raise ArchiveError(
            f"Extracting this file would exceed total size limit: "
            f"{self.total_extracted_size + member_size} > {self.limits.max_total_size}",
            code="MAX_TOTAL_SIZE_EXCEEDED",
        )

    # Check individual file compression ratio if available
    if compressed_member_size and compressed_member_size > 0:
        member_ratio = member_size / compressed_member_size
        if self.limits.enabled and member_ratio > self.limits.max_compression_ratio:
            raise ArchiveError(
                f"File compression ratio exceeds maximum: {member_ratio:.1f} > {self.limits.max_compression_ratio}",
                code="MAX_COMPRESSION_RATIO_EXCEEDED",
            )

GzipCompressor

Bases: BaseCompressor

GZIP compression implementation.

Provides GZIP compression and decompression for single files. Does not handle bundling - use with TarArchive for .tar.gz files.

Attributes
format_name property
format_name: str

Return the name of the compression format.

OperationChain

Chain multiple archive operations together.

Enables complex operations like tar.gz, tar.bz2, etc. Operations are executed in order for creation, reversed for extraction.

Functions
execute
execute(source: Path, output: Path) -> Path

Execute operation chain on source.

Parameters:

Name Type Description Default
source Path

Source file or directory

required
output Path

Final output path

required

Returns:

Type Description
Path

Path to final output

Raises:

Type Description
ArchiveError

If any operation fails

Source code in provide/foundation/archive/operations.py
def execute(self, source: Path, output: Path) -> Path:
    """Execute operation chain on source.

    Args:
        source: Source file or directory
        output: Final output path

    Returns:
        Path to final output

    Raises:
        ArchiveError: If any operation fails

    """
    current = source
    temp_files = []

    try:
        for i, op in enumerate(self.operations):
            # Determine output for this operation
            if i == len(self.operations) - 1:
                # Last operation, use final output
                next_output = output
            else:
                # Intermediate operation, use temp file
                suffix = self._get_suffix_for_operation(op)
                # Use Foundation's temp_file with cleanup=False so we manage it
                with temp_file(suffix=suffix, cleanup=False) as temp_path:
                    next_output = temp_path
                temp_files.append(next_output)

            # Execute operation
            current = self._execute_operation(op, current, next_output)
            log.debug(f"Executed operation '{op}': {current}")

        return current

    except Exception as e:
        raise ArchiveError(f"Operation chain failed: {e}") from e
    finally:
        # Clean up temp files using Foundation's safe file operations
        for temp in temp_files:
            safe_delete(temp, missing_ok=True)
reverse
reverse(source: Path, output: Path) -> Path

Reverse operation chain (extract/decompress).

Parameters:

Name Type Description Default
source Path

Source archive

required
output Path

Final output path

required

Returns:

Type Description
Path

Path to final output

Raises:

Type Description
ArchiveError

If any operation fails

Source code in provide/foundation/archive/operations.py
def reverse(self, source: Path, output: Path) -> Path:
    """Reverse operation chain (extract/decompress).

    Args:
        source: Source archive
        output: Final output path

    Returns:
        Path to final output

    Raises:
        ArchiveError: If any operation fails

    """
    # Operations are the same when reversed; the _execute_operation
    # method will handle whether to create or extract based on context
    reversed_chain = OperationChain(
        operations=list(reversed(self.operations)), operation_config=self.operation_config
    )
    return reversed_chain.execute(source, output)

TarArchive

Bases: BaseArchive

TAR archive implementation.

Creates and extracts TAR archives with optional metadata preservation and deterministic output for reproducible builds.

Functions
create
create(source: Path, output: Path) -> Path

Create TAR archive from source.

Parameters:

Name Type Description Default
source Path

Source file or directory to archive

required
output Path

Output TAR file path

required

Returns:

Type Description
Path

Path to created archive

Raises:

Type Description
ArchiveError

If archive creation fails

Source code in provide/foundation/archive/tar.py
def create(self, source: Path, output: Path) -> Path:
    """Create TAR archive from source.

    Args:
        source: Source file or directory to archive
        output: Output TAR file path

    Returns:
        Path to created archive

    Raises:
        ArchiveError: If archive creation fails

    """
    try:
        ensure_parent_dir(output)

        with tarfile.open(output, "w") as tar:
            if source.is_dir():
                # Add all files in directory (consistent with ZIP behavior)
                for item in sorted(source.rglob("*")):
                    if item.is_file():
                        arcname = item.relative_to(source)
                        self._add_file(tar, item, arcname)
            else:
                # Add single file
                self._add_file(tar, source, source.name)

        log.debug(f"Created TAR archive: {output}")
        return output

    except OSError as e:
        raise ArchiveIOError(f"Failed to create TAR archive (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to create TAR archive: {e}") from e
extract
extract(
    archive: Path,
    output: Path,
    limits: ArchiveLimits | None = None,
) -> Path

Extract TAR archive to output directory with decompression bomb protection.

Parameters:

Name Type Description Default
archive Path

TAR archive file path

required
output Path

Output directory path

required
limits ArchiveLimits | None

Optional extraction limits (uses DEFAULT_LIMITS if None)

None

Returns:

Type Description
Path

Path to extraction directory

Raises:

Type Description
ArchiveError

If extraction fails, archive contains unsafe paths, or exceeds limits

Source code in provide/foundation/archive/tar.py
def extract(self, archive: Path, output: Path, limits: ArchiveLimits | None = None) -> Path:  # noqa: C901
    """Extract TAR archive to output directory with decompression bomb protection.

    Args:
        archive: TAR archive file path
        output: Output directory path
        limits: Optional extraction limits (uses DEFAULT_LIMITS if None)

    Returns:
        Path to extraction directory

    Raises:
        ArchiveError: If extraction fails, archive contains unsafe paths, or exceeds limits

    """
    if limits is None:
        limits = DEFAULT_LIMITS

    try:
        output.mkdir(parents=True, exist_ok=True)

        # Initialize extraction tracker
        tracker = ExtractionTracker(limits)
        tracker.set_compressed_size(get_archive_size(archive))

        with tarfile.open(archive, "r") as tar:
            # Enhanced security check - prevent path traversal and validate members
            safe_members = []
            for member in tar.getmembers():
                # Check file count limit
                tracker.check_file_count(1)

                # Validate member size and compression ratio
                tracker.validate_member_size(member.size)

                # Track extracted size
                tracker.add_extracted_size(member.size)

                # Use unified path validation
                if not is_safe_path(output, member.name):
                    raise ArchiveValidationError(
                        f"Unsafe path in archive: {member.name}. "
                        "Archive may contain path traversal, symlinks, or absolute paths."
                    )

                # Additional checks for symlinks and hardlinks
                if member.islnk() or member.issym():
                    # Check that link targets are also safe
                    if not is_safe_path(output, member.linkname):
                        raise ArchiveValidationError(
                            f"Unsafe link target in archive: {member.name} -> {member.linkname}. "
                            "Link target may escape extraction directory."
                        )

                    # Prevent absolute path in link target
                    if Path(member.linkname).is_absolute():
                        raise ArchiveValidationError(
                            f"Absolute path in link target: {member.name} -> {member.linkname}"
                        )

                safe_members.append(member)

            # Check overall compression ratio
            tracker.check_compression_ratio()

            # Extract only validated members (all members have been security-checked above)
            tar.extractall(output, members=safe_members)  # nosec B202

        log.debug(f"Extracted TAR archive to: {output}")
        return output

    except (ArchiveError, ArchiveValidationError):
        raise
    except tarfile.ReadError as e:
        raise ArchiveFormatError(f"Invalid or corrupted TAR archive: {e}") from e
    except OSError as e:
        raise ArchiveIOError(f"Failed to extract TAR archive (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to extract TAR archive: {e}") from e
list_contents
list_contents(archive: Path) -> list[str]

List contents of TAR archive.

Parameters:

Name Type Description Default
archive Path

TAR archive file path

required

Returns:

Type Description
list[str]

List of file paths in archive

Raises:

Type Description
ArchiveError

If listing fails

Source code in provide/foundation/archive/tar.py
def list_contents(self, archive: Path) -> list[str]:
    """List contents of TAR archive.

    Args:
        archive: TAR archive file path

    Returns:
        List of file paths in archive

    Raises:
        ArchiveError: If listing fails

    """
    try:
        contents = []
        with tarfile.open(archive, "r") as tar:
            for member in tar.getmembers():
                if member.isfile():
                    contents.append(member.name)
        return sorted(contents)
    except tarfile.ReadError as e:
        raise ArchiveFormatError(f"Invalid or corrupted TAR archive: {e}") from e
    except OSError as e:
        raise ArchiveIOError(f"Failed to list TAR contents (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to list TAR contents: {e}") from e
validate
validate(archive: Path) -> bool

Validate TAR archive integrity.

Parameters:

Name Type Description Default
archive Path

TAR archive file path

required

Returns:

Type Description
bool

True if archive is valid, False otherwise

Note: This method intentionally catches all exceptions and returns False. This is NOT an error suppression case - returning False on any exception is the expected validation behavior. Do NOT replace this with @resilient decorator.

Source code in provide/foundation/archive/tar.py
def validate(self, archive: Path) -> bool:
    """Validate TAR archive integrity.

    Args:
        archive: TAR archive file path

    Returns:
        True if archive is valid, False otherwise

    Note: This method intentionally catches all exceptions and returns False.
    This is NOT an error suppression case - returning False on any exception
    is the expected validation behavior. Do NOT replace this with @resilient decorator.
    """
    try:
        with tarfile.open(archive, "r") as tar:
            # Try to read all members
            for _member in tar.getmembers():
                # Just checking we can read the metadata
                pass
        return True
    except Exception:  # nosec B110
        # Broad catch is intentional for validation: any error means invalid archive.
        # Possible exceptions: tarfile.ReadError, OSError, PermissionError, etc.
        return False

XzCompressor

Bases: BaseCompressor

XZ/LZMA2 compression implementation.

Provides XZ compression and decompression using Python's stdlib lzma module. Does not handle bundling - use with TarArchive for .tar.xz files.

XZ preset range: 0-9 - 0: Fastest compression, lower ratio - 6: Default balanced setting - 9: Best compression, slower

Attributes
format_name property
format_name: str

Return the name of the compression format.

ZipArchive

Bases: BaseArchive

ZIP archive implementation.

Creates and extracts ZIP archives with optional compression. Supports adding files to existing archives.

Security Note - Password Handling

The password parameter only decrypts existing encrypted ZIP files during extraction/reading. It does NOT encrypt new files during creation with stdlib zipfile. To create encrypted ZIP archives, use a third-party library like pyzipper that supports AES encryption. The stdlib zipfile.setpassword() method only enables reading password-protected archives.

Attributes:

Name Type Description
compression_level int

ZIP compression level 0-9 (0=store/no compression, 9=best)

compression_type int

Compression type (zipfile.ZIP_DEFLATED, etc)

password bytes | None

Password for decrypting existing encrypted archives (read-only)

Functions
add_file
add_file(
    archive: Path, file: Path, arcname: str | None = None
) -> None

Add file to existing ZIP archive.

Parameters:

Name Type Description Default
archive Path

ZIP archive file path

required
file Path

File to add

required
arcname str | None

Name in archive (defaults to file name)

None

Raises:

Type Description
ArchiveError

If adding file fails

Source code in provide/foundation/archive/zip.py
def add_file(self, archive: Path, file: Path, arcname: str | None = None) -> None:
    """Add file to existing ZIP archive.

    Args:
        archive: ZIP archive file path
        file: File to add
        arcname: Name in archive (defaults to file name)

    Raises:
        ArchiveError: If adding file fails

    """
    try:
        with zipfile.ZipFile(archive, "a", compression=self.compression_type) as zf:
            if self.password:
                zf.setpassword(self.password)

            zf.write(file, arcname or file.name)

        log.debug(f"Added {file} to ZIP archive {archive}")

    except OSError as e:
        raise ArchiveIOError(f"Failed to add file to ZIP (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to add file to ZIP: {e}") from e
create
create(source: Path, output: Path) -> Path

Create ZIP archive from source.

Parameters:

Name Type Description Default
source Path

Source file or directory to archive

required
output Path

Output ZIP file path

required

Returns:

Type Description
Path

Path to created archive

Raises:

Type Description
ArchiveError

If archive creation fails

Note

Files are NOT encrypted during creation even if password is set. The stdlib zipfile module does not support creating encrypted archives. Use pyzipper or similar for AES-encrypted ZIP creation.

Source code in provide/foundation/archive/zip.py
def create(self, source: Path, output: Path) -> Path:
    """Create ZIP archive from source.

    Args:
        source: Source file or directory to archive
        output: Output ZIP file path

    Returns:
        Path to created archive

    Raises:
        ArchiveError: If archive creation fails

    Note:
        Files are NOT encrypted during creation even if password is set.
        The stdlib zipfile module does not support creating encrypted archives.
        Use pyzipper or similar for AES-encrypted ZIP creation.

    """
    try:
        ensure_parent_dir(output)

        with zipfile.ZipFile(
            output,
            "w",
            compression=self.compression_type,
            compresslevel=self.compression_level,
        ) as zf:
            if self.password:
                zf.setpassword(self.password)

            if source.is_dir():
                # Add all files in directory
                for item in sorted(source.rglob("*")):
                    if item.is_file():
                        arcname = item.relative_to(source)
                        zf.write(item, arcname)
            else:
                # Add single file
                zf.write(source, source.name)

        log.debug(f"Created ZIP archive: {output}")
        return output

    except OSError as e:
        raise ArchiveIOError(f"Failed to create ZIP archive (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to create ZIP archive: {e}") from e
extract
extract(
    archive: Path,
    output: Path,
    limits: ArchiveLimits | None = None,
) -> Path

Extract ZIP archive to output directory with decompression bomb protection.

Parameters:

Name Type Description Default
archive Path

ZIP archive file path

required
output Path

Output directory path

required
limits ArchiveLimits | None

Optional extraction limits (uses DEFAULT_LIMITS if None)

None

Returns:

Type Description
Path

Path to extraction directory

Raises:

Type Description
ArchiveError

If extraction fails, archive contains unsafe paths, or exceeds limits

Source code in provide/foundation/archive/zip.py
def extract(self, archive: Path, output: Path, limits: ArchiveLimits | None = None) -> Path:
    """Extract ZIP archive to output directory with decompression bomb protection.

    Args:
        archive: ZIP archive file path
        output: Output directory path
        limits: Optional extraction limits (uses DEFAULT_LIMITS if None)

    Returns:
        Path to extraction directory

    Raises:
        ArchiveError: If extraction fails, archive contains unsafe paths, or exceeds limits

    """
    if limits is None:
        limits = DEFAULT_LIMITS

    try:
        output.mkdir(parents=True, exist_ok=True)

        # Initialize extraction tracker
        tracker = ExtractionTracker(limits)
        tracker.set_compressed_size(get_archive_size(archive))

        with zipfile.ZipFile(archive, "r") as zf:
            if self.password:
                zf.setpassword(self.password)

            # Validate all members before extraction
            self._validate_zip_members(zf, output, tracker)

            # Check overall compression ratio
            tracker.check_compression_ratio()

            # Extract all (all members have been security-checked above)
            zf.extractall(output)

        log.debug(f"Extracted ZIP archive to: {output}")
        return output

    except (ArchiveError, ArchiveValidationError):
        raise
    except zipfile.BadZipFile as e:
        raise ArchiveFormatError(f"Invalid or corrupted ZIP archive: {e}") from e
    except OSError as e:
        raise ArchiveIOError(f"Failed to extract ZIP archive (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to extract ZIP archive: {e}") from e
extract_file
extract_file(
    archive: Path, member: str, output: Path
) -> Path

Extract single file from ZIP archive.

Parameters:

Name Type Description Default
archive Path

ZIP archive file path

required
member str

Name of file in archive

required
output Path

Output directory or file path

required

Returns:

Type Description
Path

Path to extracted file

Raises:

Type Description
ArchiveError

If extraction fails or member path is unsafe

Source code in provide/foundation/archive/zip.py
def extract_file(self, archive: Path, member: str, output: Path) -> Path:
    """Extract single file from ZIP archive.

    Args:
        archive: ZIP archive file path
        member: Name of file in archive
        output: Output directory or file path

    Returns:
        Path to extracted file

    Raises:
        ArchiveError: If extraction fails or member path is unsafe

    """
    try:
        with zipfile.ZipFile(archive, "r") as zf:
            if self.password:
                zf.setpassword(self.password)

            # Enhanced security check
            extract_base = output if output.is_dir() else output.parent
            self._validate_member_path(extract_base, member)

            # Check for symlinks
            info = zf.getinfo(member)
            if info.external_attr:
                self._validate_symlink_if_present(zf, extract_base, info)

            if output.is_dir():
                zf.extract(member, output)
                return output / member
            ensure_parent_dir(output)
            with zf.open(member) as source, output.open("wb") as target:
                target.write(source.read())
            return output

    except (ArchiveError, ArchiveValidationError):
        raise
    except zipfile.BadZipFile as e:
        raise ArchiveFormatError(f"Invalid or corrupted ZIP archive: {e}") from e
    except OSError as e:
        raise ArchiveIOError(f"Failed to extract file from ZIP (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to extract file from ZIP: {e}") from e
list_contents
list_contents(archive: Path) -> list[str]

List contents of ZIP archive.

Parameters:

Name Type Description Default
archive Path

ZIP archive file path

required

Returns:

Type Description
list[str]

List of file paths in archive

Raises:

Type Description
ArchiveError

If listing fails

Source code in provide/foundation/archive/zip.py
def list_contents(self, archive: Path) -> list[str]:
    """List contents of ZIP archive.

    Args:
        archive: ZIP archive file path

    Returns:
        List of file paths in archive

    Raises:
        ArchiveError: If listing fails

    """
    try:
        with zipfile.ZipFile(archive, "r") as zf:
            return sorted(zf.namelist())
    except zipfile.BadZipFile as e:
        raise ArchiveFormatError(f"Invalid or corrupted ZIP archive: {e}") from e
    except OSError as e:
        raise ArchiveIOError(f"Failed to list ZIP contents (I/O error): {e}") from e
    except Exception as e:
        raise ArchiveError(f"Failed to list ZIP contents: {e}") from e
validate
validate(archive: Path) -> bool

Validate ZIP archive integrity.

Parameters:

Name Type Description Default
archive Path

ZIP archive file path

required

Returns:

Type Description
bool

True if archive is valid, False otherwise

Note: This method intentionally catches all exceptions and returns False. This is NOT an error suppression case - returning False on any exception is the expected validation behavior. Do NOT replace this with @resilient decorator.

Source code in provide/foundation/archive/zip.py
def validate(self, archive: Path) -> bool:
    """Validate ZIP archive integrity.

    Args:
        archive: ZIP archive file path

    Returns:
        True if archive is valid, False otherwise

    Note: This method intentionally catches all exceptions and returns False.
    This is NOT an error suppression case - returning False on any exception
    is the expected validation behavior. Do NOT replace this with @resilient decorator.
    """
    try:
        with zipfile.ZipFile(archive, "r") as zf:
            # Test the archive
            result = zf.testzip()
            return result is None  # None means no bad files
    except Exception:  # nosec B110
        # Broad catch is intentional for validation: any error means invalid archive.
        # Possible exceptions: zipfile.BadZipFile, OSError, PermissionError, etc.
        return False

ZstdCompressor

Bases: BaseCompressor

Zstandard compression implementation.

Provides ZSTD compression and decompression using the zstandard package. Does not handle bundling - use with TarArchive for .tar.zst files.

ZSTD level range: 1-22 - 1: Fastest compression, lower ratio - 3: Default balanced setting - 22: Best compression, much slower

Requires the 'zstandard' package to be installed.

Install with: pip install provide-foundation[compression]

Attributes
format_name property
format_name: str

Return the name of the compression format.

Functions

deterministic_filter

deterministic_filter(tarinfo: TarInfo) -> tarfile.TarInfo

Tarfile filter for deterministic/reproducible archives.

Resets user/group info and modification time to ensure consistent output for reproducible builds.

Parameters:

Name Type Description Default
tarinfo TarInfo

TarInfo object to modify

required

Returns:

Type Description
TarInfo

Modified TarInfo object with deterministic metadata

Examples:

>>> import tarfile
>>> with tarfile.open("archive.tar", "w") as tar:
...     tar.add("myfile.txt", filter=deterministic_filter)
Notes

This filter sets: - uid/gid to 0 (root) - uname/gname to empty strings - mtime to 0 (1970-01-01)

This ensures archives are byte-for-byte identical when created from the same source, regardless of filesystem timestamps or ownership.

Source code in provide/foundation/archive/tar.py
def deterministic_filter(tarinfo: tarfile.TarInfo) -> tarfile.TarInfo:
    """Tarfile filter for deterministic/reproducible archives.

    Resets user/group info and modification time to ensure consistent
    output for reproducible builds.

    Args:
        tarinfo: TarInfo object to modify

    Returns:
        Modified TarInfo object with deterministic metadata

    Examples:
        >>> import tarfile
        >>> with tarfile.open("archive.tar", "w") as tar:
        ...     tar.add("myfile.txt", filter=deterministic_filter)

    Notes:
        This filter sets:
        - uid/gid to 0 (root)
        - uname/gname to empty strings
        - mtime to 0 (1970-01-01)

        This ensures archives are byte-for-byte identical when created
        from the same source, regardless of filesystem timestamps or
        ownership.
    """
    # Reset user/group info
    tarinfo.uid = 0
    tarinfo.gid = 0
    tarinfo.uname = ""
    tarinfo.gname = ""

    # Reset modification time
    tarinfo.mtime = 0

    return tarinfo

get_archive_size

get_archive_size(archive_path: Path) -> int

Get the size of an archive file.

Parameters:

Name Type Description Default
archive_path Path

Path to archive file

required

Returns:

Type Description
int

Size in bytes

Source code in provide/foundation/archive/limits.py
def get_archive_size(archive_path: Path) -> int:
    """Get the size of an archive file.

    Args:
        archive_path: Path to archive file

    Returns:
        Size in bytes

    """
    return archive_path.stat().st_size

get_operation_from_string

get_operation_from_string(
    op_string: str,
) -> ArchiveOperation

Get operation enum from string (supports extraction aliases).

Parameters:

Name Type Description Default
op_string str

Operation string (e.g., "tar", "untar", "gzip", "gunzip")

required

Returns:

Type Description
ArchiveOperation

ArchiveOperation enum value

Raises:

Type Description
ValueError

If operation string is invalid

Example

get_operation_from_string("tar") get_operation_from_string("untar") # Same as "tar"

Source code in provide/foundation/archive/types.py
def get_operation_from_string(op_string: str) -> ArchiveOperation:
    """Get operation enum from string (supports extraction aliases).

    Args:
        op_string: Operation string (e.g., "tar", "untar", "gzip", "gunzip")

    Returns:
        ArchiveOperation enum value

    Raises:
        ValueError: If operation string is invalid

    Example:
        >>> get_operation_from_string("tar")
        <ArchiveOperation.TAR: 1>
        >>> get_operation_from_string("untar")  # Same as "tar"
        <ArchiveOperation.TAR: 1>

    """
    op_lower = op_string.lower()
    if op_lower not in OPERATION_NAMES:
        raise ValueError(f"Unknown archive operation: {op_string}")
    return OPERATION_NAMES[op_lower]