Coverage for src/photo_download_manager.py: 100%

89 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 04:41 +0000

1"""Photo download task management module. 

2 

3This module contains utilities for managing photo download tasks 

4and parallel execution during photo synchronization. 

5""" 

6 

7___author___ = "Mandar Patil <mandarons@pm.me>" 

8 

9import os 

10from concurrent.futures import ThreadPoolExecutor, as_completed 

11from threading import Lock 

12 

13from src import config_parser, get_logger 

14from src.hardlink_registry import HardlinkRegistry 

15from src.photo_file_utils import create_hardlink, download_photo_from_server 

16from src.photo_path_utils import ( 

17 create_folder_path_if_needed, 

18 generate_photo_filename_with_metadata, 

19 normalize_file_path, 

20 rename_legacy_file_if_exists, 

21) 

22 

23LOGGER = get_logger() 

24 

25# Thread-safe lock for file set operations 

26files_lock = Lock() 

27 

28 

29class DownloadTaskInfo: 

30 """Information about a photo download task.""" 

31 

32 def __init__(self, photo, file_size: str, photo_path: str, 

33 hardlink_source: str | None = None, 

34 hardlink_registry: HardlinkRegistry | None = None): 

35 """Initialize download task info. 

36 

37 Args: 

38 photo: Photo object from iCloudPy 

39 file_size: File size variant (original, medium, thumb, etc.) 

40 photo_path: Target path for photo download 

41 hardlink_source: Path to existing file for hardlink creation 

42 hardlink_registry: Registry for tracking downloaded files 

43 """ 

44 self.photo = photo 

45 self.file_size = file_size 

46 self.photo_path = photo_path 

47 self.hardlink_source = hardlink_source 

48 self.hardlink_registry = hardlink_registry 

49 

50 

51def get_max_threads_for_download(config) -> int: 

52 """Get maximum number of threads for parallel downloads. 

53 

54 Args: 

55 config: Configuration dictionary 

56 

57 Returns: 

58 Maximum number of threads to use for downloads 

59 """ 

60 return config_parser.get_app_max_threads(config) 

61 

62 

63def generate_photo_path(photo, file_size: str, destination_path: str, 

64 folder_format: str | None) -> str: 

65 """Generate full file path for photo with legacy file renaming. 

66 

67 This function combines path generation, folder creation, and legacy 

68 file renaming into a single operation to maintain backward compatibility. 

69 

70 Args: 

71 photo: Photo object from iCloudPy 

72 file_size: File size variant (original, medium, thumb, etc.) 

73 destination_path: Base destination path 

74 folder_format: strftime format string for folder creation 

75 

76 Returns: 

77 Normalized full path where photo should be saved 

78 """ 

79 # Generate filename with metadata 

80 filename_with_metadata = generate_photo_filename_with_metadata(photo, file_size) 

81 

82 # Create folder path if needed 

83 final_destination = create_folder_path_if_needed(destination_path, folder_format, photo) 

84 

85 # Generate paths for legacy file format handling 

86 filename = photo.filename 

87 name, extension = filename.rsplit(".", 1) if "." in filename else [filename, ""] 

88 

89 # Legacy file paths that need to be renamed 

90 file_path = os.path.join(destination_path, filename) 

91 file_size_path = os.path.join( 

92 destination_path, 

93 f"{'__'.join([name, file_size])}" if extension == "" else f"{'__'.join([name, file_size])}.{extension}", 

94 ) 

95 

96 # Final path with normalization 

97 final_file_path = os.path.join(final_destination, filename_with_metadata) 

98 normalized_path = normalize_file_path(final_file_path) 

99 

100 # Rename legacy files if they exist 

101 rename_legacy_file_if_exists(file_path, normalized_path) 

102 rename_legacy_file_if_exists(file_size_path, normalized_path) 

103 

104 # Handle existing file with different normalization 

105 if os.path.isfile(final_file_path) and final_file_path != normalized_path: 

106 rename_legacy_file_if_exists(final_file_path, normalized_path) 

107 

108 return normalized_path 

109 

110 

111def collect_download_task(photo, file_size: str, destination_path: str, 

112 files: set[str] | None, folder_format: str | None, 

113 hardlink_registry: HardlinkRegistry | None) -> DownloadTaskInfo | None: 

114 """Collect photo info for parallel download without immediately downloading. 

115 

116 Args: 

117 photo: Photo object from iCloudPy 

118 file_size: File size variant (original, medium, thumb, etc.) 

119 destination_path: Base destination path 

120 files: Set to track downloaded files (thread-safe updates) 

121 folder_format: strftime format string for folder creation 

122 hardlink_registry: Registry for tracking downloaded files 

123 

124 Returns: 

125 DownloadTaskInfo if photo needs to be processed, None if skipped 

126 """ 

127 # Check if file size exists on server 

128 if file_size not in photo.versions: 

129 photo_path = generate_photo_path(photo, file_size, destination_path, folder_format) 

130 LOGGER.warning(f"File size {file_size} not found on server. Skipping the photo {photo_path} ...") 

131 return None 

132 

133 # Generate photo path 

134 photo_path = generate_photo_path(photo, file_size, destination_path, folder_format) 

135 

136 # Thread-safe file set update 

137 if files is not None: 

138 with files_lock: 

139 files.add(photo_path) 

140 

141 # Check if photo already exists with correct size 

142 from src.photo_file_utils import check_photo_exists 

143 if check_photo_exists(photo, file_size, photo_path): 

144 return None 

145 

146 # Check for existing hardlink source 

147 hardlink_source = None 

148 if hardlink_registry is not None: 

149 hardlink_source = hardlink_registry.get_existing_path(photo.id, file_size) 

150 

151 return DownloadTaskInfo( 

152 photo=photo, 

153 file_size=file_size, 

154 photo_path=photo_path, 

155 hardlink_source=hardlink_source, 

156 hardlink_registry=hardlink_registry, 

157 ) 

158 

159 

160def execute_download_task(task_info: DownloadTaskInfo) -> bool: 

161 """Download a single photo or create hardlink as part of parallel execution. 

162 

163 Args: 

164 task_info: Download task information 

165 

166 Returns: 

167 True if task completed successfully, False otherwise 

168 """ 

169 LOGGER.debug(f"[Thread] Starting processing of {task_info.photo_path}") 

170 

171 try: 

172 # Try hardlink first if source exists 

173 if task_info.hardlink_source: 

174 if create_hardlink(task_info.hardlink_source, task_info.photo_path): 

175 LOGGER.debug(f"[Thread] Created hardlink for {task_info.photo_path}") 

176 return True 

177 else: 

178 # Fallback to download if hard link creation fails 

179 LOGGER.warning(f"Hard link creation failed, downloading {task_info.photo_path} instead") 

180 

181 # Download the photo 

182 result = download_photo_from_server(task_info.photo, task_info.file_size, task_info.photo_path) 

183 if result and task_info.hardlink_registry is not None: 

184 # Register for future hard links if enabled 

185 task_info.hardlink_registry.register_photo_path( 

186 task_info.photo.id, task_info.file_size, task_info.photo_path, 

187 ) 

188 LOGGER.debug(f"[Thread] Completed download of {task_info.photo_path}") 

189 

190 return result 

191 

192 except Exception as e: 

193 LOGGER.error(f"[Thread] Failed to process {task_info.photo_path}: {e!s}") 

194 return False 

195 

196 

197def execute_parallel_downloads(download_tasks: list[DownloadTaskInfo], config) -> tuple[int, int]: 

198 """Execute download tasks in parallel using thread pool. 

199 

200 Args: 

201 download_tasks: List of download tasks to execute 

202 config: Configuration dictionary for thread settings 

203 

204 Returns: 

205 Tuple of (successful_downloads, failed_downloads) 

206 """ 

207 if not download_tasks: 

208 return 0, 0 

209 

210 max_threads = get_max_threads_for_download(config) 

211 

212 # Count hardlink tasks vs download tasks for logging 

213 hardlink_tasks = sum(1 for task in download_tasks if task.hardlink_source) 

214 download_only_tasks = len(download_tasks) - hardlink_tasks 

215 

216 if hardlink_tasks > 0: 

217 LOGGER.info( 

218 f"Starting parallel processing with {max_threads} threads: " 

219 f"{hardlink_tasks} hard links, {download_only_tasks} downloads...", 

220 ) 

221 else: 

222 LOGGER.info( 

223 f"Starting parallel photo downloads with {max_threads} threads for {len(download_tasks)} photos...", 

224 ) 

225 

226 successful_downloads = 0 

227 failed_downloads = 0 

228 

229 with ThreadPoolExecutor(max_workers=max_threads) as executor: 

230 # Submit all download tasks 

231 future_to_task = {executor.submit(execute_download_task, task): task for task in download_tasks} 

232 

233 # Process completed downloads 

234 for future in as_completed(future_to_task): 

235 try: 

236 result = future.result() 

237 if result: 

238 successful_downloads += 1 

239 else: 

240 failed_downloads += 1 

241 except Exception as e: # noqa: PERF203 

242 LOGGER.error(f"Unexpected error during photo download: {e!s}") 

243 failed_downloads += 1 

244 

245 LOGGER.info(f"Photo processing complete: {successful_downloads} successful, {failed_downloads} failed") 

246 return successful_downloads, failed_downloads