Coverage for src/drive_parallel_download.py: 100%
77 statements
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-12 17:18 +0000
« prev ^ index » next coverage.py v7.11.3, created at 2025-11-12 17:18 +0000
1"""Parallel download utilities.
3This module provides parallel download coordination for iCloud Drive sync,
4separating parallel execution logic from sync operations per SRP.
5"""
7__author__ = "Mandar Patil (mandarons@pm.me)"
9import os
10import unicodedata
11from concurrent.futures import ThreadPoolExecutor, as_completed
12from pathlib import Path
13from threading import Lock
14from typing import Any
15from urllib.parse import unquote
17from src import configure_icloudpy_logging, get_logger
18from src.drive_file_download import download_file
19from src.drive_file_existence import file_exists, is_package, package_exists
20from src.drive_filtering import wanted_file
22# Configure icloudpy logging immediately after import
23configure_icloudpy_logging()
25LOGGER = get_logger()
27# Thread-safe lock for file set operations
28files_lock = Lock()
31def collect_file_for_download(
32 item: Any,
33 destination_path: str,
34 filters: list[str] | None,
35 ignore: list[str] | None,
36 files: set[str],
37) -> dict[str, Any] | None:
38 """Collect file information for parallel download without immediately downloading.
40 Args:
41 item: iCloud file item
42 destination_path: Local destination directory
43 filters: File extension filters
44 ignore: Ignore patterns
45 files: Set to track processed files (thread-safe updates)
47 Returns:
48 Download task info dict, or None if file should be skipped
49 """
50 if not (item and destination_path and files is not None):
51 return None
53 # Decode URL-encoded filename from iCloud API
54 # This handles special characters like %CC%88 (combining diacritical marks)
55 decoded_name = unquote(item.name)
56 local_file = os.path.join(destination_path, decoded_name)
57 local_file = unicodedata.normalize("NFC", local_file)
59 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file):
60 return None
62 # Thread-safe file set update
63 with files_lock:
64 files.add(local_file)
66 item_is_package = is_package(item=item)
67 if item_is_package:
68 if package_exists(item=item, local_package_path=local_file):
69 with files_lock:
70 for f in Path(local_file).glob("**/*"):
71 files.add(str(f))
72 return None
73 elif file_exists(item=item, local_file=local_file):
74 return None
76 # Return download task info
77 return {
78 "item": item,
79 "local_file": local_file,
80 "is_package": item_is_package,
81 "files": files,
82 }
85def download_file_task(download_info: dict[str, Any]) -> bool:
86 """Download a single file as part of parallel execution.
88 Args:
89 download_info: Dictionary containing download task information
91 Returns:
92 True if download succeeded, False otherwise
93 """
94 item = download_info["item"]
95 local_file = download_info["local_file"]
96 is_package = download_info["is_package"]
97 files = download_info["files"]
99 LOGGER.debug(f"[Thread] Starting download of {local_file}")
101 try:
102 downloaded_file = download_file(item=item, local_file=local_file)
103 if not downloaded_file:
104 return False
106 if is_package:
107 with files_lock:
108 for f in Path(downloaded_file).glob("**/*"):
109 f = str(f)
110 f_normalized = unicodedata.normalize("NFD", f)
111 if os.path.exists(f):
112 os.rename(f, f_normalized)
113 files.add(f_normalized)
115 LOGGER.debug(f"[Thread] Completed download of {local_file}")
116 return True
117 except Exception as e:
118 LOGGER.error(f"[Thread] Failed to download {local_file}: {e!s}")
119 return False
122def execute_parallel_downloads(download_tasks: list[dict[str, Any]], max_threads: int) -> tuple[int, int]:
123 """Execute multiple file downloads in parallel.
125 Args:
126 download_tasks: List of download task dictionaries
127 max_threads: Maximum number of concurrent threads
129 Returns:
130 Tuple of (successful_downloads, failed_downloads) counts
131 """
132 if not download_tasks:
133 return 0, 0
135 LOGGER.info(f"Starting parallel downloads with {max_threads} threads for {len(download_tasks)} files...")
137 successful_downloads = 0
138 failed_downloads = 0
140 with ThreadPoolExecutor(max_workers=max_threads) as executor:
141 # Submit all download tasks
142 future_to_task = {executor.submit(download_file_task, task): task for task in download_tasks}
144 # Process completed downloads
145 for future in as_completed(future_to_task):
146 try:
147 result = future.result()
148 if result:
149 successful_downloads += 1
150 else:
151 failed_downloads += 1
152 except Exception as e: # noqa: PERF203
153 LOGGER.error(f"Download task failed with exception: {e!s}")
154 failed_downloads += 1
156 LOGGER.info(f"Parallel downloads completed: {successful_downloads} successful, {failed_downloads} failed")
157 return successful_downloads, failed_downloads