Coverage for src/drive_package_processing.py: 100%
41 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
1"""Package processing utilities.
3This module provides package extraction and processing functionality,
4separating archive handling logic from sync operations per SRP.
5"""
7__author__ = "Mandar Patil (mandarons@pm.me)"
9import gzip
10import os
11import unicodedata
12import zipfile
13from shutil import copyfileobj
15import magic
17from src import configure_icloudpy_logging, get_logger
19# Configure icloudpy logging immediately after import
20configure_icloudpy_logging()
22LOGGER = get_logger()
25def process_package(local_file: str) -> str | None:
26 """Process and extract a downloaded package file.
28 This function handles different archive types (ZIP, gzip) and extracts them
29 to the appropriate location. It also handles Unicode normalization for
30 cross-platform compatibility.
32 Args:
33 local_file: Path to the downloaded package file
35 Returns:
36 Path to the processed file/directory, or False if processing failed
37 """
38 archive_file = local_file
39 magic_object = magic.Magic(mime=True)
40 file_mime_type = magic_object.from_file(filename=local_file)
42 if file_mime_type == "application/zip":
43 return _process_zip_package(local_file, archive_file)
44 elif file_mime_type == "application/gzip":
45 return _process_gzip_package(local_file, archive_file)
46 else:
47 LOGGER.error(
48 f"Unhandled file type - cannot unpack the package {file_mime_type}.",
49 )
50 return None
53def _process_zip_package(local_file: str, archive_file: str) -> str:
54 """Process a ZIP package file.
56 Args:
57 local_file: Original file path
58 archive_file: Archive file path
60 Returns:
61 Path to the processed file
62 """
63 archive_file += ".zip"
64 os.rename(local_file, archive_file)
65 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(archive_file)}")
66 zipfile.ZipFile(archive_file).extractall(path=os.path.dirname(archive_file))
68 # Handle Unicode normalization for cross-platform compatibility
69 normalized_path = unicodedata.normalize("NFD", local_file)
70 if normalized_path != local_file:
71 os.rename(local_file, normalized_path)
72 local_file = normalized_path
74 os.remove(archive_file)
75 LOGGER.info(f"Successfully unpacked the package {archive_file}.")
76 return local_file
79def _process_gzip_package(local_file: str, archive_file: str) -> str | None:
80 """Process a gzip package file.
82 Args:
83 local_file: Original file path
84 archive_file: Archive file path
86 Returns:
87 Path to the processed file, or None if processing failed
88 """
89 archive_file += ".gz"
90 os.rename(local_file, archive_file)
91 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(local_file)}")
93 with gzip.GzipFile(filename=archive_file, mode="rb") as gz_file:
94 with open(file=local_file, mode="wb") as package_file:
95 copyfileobj(gz_file, package_file)
97 os.remove(archive_file)
99 # Recursively process the extracted file (might be another archive)
100 return process_package(local_file=local_file)