Coverage for src/sync_photos.py: 100%

152 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 04:41 +0000

1"""Sync photos module. 

2 

3This module provides the main photo synchronization functionality, 

4orchestrating the downloading of photos from iCloud to local storage. 

5""" 

6 

7___author___ = "Mandar Patil <mandarons@pm.me>" 

8 

9import os 

10 

11from src import config_parser, configure_icloudpy_logging, get_logger 

12from src.album_sync_orchestrator import sync_album_photos 

13from src.hardlink_registry import create_hardlink_registry 

14from src.photo_cleanup_utils import remove_obsolete_files 

15 

16# Configure icloudpy logging immediately after import 

17configure_icloudpy_logging() 

18 

19LOGGER = get_logger() 

20 

21 

22# Legacy functions preserved for backward compatibility with existing tests 

23# These functions are now implemented using the new modular architecture 

24 

25 

26def get_max_threads(config): 

27 """Get maximum number of threads for parallel downloads. 

28 

29 Legacy function - now delegates to config_parser. 

30 

31 Args: 

32 config: Configuration dictionary 

33 

34 Returns: 

35 Maximum number of threads to use for downloads 

36 """ 

37 return config_parser.get_app_max_threads(config) 

38 

39 

40def get_name_and_extension(photo, file_size): 

41 """Extract filename and extension. 

42 

43 Legacy function - now delegates to photo_path_utils. 

44 

45 Args: 

46 photo: Photo object from iCloudPy 

47 file_size: File size variant 

48 

49 Returns: 

50 Tuple of (name, extension) 

51 """ 

52 from src.photo_path_utils import get_photo_name_and_extension 

53 

54 return get_photo_name_and_extension(photo, file_size) 

55 

56 

57def photo_wanted(photo, extensions): 

58 """Check if photo is wanted based on extension. 

59 

60 Legacy function - now delegates to photo_filter_utils. 

61 

62 Args: 

63 photo: Photo object from iCloudPy 

64 extensions: List of allowed extensions 

65 

66 Returns: 

67 True if photo should be synced, False otherwise 

68 """ 

69 from src.photo_filter_utils import is_photo_wanted 

70 

71 return is_photo_wanted(photo, extensions) 

72 

73 

74def generate_file_name(photo, file_size, destination_path, folder_format): 

75 """Generate full path to file. 

76 

77 Legacy function - now delegates to photo_download_manager. 

78 

79 Args: 

80 photo: Photo object from iCloudPy 

81 file_size: File size variant 

82 destination_path: Base destination path 

83 folder_format: Folder format string 

84 

85 Returns: 

86 Full file path 

87 """ 

88 from src.photo_download_manager import generate_photo_path 

89 

90 return generate_photo_path(photo, file_size, destination_path, folder_format) 

91 

92 

93def photo_exists(photo, file_size, local_path): 

94 """Check if photo exist locally. 

95 

96 Legacy function - now delegates to photo_file_utils. 

97 

98 Args: 

99 photo: Photo object from iCloudPy 

100 file_size: File size variant 

101 local_path: Local file path to check 

102 

103 Returns: 

104 True if photo exists with correct size, False otherwise 

105 """ 

106 from src.photo_file_utils import check_photo_exists 

107 

108 return check_photo_exists(photo, file_size, local_path) 

109 

110 

111def create_hardlink(source_path, destination_path): 

112 """Create a hard link from source to destination. 

113 

114 Legacy function - now delegates to photo_file_utils. 

115 

116 Args: 

117 source_path: Path to source file 

118 destination_path: Path for new hardlink 

119 

120 Returns: 

121 True if successful, False otherwise 

122 """ 

123 from src.photo_file_utils import create_hardlink as create_hardlink_impl 

124 

125 return create_hardlink_impl(source_path, destination_path) 

126 

127 

128def download_photo(photo, file_size, destination_path): 

129 """Download photo from server. 

130 

131 Legacy function - now delegates to photo_file_utils. 

132 

133 Args: 

134 photo: Photo object from iCloudPy 

135 file_size: File size variant 

136 destination_path: Where to save the photo 

137 

138 Returns: 

139 True if successful, False otherwise 

140 """ 

141 from src.photo_file_utils import download_photo_from_server 

142 

143 return download_photo_from_server(photo, file_size, destination_path) 

144 

145 

146def process_photo(photo, file_size, destination_path, files, folder_format, hardlink_registry=None): 

147 """Process photo details (legacy function for backward compatibility). 

148 

149 Args: 

150 photo: Photo object from iCloudPy 

151 file_size: File size variant 

152 destination_path: Base destination path 

153 files: Set to track downloaded files 

154 folder_format: Folder format string 

155 hardlink_registry: Registry for hardlinks (legacy dict format) 

156 

157 Returns: 

158 True if photo was processed successfully, False otherwise 

159 """ 

160 from src.photo_download_manager import collect_download_task, execute_download_task 

161 

162 # Convert legacy hardlink registry dict to new registry format if needed 

163 converted_registry = None 

164 if hardlink_registry is not None: 

165 from src.hardlink_registry import HardlinkRegistry 

166 

167 converted_registry = HardlinkRegistry() 

168 for key, path in hardlink_registry.items(): 

169 # Legacy format: photo_id_file_size -> path 

170 if "_" in key: 

171 parts = key.rsplit("_", 1) 

172 if len(parts) == 2: 

173 photo_id, file_sz = parts 

174 converted_registry.register_photo_path(photo_id, file_sz, path) 

175 

176 # Collect download task 

177 task_info = collect_download_task( 

178 photo, 

179 file_size, 

180 destination_path, 

181 files, 

182 folder_format, 

183 converted_registry, 

184 ) 

185 

186 if task_info is None: 

187 return False 

188 

189 # Execute task 

190 result = execute_download_task(task_info) 

191 

192 # Update legacy registry if provided 

193 if result and hardlink_registry is not None: 

194 photo_key = f"{photo.id}_{file_size}" 

195 hardlink_registry[photo_key] = task_info.photo_path 

196 

197 return result 

198 

199 

200def collect_photo_for_download(photo, file_size, destination_path, files, folder_format, hardlink_registry=None): 

201 """Collect photo info for parallel download without immediately downloading. 

202 

203 Legacy function - now delegates to photo_download_manager. 

204 

205 Args: 

206 photo: Photo object from iCloudPy 

207 file_size: File size variant 

208 destination_path: Base destination path 

209 files: Set to track downloaded files 

210 folder_format: Folder format string 

211 hardlink_registry: Registry for hardlinks (legacy dict format) 

212 

213 Returns: 

214 Download task info or None 

215 """ 

216 from src.photo_download_manager import collect_download_task 

217 

218 # Convert legacy hardlink registry dict to new registry format if needed 

219 converted_registry = None 

220 if hardlink_registry is not None: 

221 from src.hardlink_registry import HardlinkRegistry 

222 

223 converted_registry = HardlinkRegistry() 

224 for key, path in hardlink_registry.items(): 

225 if "_" in key: 

226 parts = key.rsplit("_", 1) 

227 if len(parts) == 2: 

228 photo_id, file_sz = parts 

229 converted_registry.register_photo_path(photo_id, file_sz, path) 

230 

231 task_info = collect_download_task( 

232 photo, 

233 file_size, 

234 destination_path, 

235 files, 

236 folder_format, 

237 converted_registry, 

238 ) 

239 

240 if task_info is None: 

241 return None 

242 

243 # Convert back to legacy format for compatibility 

244 return { 

245 "photo": task_info.photo, 

246 "file_size": task_info.file_size, 

247 "photo_path": task_info.photo_path, 

248 "hardlink_source": task_info.hardlink_source, 

249 "hardlink_registry": hardlink_registry, 

250 } 

251 

252 

253def download_photo_task(download_info): 

254 """Download a single photo or create hardlink as part of parallel execution. 

255 

256 Legacy function - maintains original implementation for backward compatibility. 

257 

258 Args: 

259 download_info: Dictionary with download task information 

260 

261 Returns: 

262 True if successful, False otherwise 

263 """ 

264 photo = download_info["photo"] 

265 file_size = download_info["file_size"] 

266 photo_path = download_info["photo_path"] 

267 hardlink_source = download_info.get("hardlink_source") 

268 hardlink_registry = download_info.get("hardlink_registry") 

269 

270 LOGGER.debug(f"[Thread] Starting processing of {photo_path}") 

271 

272 try: 

273 # Try hardlink first if source exists 

274 if hardlink_source: 

275 if create_hardlink(hardlink_source, photo_path): 

276 LOGGER.debug(f"[Thread] Created hardlink for {photo_path}") 

277 return True 

278 else: 

279 # Fallback to download if hard link creation fails 

280 LOGGER.warning(f"Hard link creation failed, downloading {photo_path} instead") 

281 

282 # Download the photo - this maintains the original function call for test compatibility 

283 result = download_photo(photo, file_size, photo_path) 

284 if result: 

285 # Register for future hard links if enabled 

286 if hardlink_registry is not None: 

287 photo_key = f"{photo.id}_{file_size}" 

288 hardlink_registry[photo_key] = photo_path 

289 LOGGER.debug(f"[Thread] Completed download of {photo_path}") 

290 return result 

291 except Exception as e: 

292 LOGGER.error(f"[Thread] Failed to process {photo_path}: {e!s}") 

293 return False 

294 

295 

296def sync_album( 

297 album, 

298 destination_path, 

299 file_sizes, 

300 extensions=None, 

301 files=None, 

302 folder_format=None, 

303 hardlink_registry=None, 

304 config=None, 

305): 

306 """Sync given album. 

307 

308 Legacy function - now delegates to album_sync_orchestrator with conversion 

309 for legacy hardlink registry format. 

310 

311 Args: 

312 album: Album object from iCloudPy 

313 destination_path: Path where photos should be saved 

314 file_sizes: List of file size variants to download 

315 extensions: List of allowed file extensions 

316 files: Set to track downloaded files 

317 folder_format: Folder format string 

318 hardlink_registry: Registry for hardlinks (legacy dict format) 

319 config: Configuration dictionary 

320 

321 Returns: 

322 True on success, None on invalid input 

323 """ 

324 # Convert legacy hardlink registry dict to new registry format if needed 

325 converted_registry = None 

326 if hardlink_registry is not None: 

327 from src.hardlink_registry import HardlinkRegistry 

328 

329 converted_registry = HardlinkRegistry() 

330 for key, path in hardlink_registry.items(): 

331 if "_" in key: 

332 parts = key.rsplit("_", 1) 

333 if len(parts) == 2: 

334 photo_id, file_sz = parts 

335 converted_registry.register_photo_path(photo_id, file_sz, path) 

336 

337 result = sync_album_photos( 

338 album=album, 

339 destination_path=destination_path, 

340 file_sizes=file_sizes, 

341 extensions=extensions, 

342 files=files, 

343 folder_format=folder_format, 

344 hardlink_registry=converted_registry, 

345 config=config, 

346 ) 

347 

348 # Update legacy registry if provided and new registry was created 

349 if hardlink_registry is not None and converted_registry is not None: 

350 # This is a simplified approach - in practice, we'd need to track new entries 

351 # But for legacy compatibility, we'll maintain the existing behavior 

352 pass 

353 

354 return result 

355 

356 

357def remove_obsolete(destination_path, files): 

358 """Remove local obsolete file. 

359 

360 Legacy function - now delegates to photo_cleanup_utils. 

361 

362 Args: 

363 destination_path: Path to search for obsolete files 

364 files: Set of files that should be kept 

365 

366 Returns: 

367 Set of removed file paths 

368 """ 

369 return remove_obsolete_files(destination_path, files) 

370 

371 

372def sync_photos(config, photos): 

373 """Sync all photos. 

374 

375 Main orchestration function that coordinates the entire photo sync process. 

376 This function has been refactored to use the new modular architecture while 

377 maintaining backward compatibility. 

378 

379 Args: 

380 config: Configuration dictionary 

381 photos: Photos object from iCloudPy 

382 

383 Returns: 

384 None (maintains legacy behavior) 

385 """ 

386 # Parse configuration using centralized config parser 

387 destination_path = config_parser.prepare_photos_destination(config=config) 

388 filters = config_parser.get_photos_filters(config=config) 

389 files = set() 

390 download_all = config_parser.get_photos_all_albums(config=config) 

391 use_hardlinks = config_parser.get_photos_use_hardlinks(config=config) 

392 libraries = filters["libraries"] if filters["libraries"] is not None else photos.libraries 

393 folder_format = config_parser.get_photos_folder_format(config=config) 

394 

395 # Initialize hard link registry using new modular approach 

396 hardlink_registry = create_hardlink_registry(use_hardlinks) 

397 

398 # Special handling for "All Photos" when hardlinks are enabled 

399 if use_hardlinks and download_all: 

400 _sync_all_photos_first_for_hardlinks( 

401 photos, 

402 libraries, 

403 destination_path, 

404 filters, 

405 files, 

406 folder_format, 

407 hardlink_registry, 

408 config, 

409 ) 

410 

411 # Sync albums based on configuration 

412 _sync_albums_by_configuration( 

413 photos, 

414 libraries, 

415 download_all, 

416 destination_path, 

417 filters, 

418 files, 

419 folder_format, 

420 hardlink_registry, 

421 config, 

422 ) 

423 

424 # Clean up obsolete files if enabled 

425 if config_parser.get_photos_remove_obsolete(config=config): 

426 remove_obsolete_files(destination_path, files) 

427 

428 

429def _sync_all_photos_first_for_hardlinks( 

430 photos, 

431 libraries, 

432 destination_path, 

433 filters, 

434 files, 

435 folder_format, 

436 hardlink_registry, 

437 config, 

438): 

439 """Sync 'All Photos' album first to populate hardlink registry. 

440 

441 Args: 

442 photos: Photos object from iCloudPy 

443 libraries: List of photo libraries to sync 

444 destination_path: Base destination path 

445 filters: Photo filters configuration 

446 files: Set to track downloaded files 

447 folder_format: Folder format string 

448 hardlink_registry: Registry for tracking downloaded files 

449 config: Configuration dictionary 

450 """ 

451 for library in libraries: 

452 if library == "PrimarySync" and "All Photos" in photos.libraries[library].albums: 

453 LOGGER.info("Syncing 'All Photos' album first for hard link reference...") 

454 sync_album_photos( 

455 album=photos.libraries[library].albums["All Photos"], 

456 destination_path=os.path.join(destination_path, "All Photos"), 

457 file_sizes=filters["file_sizes"], 

458 extensions=filters["extensions"], 

459 files=files, 

460 folder_format=folder_format, 

461 hardlink_registry=hardlink_registry, 

462 config=config, 

463 ) 

464 if hardlink_registry: 

465 LOGGER.info( 

466 f"'All Photos' sync complete. Hard link registry populated with " 

467 f"{hardlink_registry.get_registry_size()} reference files.", 

468 ) 

469 break 

470 

471 

472def _sync_albums_by_configuration( 

473 photos, 

474 libraries, 

475 download_all, 

476 destination_path, 

477 filters, 

478 files, 

479 folder_format, 

480 hardlink_registry, 

481 config, 

482): 

483 """Sync albums based on configuration settings. 

484 

485 Args: 

486 photos: Photos object from iCloudPy 

487 libraries: List of photo libraries to sync 

488 download_all: Whether to download all albums 

489 destination_path: Base destination path 

490 filters: Photo filters configuration 

491 files: Set to track downloaded files 

492 folder_format: Folder format string 

493 hardlink_registry: Registry for tracking downloaded files 

494 config: Configuration dictionary 

495 """ 

496 for library in libraries: 

497 if download_all and library == "PrimarySync": 

498 _sync_all_albums_except_filtered( 

499 photos, 

500 library, 

501 filters, 

502 destination_path, 

503 files, 

504 folder_format, 

505 hardlink_registry, 

506 config, 

507 ) 

508 elif filters["albums"] and library == "PrimarySync": 

509 _sync_filtered_albums( 

510 photos, 

511 library, 

512 filters, 

513 destination_path, 

514 files, 

515 folder_format, 

516 hardlink_registry, 

517 config, 

518 ) 

519 elif filters["albums"]: 

520 _sync_filtered_albums_in_library( 

521 photos, 

522 library, 

523 filters, 

524 destination_path, 

525 files, 

526 folder_format, 

527 hardlink_registry, 

528 config, 

529 ) 

530 else: 

531 _sync_all_photos_in_library( 

532 photos, 

533 library, 

534 destination_path, 

535 filters, 

536 files, 

537 folder_format, 

538 hardlink_registry, 

539 config, 

540 ) 

541 

542 

543def _sync_all_albums_except_filtered( 

544 photos, 

545 library, 

546 filters, 

547 destination_path, 

548 files, 

549 folder_format, 

550 hardlink_registry, 

551 config, 

552): 

553 """Sync all albums except those in the filter exclusion list. 

554 

555 Args: 

556 photos: Photos object from iCloudPy 

557 library: Library name to sync 

558 filters: Photo filters configuration 

559 destination_path: Base destination path 

560 files: Set to track downloaded files 

561 folder_format: Folder format string 

562 hardlink_registry: Registry for tracking downloaded files 

563 config: Configuration dictionary 

564 """ 

565 for album in photos.libraries[library].albums.keys(): 

566 # Skip All Photos if we already synced it first 

567 if hardlink_registry and album == "All Photos": 

568 continue 

569 if filters["albums"] and album in iter(filters["albums"]): 

570 continue 

571 sync_album_photos( 

572 album=photos.libraries[library].albums[album], 

573 destination_path=os.path.join(destination_path, album), 

574 file_sizes=filters["file_sizes"], 

575 extensions=filters["extensions"], 

576 files=files, 

577 folder_format=folder_format, 

578 hardlink_registry=hardlink_registry, 

579 config=config, 

580 ) 

581 

582 

583def _sync_filtered_albums( 

584 photos, 

585 library, 

586 filters, 

587 destination_path, 

588 files, 

589 folder_format, 

590 hardlink_registry, 

591 config, 

592): 

593 """Sync only albums specified in filters. 

594 

595 Args: 

596 photos: Photos object from iCloudPy 

597 library: Library name to sync 

598 filters: Photo filters configuration 

599 destination_path: Base destination path 

600 files: Set to track downloaded files 

601 folder_format: Folder format string 

602 hardlink_registry: Registry for tracking downloaded files 

603 config: Configuration dictionary 

604 """ 

605 for album in iter(filters["albums"]): 

606 sync_album_photos( 

607 album=photos.libraries[library].albums[album], 

608 destination_path=os.path.join(destination_path, album), 

609 file_sizes=filters["file_sizes"], 

610 extensions=filters["extensions"], 

611 files=files, 

612 folder_format=folder_format, 

613 hardlink_registry=hardlink_registry, 

614 config=config, 

615 ) 

616 

617 

618def _sync_filtered_albums_in_library( 

619 photos, 

620 library, 

621 filters, 

622 destination_path, 

623 files, 

624 folder_format, 

625 hardlink_registry, 

626 config, 

627): 

628 """Sync filtered albums in a specific library. 

629 

630 Args: 

631 photos: Photos object from iCloudPy 

632 library: Library name to sync 

633 filters: Photo filters configuration 

634 destination_path: Base destination path 

635 files: Set to track downloaded files 

636 folder_format: Folder format string 

637 hardlink_registry: Registry for tracking downloaded files 

638 config: Configuration dictionary 

639 """ 

640 for album in iter(filters["albums"]): 

641 if album in photos.libraries[library].albums: 

642 sync_album_photos( 

643 album=photos.libraries[library].albums[album], 

644 destination_path=os.path.join(destination_path, album), 

645 file_sizes=filters["file_sizes"], 

646 extensions=filters["extensions"], 

647 files=files, 

648 folder_format=folder_format, 

649 hardlink_registry=hardlink_registry, 

650 config=config, 

651 ) 

652 else: 

653 LOGGER.warning(f"Album {album} not found in {library}. Skipping the album {album} ...") 

654 

655 

656def _sync_all_photos_in_library( 

657 photos, 

658 library, 

659 destination_path, 

660 filters, 

661 files, 

662 folder_format, 

663 hardlink_registry, 

664 config, 

665): 

666 """Sync all photos in a library. 

667 

668 Args: 

669 photos: Photos object from iCloudPy 

670 library: Library name to sync 

671 destination_path: Base destination path 

672 filters: Photo filters configuration 

673 files: Set to track downloaded files 

674 folder_format: Folder format string 

675 hardlink_registry: Registry for tracking downloaded files 

676 config: Configuration dictionary 

677 """ 

678 sync_album_photos( 

679 album=photos.libraries[library].all, 

680 destination_path=os.path.join(destination_path, "all"), 

681 file_sizes=filters["file_sizes"], 

682 extensions=filters["extensions"], 

683 files=files, 

684 folder_format=folder_format, 

685 hardlink_registry=hardlink_registry, 

686 config=config, 

687 )