Coverage for src/sync_drive.py: 100%
202 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 19:45 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 19:45 +0000
1"""Sync drive module."""
3__author__ = "Mandar Patil (mandarons@pm.me)"
5import gzip
6import os
7import re
8import time
9import unicodedata
10import zipfile
11from pathlib import Path, PurePath
12from shutil import copyfileobj, rmtree
14import magic
15from icloudpy import exceptions
17from src import config_parser, get_logger
19LOGGER = get_logger()
22def wanted_file(filters, ignore, file_path):
23 """Check if file is wanted."""
24 if not file_path:
25 return False
26 if ignore:
27 if ignored_path(ignore, file_path):
28 LOGGER.debug(f"Skipping the unwanted file {file_path}")
29 return False
30 if not filters or len(filters) == 0:
31 return True
32 for file_extension in filters:
33 if re.search(f"{file_extension}$", file_path, re.IGNORECASE):
34 return True
35 LOGGER.debug(f"Skipping the unwanted file {file_path}")
36 return False
39def wanted_folder(filters, ignore, root, folder_path):
40 """Check if folder is wanted."""
41 if ignore:
42 if ignored_path(ignore, folder_path):
43 return False
45 if not filters or not folder_path or not root or len(filters) == 0:
46 # Nothing to filter, return True
47 return True
48 # Something to filter
49 folder_path = Path(folder_path)
50 for folder in filters:
51 child_path = Path(os.path.join(os.path.abspath(root), str(folder).removeprefix("/").removesuffix("/")))
52 if folder_path in child_path.parents or child_path in folder_path.parents or folder_path == child_path:
53 return True
54 return False
57def ignored_path(ignore_list, path):
58 """Check if path is ignored."""
59 for ignore in ignore_list:
60 if PurePath(path).match(ignore + "*" if ignore.endswith("/") else ignore):
61 return True
62 return False
65def wanted_parent_folder(filters, ignore, root, folder_path):
66 """Check if parent folder is wanted."""
67 if not filters or not folder_path or not root or len(filters) == 0:
68 return True
69 folder_path = Path(folder_path)
70 for folder in filters:
71 child_path = Path(os.path.join(os.path.abspath(root), folder.removeprefix("/").removesuffix("/")))
72 if child_path in folder_path.parents or folder_path == child_path:
73 return True
74 return False
77def process_folder(item, destination_path, filters, ignore, root):
78 """Process the given folder."""
79 if not (item and destination_path and root):
80 return None
81 new_directory = os.path.join(destination_path, item.name)
82 new_directory_norm = unicodedata.normalize("NFC", new_directory)
83 if not wanted_folder(filters=filters, ignore=ignore, folder_path=new_directory_norm, root=root):
84 LOGGER.debug(f"Skipping the unwanted folder {new_directory} ...")
85 return None
86 os.makedirs(new_directory_norm, exist_ok=True)
87 return new_directory
90def package_exists(item, local_package_path):
91 """Check for package existence."""
92 if item and local_package_path and os.path.isdir(local_package_path):
93 local_package_modified_time = int(os.path.getmtime(local_package_path))
94 remote_package_modified_time = int(item.date_modified.timestamp())
95 local_package_size = sum(f.stat().st_size for f in Path(local_package_path).glob("**/*") if f.is_file())
96 remote_package_size = item.size
97 if local_package_modified_time == remote_package_modified_time and local_package_size == remote_package_size:
98 LOGGER.debug(f"No changes detected. Skipping the package {local_package_path} ...")
99 return True
100 else:
101 LOGGER.info(
102 f"Changes detected: local_modified_time is {local_package_modified_time}, "
103 + f"remote_modified_time is {remote_package_modified_time}, "
104 + f"local_package_size is {local_package_size} and remote_package_size is {remote_package_size}.",
105 )
106 rmtree(local_package_path)
107 else:
108 LOGGER.debug(f"Package {local_package_path} does not exist locally.")
109 return False
112def file_exists(item, local_file):
113 """Check for file existence locally."""
114 if item and local_file and os.path.isfile(local_file):
115 local_file_modified_time = int(os.path.getmtime(local_file))
116 remote_file_modified_time = int(item.date_modified.timestamp())
117 local_file_size = os.path.getsize(local_file)
118 remote_file_size = item.size
119 if local_file_modified_time == remote_file_modified_time and (
120 local_file_size == remote_file_size
121 or (local_file_size == 0 and remote_file_size is None)
122 or (local_file_size is None and remote_file_size == 0)
123 ):
124 LOGGER.debug(f"No changes detected. Skipping the file {local_file} ...")
125 return True
126 else:
127 LOGGER.debug(
128 f"Changes detected: local_modified_time is {local_file_modified_time}, "
129 + f"remote_modified_time is {remote_file_modified_time}, "
130 + f"local_file_size is {local_file_size} and remote_file_size is {remote_file_size}.",
131 )
132 else:
133 LOGGER.debug(f"File {local_file} does not exist locally.")
134 return False
137def process_package(local_file):
138 """Process the package."""
139 archive_file = local_file
140 magic_object = magic.Magic(mime=True)
141 if magic_object.from_file(filename=local_file) == "application/zip":
142 archive_file += ".zip"
143 os.rename(local_file, archive_file)
144 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(archive_file)}")
145 zipfile.ZipFile(archive_file).extractall(path=os.path.dirname(archive_file))
146 normalized_path = unicodedata.normalize("NFD", local_file)
147 if normalized_path is not local_file:
148 os.rename(local_file, normalized_path)
149 local_file = normalized_path
150 os.remove(archive_file)
151 elif magic_object.from_file(filename=local_file) == "application/gzip":
152 archive_file += ".gz"
153 os.rename(local_file, archive_file)
154 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(local_file)}")
155 with gzip.GzipFile(filename=archive_file, mode="rb") as gz_file:
156 with open(file=local_file, mode="wb") as package_file:
157 copyfileobj(gz_file, package_file)
158 os.remove(archive_file)
159 process_package(local_file=local_file)
160 else:
161 LOGGER.error(
162 f"Unhandled file type - cannot unpack the package {magic_object.from_file(filename=archive_file)}.",
163 )
164 return False
165 LOGGER.info(f"Successfully unpacked the package {archive_file}.")
166 return local_file
169def is_package(item):
170 """Determine if item is a package."""
171 file_is_a_package = False
172 with item.open(stream=True) as response:
173 file_is_a_package = response.url and "/packageDownload?" in response.url
174 return file_is_a_package
177def download_file(item, local_file):
178 """Download file from server."""
179 if not (item and local_file):
180 return False
181 LOGGER.info(f"Downloading {local_file} ...")
182 try:
183 with item.open(stream=True) as response:
184 with open(local_file, "wb") as file_out:
185 for chunk in response.iter_content(4 * 1024 * 1024):
186 file_out.write(chunk)
187 if response.url and "/packageDownload?" in response.url:
188 local_file = process_package(local_file=local_file)
189 item_modified_time = time.mktime(item.date_modified.timetuple())
190 os.utime(local_file, (item_modified_time, item_modified_time))
191 except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e:
192 LOGGER.error(f"Failed to download {local_file}: {e!s}")
193 return False
194 return local_file
197def process_file(item, destination_path, filters, ignore, files):
198 """Process given item as file."""
199 if not (item and destination_path and files is not None):
200 return False
201 local_file = os.path.join(destination_path, item.name)
202 local_file = unicodedata.normalize("NFC", local_file)
203 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file):
204 return False
205 files.add(local_file)
206 item_is_package = is_package(item=item)
207 if item_is_package:
208 if package_exists(item=item, local_package_path=local_file):
209 for f in Path(local_file).glob("**/*"):
210 files.add(str(f))
211 return False
212 elif file_exists(item=item, local_file=local_file):
213 return False
214 local_file = download_file(item=item, local_file=local_file)
215 if item_is_package:
216 for f in Path(local_file).glob("**/*"):
217 f = str(f)
218 f_normalized = unicodedata.normalize("NFD", f)
219 if os.path.exists(f):
220 os.rename(f, f_normalized)
221 files.add(f_normalized)
222 return True
225def remove_obsolete(destination_path, files):
226 """Remove local obsolete file."""
227 removed_paths = set()
228 if not (destination_path and files is not None):
229 return removed_paths
230 for path in Path(destination_path).rglob("*"):
231 local_file = str(path.absolute())
232 if local_file not in files:
233 LOGGER.info(f"Removing {local_file} ...")
234 if path.is_file():
235 path.unlink(missing_ok=True)
236 removed_paths.add(local_file)
237 elif path.is_dir():
238 rmtree(local_file)
239 removed_paths.add(local_file)
240 return removed_paths
243def sync_directory(
244 drive,
245 destination_path,
246 items,
247 root,
248 top=True,
249 filters=None,
250 ignore=None,
251 remove=False,
252):
253 """Sync folder."""
254 files = set()
255 if drive and destination_path and items and root:
256 for i in items:
257 item = drive[i]
258 if item.type in ("folder", "app_library"):
259 new_folder = process_folder(
260 item=item,
261 destination_path=destination_path,
262 filters=filters["folders"] if filters and "folders" in filters else None,
263 ignore=ignore,
264 root=root,
265 )
266 if not new_folder:
267 continue
268 try:
269 files.add(unicodedata.normalize("NFC", new_folder))
270 files.update(
271 sync_directory(
272 drive=item,
273 destination_path=new_folder,
274 items=item.dir(),
275 root=root,
276 top=False,
277 filters=filters,
278 ignore=ignore,
279 ),
280 )
281 except Exception:
282 # Continue execution to next item, without crashing the app
283 pass
284 elif item.type == "file":
285 if wanted_parent_folder(
286 filters=filters["folders"] if filters and "folders" in filters else None,
287 ignore=ignore,
288 root=root,
289 folder_path=destination_path,
290 ):
291 try:
292 process_file(
293 item=item,
294 destination_path=destination_path,
295 filters=filters["file_extensions"] if filters and "file_extensions" in filters else None,
296 ignore=ignore,
297 files=files,
298 )
299 except Exception:
300 # Continue execution to next item, without crashing the app
301 pass
302 if top and remove:
303 remove_obsolete(destination_path=destination_path, files=files)
304 return files
307def sync_drive(config, drive):
308 """Sync drive."""
309 destination_path = config_parser.prepare_drive_destination(config=config)
310 return sync_directory(
311 drive=drive,
312 destination_path=destination_path,
313 root=destination_path,
314 items=drive.dir(),
315 top=True,
316 filters=config["drive"]["filters"] if "drive" in config and "filters" in config["drive"] else None,
317 ignore=config["drive"]["ignore"] if "drive" in config and "ignore" in config["drive"] else None,
318 remove=config_parser.get_drive_remove_obsolete(config=config),
319 )