Coverage for src/sync_photos.py: 100%

186 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-06 02:49 +0000

1"""Sync photos module. 

2 

3This module provides the main photo synchronization functionality, 

4orchestrating the downloading of photos from iCloud to local storage. 

5""" 

6 

7___author___ = "Mandar Patil <mandarons@pm.me>" 

8 

9import os 

10 

11from src import config_parser, configure_icloudpy_logging, get_logger 

12from src.album_sync_orchestrator import sync_album_photos 

13from src.hardlink_registry import create_hardlink_registry 

14from src.photo_cleanup_utils import remove_obsolete_files 

15 

16# Configure icloudpy logging immediately after import 

17configure_icloudpy_logging() 

18 

19LOGGER = get_logger() 

20 

21 

22# Legacy functions preserved for backward compatibility with existing tests 

23# These functions are now implemented using the new modular architecture 

24 

25 

26def get_max_threads(config): 

27 """Get maximum number of threads for parallel downloads. 

28 

29 Legacy function - now delegates to config_parser. 

30 

31 Args: 

32 config: Configuration dictionary 

33 

34 Returns: 

35 Maximum number of threads to use for downloads 

36 """ 

37 return config_parser.get_app_max_threads(config) 

38 

39 

40def get_name_and_extension(photo, file_size): 

41 """Extract filename and extension. 

42 

43 Legacy function - now delegates to photo_path_utils. 

44 

45 Args: 

46 photo: Photo object from iCloudPy 

47 file_size: File size variant 

48 

49 Returns: 

50 Tuple of (name, extension) 

51 """ 

52 from src.photo_path_utils import get_photo_name_and_extension 

53 

54 return get_photo_name_and_extension(photo, file_size) 

55 

56 

57def photo_wanted(photo, extensions): 

58 """Check if photo is wanted based on extension. 

59 

60 Legacy function - now delegates to photo_filter_utils. 

61 

62 Args: 

63 photo: Photo object from iCloudPy 

64 extensions: List of allowed extensions 

65 

66 Returns: 

67 True if photo should be synced, False otherwise 

68 """ 

69 from src.photo_filter_utils import is_photo_wanted 

70 

71 return is_photo_wanted(photo, extensions) 

72 

73 

74def generate_file_name(photo, file_size, destination_path, folder_format): 

75 """Generate full path to file. 

76 

77 Legacy function - now delegates to photo_download_manager. 

78 

79 Args: 

80 photo: Photo object from iCloudPy 

81 file_size: File size variant 

82 destination_path: Base destination path 

83 folder_format: Folder format string 

84 

85 Returns: 

86 Full file path 

87 """ 

88 from src.photo_download_manager import generate_photo_path 

89 

90 return generate_photo_path(photo, file_size, destination_path, folder_format) 

91 

92 

93def photo_exists(photo, file_size, local_path): 

94 """Check if photo exist locally. 

95 

96 Legacy function - now delegates to photo_file_utils. 

97 

98 Args: 

99 photo: Photo object from iCloudPy 

100 file_size: File size variant 

101 local_path: Local file path to check 

102 

103 Returns: 

104 True if photo exists with correct size, False otherwise 

105 """ 

106 from src.photo_file_utils import check_photo_exists 

107 

108 return check_photo_exists(photo, file_size, local_path) 

109 

110 

111def create_hardlink(source_path, destination_path): 

112 """Create a hard link from source to destination. 

113 

114 Legacy function - now delegates to photo_file_utils. 

115 

116 Args: 

117 source_path: Path to source file 

118 destination_path: Path for new hardlink 

119 

120 Returns: 

121 True if successful, False otherwise 

122 """ 

123 from src.photo_file_utils import create_hardlink as create_hardlink_impl 

124 

125 return create_hardlink_impl(source_path, destination_path) 

126 

127 

128def download_photo(photo, file_size, destination_path): 

129 """Download photo from server. 

130 

131 Legacy function - now delegates to photo_file_utils. 

132 

133 Args: 

134 photo: Photo object from iCloudPy 

135 file_size: File size variant 

136 destination_path: Where to save the photo 

137 

138 Returns: 

139 True if successful, False otherwise 

140 """ 

141 from src.photo_file_utils import download_photo_from_server 

142 

143 return download_photo_from_server(photo, file_size, destination_path) 

144 

145 

146def process_photo(photo, file_size, destination_path, files, folder_format, hardlink_registry=None): 

147 """Process photo details (legacy function for backward compatibility). 

148 

149 Args: 

150 photo: Photo object from iCloudPy 

151 file_size: File size variant 

152 destination_path: Base destination path 

153 files: Set to track downloaded files 

154 folder_format: Folder format string 

155 hardlink_registry: Registry for hardlinks (legacy dict format) 

156 

157 Returns: 

158 True if photo was processed successfully, False otherwise 

159 """ 

160 from src.photo_download_manager import collect_download_task, execute_download_task 

161 

162 # Convert legacy hardlink registry dict to new registry format if needed 

163 converted_registry = None 

164 if hardlink_registry is not None: 

165 from src.hardlink_registry import HardlinkRegistry 

166 

167 converted_registry = HardlinkRegistry() 

168 for key, path in hardlink_registry.items(): 

169 # Legacy format: photo_id_file_size -> path 

170 if "_" in key: 

171 parts = key.rsplit("_", 1) 

172 if len(parts) == 2: 

173 photo_id, file_sz = parts 

174 converted_registry.register_photo_path(photo_id, file_sz, path) 

175 

176 # Collect download task 

177 task_info = collect_download_task( 

178 photo, 

179 file_size, 

180 destination_path, 

181 files, 

182 folder_format, 

183 converted_registry, 

184 ) 

185 

186 if task_info is None: 

187 return False 

188 

189 # Execute task 

190 result = execute_download_task(task_info) 

191 

192 # Update legacy registry if provided 

193 if result and hardlink_registry is not None: 

194 photo_key = f"{photo.id}_{file_size}" 

195 hardlink_registry[photo_key] = task_info.photo_path 

196 

197 return result 

198 

199 

200def collect_photo_for_download(photo, file_size, destination_path, files, folder_format, hardlink_registry=None): 

201 """Collect photo info for parallel download without immediately downloading. 

202 

203 Legacy function - now delegates to photo_download_manager. 

204 

205 Args: 

206 photo: Photo object from iCloudPy 

207 file_size: File size variant 

208 destination_path: Base destination path 

209 files: Set to track downloaded files 

210 folder_format: Folder format string 

211 hardlink_registry: Registry for hardlinks (legacy dict format) 

212 

213 Returns: 

214 Download task info or None 

215 """ 

216 from src.photo_download_manager import collect_download_task 

217 

218 # Convert legacy hardlink registry dict to new registry format if needed 

219 converted_registry = None 

220 if hardlink_registry is not None: 

221 from src.hardlink_registry import HardlinkRegistry 

222 

223 converted_registry = HardlinkRegistry() 

224 for key, path in hardlink_registry.items(): 

225 if "_" in key: 

226 parts = key.rsplit("_", 1) 

227 if len(parts) == 2: 

228 photo_id, file_sz = parts 

229 converted_registry.register_photo_path(photo_id, file_sz, path) 

230 

231 task_info = collect_download_task( 

232 photo, 

233 file_size, 

234 destination_path, 

235 files, 

236 folder_format, 

237 converted_registry, 

238 ) 

239 

240 if task_info is None: 

241 return None 

242 

243 # Convert back to legacy format for compatibility 

244 return { 

245 "photo": task_info.photo, 

246 "file_size": task_info.file_size, 

247 "photo_path": task_info.photo_path, 

248 "hardlink_source": task_info.hardlink_source, 

249 "hardlink_registry": hardlink_registry, 

250 } 

251 

252 

253def download_photo_task(download_info): 

254 """Download a single photo or create hardlink as part of parallel execution. 

255 

256 Legacy function - maintains original implementation for backward compatibility. 

257 

258 Args: 

259 download_info: Dictionary with download task information 

260 

261 Returns: 

262 True if successful, False otherwise 

263 """ 

264 photo = download_info["photo"] 

265 file_size = download_info["file_size"] 

266 photo_path = download_info["photo_path"] 

267 hardlink_source = download_info.get("hardlink_source") 

268 hardlink_registry = download_info.get("hardlink_registry") 

269 

270 LOGGER.debug(f"[Thread] Starting processing of {photo_path}") 

271 

272 try: 

273 # Try hardlink first if source exists 

274 if hardlink_source: 

275 if create_hardlink(hardlink_source, photo_path): 

276 LOGGER.debug(f"[Thread] Created hardlink for {photo_path}") 

277 return True 

278 else: 

279 # Fallback to download if hard link creation fails 

280 LOGGER.warning(f"Hard link creation failed, downloading {photo_path} instead") 

281 

282 # Download the photo - this maintains the original function call for test compatibility 

283 result = download_photo(photo, file_size, photo_path) 

284 if result: 

285 # Register for future hard links if enabled 

286 if hardlink_registry is not None: 

287 photo_key = f"{photo.id}_{file_size}" 

288 hardlink_registry[photo_key] = photo_path 

289 LOGGER.debug(f"[Thread] Completed download of {photo_path}") 

290 return result 

291 except Exception as e: 

292 LOGGER.error(f"[Thread] Failed to process {photo_path}: {e!s}") 

293 return False 

294 

295 

296def sync_album( 

297 album, 

298 destination_path, 

299 file_sizes, 

300 extensions=None, 

301 files=None, 

302 folder_format=None, 

303 hardlink_registry=None, 

304 config=None, 

305): 

306 """Sync given album. 

307 

308 Legacy function - now delegates to album_sync_orchestrator with conversion 

309 for legacy hardlink registry format. 

310 

311 Args: 

312 album: Album object from iCloudPy 

313 destination_path: Path where photos should be saved 

314 file_sizes: List of file size variants to download 

315 extensions: List of allowed file extensions 

316 files: Set to track downloaded files 

317 folder_format: Folder format string 

318 hardlink_registry: Registry for hardlinks (legacy dict format) 

319 config: Configuration dictionary 

320 

321 Returns: 

322 True on success, None on invalid input 

323 """ 

324 # Convert legacy hardlink registry dict to new registry format if needed 

325 converted_registry = None 

326 if hardlink_registry is not None: 

327 from src.hardlink_registry import HardlinkRegistry 

328 

329 converted_registry = HardlinkRegistry() 

330 for key, path in hardlink_registry.items(): 

331 if "_" in key: 

332 parts = key.rsplit("_", 1) 

333 if len(parts) == 2: 

334 photo_id, file_sz = parts 

335 converted_registry.register_photo_path(photo_id, file_sz, path) 

336 

337 result = sync_album_photos( 

338 album=album, 

339 destination_path=destination_path, 

340 file_sizes=file_sizes, 

341 extensions=extensions, 

342 files=files, 

343 folder_format=folder_format, 

344 hardlink_registry=converted_registry, 

345 config=config, 

346 ) 

347 

348 # Update legacy registry if provided and new registry was created 

349 if hardlink_registry is not None and converted_registry is not None: 

350 # This is a simplified approach - in practice, we'd need to track new entries 

351 # But for legacy compatibility, we'll maintain the existing behavior 

352 pass 

353 

354 return result 

355 

356 

357def remove_obsolete(destination_path, files): 

358 """Remove local obsolete file. 

359 

360 Legacy function - now delegates to photo_cleanup_utils. 

361 

362 Args: 

363 destination_path: Path to search for obsolete files 

364 files: Set of files that should be kept 

365 

366 Returns: 

367 Set of removed file paths 

368 """ 

369 return remove_obsolete_files(destination_path, files) 

370 

371 

372def sync_photos(config, photos): 

373 """Sync all photos. 

374 

375 Main orchestration function that coordinates the entire photo sync process. 

376 This function has been refactored to use the new modular architecture while 

377 maintaining backward compatibility. 

378 

379 Args: 

380 config: Configuration dictionary 

381 photos: Photos object from iCloudPy 

382 

383 Returns: 

384 Tuple of (total_successful, total_failed) download counts 

385 """ 

386 # Parse configuration using centralized config parser 

387 destination_path = config_parser.prepare_photos_destination(config=config) 

388 filters = config_parser.get_photos_filters(config=config) 

389 files = set() 

390 download_all = config_parser.get_photos_all_albums(config=config) 

391 use_hardlinks = config_parser.get_photos_use_hardlinks(config=config) 

392 libraries = filters["libraries"] if filters["libraries"] is not None else photos.libraries 

393 folder_format = config_parser.get_photos_folder_format(config=config) 

394 

395 # Initialize hard link registry using new modular approach 

396 hardlink_registry = create_hardlink_registry(use_hardlinks) 

397 

398 total_successful, total_failed = 0, 0 

399 

400 # Special handling for "All Photos" when hardlinks are enabled 

401 if use_hardlinks and download_all: 

402 sub_successful, sub_failed = _sync_all_photos_first_for_hardlinks( 

403 photos, 

404 libraries, 

405 destination_path, 

406 filters, 

407 files, 

408 folder_format, 

409 hardlink_registry, 

410 config, 

411 ) 

412 total_successful += sub_successful 

413 total_failed += sub_failed 

414 

415 # Sync albums based on configuration 

416 sub_successful, sub_failed = _sync_albums_by_configuration( 

417 photos, 

418 libraries, 

419 download_all, 

420 destination_path, 

421 filters, 

422 files, 

423 folder_format, 

424 hardlink_registry, 

425 config, 

426 ) 

427 total_successful += sub_successful 

428 total_failed += sub_failed 

429 

430 # Clean up obsolete files if enabled 

431 if config_parser.get_photos_remove_obsolete(config=config): 

432 remove_obsolete_files(destination_path, files) 

433 

434 return total_successful, total_failed 

435 

436 

437def _sync_all_photos_first_for_hardlinks( 

438 photos, 

439 libraries, 

440 destination_path, 

441 filters, 

442 files, 

443 folder_format, 

444 hardlink_registry, 

445 config, 

446) -> tuple[int, int]: 

447 """Sync 'All Photos' album first to populate hardlink registry. 

448 

449 Args: 

450 photos: Photos object from iCloudPy 

451 libraries: List of photo libraries to sync 

452 destination_path: Base destination path 

453 filters: Photo filters configuration 

454 files: Set to track downloaded files 

455 folder_format: Folder format string 

456 hardlink_registry: Registry for tracking downloaded files 

457 config: Configuration dictionary 

458 

459 Returns: 

460 Tuple of (total_successful, total_failed) download counts 

461 """ 

462 for library in libraries: 

463 if library == "PrimarySync" and "All Photos" in photos.libraries[library].albums: 

464 LOGGER.info("Syncing 'All Photos' album first for hard link reference...") 

465 result = sync_album_photos( 

466 album=photos.libraries[library].albums["All Photos"], 

467 destination_path=os.path.join(destination_path, "All Photos"), 

468 file_sizes=filters["file_sizes"], 

469 extensions=filters["extensions"], 

470 files=files, 

471 folder_format=folder_format, 

472 hardlink_registry=hardlink_registry, 

473 config=config, 

474 ) 

475 if hardlink_registry: 

476 LOGGER.info( 

477 f"'All Photos' sync complete. Hard link registry populated with " 

478 f"{hardlink_registry.get_registry_size()} reference files.", 

479 ) 

480 if result is not None: 

481 return result 

482 break 

483 return 0, 0 

484 

485 

486def _sync_albums_by_configuration( 

487 photos, 

488 libraries, 

489 download_all, 

490 destination_path, 

491 filters, 

492 files, 

493 folder_format, 

494 hardlink_registry, 

495 config, 

496) -> tuple[int, int]: 

497 """Sync albums based on configuration settings. 

498 

499 Args: 

500 photos: Photos object from iCloudPy 

501 libraries: List of photo libraries to sync 

502 download_all: Whether to download all albums 

503 destination_path: Base destination path 

504 filters: Photo filters configuration 

505 files: Set to track downloaded files 

506 folder_format: Folder format string 

507 hardlink_registry: Registry for tracking downloaded files 

508 config: Configuration dictionary 

509 

510 Returns: 

511 Tuple of (total_successful, total_failed) aggregated across all libraries 

512 """ 

513 total_successful, total_failed = 0, 0 

514 for library in libraries: 

515 if download_all and library == "PrimarySync": 

516 sub_successful, sub_failed = _sync_all_albums_except_filtered( 

517 photos, 

518 library, 

519 filters, 

520 destination_path, 

521 files, 

522 folder_format, 

523 hardlink_registry, 

524 config, 

525 ) 

526 elif filters["albums"] and library == "PrimarySync": 

527 sub_successful, sub_failed = _sync_filtered_albums( 

528 photos, 

529 library, 

530 filters, 

531 destination_path, 

532 files, 

533 folder_format, 

534 hardlink_registry, 

535 config, 

536 ) 

537 elif filters["albums"]: 

538 sub_successful, sub_failed = _sync_filtered_albums_in_library( 

539 photos, 

540 library, 

541 filters, 

542 destination_path, 

543 files, 

544 folder_format, 

545 hardlink_registry, 

546 config, 

547 ) 

548 else: 

549 sub_successful, sub_failed = _sync_all_photos_in_library( 

550 photos, 

551 library, 

552 destination_path, 

553 filters, 

554 files, 

555 folder_format, 

556 hardlink_registry, 

557 config, 

558 ) 

559 total_successful += sub_successful 

560 total_failed += sub_failed 

561 return total_successful, total_failed 

562 

563 

564def _sync_all_albums_except_filtered( 

565 photos, 

566 library, 

567 filters, 

568 destination_path, 

569 files, 

570 folder_format, 

571 hardlink_registry, 

572 config, 

573) -> tuple[int, int]: 

574 """Sync all albums except those in the filter exclusion list. 

575 

576 Args: 

577 photos: Photos object from iCloudPy 

578 library: Library name to sync 

579 filters: Photo filters configuration 

580 destination_path: Base destination path 

581 files: Set to track downloaded files 

582 folder_format: Folder format string 

583 hardlink_registry: Registry for tracking downloaded files 

584 config: Configuration dictionary 

585 

586 Returns: 

587 Tuple of (total_successful, total_failed) aggregated across all synced albums 

588 """ 

589 total_successful, total_failed = 0, 0 

590 for album in photos.libraries[library].albums.keys(): 

591 # Skip All Photos if we already synced it first 

592 if hardlink_registry and album == "All Photos": 

593 continue 

594 if filters["albums"] and album in iter(filters["albums"]): 

595 continue 

596 result = sync_album_photos( 

597 album=photos.libraries[library].albums[album], 

598 destination_path=os.path.join(destination_path, album), 

599 file_sizes=filters["file_sizes"], 

600 extensions=filters["extensions"], 

601 files=files, 

602 folder_format=folder_format, 

603 hardlink_registry=hardlink_registry, 

604 config=config, 

605 ) 

606 if result is not None: 

607 sub_successful, sub_failed = result 

608 total_successful += sub_successful 

609 total_failed += sub_failed 

610 return total_successful, total_failed 

611 

612 

613def _sync_filtered_albums( 

614 photos, 

615 library, 

616 filters, 

617 destination_path, 

618 files, 

619 folder_format, 

620 hardlink_registry, 

621 config, 

622) -> tuple[int, int]: 

623 """Sync only albums specified in filters. 

624 

625 Args: 

626 photos: Photos object from iCloudPy 

627 library: Library name to sync 

628 filters: Photo filters configuration 

629 destination_path: Base destination path 

630 files: Set to track downloaded files 

631 folder_format: Folder format string 

632 hardlink_registry: Registry for tracking downloaded files 

633 config: Configuration dictionary 

634 

635 Returns: 

636 Tuple of (total_successful, total_failed) aggregated across all synced albums 

637 """ 

638 total_successful, total_failed = 0, 0 

639 for album in iter(filters["albums"]): 

640 result = sync_album_photos( 

641 album=photos.libraries[library].albums[album], 

642 destination_path=os.path.join(destination_path, album), 

643 file_sizes=filters["file_sizes"], 

644 extensions=filters["extensions"], 

645 files=files, 

646 folder_format=folder_format, 

647 hardlink_registry=hardlink_registry, 

648 config=config, 

649 ) 

650 if result is not None: 

651 sub_successful, sub_failed = result 

652 total_successful += sub_successful 

653 total_failed += sub_failed 

654 return total_successful, total_failed 

655 

656 

657def _sync_filtered_albums_in_library( 

658 photos, 

659 library, 

660 filters, 

661 destination_path, 

662 files, 

663 folder_format, 

664 hardlink_registry, 

665 config, 

666) -> tuple[int, int]: 

667 """Sync filtered albums in a specific library. 

668 

669 Args: 

670 photos: Photos object from iCloudPy 

671 library: Library name to sync 

672 filters: Photo filters configuration 

673 destination_path: Base destination path 

674 files: Set to track downloaded files 

675 folder_format: Folder format string 

676 hardlink_registry: Registry for tracking downloaded files 

677 config: Configuration dictionary 

678 

679 Returns: 

680 Tuple of (total_successful, total_failed) aggregated across all synced albums 

681 """ 

682 total_successful, total_failed = 0, 0 

683 for album in iter(filters["albums"]): 

684 if album in photos.libraries[library].albums: 

685 result = sync_album_photos( 

686 album=photos.libraries[library].albums[album], 

687 destination_path=os.path.join(destination_path, album), 

688 file_sizes=filters["file_sizes"], 

689 extensions=filters["extensions"], 

690 files=files, 

691 folder_format=folder_format, 

692 hardlink_registry=hardlink_registry, 

693 config=config, 

694 ) 

695 if result is not None: 

696 sub_successful, sub_failed = result 

697 total_successful += sub_successful 

698 total_failed += sub_failed 

699 else: 

700 LOGGER.warning(f"Album {album} not found in {library}. Skipping the album {album} ...") 

701 return total_successful, total_failed 

702 

703 

704def _sync_all_photos_in_library( 

705 photos, 

706 library, 

707 destination_path, 

708 filters, 

709 files, 

710 folder_format, 

711 hardlink_registry, 

712 config, 

713) -> tuple[int, int]: 

714 """Sync all photos in a library. 

715 

716 Args: 

717 photos: Photos object from iCloudPy 

718 library: Library name to sync 

719 destination_path: Base destination path 

720 filters: Photo filters configuration 

721 files: Set to track downloaded files 

722 folder_format: Folder format string 

723 hardlink_registry: Registry for tracking downloaded files 

724 config: Configuration dictionary 

725 

726 Returns: 

727 Tuple of (total_successful, total_failed) download counts 

728 """ 

729 result = sync_album_photos( 

730 album=photos.libraries[library].all, 

731 destination_path=os.path.join(destination_path, "all"), 

732 file_sizes=filters["file_sizes"], 

733 extensions=filters["extensions"], 

734 files=files, 

735 folder_format=folder_format, 

736 hardlink_registry=hardlink_registry, 

737 config=config, 

738 ) 

739 if result is not None: 

740 return result 

741 return 0, 0