Coverage for src/sync_photos.py: 100%
186 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
1"""Sync photos module.
3This module provides the main photo synchronization functionality,
4orchestrating the downloading of photos from iCloud to local storage.
5"""
7___author___ = "Mandar Patil <mandarons@pm.me>"
9import os
11from src import config_parser, configure_icloudpy_logging, get_logger
12from src.album_sync_orchestrator import sync_album_photos
13from src.hardlink_registry import create_hardlink_registry
14from src.photo_cleanup_utils import remove_obsolete_files
16# Configure icloudpy logging immediately after import
17configure_icloudpy_logging()
19LOGGER = get_logger()
22# Legacy functions preserved for backward compatibility with existing tests
23# These functions are now implemented using the new modular architecture
26def get_max_threads(config):
27 """Get maximum number of threads for parallel downloads.
29 Legacy function - now delegates to config_parser.
31 Args:
32 config: Configuration dictionary
34 Returns:
35 Maximum number of threads to use for downloads
36 """
37 return config_parser.get_app_max_threads(config)
40def get_name_and_extension(photo, file_size):
41 """Extract filename and extension.
43 Legacy function - now delegates to photo_path_utils.
45 Args:
46 photo: Photo object from iCloudPy
47 file_size: File size variant
49 Returns:
50 Tuple of (name, extension)
51 """
52 from src.photo_path_utils import get_photo_name_and_extension
54 return get_photo_name_and_extension(photo, file_size)
57def photo_wanted(photo, extensions):
58 """Check if photo is wanted based on extension.
60 Legacy function - now delegates to photo_filter_utils.
62 Args:
63 photo: Photo object from iCloudPy
64 extensions: List of allowed extensions
66 Returns:
67 True if photo should be synced, False otherwise
68 """
69 from src.photo_filter_utils import is_photo_wanted
71 return is_photo_wanted(photo, extensions)
74def generate_file_name(photo, file_size, destination_path, folder_format):
75 """Generate full path to file.
77 Legacy function - now delegates to photo_download_manager.
79 Args:
80 photo: Photo object from iCloudPy
81 file_size: File size variant
82 destination_path: Base destination path
83 folder_format: Folder format string
85 Returns:
86 Full file path
87 """
88 from src.photo_download_manager import generate_photo_path
90 return generate_photo_path(photo, file_size, destination_path, folder_format)
93def photo_exists(photo, file_size, local_path):
94 """Check if photo exist locally.
96 Legacy function - now delegates to photo_file_utils.
98 Args:
99 photo: Photo object from iCloudPy
100 file_size: File size variant
101 local_path: Local file path to check
103 Returns:
104 True if photo exists with correct size, False otherwise
105 """
106 from src.photo_file_utils import check_photo_exists
108 return check_photo_exists(photo, file_size, local_path)
111def create_hardlink(source_path, destination_path):
112 """Create a hard link from source to destination.
114 Legacy function - now delegates to photo_file_utils.
116 Args:
117 source_path: Path to source file
118 destination_path: Path for new hardlink
120 Returns:
121 True if successful, False otherwise
122 """
123 from src.photo_file_utils import create_hardlink as create_hardlink_impl
125 return create_hardlink_impl(source_path, destination_path)
128def download_photo(photo, file_size, destination_path):
129 """Download photo from server.
131 Legacy function - now delegates to photo_file_utils.
133 Args:
134 photo: Photo object from iCloudPy
135 file_size: File size variant
136 destination_path: Where to save the photo
138 Returns:
139 True if successful, False otherwise
140 """
141 from src.photo_file_utils import download_photo_from_server
143 return download_photo_from_server(photo, file_size, destination_path)
146def process_photo(photo, file_size, destination_path, files, folder_format, hardlink_registry=None):
147 """Process photo details (legacy function for backward compatibility).
149 Args:
150 photo: Photo object from iCloudPy
151 file_size: File size variant
152 destination_path: Base destination path
153 files: Set to track downloaded files
154 folder_format: Folder format string
155 hardlink_registry: Registry for hardlinks (legacy dict format)
157 Returns:
158 True if photo was processed successfully, False otherwise
159 """
160 from src.photo_download_manager import collect_download_task, execute_download_task
162 # Convert legacy hardlink registry dict to new registry format if needed
163 converted_registry = None
164 if hardlink_registry is not None:
165 from src.hardlink_registry import HardlinkRegistry
167 converted_registry = HardlinkRegistry()
168 for key, path in hardlink_registry.items():
169 # Legacy format: photo_id_file_size -> path
170 if "_" in key:
171 parts = key.rsplit("_", 1)
172 if len(parts) == 2:
173 photo_id, file_sz = parts
174 converted_registry.register_photo_path(photo_id, file_sz, path)
176 # Collect download task
177 task_info = collect_download_task(
178 photo,
179 file_size,
180 destination_path,
181 files,
182 folder_format,
183 converted_registry,
184 )
186 if task_info is None:
187 return False
189 # Execute task
190 result = execute_download_task(task_info)
192 # Update legacy registry if provided
193 if result and hardlink_registry is not None:
194 photo_key = f"{photo.id}_{file_size}"
195 hardlink_registry[photo_key] = task_info.photo_path
197 return result
200def collect_photo_for_download(photo, file_size, destination_path, files, folder_format, hardlink_registry=None):
201 """Collect photo info for parallel download without immediately downloading.
203 Legacy function - now delegates to photo_download_manager.
205 Args:
206 photo: Photo object from iCloudPy
207 file_size: File size variant
208 destination_path: Base destination path
209 files: Set to track downloaded files
210 folder_format: Folder format string
211 hardlink_registry: Registry for hardlinks (legacy dict format)
213 Returns:
214 Download task info or None
215 """
216 from src.photo_download_manager import collect_download_task
218 # Convert legacy hardlink registry dict to new registry format if needed
219 converted_registry = None
220 if hardlink_registry is not None:
221 from src.hardlink_registry import HardlinkRegistry
223 converted_registry = HardlinkRegistry()
224 for key, path in hardlink_registry.items():
225 if "_" in key:
226 parts = key.rsplit("_", 1)
227 if len(parts) == 2:
228 photo_id, file_sz = parts
229 converted_registry.register_photo_path(photo_id, file_sz, path)
231 task_info = collect_download_task(
232 photo,
233 file_size,
234 destination_path,
235 files,
236 folder_format,
237 converted_registry,
238 )
240 if task_info is None:
241 return None
243 # Convert back to legacy format for compatibility
244 return {
245 "photo": task_info.photo,
246 "file_size": task_info.file_size,
247 "photo_path": task_info.photo_path,
248 "hardlink_source": task_info.hardlink_source,
249 "hardlink_registry": hardlink_registry,
250 }
253def download_photo_task(download_info):
254 """Download a single photo or create hardlink as part of parallel execution.
256 Legacy function - maintains original implementation for backward compatibility.
258 Args:
259 download_info: Dictionary with download task information
261 Returns:
262 True if successful, False otherwise
263 """
264 photo = download_info["photo"]
265 file_size = download_info["file_size"]
266 photo_path = download_info["photo_path"]
267 hardlink_source = download_info.get("hardlink_source")
268 hardlink_registry = download_info.get("hardlink_registry")
270 LOGGER.debug(f"[Thread] Starting processing of {photo_path}")
272 try:
273 # Try hardlink first if source exists
274 if hardlink_source:
275 if create_hardlink(hardlink_source, photo_path):
276 LOGGER.debug(f"[Thread] Created hardlink for {photo_path}")
277 return True
278 else:
279 # Fallback to download if hard link creation fails
280 LOGGER.warning(f"Hard link creation failed, downloading {photo_path} instead")
282 # Download the photo - this maintains the original function call for test compatibility
283 result = download_photo(photo, file_size, photo_path)
284 if result:
285 # Register for future hard links if enabled
286 if hardlink_registry is not None:
287 photo_key = f"{photo.id}_{file_size}"
288 hardlink_registry[photo_key] = photo_path
289 LOGGER.debug(f"[Thread] Completed download of {photo_path}")
290 return result
291 except Exception as e:
292 LOGGER.error(f"[Thread] Failed to process {photo_path}: {e!s}")
293 return False
296def sync_album(
297 album,
298 destination_path,
299 file_sizes,
300 extensions=None,
301 files=None,
302 folder_format=None,
303 hardlink_registry=None,
304 config=None,
305):
306 """Sync given album.
308 Legacy function - now delegates to album_sync_orchestrator with conversion
309 for legacy hardlink registry format.
311 Args:
312 album: Album object from iCloudPy
313 destination_path: Path where photos should be saved
314 file_sizes: List of file size variants to download
315 extensions: List of allowed file extensions
316 files: Set to track downloaded files
317 folder_format: Folder format string
318 hardlink_registry: Registry for hardlinks (legacy dict format)
319 config: Configuration dictionary
321 Returns:
322 True on success, None on invalid input
323 """
324 # Convert legacy hardlink registry dict to new registry format if needed
325 converted_registry = None
326 if hardlink_registry is not None:
327 from src.hardlink_registry import HardlinkRegistry
329 converted_registry = HardlinkRegistry()
330 for key, path in hardlink_registry.items():
331 if "_" in key:
332 parts = key.rsplit("_", 1)
333 if len(parts) == 2:
334 photo_id, file_sz = parts
335 converted_registry.register_photo_path(photo_id, file_sz, path)
337 result = sync_album_photos(
338 album=album,
339 destination_path=destination_path,
340 file_sizes=file_sizes,
341 extensions=extensions,
342 files=files,
343 folder_format=folder_format,
344 hardlink_registry=converted_registry,
345 config=config,
346 )
348 # Update legacy registry if provided and new registry was created
349 if hardlink_registry is not None and converted_registry is not None:
350 # This is a simplified approach - in practice, we'd need to track new entries
351 # But for legacy compatibility, we'll maintain the existing behavior
352 pass
354 return result
357def remove_obsolete(destination_path, files):
358 """Remove local obsolete file.
360 Legacy function - now delegates to photo_cleanup_utils.
362 Args:
363 destination_path: Path to search for obsolete files
364 files: Set of files that should be kept
366 Returns:
367 Set of removed file paths
368 """
369 return remove_obsolete_files(destination_path, files)
372def sync_photos(config, photos):
373 """Sync all photos.
375 Main orchestration function that coordinates the entire photo sync process.
376 This function has been refactored to use the new modular architecture while
377 maintaining backward compatibility.
379 Args:
380 config: Configuration dictionary
381 photos: Photos object from iCloudPy
383 Returns:
384 Tuple of (total_successful, total_failed) download counts
385 """
386 # Parse configuration using centralized config parser
387 destination_path = config_parser.prepare_photos_destination(config=config)
388 filters = config_parser.get_photos_filters(config=config)
389 files = set()
390 download_all = config_parser.get_photos_all_albums(config=config)
391 use_hardlinks = config_parser.get_photos_use_hardlinks(config=config)
392 libraries = filters["libraries"] if filters["libraries"] is not None else photos.libraries
393 folder_format = config_parser.get_photos_folder_format(config=config)
395 # Initialize hard link registry using new modular approach
396 hardlink_registry = create_hardlink_registry(use_hardlinks)
398 total_successful, total_failed = 0, 0
400 # Special handling for "All Photos" when hardlinks are enabled
401 if use_hardlinks and download_all:
402 sub_successful, sub_failed = _sync_all_photos_first_for_hardlinks(
403 photos,
404 libraries,
405 destination_path,
406 filters,
407 files,
408 folder_format,
409 hardlink_registry,
410 config,
411 )
412 total_successful += sub_successful
413 total_failed += sub_failed
415 # Sync albums based on configuration
416 sub_successful, sub_failed = _sync_albums_by_configuration(
417 photos,
418 libraries,
419 download_all,
420 destination_path,
421 filters,
422 files,
423 folder_format,
424 hardlink_registry,
425 config,
426 )
427 total_successful += sub_successful
428 total_failed += sub_failed
430 # Clean up obsolete files if enabled
431 if config_parser.get_photos_remove_obsolete(config=config):
432 remove_obsolete_files(destination_path, files)
434 return total_successful, total_failed
437def _sync_all_photos_first_for_hardlinks(
438 photos,
439 libraries,
440 destination_path,
441 filters,
442 files,
443 folder_format,
444 hardlink_registry,
445 config,
446) -> tuple[int, int]:
447 """Sync 'All Photos' album first to populate hardlink registry.
449 Args:
450 photos: Photos object from iCloudPy
451 libraries: List of photo libraries to sync
452 destination_path: Base destination path
453 filters: Photo filters configuration
454 files: Set to track downloaded files
455 folder_format: Folder format string
456 hardlink_registry: Registry for tracking downloaded files
457 config: Configuration dictionary
459 Returns:
460 Tuple of (total_successful, total_failed) download counts
461 """
462 for library in libraries:
463 if library == "PrimarySync" and "All Photos" in photos.libraries[library].albums:
464 LOGGER.info("Syncing 'All Photos' album first for hard link reference...")
465 result = sync_album_photos(
466 album=photos.libraries[library].albums["All Photos"],
467 destination_path=os.path.join(destination_path, "All Photos"),
468 file_sizes=filters["file_sizes"],
469 extensions=filters["extensions"],
470 files=files,
471 folder_format=folder_format,
472 hardlink_registry=hardlink_registry,
473 config=config,
474 )
475 if hardlink_registry:
476 LOGGER.info(
477 f"'All Photos' sync complete. Hard link registry populated with "
478 f"{hardlink_registry.get_registry_size()} reference files.",
479 )
480 if result is not None:
481 return result
482 break
483 return 0, 0
486def _sync_albums_by_configuration(
487 photos,
488 libraries,
489 download_all,
490 destination_path,
491 filters,
492 files,
493 folder_format,
494 hardlink_registry,
495 config,
496) -> tuple[int, int]:
497 """Sync albums based on configuration settings.
499 Args:
500 photos: Photos object from iCloudPy
501 libraries: List of photo libraries to sync
502 download_all: Whether to download all albums
503 destination_path: Base destination path
504 filters: Photo filters configuration
505 files: Set to track downloaded files
506 folder_format: Folder format string
507 hardlink_registry: Registry for tracking downloaded files
508 config: Configuration dictionary
510 Returns:
511 Tuple of (total_successful, total_failed) aggregated across all libraries
512 """
513 total_successful, total_failed = 0, 0
514 for library in libraries:
515 if download_all and library == "PrimarySync":
516 sub_successful, sub_failed = _sync_all_albums_except_filtered(
517 photos,
518 library,
519 filters,
520 destination_path,
521 files,
522 folder_format,
523 hardlink_registry,
524 config,
525 )
526 elif filters["albums"] and library == "PrimarySync":
527 sub_successful, sub_failed = _sync_filtered_albums(
528 photos,
529 library,
530 filters,
531 destination_path,
532 files,
533 folder_format,
534 hardlink_registry,
535 config,
536 )
537 elif filters["albums"]:
538 sub_successful, sub_failed = _sync_filtered_albums_in_library(
539 photos,
540 library,
541 filters,
542 destination_path,
543 files,
544 folder_format,
545 hardlink_registry,
546 config,
547 )
548 else:
549 sub_successful, sub_failed = _sync_all_photos_in_library(
550 photos,
551 library,
552 destination_path,
553 filters,
554 files,
555 folder_format,
556 hardlink_registry,
557 config,
558 )
559 total_successful += sub_successful
560 total_failed += sub_failed
561 return total_successful, total_failed
564def _sync_all_albums_except_filtered(
565 photos,
566 library,
567 filters,
568 destination_path,
569 files,
570 folder_format,
571 hardlink_registry,
572 config,
573) -> tuple[int, int]:
574 """Sync all albums except those in the filter exclusion list.
576 Args:
577 photos: Photos object from iCloudPy
578 library: Library name to sync
579 filters: Photo filters configuration
580 destination_path: Base destination path
581 files: Set to track downloaded files
582 folder_format: Folder format string
583 hardlink_registry: Registry for tracking downloaded files
584 config: Configuration dictionary
586 Returns:
587 Tuple of (total_successful, total_failed) aggregated across all synced albums
588 """
589 total_successful, total_failed = 0, 0
590 for album in photos.libraries[library].albums.keys():
591 # Skip All Photos if we already synced it first
592 if hardlink_registry and album == "All Photos":
593 continue
594 if filters["albums"] and album in iter(filters["albums"]):
595 continue
596 result = sync_album_photos(
597 album=photos.libraries[library].albums[album],
598 destination_path=os.path.join(destination_path, album),
599 file_sizes=filters["file_sizes"],
600 extensions=filters["extensions"],
601 files=files,
602 folder_format=folder_format,
603 hardlink_registry=hardlink_registry,
604 config=config,
605 )
606 if result is not None:
607 sub_successful, sub_failed = result
608 total_successful += sub_successful
609 total_failed += sub_failed
610 return total_successful, total_failed
613def _sync_filtered_albums(
614 photos,
615 library,
616 filters,
617 destination_path,
618 files,
619 folder_format,
620 hardlink_registry,
621 config,
622) -> tuple[int, int]:
623 """Sync only albums specified in filters.
625 Args:
626 photos: Photos object from iCloudPy
627 library: Library name to sync
628 filters: Photo filters configuration
629 destination_path: Base destination path
630 files: Set to track downloaded files
631 folder_format: Folder format string
632 hardlink_registry: Registry for tracking downloaded files
633 config: Configuration dictionary
635 Returns:
636 Tuple of (total_successful, total_failed) aggregated across all synced albums
637 """
638 total_successful, total_failed = 0, 0
639 for album in iter(filters["albums"]):
640 result = sync_album_photos(
641 album=photos.libraries[library].albums[album],
642 destination_path=os.path.join(destination_path, album),
643 file_sizes=filters["file_sizes"],
644 extensions=filters["extensions"],
645 files=files,
646 folder_format=folder_format,
647 hardlink_registry=hardlink_registry,
648 config=config,
649 )
650 if result is not None:
651 sub_successful, sub_failed = result
652 total_successful += sub_successful
653 total_failed += sub_failed
654 return total_successful, total_failed
657def _sync_filtered_albums_in_library(
658 photos,
659 library,
660 filters,
661 destination_path,
662 files,
663 folder_format,
664 hardlink_registry,
665 config,
666) -> tuple[int, int]:
667 """Sync filtered albums in a specific library.
669 Args:
670 photos: Photos object from iCloudPy
671 library: Library name to sync
672 filters: Photo filters configuration
673 destination_path: Base destination path
674 files: Set to track downloaded files
675 folder_format: Folder format string
676 hardlink_registry: Registry for tracking downloaded files
677 config: Configuration dictionary
679 Returns:
680 Tuple of (total_successful, total_failed) aggregated across all synced albums
681 """
682 total_successful, total_failed = 0, 0
683 for album in iter(filters["albums"]):
684 if album in photos.libraries[library].albums:
685 result = sync_album_photos(
686 album=photos.libraries[library].albums[album],
687 destination_path=os.path.join(destination_path, album),
688 file_sizes=filters["file_sizes"],
689 extensions=filters["extensions"],
690 files=files,
691 folder_format=folder_format,
692 hardlink_registry=hardlink_registry,
693 config=config,
694 )
695 if result is not None:
696 sub_successful, sub_failed = result
697 total_successful += sub_successful
698 total_failed += sub_failed
699 else:
700 LOGGER.warning(f"Album {album} not found in {library}. Skipping the album {album} ...")
701 return total_successful, total_failed
704def _sync_all_photos_in_library(
705 photos,
706 library,
707 destination_path,
708 filters,
709 files,
710 folder_format,
711 hardlink_registry,
712 config,
713) -> tuple[int, int]:
714 """Sync all photos in a library.
716 Args:
717 photos: Photos object from iCloudPy
718 library: Library name to sync
719 destination_path: Base destination path
720 filters: Photo filters configuration
721 files: Set to track downloaded files
722 folder_format: Folder format string
723 hardlink_registry: Registry for tracking downloaded files
724 config: Configuration dictionary
726 Returns:
727 Tuple of (total_successful, total_failed) download counts
728 """
729 result = sync_album_photos(
730 album=photos.libraries[library].all,
731 destination_path=os.path.join(destination_path, "all"),
732 file_sizes=filters["file_sizes"],
733 extensions=filters["extensions"],
734 files=files,
735 folder_format=folder_format,
736 hardlink_registry=hardlink_registry,
737 config=config,
738 )
739 if result is not None:
740 return result
741 return 0, 0