Coverage for src/drive_parallel_download.py: 100%
80 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
1"""Parallel download utilities.
3This module provides parallel download coordination for iCloud Drive sync,
4separating parallel execution logic from sync operations per SRP.
5"""
7__author__ = "Mandar Patil (mandarons@pm.me)"
9import os
10import unicodedata
11from concurrent.futures import ThreadPoolExecutor, as_completed
12from pathlib import Path
13from threading import Lock
14from typing import Any
15from urllib.parse import unquote
17from src import config_parser, configure_icloudpy_logging, get_logger
18from src.drive_file_download import download_file
19from src.drive_file_existence import file_exists, is_package, package_exists
20from src.drive_filtering import wanted_file
22# Configure icloudpy logging immediately after import
23configure_icloudpy_logging()
25LOGGER = get_logger()
27# Thread-safe lock for file set operations
28files_lock = Lock()
31def collect_file_for_download(
32 item: Any,
33 destination_path: str,
34 filters: list[str] | None,
35 ignore: list[str] | None,
36 files: set[str],
37 config: dict | None = None,
38) -> dict[str, Any] | None:
39 """Collect file information for parallel download without immediately downloading.
41 Args:
42 item: iCloud file item
43 destination_path: Local destination directory
44 filters: File extension filters
45 ignore: Ignore patterns
46 files: Set to track processed files (thread-safe updates)
47 config: Configuration dictionary (used to resolve request timeout)
49 Returns:
50 Download task info dict, or None if file should be skipped
51 """
52 if not (item and destination_path and files is not None):
53 return None
55 # Decode URL-encoded filename from iCloud API
56 # This handles special characters like %CC%88 (combining diacritical marks)
57 decoded_name = unquote(item.name)
58 local_file = os.path.join(destination_path, decoded_name)
59 local_file = unicodedata.normalize("NFC", local_file)
61 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file):
62 return None
64 # Thread-safe file set update
65 with files_lock:
66 files.add(local_file)
68 # Check local existence FIRST to avoid unnecessary network requests.
69 # is_package() makes an HTTP call for every file, which is very slow
70 # when syncing thousands of already-up-to-date files.
71 if os.path.isfile(local_file):
72 # It's a regular file locally — check if it's up-to-date (no network call)
73 if file_exists(item=item, local_file=local_file):
74 return None
75 # File is outdated; still need to determine its type for re-download
76 elif os.path.isdir(local_file):
77 # A directory at this path means the item was previously downloaded as a
78 # package. iCloud Drive items do not change type between file and package,
79 # so package_exists() is the correct check here (no is_package() needed).
80 # Note: package_exists() deletes the directory if it is outdated.
81 if package_exists(item=item, local_package_path=local_file):
82 with files_lock:
83 for f in Path(local_file).glob("**/*"):
84 files.add(str(f))
85 return None
86 # Directory was deleted by package_exists(); schedule re-download as a package
87 return {
88 "item": item,
89 "local_file": local_file,
90 "is_package": True,
91 "files": files,
92 }
94 # File/directory doesn't exist locally (or was an outdated regular file that needs
95 # re-download) — make the network call to determine the item type
96 timeout = config_parser.get_drive_request_timeout(config)
97 item_is_package = is_package(item=item, timeout=timeout)
99 # Return download task info
100 return {
101 "item": item,
102 "local_file": local_file,
103 "is_package": item_is_package,
104 "files": files,
105 }
108def download_file_task(download_info: dict[str, Any]) -> bool:
109 """Download a single file as part of parallel execution.
111 Args:
112 download_info: Dictionary containing download task information
114 Returns:
115 True if download succeeded, False otherwise
116 """
117 item = download_info["item"]
118 local_file = download_info["local_file"]
119 is_package = download_info["is_package"]
120 files = download_info["files"]
122 LOGGER.debug(f"[Thread] Starting download of {local_file}")
124 try:
125 downloaded_file = download_file(item=item, local_file=local_file)
126 if not downloaded_file:
127 return False
129 if is_package:
130 with files_lock:
131 for f in Path(downloaded_file).glob("**/*"):
132 f = str(f)
133 f_normalized = unicodedata.normalize("NFD", f)
134 if os.path.exists(f):
135 os.rename(f, f_normalized)
136 files.add(f_normalized)
138 LOGGER.debug(f"[Thread] Completed download of {local_file}")
139 return True
140 except Exception as e:
141 LOGGER.error(f"[Thread] Failed to download {local_file}: {e!s}")
142 return False
145def execute_parallel_downloads(download_tasks: list[dict[str, Any]], max_threads: int) -> tuple[int, int]:
146 """Execute multiple file downloads in parallel.
148 Args:
149 download_tasks: List of download task dictionaries
150 max_threads: Maximum number of concurrent threads
152 Returns:
153 Tuple of (successful_downloads, failed_downloads) counts
154 """
155 if not download_tasks:
156 return 0, 0
158 LOGGER.info(f"Starting parallel downloads with {max_threads} threads for {len(download_tasks)} files...")
160 successful_downloads = 0
161 failed_downloads = 0
163 with ThreadPoolExecutor(max_workers=max_threads) as executor:
164 # Submit all download tasks
165 future_to_task = {executor.submit(download_file_task, task): task for task in download_tasks}
167 # Process completed downloads
168 for future in as_completed(future_to_task):
169 try:
170 result = future.result()
171 if result:
172 successful_downloads += 1
173 else:
174 failed_downloads += 1
175 except Exception as e: # noqa: PERF203
176 LOGGER.error(f"Download task failed with exception: {e!s}")
177 failed_downloads += 1
179 LOGGER.info(f"Parallel downloads completed: {successful_downloads} successful, {failed_downloads} failed")
180 return successful_downloads, failed_downloads