Coverage for src/drive_parallel_download.py: 100%
75 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
1"""Parallel download utilities.
3This module provides parallel download coordination for iCloud Drive sync,
4separating parallel execution logic from sync operations per SRP.
5"""
7__author__ = "Mandar Patil (mandarons@pm.me)"
9import os
10import unicodedata
11from concurrent.futures import ThreadPoolExecutor, as_completed
12from pathlib import Path
13from threading import Lock
14from typing import Any
16from src import configure_icloudpy_logging, get_logger
17from src.drive_file_download import download_file
18from src.drive_file_existence import file_exists, is_package, package_exists
19from src.drive_filtering import wanted_file
21# Configure icloudpy logging immediately after import
22configure_icloudpy_logging()
24LOGGER = get_logger()
26# Thread-safe lock for file set operations
27files_lock = Lock()
30def collect_file_for_download(
31 item: Any,
32 destination_path: str,
33 filters: list[str] | None,
34 ignore: list[str] | None,
35 files: set[str],
36) -> dict[str, Any] | None:
37 """Collect file information for parallel download without immediately downloading.
39 Args:
40 item: iCloud file item
41 destination_path: Local destination directory
42 filters: File extension filters
43 ignore: Ignore patterns
44 files: Set to track processed files (thread-safe updates)
46 Returns:
47 Download task info dict, or None if file should be skipped
48 """
49 if not (item and destination_path and files is not None):
50 return None
52 local_file = os.path.join(destination_path, item.name)
53 local_file = unicodedata.normalize("NFC", local_file)
55 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file):
56 return None
58 # Thread-safe file set update
59 with files_lock:
60 files.add(local_file)
62 item_is_package = is_package(item=item)
63 if item_is_package:
64 if package_exists(item=item, local_package_path=local_file):
65 with files_lock:
66 for f in Path(local_file).glob("**/*"):
67 files.add(str(f))
68 return None
69 elif file_exists(item=item, local_file=local_file):
70 return None
72 # Return download task info
73 return {
74 "item": item,
75 "local_file": local_file,
76 "is_package": item_is_package,
77 "files": files,
78 }
81def download_file_task(download_info: dict[str, Any]) -> bool:
82 """Download a single file as part of parallel execution.
84 Args:
85 download_info: Dictionary containing download task information
87 Returns:
88 True if download succeeded, False otherwise
89 """
90 item = download_info["item"]
91 local_file = download_info["local_file"]
92 is_package = download_info["is_package"]
93 files = download_info["files"]
95 LOGGER.debug(f"[Thread] Starting download of {local_file}")
97 try:
98 downloaded_file = download_file(item=item, local_file=local_file)
99 if not downloaded_file:
100 return False
102 if is_package:
103 with files_lock:
104 for f in Path(downloaded_file).glob("**/*"):
105 f = str(f)
106 f_normalized = unicodedata.normalize("NFD", f)
107 if os.path.exists(f):
108 os.rename(f, f_normalized)
109 files.add(f_normalized)
111 LOGGER.debug(f"[Thread] Completed download of {local_file}")
112 return True
113 except Exception as e:
114 LOGGER.error(f"[Thread] Failed to download {local_file}: {e!s}")
115 return False
118def execute_parallel_downloads(download_tasks: list[dict[str, Any]], max_threads: int) -> tuple[int, int]:
119 """Execute multiple file downloads in parallel.
121 Args:
122 download_tasks: List of download task dictionaries
123 max_threads: Maximum number of concurrent threads
125 Returns:
126 Tuple of (successful_downloads, failed_downloads) counts
127 """
128 if not download_tasks:
129 return 0, 0
131 LOGGER.info(f"Starting parallel downloads with {max_threads} threads for {len(download_tasks)} files...")
133 successful_downloads = 0
134 failed_downloads = 0
136 with ThreadPoolExecutor(max_workers=max_threads) as executor:
137 # Submit all download tasks
138 future_to_task = {executor.submit(download_file_task, task): task for task in download_tasks}
140 # Process completed downloads
141 for future in as_completed(future_to_task):
142 try:
143 result = future.result()
144 if result:
145 successful_downloads += 1
146 else:
147 failed_downloads += 1
148 except Exception as e: # noqa: PERF203
149 LOGGER.error(f"Download task failed with exception: {e!s}")
150 failed_downloads += 1
152 LOGGER.info(f"Parallel downloads completed: {successful_downloads} successful, {failed_downloads} failed")
153 return successful_downloads, failed_downloads