Coverage for src/drive_package_processing.py: 100%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 04:41 +0000

1"""Package processing utilities. 

2 

3This module provides package extraction and processing functionality, 

4separating archive handling logic from sync operations per SRP. 

5""" 

6 

7__author__ = "Mandar Patil (mandarons@pm.me)" 

8 

9import gzip 

10import os 

11import unicodedata 

12import zipfile 

13from shutil import copyfileobj 

14 

15import magic 

16 

17from src import configure_icloudpy_logging, get_logger 

18 

19# Configure icloudpy logging immediately after import 

20configure_icloudpy_logging() 

21 

22LOGGER = get_logger() 

23 

24 

25def process_package(local_file: str) -> str | None: 

26 """Process and extract a downloaded package file. 

27 

28 This function handles different archive types (ZIP, gzip) and extracts them 

29 to the appropriate location. It also handles Unicode normalization for 

30 cross-platform compatibility. 

31 

32 Args: 

33 local_file: Path to the downloaded package file 

34 

35 Returns: 

36 Path to the processed file/directory, or False if processing failed 

37 """ 

38 archive_file = local_file 

39 magic_object = magic.Magic(mime=True) 

40 file_mime_type = magic_object.from_file(filename=local_file) 

41 

42 if file_mime_type == "application/zip": 

43 return _process_zip_package(local_file, archive_file) 

44 elif file_mime_type == "application/gzip": 

45 return _process_gzip_package(local_file, archive_file) 

46 else: 

47 LOGGER.error( 

48 f"Unhandled file type - cannot unpack the package {file_mime_type}.", 

49 ) 

50 return None 

51 

52 

53def _process_zip_package(local_file: str, archive_file: str) -> str: 

54 """Process a ZIP package file. 

55 

56 Args: 

57 local_file: Original file path 

58 archive_file: Archive file path 

59 

60 Returns: 

61 Path to the processed file 

62 """ 

63 archive_file += ".zip" 

64 os.rename(local_file, archive_file) 

65 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(archive_file)}") 

66 zipfile.ZipFile(archive_file).extractall(path=os.path.dirname(archive_file)) 

67 

68 # Handle Unicode normalization for cross-platform compatibility 

69 normalized_path = unicodedata.normalize("NFD", local_file) 

70 if normalized_path != local_file: 

71 os.rename(local_file, normalized_path) 

72 local_file = normalized_path 

73 

74 os.remove(archive_file) 

75 LOGGER.info(f"Successfully unpacked the package {archive_file}.") 

76 return local_file 

77 

78 

79def _process_gzip_package(local_file: str, archive_file: str) -> str | None: 

80 """Process a gzip package file. 

81 

82 Args: 

83 local_file: Original file path 

84 archive_file: Archive file path 

85 

86 Returns: 

87 Path to the processed file, or None if processing failed 

88 """ 

89 archive_file += ".gz" 

90 os.rename(local_file, archive_file) 

91 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(local_file)}") 

92 

93 with gzip.GzipFile(filename=archive_file, mode="rb") as gz_file: 

94 with open(file=local_file, mode="wb") as package_file: 

95 copyfileobj(gz_file, package_file) 

96 

97 os.remove(archive_file) 

98 

99 # Recursively process the extracted file (might be another archive) 

100 return process_package(local_file=local_file)