Coverage for src/drive_parallel_download.py: 100%

77 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-12 17:18 +0000

1"""Parallel download utilities. 

2 

3This module provides parallel download coordination for iCloud Drive sync, 

4separating parallel execution logic from sync operations per SRP. 

5""" 

6 

7__author__ = "Mandar Patil (mandarons@pm.me)" 

8 

9import os 

10import unicodedata 

11from concurrent.futures import ThreadPoolExecutor, as_completed 

12from pathlib import Path 

13from threading import Lock 

14from typing import Any 

15from urllib.parse import unquote 

16 

17from src import configure_icloudpy_logging, get_logger 

18from src.drive_file_download import download_file 

19from src.drive_file_existence import file_exists, is_package, package_exists 

20from src.drive_filtering import wanted_file 

21 

22# Configure icloudpy logging immediately after import 

23configure_icloudpy_logging() 

24 

25LOGGER = get_logger() 

26 

27# Thread-safe lock for file set operations 

28files_lock = Lock() 

29 

30 

31def collect_file_for_download( 

32 item: Any, 

33 destination_path: str, 

34 filters: list[str] | None, 

35 ignore: list[str] | None, 

36 files: set[str], 

37) -> dict[str, Any] | None: 

38 """Collect file information for parallel download without immediately downloading. 

39 

40 Args: 

41 item: iCloud file item 

42 destination_path: Local destination directory 

43 filters: File extension filters 

44 ignore: Ignore patterns 

45 files: Set to track processed files (thread-safe updates) 

46 

47 Returns: 

48 Download task info dict, or None if file should be skipped 

49 """ 

50 if not (item and destination_path and files is not None): 

51 return None 

52 

53 # Decode URL-encoded filename from iCloud API 

54 # This handles special characters like %CC%88 (combining diacritical marks) 

55 decoded_name = unquote(item.name) 

56 local_file = os.path.join(destination_path, decoded_name) 

57 local_file = unicodedata.normalize("NFC", local_file) 

58 

59 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file): 

60 return None 

61 

62 # Thread-safe file set update 

63 with files_lock: 

64 files.add(local_file) 

65 

66 item_is_package = is_package(item=item) 

67 if item_is_package: 

68 if package_exists(item=item, local_package_path=local_file): 

69 with files_lock: 

70 for f in Path(local_file).glob("**/*"): 

71 files.add(str(f)) 

72 return None 

73 elif file_exists(item=item, local_file=local_file): 

74 return None 

75 

76 # Return download task info 

77 return { 

78 "item": item, 

79 "local_file": local_file, 

80 "is_package": item_is_package, 

81 "files": files, 

82 } 

83 

84 

85def download_file_task(download_info: dict[str, Any]) -> bool: 

86 """Download a single file as part of parallel execution. 

87 

88 Args: 

89 download_info: Dictionary containing download task information 

90 

91 Returns: 

92 True if download succeeded, False otherwise 

93 """ 

94 item = download_info["item"] 

95 local_file = download_info["local_file"] 

96 is_package = download_info["is_package"] 

97 files = download_info["files"] 

98 

99 LOGGER.debug(f"[Thread] Starting download of {local_file}") 

100 

101 try: 

102 downloaded_file = download_file(item=item, local_file=local_file) 

103 if not downloaded_file: 

104 return False 

105 

106 if is_package: 

107 with files_lock: 

108 for f in Path(downloaded_file).glob("**/*"): 

109 f = str(f) 

110 f_normalized = unicodedata.normalize("NFD", f) 

111 if os.path.exists(f): 

112 os.rename(f, f_normalized) 

113 files.add(f_normalized) 

114 

115 LOGGER.debug(f"[Thread] Completed download of {local_file}") 

116 return True 

117 except Exception as e: 

118 LOGGER.error(f"[Thread] Failed to download {local_file}: {e!s}") 

119 return False 

120 

121 

122def execute_parallel_downloads(download_tasks: list[dict[str, Any]], max_threads: int) -> tuple[int, int]: 

123 """Execute multiple file downloads in parallel. 

124 

125 Args: 

126 download_tasks: List of download task dictionaries 

127 max_threads: Maximum number of concurrent threads 

128 

129 Returns: 

130 Tuple of (successful_downloads, failed_downloads) counts 

131 """ 

132 if not download_tasks: 

133 return 0, 0 

134 

135 LOGGER.info(f"Starting parallel downloads with {max_threads} threads for {len(download_tasks)} files...") 

136 

137 successful_downloads = 0 

138 failed_downloads = 0 

139 

140 with ThreadPoolExecutor(max_workers=max_threads) as executor: 

141 # Submit all download tasks 

142 future_to_task = {executor.submit(download_file_task, task): task for task in download_tasks} 

143 

144 # Process completed downloads 

145 for future in as_completed(future_to_task): 

146 try: 

147 result = future.result() 

148 if result: 

149 successful_downloads += 1 

150 else: 

151 failed_downloads += 1 

152 except Exception as e: # noqa: PERF203 

153 LOGGER.error(f"Download task failed with exception: {e!s}") 

154 failed_downloads += 1 

155 

156 LOGGER.info(f"Parallel downloads completed: {successful_downloads} successful, {failed_downloads} failed") 

157 return successful_downloads, failed_downloads