Source code for dmf.io.compress


from pathlib import Path
from typing import Optional, Union, Callable

from ..utils.decorators import register

COMPRESSORS = {}

[docs] def compress( input_file: Union[str, Path], compression: Optional[str] = None, output_file: Optional[Union[str, Path]] = None, password: Optional[str] = None, **kwargs, ) -> Path: """ Compress a file or directory into a specified format. This function compresses a file or directory using various compression formats such as gzip, bzip2, xz, zip, 7z, and tar-based formats. The format can either be specified directly or inferred from the output file extension. Parameters ---------- input_file : Union[str, Path] The path to the input file or directory to be compressed. compression : Optional[str], default=None The compression format to use. If not provided, it will be inferred from the output file extension or defaults to "zip". output_file : Optional[Union[str, Path]], default=None The path for the output compressed file. If not provided, it will be derived from the input file path by appending the appropriate file extension. password : Optional[str], default=None Password for the archive, supported only for ZIP and 7z formats. kwargs : dict Additional keyword arguments to pass to the compression function. Returns ------- Path The path to the compressed output file. Raises ------ FileExistsError If the input file does not exist. ValueError If both `compression` and `output_file` are provided, or if the compression format is unsupported. NotImplementedError If password protection is specified for a format that does not support it. Notes ----- Either `compression` or `output_file` must be provided, but not both. If neither is provided, the default compression format "zip" is used. - If `output_file` is provided without a specified `compression`, the format will be inferred from the file extension. - If `compression` is provided without an `output_file`, the output file path is derived by appending the appropriate file extension to the `input_file` path. - Password protection is only supported for ZIP and 7z formats. Examples -------- Compressing a directory into a zip file .. code-block:: python compress("my_folder", compression="zip") Compressing a file into a gzip file .. code-block:: python compress("data.txt", output_file="data.txt.gz") Compressing a directory with a specified output file and inferred format .. code-block:: python compress("my_folder", output_file="my_folder.tar.gz") Compressing a directory into a password-protected file .. code-block:: python compress("my_folder", compression="7z", password="mypassword") """ input_file = Path(input_file) if not input_file.exists(): raise FileExistsError(f"Input file {input_file} does not exist.") # Check that compression or output_file is provided. Only one. if compression and output_file: raise ValueError("Only one of compression or output_file must be provided.") if output_file: output_file = Path(output_file) output_file.parent.mkdir(parents=True, exist_ok=True) compression = next( (ext for ext in COMPRESSORS if output_file.name.endswith(f".{ext}")), None, ) if not output_file: compression = compression or "zip" compression = compression.lower().lstrip(".") output_file = input_file.with_suffix(input_file.suffix + f".{compression}") compressor_func = COMPRESSORS.get(compression) if not compressor_func: raise ValueError( f"Compression format {compression} is not supported. " f"Use one of {list(COMPRESSORS.keys())}." ) compressor_func(input_file, output_file, password=password, **kwargs) return output_file
@register(COMPRESSORS, ["gz", "gzip"]) def compress_gzip( input_file: Path, output_file: Path, password: Optional[str] = None, **kwargs ) -> None: """Compress a file using gzip.""" import gzip _check_no_folder(input_file) _check_password_none(password) _generic_compressor(input_file, output_file, gzip.open, **kwargs) @register(COMPRESSORS, ["bz2", "bzip2"]) def compress_bzip2( input_file: Path, output_file: Path, password: Optional[str] = None, **kwargs ) -> None: """Compress a file using bzip2.""" import bz2 _check_no_folder(input_file) _check_password_none(password) _generic_compressor(input_file, output_file, bz2.open, **kwargs) @register(COMPRESSORS, ["xz"]) def compress_xz( input_file: Path, output_file: Path, password: Optional[str] = None, **kwargs ) -> None: """Compress a file using xz.""" import lzma _check_no_folder(input_file) _check_password_none(password) _generic_compressor(input_file, output_file, lzma.open, **kwargs) @register(COMPRESSORS, ["zip"]) def compress_zip( input_file: Path, output_file: Path, password: Optional[str] = None, **kwargs ) -> None: """Compress a file or directory using zip.""" import os import zipfile # To support password protection we would need to use library like pyzipper _check_password_none(password) with zipfile.ZipFile(output_file, "w", zipfile.ZIP_DEFLATED, **kwargs) as zipf: if input_file.is_dir(): root_folder_name = input_file.name # Get the name of the directory for root, _, files in os.walk(input_file): for file in files: # Add the root folder name to the path within the zip zipf.write( Path(root) / file, Path(root_folder_name) / Path(root).relative_to(input_file) / file ) else: zipf.write(input_file, input_file.name) @register(COMPRESSORS, ["7z"]) def compress_7z( input_file: Path, output_file: Path, password: Optional[str] = None, **kwargs ) -> None: """Compress a file or directory using 7z.""" try: import py7zr except ImportError: raise ImportError( "py7zr package is required for 7z compression. " "Install it using `pip install py7zr`." ) with py7zr.SevenZipFile(output_file, "w", password=password, **kwargs) as archive: if input_file.is_dir(): archive.writeall(input_file, arcname=input_file.name) else: archive.write(input_file, arcname=input_file.name) @register(COMPRESSORS, ["tar", "tgz", "tar.gz", "tar.bz2", "tar.xz"]) def compress_tar( input_file: Path, output_file: Path, password: Optional[str] = None, **kwargs ) -> None: """Compress a file or directory using tar.""" import tarfile _check_password_none(password) mode_mapping = { "tgz": "w:gz", "tar.gz": "w:gz", "tar.bz2": "w:bz2", "tar.xz": "w:xz", "tar": "w", } # Compression is first match that ends with the input file suffix compression = next( (comp for comp in mode_mapping if input_file.suffix.endswith(comp)), "tar" ) if input_file.is_dir() or input_file.is_file(): with tarfile.open(output_file, mode_mapping[compression], **kwargs) as tar: tar.add(input_file, arcname=input_file.name) else: raise ValueError("Invalid input for tar compression.") def _generic_compressor( input_file: Path, output_file: Path, compressor: Callable, **kwargs, ) -> None: """Generic compressor function.""" import shutil with open(input_file, "rb") as f_in, compressor( output_file, "wb", **kwargs ) as f_out: shutil.copyfileobj(f_in, f_out) def _check_password_none(password: Optional[str]) -> None: """Check if password is None.""" if password: raise NotImplementedError( "Password protection is not supported for the compression format. " "Supported only for ZIP and 7z formats." ) def _check_no_folder(input_file: Path) -> None: """Check if the input file is a folder.""" if input_file.is_dir(): raise ValueError( "Compressing folders is not supported. " "Use tar format instead, zip or 7z for exampel." )