Coverage for src/sync.py: 100%
259 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
1"""Sync module."""
3__author__ = "Mandar Patil <mandarons@pm.me>"
4import datetime
5import os
6from time import sleep
8from icloudpy import ICloudPyService, exceptions, utils
10from src import (
11 DEFAULT_CONFIG_FILE_PATH,
12 ENV_CONFIG_FILE_PATH_KEY,
13 ENV_ICLOUD_PASSWORD_KEY,
14 config_parser,
15 configure_icloudpy_logging,
16 get_logger,
17 notify,
18 read_config,
19 sync_drive,
20 sync_photos,
21)
22from src.sync_stats import SyncSummary
23from src.usage import alive
25# Configure icloudpy logging immediately after import
26configure_icloudpy_logging()
28LOGGER = get_logger()
31def get_api_instance(
32 username: str,
33 password: str,
34 cookie_directory: str | None = None,
35 server_region: str = "global",
36) -> ICloudPyService:
37 """
38 Create and return an iCloud API client instance.
40 Args:
41 username: iCloud username/Apple ID
42 password: iCloud password
43 cookie_directory: Directory to store authentication cookies.
44 When ``None`` (the default), resolved late from
45 ``src.DEFAULT_COOKIE_DIRECTORY`` so test fixtures that
46 redirect the constant at runtime take effect — the previous
47 ``= DEFAULT_COOKIE_DIRECTORY`` default-arg capture made the
48 constant unmockable post-import.
49 server_region: Server region ("china" or "global")
51 Returns:
52 Configured ICloudPyService instance
53 """
54 if cookie_directory is None:
55 # Read through the src module so monkey-patches of
56 # ``src.DEFAULT_COOKIE_DIRECTORY`` (e.g. by tests/conftest.py)
57 # are honoured. ``src`` is this function's parent package and
58 # already imported; using ``sys.modules`` avoids a per-call
59 # ``import src`` and makes the data flow explicit.
60 import sys
62 cookie_directory = sys.modules["src"].DEFAULT_COOKIE_DIRECTORY
63 return (
64 ICloudPyService(
65 apple_id=username,
66 password=password,
67 cookie_directory=cookie_directory,
68 home_endpoint="https://www.icloud.com.cn",
69 setup_endpoint="https://setup.icloud.com.cn/setup/ws/1",
70 )
71 if server_region == "china"
72 else ICloudPyService(
73 apple_id=username,
74 password=password,
75 cookie_directory=cookie_directory,
76 )
77 )
80class SyncState:
81 """
82 Maintains synchronization state for drive and photos.
84 This class encapsulates the countdown timers and sync flags to avoid
85 passing multiple variables between functions.
86 """
88 def __init__(self):
89 """Initialize sync state with default values."""
90 self.drive_time_remaining = 0
91 self.photos_time_remaining = 0
92 self.enable_sync_drive = True
93 self.enable_sync_photos = True
94 self.last_send = None
97def _load_configuration():
98 """
99 Load configuration from file or environment.
101 Returns:
102 Configuration dictionary
103 """
104 config_path = os.environ.get(ENV_CONFIG_FILE_PATH_KEY, DEFAULT_CONFIG_FILE_PATH)
105 return read_config(config_path=config_path)
108def _extract_sync_intervals(config, log_messages: bool = False):
109 """
110 Extract drive and photos sync intervals from configuration.
112 Args:
113 config: Configuration dictionary
114 log_messages: Whether to log informational messages (default: False for loop usage)
116 Returns:
117 tuple: (drive_sync_interval, photos_sync_interval)
118 """
119 drive_sync_interval = 0
120 photos_sync_interval = 0
122 if config and "drive" in config:
123 drive_sync_interval = config_parser.get_drive_sync_interval(config=config, log_messages=log_messages)
124 if config and "photos" in config:
125 photos_sync_interval = config_parser.get_photos_sync_interval(config=config, log_messages=log_messages)
127 return drive_sync_interval, photos_sync_interval
130def _retrieve_password(username: str):
131 """
132 Retrieve password from environment or keyring.
134 Args:
135 username: iCloud username
137 Returns:
138 Password string or None if not found
140 Raises:
141 ICloudPyNoStoredPasswordAvailableException: If password not available
142 """
143 if ENV_ICLOUD_PASSWORD_KEY in os.environ:
144 password = os.environ.get(ENV_ICLOUD_PASSWORD_KEY)
145 utils.store_password_in_keyring(username=username, password=password)
146 return password
147 else:
148 return utils.get_password_from_keyring(username=username)
151def _authenticate_and_get_api(config, username: str):
152 """
153 Authenticate user and return iCloud API instance.
155 Args:
156 config: Configuration dictionary
157 username: iCloud username
159 Returns:
160 ICloudPyService instance
162 Raises:
163 ICloudPyNoStoredPasswordAvailableException: If password not available
164 """
165 server_region = config_parser.get_region(config=config)
166 password = _retrieve_password(username)
167 return get_api_instance(username=username, password=password, server_region=server_region)
170def _perform_drive_sync(config, api, sync_state: SyncState, drive_sync_interval: int):
171 """
172 Execute drive synchronization if enabled.
174 Args:
175 config: Configuration dictionary
176 api: iCloud API instance
177 sync_state: Current sync state
178 drive_sync_interval: Drive sync interval in seconds
180 Returns:
181 DriveStats object if sync was performed, None otherwise
182 """
183 if config and "drive" in config and sync_state.enable_sync_drive:
184 import time
186 from src.sync_stats import DriveStats
188 start_time = time.time()
189 stats = DriveStats()
191 destination_path = config_parser.prepare_drive_destination(config=config)
193 # Count files before sync
194 files_before = set()
195 if os.path.exists(destination_path):
196 try:
197 for root, _dirs, file_list in os.walk(destination_path):
198 for file in file_list:
199 files_before.add(os.path.join(root, file))
200 except Exception:
201 pass
203 LOGGER.info("Syncing drive...")
204 files_after = sync_drive.sync_drive(config=config, drive=api.drive)
205 LOGGER.info("Drive synced")
207 # Calculate statistics
208 stats.duration_seconds = time.time() - start_time
210 # Handle case where sync_drive returns None (e.g., in tests)
211 if files_after is not None:
212 # Count newly downloaded files
213 new_files = files_after - files_before
214 stats.files_downloaded = len(new_files)
216 # Count skipped files
217 stats.files_skipped = len(files_before & files_after)
219 # Count removed files
220 if config_parser.get_drive_remove_obsolete(config=config):
221 stats.files_removed = len(files_before - files_after)
223 # Calculate bytes downloaded
224 try:
225 for file_path in new_files:
226 if os.path.exists(file_path) and os.path.isfile(file_path):
227 stats.bytes_downloaded += os.path.getsize(file_path)
228 except Exception:
229 pass
231 # Reset countdown timer to the configured interval
232 sync_state.drive_time_remaining = drive_sync_interval
233 return stats
234 return None
237def _perform_photos_sync(config, api, sync_state: SyncState, photos_sync_interval: int):
238 """
239 Execute photos synchronization if enabled.
241 Args:
242 config: Configuration dictionary
243 api: iCloud API instance
244 sync_state: Current sync state
245 photos_sync_interval: Photos sync interval in seconds
247 Returns:
248 PhotoStats object if sync was performed, None otherwise
249 """
250 if config and "photos" in config and sync_state.enable_sync_photos:
251 import time
253 from src.sync_stats import PhotoStats
255 start_time = time.time()
256 stats = PhotoStats()
258 destination_path = config_parser.prepare_photos_destination(config=config)
260 # Count files before sync
261 files_before = set()
262 if os.path.exists(destination_path):
263 try:
264 for root, _dirs, file_list in os.walk(destination_path):
265 for file in file_list:
266 files_before.add(os.path.join(root, file))
267 except Exception:
268 pass
270 LOGGER.info("Syncing photos...")
271 sync_result = sync_photos.sync_photos(config=config, photos=api.photos)
272 LOGGER.info("Photos synced")
274 # Count files after sync
275 files_after = set()
276 if os.path.exists(destination_path):
277 try:
278 for root, _dirs, file_list in os.walk(destination_path):
279 for file in file_list:
280 files_after.add(os.path.join(root, file))
281 except Exception:
282 pass
284 # Calculate statistics
285 stats.duration_seconds = time.time() - start_time
287 # Count newly downloaded files
288 new_files = files_after - files_before
289 stats.photos_downloaded = len(new_files)
291 # Estimate hardlinked photos (approximate)
292 use_hardlinks = config_parser.get_photos_use_hardlinks(config=config, log_messages=False)
293 if use_hardlinks:
294 stats.photos_hardlinked = max(0, len(files_after) - len(files_before) - stats.photos_downloaded)
296 # Count skipped photos
297 stats.photos_skipped = len(files_before & files_after)
299 # Calculate bytes downloaded
300 try:
301 for file_path in new_files:
302 if os.path.exists(file_path) and os.path.isfile(file_path):
303 stats.bytes_downloaded += os.path.getsize(file_path)
305 # Estimate bytes saved by hardlinks
306 if use_hardlinks and stats.photos_hardlinked > 0:
307 for file_path in files_after:
308 if file_path not in new_files and os.path.isfile(file_path):
309 stats.bytes_saved_by_hardlinks += os.path.getsize(file_path)
310 except Exception:
311 pass
313 # Track failed downloads so notifications reflect errors
314 if isinstance(sync_result, tuple):
315 _, failed_downloads = sync_result
316 if failed_downloads > 0:
317 stats.errors.append(f"{failed_downloads} photo download(s) failed")
319 # Get list of synced albums (simple approximation based on directories)
320 try:
321 for item in os.listdir(destination_path):
322 item_path = os.path.join(destination_path, item)
323 if os.path.isdir(item_path):
324 stats.albums_synced.append(item)
325 except Exception:
326 pass
328 # Reset countdown timer to the configured interval
329 sync_state.photos_time_remaining = photos_sync_interval
330 return stats
331 return None
334def _check_services_configured(config):
335 """
336 Check if any sync services are configured.
338 Args:
339 config: Configuration dictionary
341 Returns:
342 bool: True if at least one service is configured
343 """
345 return "drive" in config or "photos" in config
348def _send_usage_statistics(config, summary: SyncSummary) -> None:
349 """Send anonymized usage statistics.
351 Args:
352 config: Configuration dictionary
353 summary: Sync summary with statistics
354 """
356 # Create anonymized usage data
357 usage_data = {
358 "sync_duration": (
359 (summary.sync_end_time - summary.sync_start_time).total_seconds() if summary.sync_end_time else 0
360 ),
361 "has_drive_activity": bool(summary.drive_stats and summary.drive_stats.has_activity()),
362 "has_photos_activity": bool(summary.photo_stats and summary.photo_stats.has_activity()),
363 "has_errors": summary.has_errors(),
364 "timestamp": (summary.sync_end_time.isoformat() if summary.sync_end_time else None),
365 }
367 # Add aggregated statistics (no personal data)
368 if summary.drive_stats:
369 usage_data["drive"] = {
370 "files_count": summary.drive_stats.files_downloaded,
371 "bytes_count": summary.drive_stats.bytes_downloaded,
372 "has_errors": summary.drive_stats.has_errors(),
373 }
375 if summary.photo_stats:
376 usage_data["photos"] = {
377 "photos_count": summary.photo_stats.photos_downloaded,
378 "bytes_count": summary.photo_stats.bytes_downloaded,
379 "hardlinks_count": summary.photo_stats.photos_hardlinked,
380 "has_errors": summary.photo_stats.has_errors(),
381 }
383 # Send to usage tracking
384 alive(config=config, data=usage_data)
387def _handle_2fa_required(config, username: str, sync_state: SyncState):
388 """
389 Handle 2FA authentication requirement.
391 Args:
392 config: Configuration dictionary
393 username: iCloud username
394 sync_state: Current sync state
396 Returns:
397 bool: True if should continue (retry), False if should exit
398 """
399 LOGGER.error("Error: 2FA is required. Please log in.")
400 sleep_for = config_parser.get_retry_login_interval(config=config)
402 if sleep_for < 0:
403 LOGGER.info("retry_login_interval is < 0, exiting ...")
404 return False
406 _log_retry_time(sleep_for)
407 server_region = config_parser.get_region(config=config)
408 sync_state.last_send = notify.send(
409 config=config,
410 username=username,
411 last_send=sync_state.last_send,
412 region=server_region,
413 )
414 sleep(sleep_for)
415 return True
418def _handle_password_error(config, username: str, sync_state: SyncState):
419 """
420 Handle password not available error.
422 Args:
423 config: Configuration dictionary
424 username: iCloud username
425 sync_state: Current sync state
427 Returns:
428 bool: True if should continue (retry), False if should exit
429 """
430 LOGGER.error("Password is not stored in keyring. Please save the password in keyring.")
431 sleep_for = config_parser.get_retry_login_interval(config=config)
433 if sleep_for < 0:
434 LOGGER.info("retry_login_interval is < 0, exiting ...")
435 return False
437 _log_retry_time(sleep_for)
438 server_region = config_parser.get_region(config=config)
439 sync_state.last_send = notify.send(
440 config=config,
441 username=username,
442 last_send=sync_state.last_send,
443 region=server_region,
444 )
445 sleep(sleep_for)
446 return True
449def _log_retry_time(sleep_for: int):
450 """
451 Log the next retry time.
453 Args:
454 sleep_for: Sleep duration in seconds
455 """
456 next_sync = (datetime.datetime.now() + datetime.timedelta(seconds=sleep_for)).strftime("%c")
457 LOGGER.info(f"Retrying login at {next_sync} ...")
460def _calculate_next_sync_schedule(config, sync_state: SyncState):
461 """
462 Calculate next sync schedule and update sync state.
464 This function implements the adaptive scheduling algorithm that determines
465 which service should sync next based on countdown timers.
467 Args:
468 config: Configuration dictionary
469 sync_state: Current sync state
471 Returns:
472 int: Sleep duration in seconds
473 """
474 has_drive = config and "drive" in config
475 has_photos = config and "photos" in config
477 if not has_drive and has_photos:
478 sleep_for = sync_state.photos_time_remaining
479 sync_state.enable_sync_drive = False
480 sync_state.enable_sync_photos = True
481 elif has_drive and not has_photos:
482 sleep_for = sync_state.drive_time_remaining
483 sync_state.enable_sync_drive = True
484 sync_state.enable_sync_photos = False
485 elif has_drive and has_photos and sync_state.drive_time_remaining <= sync_state.photos_time_remaining:
486 # Special case: if both timers are equal and large (> 10 seconds), wait for the full interval
487 # This fixes the bug where equal large intervals cause immediate re-sync
488 if sync_state.drive_time_remaining == sync_state.photos_time_remaining and sync_state.drive_time_remaining > 10:
489 sleep_for = sync_state.drive_time_remaining
490 sync_state.enable_sync_drive = True
491 sync_state.enable_sync_photos = True
492 else:
493 sleep_for = sync_state.photos_time_remaining - sync_state.drive_time_remaining
494 sync_state.photos_time_remaining -= sync_state.drive_time_remaining
495 sync_state.enable_sync_drive = True
496 sync_state.enable_sync_photos = False
497 else:
498 sleep_for = sync_state.drive_time_remaining - sync_state.photos_time_remaining
499 sync_state.drive_time_remaining -= sync_state.photos_time_remaining
500 sync_state.enable_sync_drive = False
501 sync_state.enable_sync_photos = True
503 return sleep_for
506def _log_next_sync_time(sleep_for: int):
507 """
508 Log the next scheduled sync time.
510 Args:
511 sleep_for: Sleep duration in seconds
512 """
513 next_sync = (datetime.datetime.now() + datetime.timedelta(seconds=sleep_for)).strftime("%c")
514 LOGGER.info(f"Resyncing at {next_sync} ...")
517def _log_sync_intervals_at_startup(config):
518 """
519 Log sync intervals once at startup.
521 Args:
522 config: Configuration dictionary
523 """
524 if config and "drive" in config:
525 config_parser.get_drive_sync_interval(config=config, log_messages=True)
526 if config and "photos" in config:
527 config_parser.get_photos_sync_interval(config=config, log_messages=True)
530def _should_exit_oneshot_mode(config):
531 """
532 Check if should exit in oneshot mode.
534 Oneshot mode exits when ALL configured sync intervals are negative.
536 Args:
537 config: Configuration dictionary
539 Returns:
540 bool: True if should exit
541 """
543 should_exit_drive = ("drive" not in config) or (
544 config_parser.get_drive_sync_interval(config=config, log_messages=False) < 0
545 )
546 should_exit_photos = ("photos" not in config) or (
547 config_parser.get_photos_sync_interval(config=config, log_messages=False) < 0
548 )
550 return should_exit_drive and should_exit_photos
553def sync():
554 """
555 Main synchronization loop.
557 Orchestrates the entire sync process by delegating specific responsibilities
558 to focused helper functions. This function coordinates the high-level flow
559 while each helper handles a single concern.
560 """
561 sync_state = SyncState()
562 startup_logged = False
564 while True:
565 config = _load_configuration()
566 alive(config=config)
568 # Log sync intervals once at startup
569 if not startup_logged:
570 _log_sync_intervals_at_startup(config)
571 startup_logged = True
573 drive_sync_interval, photos_sync_interval = _extract_sync_intervals(config, log_messages=False)
574 username = config_parser.get_username(config=config) if config else None
576 if username:
577 try:
578 api = _authenticate_and_get_api(config, username)
580 if not api.requires_2sa:
581 # Create summary for this sync cycle
582 summary = SyncSummary()
584 # Perform syncs and collect statistics
585 drive_stats = _perform_drive_sync(config, api, sync_state, drive_sync_interval)
586 photos_stats = _perform_photos_sync(config, api, sync_state, photos_sync_interval)
588 # Populate summary with statistics
589 summary.drive_stats = drive_stats
590 summary.photo_stats = photos_stats
591 summary.sync_end_time = datetime.datetime.now()
593 # Send usage statistics (anonymized summary data)
594 try:
595 _send_usage_statistics(config, summary)
596 except Exception as e:
597 LOGGER.debug(f"Failed to send usage statistics: {e!s}")
599 # Send sync summary notification if configured
600 # Only send notification when both enabled services have synced in this cycle
601 # Gracefully handle notification failures to not break sync
602 has_drive_config = config and "drive" in config
603 has_photos_config = config and "photos" in config
605 should_send_notification = False
606 if has_drive_config and has_photos_config:
607 # Both services configured - send notification only when both have synced
608 should_send_notification = drive_stats is not None and photos_stats is not None
609 elif has_drive_config and not has_photos_config:
610 # Only drive configured - send when drive synced
611 should_send_notification = drive_stats is not None
612 elif has_photos_config and not has_drive_config:
613 # Only photos configured - send when photos synced
614 should_send_notification = photos_stats is not None
616 if should_send_notification:
617 try:
618 notify.send_sync_summary(config=config, summary=summary)
619 except Exception as e:
620 LOGGER.debug(f"Failed to send sync summary notification: {e!s}")
622 if not _check_services_configured(config):
623 LOGGER.warning("Nothing to sync. Please add drive: and/or photos: section in config.yaml file.")
624 else:
625 if not _handle_2fa_required(config, username, sync_state):
626 break
627 continue
629 except exceptions.ICloudPyNoStoredPasswordAvailableException:
630 if not _handle_password_error(config, username, sync_state):
631 break
632 continue
634 sleep_for = _calculate_next_sync_schedule(config, sync_state)
635 _log_next_sync_time(sleep_for)
637 if _should_exit_oneshot_mode(config):
638 LOGGER.info("All configured sync intervals are negative, exiting oneshot mode...")
639 break
641 sleep(sleep_for)