Coverage for src/sync.py: 100%
237 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-08 21:40 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-08 21:40 +0000
1"""Sync module."""
3__author__ = "Mandar Patil <mandarons@pm.me>"
4import datetime
5import os
6from time import sleep
8from icloudpy import ICloudPyService, exceptions, utils
10from src import (
11 DEFAULT_CONFIG_FILE_PATH,
12 DEFAULT_COOKIE_DIRECTORY,
13 ENV_CONFIG_FILE_PATH_KEY,
14 ENV_ICLOUD_PASSWORD_KEY,
15 config_parser,
16 configure_icloudpy_logging,
17 get_logger,
18 notify,
19 read_config,
20 sync_drive,
21 sync_photos,
22)
23from src.sync_stats import SyncSummary
24from src.usage import alive
26# Configure icloudpy logging immediately after import
27configure_icloudpy_logging()
29LOGGER = get_logger()
32def get_api_instance(
33 username: str,
34 password: str,
35 cookie_directory: str = DEFAULT_COOKIE_DIRECTORY,
36 server_region: str = "global",
37) -> ICloudPyService:
38 """
39 Create and return an iCloud API client instance.
41 Args:
42 username: iCloud username/Apple ID
43 password: iCloud password
44 cookie_directory: Directory to store authentication cookies
45 server_region: Server region ("china" or "global")
47 Returns:
48 Configured ICloudPyService instance
49 """
50 return (
51 ICloudPyService(
52 apple_id=username,
53 password=password,
54 cookie_directory=cookie_directory,
55 home_endpoint="https://www.icloud.com.cn",
56 setup_endpoint="https://setup.icloud.com.cn/setup/ws/1",
57 )
58 if server_region == "china"
59 else ICloudPyService(
60 apple_id=username,
61 password=password,
62 cookie_directory=cookie_directory,
63 )
64 )
67class SyncState:
68 """
69 Maintains synchronization state for drive and photos.
71 This class encapsulates the countdown timers and sync flags to avoid
72 passing multiple variables between functions.
73 """
75 def __init__(self):
76 """Initialize sync state with default values."""
77 self.drive_time_remaining = 0
78 self.photos_time_remaining = 0
79 self.enable_sync_drive = True
80 self.enable_sync_photos = True
81 self.last_send = None
84def _load_configuration():
85 """
86 Load configuration from file or environment.
88 Returns:
89 Configuration dictionary
90 """
91 config_path = os.environ.get(ENV_CONFIG_FILE_PATH_KEY, DEFAULT_CONFIG_FILE_PATH)
92 return read_config(config_path=config_path)
95def _extract_sync_intervals(config, log_messages: bool = False):
96 """
97 Extract drive and photos sync intervals from configuration.
99 Args:
100 config: Configuration dictionary
101 log_messages: Whether to log informational messages (default: False for loop usage)
103 Returns:
104 tuple: (drive_sync_interval, photos_sync_interval)
105 """
106 drive_sync_interval = 0
107 photos_sync_interval = 0
109 if config and "drive" in config:
110 drive_sync_interval = config_parser.get_drive_sync_interval(config=config, log_messages=log_messages)
111 if config and "photos" in config:
112 photos_sync_interval = config_parser.get_photos_sync_interval(config=config, log_messages=log_messages)
114 return drive_sync_interval, photos_sync_interval
117def _retrieve_password(username: str):
118 """
119 Retrieve password from environment or keyring.
121 Args:
122 username: iCloud username
124 Returns:
125 Password string or None if not found
127 Raises:
128 ICloudPyNoStoredPasswordAvailableException: If password not available
129 """
130 if ENV_ICLOUD_PASSWORD_KEY in os.environ:
131 password = os.environ.get(ENV_ICLOUD_PASSWORD_KEY)
132 utils.store_password_in_keyring(username=username, password=password)
133 return password
134 else:
135 return utils.get_password_from_keyring(username=username)
138def _authenticate_and_get_api(config, username: str):
139 """
140 Authenticate user and return iCloud API instance.
142 Args:
143 config: Configuration dictionary
144 username: iCloud username
146 Returns:
147 ICloudPyService instance
149 Raises:
150 ICloudPyNoStoredPasswordAvailableException: If password not available
151 """
152 server_region = config_parser.get_region(config=config)
153 password = _retrieve_password(username)
154 return get_api_instance(username=username, password=password, server_region=server_region)
157def _perform_drive_sync(config, api, sync_state: SyncState, drive_sync_interval: int):
158 """
159 Execute drive synchronization if enabled.
161 Args:
162 config: Configuration dictionary
163 api: iCloud API instance
164 sync_state: Current sync state
165 drive_sync_interval: Drive sync interval in seconds
167 Returns:
168 DriveStats object if sync was performed, None otherwise
169 """
170 if config and "drive" in config and sync_state.enable_sync_drive:
171 import time
173 from src.sync_stats import DriveStats
175 start_time = time.time()
176 stats = DriveStats()
178 destination_path = config_parser.prepare_drive_destination(config=config)
180 # Count files before sync
181 files_before = set()
182 if os.path.exists(destination_path):
183 try:
184 for root, _dirs, file_list in os.walk(destination_path):
185 for file in file_list:
186 files_before.add(os.path.join(root, file))
187 except Exception:
188 pass
190 LOGGER.info("Syncing drive...")
191 files_after = sync_drive.sync_drive(config=config, drive=api.drive)
192 LOGGER.info("Drive synced")
194 # Calculate statistics
195 stats.duration_seconds = time.time() - start_time
197 # Handle case where sync_drive returns None (e.g., in tests)
198 if files_after is not None:
199 # Count newly downloaded files
200 new_files = files_after - files_before
201 stats.files_downloaded = len(new_files)
203 # Count skipped files
204 stats.files_skipped = len(files_before & files_after)
206 # Count removed files
207 if config_parser.get_drive_remove_obsolete(config=config):
208 stats.files_removed = len(files_before - files_after)
210 # Calculate bytes downloaded
211 try:
212 for file_path in new_files:
213 if os.path.exists(file_path) and os.path.isfile(file_path):
214 stats.bytes_downloaded += os.path.getsize(file_path)
215 except Exception:
216 pass
218 # Reset countdown timer to the configured interval
219 sync_state.drive_time_remaining = drive_sync_interval
220 return stats
221 return None
224def _perform_photos_sync(config, api, sync_state: SyncState, photos_sync_interval: int):
225 """
226 Execute photos synchronization if enabled.
228 Args:
229 config: Configuration dictionary
230 api: iCloud API instance
231 sync_state: Current sync state
232 photos_sync_interval: Photos sync interval in seconds
234 Returns:
235 PhotoStats object if sync was performed, None otherwise
236 """
237 if config and "photos" in config and sync_state.enable_sync_photos:
238 import time
240 from src.sync_stats import PhotoStats
242 start_time = time.time()
243 stats = PhotoStats()
245 destination_path = config_parser.prepare_photos_destination(config=config)
247 # Count files before sync
248 files_before = set()
249 if os.path.exists(destination_path):
250 try:
251 for root, _dirs, file_list in os.walk(destination_path):
252 for file in file_list:
253 files_before.add(os.path.join(root, file))
254 except Exception:
255 pass
257 LOGGER.info("Syncing photos...")
258 sync_photos.sync_photos(config=config, photos=api.photos)
259 LOGGER.info("Photos synced")
261 # Count files after sync
262 files_after = set()
263 if os.path.exists(destination_path):
264 try:
265 for root, _dirs, file_list in os.walk(destination_path):
266 for file in file_list:
267 files_after.add(os.path.join(root, file))
268 except Exception:
269 pass
271 # Calculate statistics
272 stats.duration_seconds = time.time() - start_time
274 # Count newly downloaded files
275 new_files = files_after - files_before
276 stats.photos_downloaded = len(new_files)
278 # Estimate hardlinked photos (approximate)
279 use_hardlinks = config_parser.get_photos_use_hardlinks(config=config)
280 if use_hardlinks:
281 stats.photos_hardlinked = max(0, len(files_after) - len(files_before) - stats.photos_downloaded)
283 # Count skipped photos
284 stats.photos_skipped = len(files_before & files_after)
286 # Calculate bytes downloaded
287 try:
288 for file_path in new_files:
289 if os.path.exists(file_path) and os.path.isfile(file_path):
290 stats.bytes_downloaded += os.path.getsize(file_path)
292 # Estimate bytes saved by hardlinks
293 if use_hardlinks and stats.photos_hardlinked > 0:
294 for file_path in files_after:
295 if file_path not in new_files and os.path.isfile(file_path):
296 stats.bytes_saved_by_hardlinks += os.path.getsize(file_path)
297 except Exception:
298 pass
300 # Get list of synced albums (simple approximation based on directories)
301 try:
302 for item in os.listdir(destination_path):
303 item_path = os.path.join(destination_path, item)
304 if os.path.isdir(item_path):
305 stats.albums_synced.append(item)
306 except Exception:
307 pass
309 # Reset countdown timer to the configured interval
310 sync_state.photos_time_remaining = photos_sync_interval
311 return stats
312 return None
315def _check_services_configured(config):
316 """
317 Check if any sync services are configured.
319 Args:
320 config: Configuration dictionary
322 Returns:
323 bool: True if at least one service is configured
324 """
326 return "drive" in config or "photos" in config
329def _handle_2fa_required(config, username: str, sync_state: SyncState):
330 """
331 Handle 2FA authentication requirement.
333 Args:
334 config: Configuration dictionary
335 username: iCloud username
336 sync_state: Current sync state
338 Returns:
339 bool: True if should continue (retry), False if should exit
340 """
341 LOGGER.error("Error: 2FA is required. Please log in.")
342 sleep_for = config_parser.get_retry_login_interval(config=config)
344 if sleep_for < 0:
345 LOGGER.info("retry_login_interval is < 0, exiting ...")
346 return False
348 _log_retry_time(sleep_for)
349 server_region = config_parser.get_region(config=config)
350 sync_state.last_send = notify.send(
351 config=config,
352 username=username,
353 last_send=sync_state.last_send,
354 region=server_region,
355 )
356 sleep(sleep_for)
357 return True
360def _handle_password_error(config, username: str, sync_state: SyncState):
361 """
362 Handle password not available error.
364 Args:
365 config: Configuration dictionary
366 username: iCloud username
367 sync_state: Current sync state
369 Returns:
370 bool: True if should continue (retry), False if should exit
371 """
372 LOGGER.error("Password is not stored in keyring. Please save the password in keyring.")
373 sleep_for = config_parser.get_retry_login_interval(config=config)
375 if sleep_for < 0:
376 LOGGER.info("retry_login_interval is < 0, exiting ...")
377 return False
379 _log_retry_time(sleep_for)
380 server_region = config_parser.get_region(config=config)
381 sync_state.last_send = notify.send(
382 config=config,
383 username=username,
384 last_send=sync_state.last_send,
385 region=server_region,
386 )
387 sleep(sleep_for)
388 return True
391def _log_retry_time(sleep_for: int):
392 """
393 Log the next retry time.
395 Args:
396 sleep_for: Sleep duration in seconds
397 """
398 next_sync = (datetime.datetime.now() + datetime.timedelta(seconds=sleep_for)).strftime("%c")
399 LOGGER.info(f"Retrying login at {next_sync} ...")
402def _calculate_next_sync_schedule(config, sync_state: SyncState):
403 """
404 Calculate next sync schedule and update sync state.
406 This function implements the adaptive scheduling algorithm that determines
407 which service should sync next based on countdown timers.
409 Args:
410 config: Configuration dictionary
411 sync_state: Current sync state
413 Returns:
414 int: Sleep duration in seconds
415 """
416 has_drive = config and "drive" in config
417 has_photos = config and "photos" in config
419 if not has_drive and has_photos:
420 sleep_for = sync_state.photos_time_remaining
421 sync_state.enable_sync_drive = False
422 sync_state.enable_sync_photos = True
423 elif has_drive and not has_photos:
424 sleep_for = sync_state.drive_time_remaining
425 sync_state.enable_sync_drive = True
426 sync_state.enable_sync_photos = False
427 elif has_drive and has_photos and sync_state.drive_time_remaining <= sync_state.photos_time_remaining:
428 sleep_for = sync_state.photos_time_remaining - sync_state.drive_time_remaining
429 sync_state.photos_time_remaining -= sync_state.drive_time_remaining
430 sync_state.enable_sync_drive = True
431 sync_state.enable_sync_photos = False
432 else:
433 sleep_for = sync_state.drive_time_remaining - sync_state.photos_time_remaining
434 sync_state.drive_time_remaining -= sync_state.photos_time_remaining
435 sync_state.enable_sync_drive = False
436 sync_state.enable_sync_photos = True
438 return sleep_for
441def _log_next_sync_time(sleep_for: int):
442 """
443 Log the next scheduled sync time.
445 Args:
446 sleep_for: Sleep duration in seconds
447 """
448 next_sync = (datetime.datetime.now() + datetime.timedelta(seconds=sleep_for)).strftime("%c")
449 LOGGER.info(f"Resyncing at {next_sync} ...")
452def _log_sync_intervals_at_startup(config):
453 """
454 Log sync intervals once at startup.
456 Args:
457 config: Configuration dictionary
458 """
459 if config and "drive" in config:
460 config_parser.get_drive_sync_interval(config=config, log_messages=True)
461 if config and "photos" in config:
462 config_parser.get_photos_sync_interval(config=config, log_messages=True)
465def _should_exit_oneshot_mode(config):
466 """
467 Check if should exit in oneshot mode.
469 Oneshot mode exits when ALL configured sync intervals are negative.
471 Args:
472 config: Configuration dictionary
474 Returns:
475 bool: True if should exit
476 """
478 should_exit_drive = ("drive" not in config) or (
479 config_parser.get_drive_sync_interval(config=config, log_messages=False) < 0
480 )
481 should_exit_photos = ("photos" not in config) or (
482 config_parser.get_photos_sync_interval(config=config, log_messages=False) < 0
483 )
485 return should_exit_drive and should_exit_photos
488def sync():
489 """
490 Main synchronization loop.
492 Orchestrates the entire sync process by delegating specific responsibilities
493 to focused helper functions. This function coordinates the high-level flow
494 while each helper handles a single concern.
495 """
496 sync_state = SyncState()
497 startup_logged = False
499 while True:
500 config = _load_configuration()
501 alive(config=config)
503 # Log sync intervals once at startup
504 if not startup_logged:
505 _log_sync_intervals_at_startup(config)
506 startup_logged = True
508 drive_sync_interval, photos_sync_interval = _extract_sync_intervals(config, log_messages=False)
509 username = config_parser.get_username(config=config) if config else None
511 if username:
512 try:
513 api = _authenticate_and_get_api(config, username)
515 if not api.requires_2sa:
516 # Create summary for this sync cycle
517 summary = SyncSummary()
519 # Perform syncs and collect statistics
520 drive_stats = _perform_drive_sync(config, api, sync_state, drive_sync_interval)
521 photos_stats = _perform_photos_sync(config, api, sync_state, photos_sync_interval)
523 # Populate summary with statistics
524 summary.drive_stats = drive_stats
525 summary.photo_stats = photos_stats
526 summary.sync_end_time = datetime.datetime.now()
528 # Send sync summary notification if configured
529 # Only send notification when both enabled services have synced in this cycle
530 # Gracefully handle notification failures to not break sync
531 has_drive_config = config and "drive" in config
532 has_photos_config = config and "photos" in config
534 should_send_notification = False
535 if has_drive_config and has_photos_config:
536 # Both services configured - send notification only when both have synced
537 should_send_notification = drive_stats is not None and photos_stats is not None
538 elif has_drive_config and not has_photos_config:
539 # Only drive configured - send when drive synced
540 should_send_notification = drive_stats is not None
541 elif has_photos_config and not has_drive_config:
542 # Only photos configured - send when photos synced
543 should_send_notification = photos_stats is not None
545 if should_send_notification:
546 try:
547 notify.send_sync_summary(config=config, summary=summary)
548 except Exception as e:
549 LOGGER.debug(f"Failed to send sync summary notification: {e!s}")
551 if not _check_services_configured(config):
552 LOGGER.warning("Nothing to sync. Please add drive: and/or photos: section in config.yaml file.")
553 else:
554 if not _handle_2fa_required(config, username, sync_state):
555 break
556 continue
558 except exceptions.ICloudPyNoStoredPasswordAvailableException:
559 if not _handle_password_error(config, username, sync_state):
560 break
561 continue
563 sleep_for = _calculate_next_sync_schedule(config, sync_state)
564 _log_next_sync_time(sleep_for)
566 if _should_exit_oneshot_mode(config):
567 LOGGER.info("All configured sync intervals are negative, exiting oneshot mode...")
568 break
570 sleep(sleep_for)