Coverage for src/sync_drive.py: 100%

47 statements  

« prev     ^ index     » next       coverage.py v7.11.3, created at 2025-11-12 17:18 +0000

1"""Sync drive module. 

2 

3This module provides the main entry point for iCloud Drive synchronization, 

4orchestrating the sync process using specialized utility modules per SRP. 

5""" 

6 

7__author__ = "Mandar Patil (mandarons@pm.me)" 

8 

9import os 

10import unicodedata 

11from pathlib import Path 

12from typing import Any 

13from urllib.parse import unquote 

14 

15from src import config_parser, configure_icloudpy_logging, get_logger 

16from src.drive_cleanup import remove_obsolete # noqa: F401 

17from src.drive_file_download import download_file # noqa: F401 

18from src.drive_file_existence import file_exists, is_package, package_exists # noqa: F401 

19from src.drive_filtering import ignored_path, wanted_file, wanted_folder, wanted_parent_folder # noqa: F401 

20from src.drive_folder_processing import process_folder # noqa: F401 

21from src.drive_package_processing import process_package # noqa: F401 

22from src.drive_parallel_download import collect_file_for_download, download_file_task, files_lock # noqa: F401 

23from src.drive_sync_directory import sync_directory 

24from src.drive_thread_config import get_max_threads # noqa: F401 

25 

26# Configure icloudpy logging immediately after import 

27configure_icloudpy_logging() 

28 

29LOGGER = get_logger() 

30 

31 

32def sync_drive(config: Any, drive: Any) -> set[str]: 

33 """Synchronize iCloud Drive to local filesystem. 

34 

35 This function serves as the main entry point for drive synchronization, 

36 preparing the destination and delegating to the sync_directory orchestrator. 

37 

38 Args: 

39 config: Configuration dictionary containing drive settings 

40 drive: iCloud drive service instance 

41 

42 Returns: 

43 Set of all synchronized file paths 

44 """ 

45 destination_path = config_parser.prepare_drive_destination(config=config) 

46 return sync_directory( 

47 drive=drive, 

48 destination_path=destination_path, 

49 root=destination_path, 

50 items=drive.dir(), 

51 top=True, 

52 filters=config["drive"]["filters"] if "drive" in config and "filters" in config["drive"] else None, 

53 ignore=config["drive"]["ignore"] if "drive" in config and "ignore" in config["drive"] else None, 

54 remove=config_parser.get_drive_remove_obsolete(config=config), 

55 config=config, 

56 ) 

57 

58 

59def process_file(item: Any, destination_path: str, filters: list[str], ignore: list[str], files: set[str]) -> bool: 

60 """Process given item as file (legacy compatibility function). 

61 

62 This function maintains backward compatibility with existing tests. 

63 New code should use the specialized modules directly. 

64 

65 Args: 

66 item: iCloud file item to process 

67 destination_path: Local destination directory 

68 filters: File extension filters 

69 ignore: Ignore patterns 

70 files: Set to track processed files 

71 

72 Returns: 

73 True if file was processed successfully, False otherwise 

74 """ 

75 if not (item and destination_path and files is not None): 

76 return False 

77 # Decode URL-encoded filename from iCloud API 

78 # This handles special characters like %CC%88 (combining diacritical marks) 

79 decoded_name = unquote(item.name) 

80 local_file = os.path.join(destination_path, decoded_name) 

81 local_file = unicodedata.normalize("NFC", local_file) 

82 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file): 

83 return False 

84 files.add(local_file) 

85 item_is_package = is_package(item=item) 

86 if item_is_package: 

87 if package_exists(item=item, local_package_path=local_file): 

88 for f in Path(local_file).glob("**/*"): 

89 files.add(str(f)) 

90 return False 

91 elif file_exists(item=item, local_file=local_file): 

92 return False 

93 local_file = download_file(item=item, local_file=local_file) 

94 if local_file and item_is_package: 

95 for f in Path(local_file).glob("**/*"): 

96 f = str(f) 

97 f_normalized = unicodedata.normalize("NFD", f) 

98 if os.path.exists(f): 

99 os.rename(f, f_normalized) 

100 files.add(f_normalized) 

101 return bool(local_file)