Coverage for src/drive_parallel_download.py: 100%

80 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-06 02:49 +0000

1"""Parallel download utilities. 

2 

3This module provides parallel download coordination for iCloud Drive sync, 

4separating parallel execution logic from sync operations per SRP. 

5""" 

6 

7__author__ = "Mandar Patil (mandarons@pm.me)" 

8 

9import os 

10import unicodedata 

11from concurrent.futures import ThreadPoolExecutor, as_completed 

12from pathlib import Path 

13from threading import Lock 

14from typing import Any 

15from urllib.parse import unquote 

16 

17from src import config_parser, configure_icloudpy_logging, get_logger 

18from src.drive_file_download import download_file 

19from src.drive_file_existence import file_exists, is_package, package_exists 

20from src.drive_filtering import wanted_file 

21 

22# Configure icloudpy logging immediately after import 

23configure_icloudpy_logging() 

24 

25LOGGER = get_logger() 

26 

27# Thread-safe lock for file set operations 

28files_lock = Lock() 

29 

30 

31def collect_file_for_download( 

32 item: Any, 

33 destination_path: str, 

34 filters: list[str] | None, 

35 ignore: list[str] | None, 

36 files: set[str], 

37 config: dict | None = None, 

38) -> dict[str, Any] | None: 

39 """Collect file information for parallel download without immediately downloading. 

40 

41 Args: 

42 item: iCloud file item 

43 destination_path: Local destination directory 

44 filters: File extension filters 

45 ignore: Ignore patterns 

46 files: Set to track processed files (thread-safe updates) 

47 config: Configuration dictionary (used to resolve request timeout) 

48 

49 Returns: 

50 Download task info dict, or None if file should be skipped 

51 """ 

52 if not (item and destination_path and files is not None): 

53 return None 

54 

55 # Decode URL-encoded filename from iCloud API 

56 # This handles special characters like %CC%88 (combining diacritical marks) 

57 decoded_name = unquote(item.name) 

58 local_file = os.path.join(destination_path, decoded_name) 

59 local_file = unicodedata.normalize("NFC", local_file) 

60 

61 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file): 

62 return None 

63 

64 # Thread-safe file set update 

65 with files_lock: 

66 files.add(local_file) 

67 

68 # Check local existence FIRST to avoid unnecessary network requests. 

69 # is_package() makes an HTTP call for every file, which is very slow 

70 # when syncing thousands of already-up-to-date files. 

71 if os.path.isfile(local_file): 

72 # It's a regular file locally — check if it's up-to-date (no network call) 

73 if file_exists(item=item, local_file=local_file): 

74 return None 

75 # File is outdated; still need to determine its type for re-download 

76 elif os.path.isdir(local_file): 

77 # A directory at this path means the item was previously downloaded as a 

78 # package. iCloud Drive items do not change type between file and package, 

79 # so package_exists() is the correct check here (no is_package() needed). 

80 # Note: package_exists() deletes the directory if it is outdated. 

81 if package_exists(item=item, local_package_path=local_file): 

82 with files_lock: 

83 for f in Path(local_file).glob("**/*"): 

84 files.add(str(f)) 

85 return None 

86 # Directory was deleted by package_exists(); schedule re-download as a package 

87 return { 

88 "item": item, 

89 "local_file": local_file, 

90 "is_package": True, 

91 "files": files, 

92 } 

93 

94 # File/directory doesn't exist locally (or was an outdated regular file that needs 

95 # re-download) — make the network call to determine the item type 

96 timeout = config_parser.get_drive_request_timeout(config) 

97 item_is_package = is_package(item=item, timeout=timeout) 

98 

99 # Return download task info 

100 return { 

101 "item": item, 

102 "local_file": local_file, 

103 "is_package": item_is_package, 

104 "files": files, 

105 } 

106 

107 

108def download_file_task(download_info: dict[str, Any]) -> bool: 

109 """Download a single file as part of parallel execution. 

110 

111 Args: 

112 download_info: Dictionary containing download task information 

113 

114 Returns: 

115 True if download succeeded, False otherwise 

116 """ 

117 item = download_info["item"] 

118 local_file = download_info["local_file"] 

119 is_package = download_info["is_package"] 

120 files = download_info["files"] 

121 

122 LOGGER.debug(f"[Thread] Starting download of {local_file}") 

123 

124 try: 

125 downloaded_file = download_file(item=item, local_file=local_file) 

126 if not downloaded_file: 

127 return False 

128 

129 if is_package: 

130 with files_lock: 

131 for f in Path(downloaded_file).glob("**/*"): 

132 f = str(f) 

133 f_normalized = unicodedata.normalize("NFD", f) 

134 if os.path.exists(f): 

135 os.rename(f, f_normalized) 

136 files.add(f_normalized) 

137 

138 LOGGER.debug(f"[Thread] Completed download of {local_file}") 

139 return True 

140 except Exception as e: 

141 LOGGER.error(f"[Thread] Failed to download {local_file}: {e!s}") 

142 return False 

143 

144 

145def execute_parallel_downloads(download_tasks: list[dict[str, Any]], max_threads: int) -> tuple[int, int]: 

146 """Execute multiple file downloads in parallel. 

147 

148 Args: 

149 download_tasks: List of download task dictionaries 

150 max_threads: Maximum number of concurrent threads 

151 

152 Returns: 

153 Tuple of (successful_downloads, failed_downloads) counts 

154 """ 

155 if not download_tasks: 

156 return 0, 0 

157 

158 LOGGER.info(f"Starting parallel downloads with {max_threads} threads for {len(download_tasks)} files...") 

159 

160 successful_downloads = 0 

161 failed_downloads = 0 

162 

163 with ThreadPoolExecutor(max_workers=max_threads) as executor: 

164 # Submit all download tasks 

165 future_to_task = {executor.submit(download_file_task, task): task for task in download_tasks} 

166 

167 # Process completed downloads 

168 for future in as_completed(future_to_task): 

169 try: 

170 result = future.result() 

171 if result: 

172 successful_downloads += 1 

173 else: 

174 failed_downloads += 1 

175 except Exception as e: # noqa: PERF203 

176 LOGGER.error(f"Download task failed with exception: {e!s}") 

177 failed_downloads += 1 

178 

179 LOGGER.info(f"Parallel downloads completed: {successful_downloads} successful, {failed_downloads} failed") 

180 return successful_downloads, failed_downloads