Coverage for src/drive_parallel_download.py: 100%

75 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 04:41 +0000

1"""Parallel download utilities. 

2 

3This module provides parallel download coordination for iCloud Drive sync, 

4separating parallel execution logic from sync operations per SRP. 

5""" 

6 

7__author__ = "Mandar Patil (mandarons@pm.me)" 

8 

9import os 

10import unicodedata 

11from concurrent.futures import ThreadPoolExecutor, as_completed 

12from pathlib import Path 

13from threading import Lock 

14from typing import Any 

15 

16from src import configure_icloudpy_logging, get_logger 

17from src.drive_file_download import download_file 

18from src.drive_file_existence import file_exists, is_package, package_exists 

19from src.drive_filtering import wanted_file 

20 

21# Configure icloudpy logging immediately after import 

22configure_icloudpy_logging() 

23 

24LOGGER = get_logger() 

25 

26# Thread-safe lock for file set operations 

27files_lock = Lock() 

28 

29 

30def collect_file_for_download( 

31 item: Any, 

32 destination_path: str, 

33 filters: list[str] | None, 

34 ignore: list[str] | None, 

35 files: set[str], 

36) -> dict[str, Any] | None: 

37 """Collect file information for parallel download without immediately downloading. 

38 

39 Args: 

40 item: iCloud file item 

41 destination_path: Local destination directory 

42 filters: File extension filters 

43 ignore: Ignore patterns 

44 files: Set to track processed files (thread-safe updates) 

45 

46 Returns: 

47 Download task info dict, or None if file should be skipped 

48 """ 

49 if not (item and destination_path and files is not None): 

50 return None 

51 

52 local_file = os.path.join(destination_path, item.name) 

53 local_file = unicodedata.normalize("NFC", local_file) 

54 

55 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file): 

56 return None 

57 

58 # Thread-safe file set update 

59 with files_lock: 

60 files.add(local_file) 

61 

62 item_is_package = is_package(item=item) 

63 if item_is_package: 

64 if package_exists(item=item, local_package_path=local_file): 

65 with files_lock: 

66 for f in Path(local_file).glob("**/*"): 

67 files.add(str(f)) 

68 return None 

69 elif file_exists(item=item, local_file=local_file): 

70 return None 

71 

72 # Return download task info 

73 return { 

74 "item": item, 

75 "local_file": local_file, 

76 "is_package": item_is_package, 

77 "files": files, 

78 } 

79 

80 

81def download_file_task(download_info: dict[str, Any]) -> bool: 

82 """Download a single file as part of parallel execution. 

83 

84 Args: 

85 download_info: Dictionary containing download task information 

86 

87 Returns: 

88 True if download succeeded, False otherwise 

89 """ 

90 item = download_info["item"] 

91 local_file = download_info["local_file"] 

92 is_package = download_info["is_package"] 

93 files = download_info["files"] 

94 

95 LOGGER.debug(f"[Thread] Starting download of {local_file}") 

96 

97 try: 

98 downloaded_file = download_file(item=item, local_file=local_file) 

99 if not downloaded_file: 

100 return False 

101 

102 if is_package: 

103 with files_lock: 

104 for f in Path(downloaded_file).glob("**/*"): 

105 f = str(f) 

106 f_normalized = unicodedata.normalize("NFD", f) 

107 if os.path.exists(f): 

108 os.rename(f, f_normalized) 

109 files.add(f_normalized) 

110 

111 LOGGER.debug(f"[Thread] Completed download of {local_file}") 

112 return True 

113 except Exception as e: 

114 LOGGER.error(f"[Thread] Failed to download {local_file}: {e!s}") 

115 return False 

116 

117 

118def execute_parallel_downloads(download_tasks: list[dict[str, Any]], max_threads: int) -> tuple[int, int]: 

119 """Execute multiple file downloads in parallel. 

120 

121 Args: 

122 download_tasks: List of download task dictionaries 

123 max_threads: Maximum number of concurrent threads 

124 

125 Returns: 

126 Tuple of (successful_downloads, failed_downloads) counts 

127 """ 

128 if not download_tasks: 

129 return 0, 0 

130 

131 LOGGER.info(f"Starting parallel downloads with {max_threads} threads for {len(download_tasks)} files...") 

132 

133 successful_downloads = 0 

134 failed_downloads = 0 

135 

136 with ThreadPoolExecutor(max_workers=max_threads) as executor: 

137 # Submit all download tasks 

138 future_to_task = {executor.submit(download_file_task, task): task for task in download_tasks} 

139 

140 # Process completed downloads 

141 for future in as_completed(future_to_task): 

142 try: 

143 result = future.result() 

144 if result: 

145 successful_downloads += 1 

146 else: 

147 failed_downloads += 1 

148 except Exception as e: # noqa: PERF203 

149 LOGGER.error(f"Download task failed with exception: {e!s}") 

150 failed_downloads += 1 

151 

152 LOGGER.info(f"Parallel downloads completed: {successful_downloads} successful, {failed_downloads} failed") 

153 return successful_downloads, failed_downloads