Coverage for src/photo_download_manager.py: 100%
89 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
1"""Photo download task management module.
3This module contains utilities for managing photo download tasks
4and parallel execution during photo synchronization.
5"""
7___author___ = "Mandar Patil <mandarons@pm.me>"
9import os
10from concurrent.futures import ThreadPoolExecutor, as_completed
11from threading import Lock
13from src import config_parser, get_logger
14from src.hardlink_registry import HardlinkRegistry
15from src.photo_file_utils import create_hardlink, download_photo_from_server
16from src.photo_path_utils import (
17 create_folder_path_if_needed,
18 generate_photo_filename_with_metadata,
19 normalize_file_path,
20 rename_legacy_file_if_exists,
21)
23LOGGER = get_logger()
25# Thread-safe lock for file set operations
26files_lock = Lock()
29class DownloadTaskInfo:
30 """Information about a photo download task."""
32 def __init__(self, photo, file_size: str, photo_path: str,
33 hardlink_source: str | None = None,
34 hardlink_registry: HardlinkRegistry | None = None):
35 """Initialize download task info.
37 Args:
38 photo: Photo object from iCloudPy
39 file_size: File size variant (original, medium, thumb, etc.)
40 photo_path: Target path for photo download
41 hardlink_source: Path to existing file for hardlink creation
42 hardlink_registry: Registry for tracking downloaded files
43 """
44 self.photo = photo
45 self.file_size = file_size
46 self.photo_path = photo_path
47 self.hardlink_source = hardlink_source
48 self.hardlink_registry = hardlink_registry
51def get_max_threads_for_download(config) -> int:
52 """Get maximum number of threads for parallel downloads.
54 Args:
55 config: Configuration dictionary
57 Returns:
58 Maximum number of threads to use for downloads
59 """
60 return config_parser.get_app_max_threads(config)
63def generate_photo_path(photo, file_size: str, destination_path: str,
64 folder_format: str | None) -> str:
65 """Generate full file path for photo with legacy file renaming.
67 This function combines path generation, folder creation, and legacy
68 file renaming into a single operation to maintain backward compatibility.
70 Args:
71 photo: Photo object from iCloudPy
72 file_size: File size variant (original, medium, thumb, etc.)
73 destination_path: Base destination path
74 folder_format: strftime format string for folder creation
76 Returns:
77 Normalized full path where photo should be saved
78 """
79 # Generate filename with metadata
80 filename_with_metadata = generate_photo_filename_with_metadata(photo, file_size)
82 # Create folder path if needed
83 final_destination = create_folder_path_if_needed(destination_path, folder_format, photo)
85 # Generate paths for legacy file format handling
86 filename = photo.filename
87 name, extension = filename.rsplit(".", 1) if "." in filename else [filename, ""]
89 # Legacy file paths that need to be renamed
90 file_path = os.path.join(destination_path, filename)
91 file_size_path = os.path.join(
92 destination_path,
93 f"{'__'.join([name, file_size])}" if extension == "" else f"{'__'.join([name, file_size])}.{extension}",
94 )
96 # Final path with normalization
97 final_file_path = os.path.join(final_destination, filename_with_metadata)
98 normalized_path = normalize_file_path(final_file_path)
100 # Rename legacy files if they exist
101 rename_legacy_file_if_exists(file_path, normalized_path)
102 rename_legacy_file_if_exists(file_size_path, normalized_path)
104 # Handle existing file with different normalization
105 if os.path.isfile(final_file_path) and final_file_path != normalized_path:
106 rename_legacy_file_if_exists(final_file_path, normalized_path)
108 return normalized_path
111def collect_download_task(photo, file_size: str, destination_path: str,
112 files: set[str] | None, folder_format: str | None,
113 hardlink_registry: HardlinkRegistry | None) -> DownloadTaskInfo | None:
114 """Collect photo info for parallel download without immediately downloading.
116 Args:
117 photo: Photo object from iCloudPy
118 file_size: File size variant (original, medium, thumb, etc.)
119 destination_path: Base destination path
120 files: Set to track downloaded files (thread-safe updates)
121 folder_format: strftime format string for folder creation
122 hardlink_registry: Registry for tracking downloaded files
124 Returns:
125 DownloadTaskInfo if photo needs to be processed, None if skipped
126 """
127 # Check if file size exists on server
128 if file_size not in photo.versions:
129 photo_path = generate_photo_path(photo, file_size, destination_path, folder_format)
130 LOGGER.warning(f"File size {file_size} not found on server. Skipping the photo {photo_path} ...")
131 return None
133 # Generate photo path
134 photo_path = generate_photo_path(photo, file_size, destination_path, folder_format)
136 # Thread-safe file set update
137 if files is not None:
138 with files_lock:
139 files.add(photo_path)
141 # Check if photo already exists with correct size
142 from src.photo_file_utils import check_photo_exists
143 if check_photo_exists(photo, file_size, photo_path):
144 return None
146 # Check for existing hardlink source
147 hardlink_source = None
148 if hardlink_registry is not None:
149 hardlink_source = hardlink_registry.get_existing_path(photo.id, file_size)
151 return DownloadTaskInfo(
152 photo=photo,
153 file_size=file_size,
154 photo_path=photo_path,
155 hardlink_source=hardlink_source,
156 hardlink_registry=hardlink_registry,
157 )
160def execute_download_task(task_info: DownloadTaskInfo) -> bool:
161 """Download a single photo or create hardlink as part of parallel execution.
163 Args:
164 task_info: Download task information
166 Returns:
167 True if task completed successfully, False otherwise
168 """
169 LOGGER.debug(f"[Thread] Starting processing of {task_info.photo_path}")
171 try:
172 # Try hardlink first if source exists
173 if task_info.hardlink_source:
174 if create_hardlink(task_info.hardlink_source, task_info.photo_path):
175 LOGGER.debug(f"[Thread] Created hardlink for {task_info.photo_path}")
176 return True
177 else:
178 # Fallback to download if hard link creation fails
179 LOGGER.warning(f"Hard link creation failed, downloading {task_info.photo_path} instead")
181 # Download the photo
182 result = download_photo_from_server(task_info.photo, task_info.file_size, task_info.photo_path)
183 if result and task_info.hardlink_registry is not None:
184 # Register for future hard links if enabled
185 task_info.hardlink_registry.register_photo_path(
186 task_info.photo.id, task_info.file_size, task_info.photo_path,
187 )
188 LOGGER.debug(f"[Thread] Completed download of {task_info.photo_path}")
190 return result
192 except Exception as e:
193 LOGGER.error(f"[Thread] Failed to process {task_info.photo_path}: {e!s}")
194 return False
197def execute_parallel_downloads(download_tasks: list[DownloadTaskInfo], config) -> tuple[int, int]:
198 """Execute download tasks in parallel using thread pool.
200 Args:
201 download_tasks: List of download tasks to execute
202 config: Configuration dictionary for thread settings
204 Returns:
205 Tuple of (successful_downloads, failed_downloads)
206 """
207 if not download_tasks:
208 return 0, 0
210 max_threads = get_max_threads_for_download(config)
212 # Count hardlink tasks vs download tasks for logging
213 hardlink_tasks = sum(1 for task in download_tasks if task.hardlink_source)
214 download_only_tasks = len(download_tasks) - hardlink_tasks
216 if hardlink_tasks > 0:
217 LOGGER.info(
218 f"Starting parallel processing with {max_threads} threads: "
219 f"{hardlink_tasks} hard links, {download_only_tasks} downloads...",
220 )
221 else:
222 LOGGER.info(
223 f"Starting parallel photo downloads with {max_threads} threads for {len(download_tasks)} photos...",
224 )
226 successful_downloads = 0
227 failed_downloads = 0
229 with ThreadPoolExecutor(max_workers=max_threads) as executor:
230 # Submit all download tasks
231 future_to_task = {executor.submit(execute_download_task, task): task for task in download_tasks}
233 # Process completed downloads
234 for future in as_completed(future_to_task):
235 try:
236 result = future.result()
237 if result:
238 successful_downloads += 1
239 else:
240 failed_downloads += 1
241 except Exception as e: # noqa: PERF203
242 LOGGER.error(f"Unexpected error during photo download: {e!s}")
243 failed_downloads += 1
245 LOGGER.info(f"Photo processing complete: {successful_downloads} successful, {failed_downloads} failed")
246 return successful_downloads, failed_downloads