Coverage for src/sync_photos.py: 100%
128 statements
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 19:45 +0000
« prev ^ index » next coverage.py v7.7.1, created at 2025-03-24 19:45 +0000
1"""Sync photos module."""
3___author___ = "Mandar Patil <mandarons@pm.me>"
4import base64
5import os
6import shutil
7import time
8import unicodedata
9from pathlib import Path
11from icloudpy import exceptions
13from src import config_parser, get_logger
15LOGGER = get_logger()
16original_alt_filetype_to_extension = {
17 "public.png": "png",
18 "public.jpeg": "jpeg",
19 "public.heic": "heic",
20 "public.image": "HEIC",
21 "com.sony.arw-raw-image": "arw",
22 "org.webmproject.webp": "webp",
23 "com.compuserve.gif": "gif",
24 "com.adobe.raw-image": "dng",
25 "public.tiff": "tiff",
26 "public.jpeg-2000": "jp2",
27 "com.truevision.tga-image": "tga",
28 "com.sgi.sgi-image": "sgi",
29 "com.adobe.photoshop-image": "psd",
30 "public.pbm": "pbm",
31 "public.heif": "heif",
32 "com.microsoft.bmp": "bmp",
33 "com.fuji.raw-image": "raf",
34 "com.canon.cr2-raw-image": "cr2",
35 "com.panasonic.rw2-raw-image": "rw2",
36 "com.nikon.nrw-raw-image": "nrw",
37 "com.pentax.raw-image": "pef",
38 "com.nikon.raw-image": "nef",
39 "com.olympus.raw-image": "orf",
40 "com.adobe.pdf": "pdf",
41 "com.canon.cr3-raw-image": "cr3",
42 "com.olympus.or-raw-image": "orf",
43 "public.mpo-image": "mpo",
44 "com.dji.mimo.pano.jpeg": "jpg",
45 "public.avif": "avif",
46 "com.canon.crw-raw-image": "crw",
47}
50def get_name_and_extension(photo, file_size):
51 """Extract filename and extension."""
52 filename = photo.filename
53 name, extension = filename.rsplit(".", 1) if "." in filename else [filename, ""]
54 if file_size == "original_alt" and file_size in photo.versions:
55 filetype = photo.versions[file_size]["type"]
56 if filetype in original_alt_filetype_to_extension:
57 extension = original_alt_filetype_to_extension[filetype]
58 else:
59 LOGGER.warning(f"Unknown filetype {filetype} for original_alt version of {filename}")
60 return name, extension
63def photo_wanted(photo, extensions):
64 """Check if photo is wanted based on extension."""
65 if not extensions or len(extensions) == 0:
66 return True
67 for extension in extensions:
68 if photo.filename.lower().endswith(str(extension).lower()):
69 return True
70 return False
73def generate_file_name(photo, file_size, destination_path, folder_format):
74 """Generate full path to file."""
75 filename = photo.filename
76 name, extension = get_name_and_extension(photo, file_size)
77 file_path = os.path.join(destination_path, filename)
78 file_size_path = os.path.join(
79 destination_path,
80 f'{"__".join([name, file_size])}' if extension == "" else f'{"__".join([name, file_size])}.{extension}',
81 )
82 file_size_id_path = os.path.join(
83 destination_path,
84 f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}'
85 if extension == ""
86 else f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}.{extension}',
87 )
89 if folder_format is not None:
90 folder = photo.created.strftime(folder_format)
91 file_size_id_path = os.path.join(
92 destination_path,
93 folder,
94 f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}'
95 if extension == ""
96 else f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}.{extension}',
97 )
98 os.makedirs(os.path.join(destination_path, folder), exist_ok=True)
100 file_size_id_path_norm = unicodedata.normalize("NFC", file_size_id_path)
102 if os.path.isfile(file_path):
103 os.rename(file_path, file_size_id_path)
104 if os.path.isfile(file_size_path):
105 os.rename(file_size_path, file_size_id_path)
106 if os.path.isfile(file_size_id_path):
107 os.rename(file_size_id_path, file_size_id_path_norm)
108 return file_size_id_path_norm
111def photo_exists(photo, file_size, local_path):
112 """Check if photo exist locally."""
113 if photo and local_path and os.path.isfile(local_path):
114 local_size = os.path.getsize(local_path)
115 remote_size = int(photo.versions[file_size]["size"])
116 if local_size == remote_size:
117 LOGGER.debug(f"No changes detected. Skipping the file {local_path} ...")
118 return True
119 else:
120 LOGGER.debug(f"Change detected: local_file_size is {local_size} and remote_file_size is {remote_size}.")
121 return False
124def download_photo(photo, file_size, destination_path):
125 """Download photo from server."""
126 if not (photo and file_size and destination_path):
127 return False
128 LOGGER.info(f"Downloading {destination_path} ...")
129 try:
130 download = photo.download(file_size)
131 with open(destination_path, "wb") as file_out:
132 shutil.copyfileobj(download.raw, file_out)
133 local_modified_time = time.mktime(photo.added_date.timetuple())
134 os.utime(destination_path, (local_modified_time, local_modified_time))
135 except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e:
136 LOGGER.error(f"Failed to download {destination_path}: {e!s}")
137 return False
138 return True
141def process_photo(photo, file_size, destination_path, files, folder_format):
142 """Process photo details."""
143 photo_path = generate_file_name(
144 photo=photo,
145 file_size=file_size,
146 destination_path=destination_path,
147 folder_format=folder_format,
148 )
149 if file_size not in photo.versions:
150 LOGGER.warning(f"File size {file_size} not found on server. Skipping the photo {photo_path} ...")
151 return False
152 if files is not None:
153 files.add(photo_path)
154 if photo_exists(photo, file_size, photo_path):
155 return False
156 download_photo(photo, file_size, photo_path)
157 return True
160def sync_album(album, destination_path, file_sizes, extensions=None, files=None, folder_format=None):
161 """Sync given album."""
162 if album is None or destination_path is None or file_sizes is None:
163 return None
164 os.makedirs(unicodedata.normalize("NFC", destination_path), exist_ok=True)
165 LOGGER.info(f"Syncing {album.title}")
166 for photo in album:
167 if photo_wanted(photo, extensions):
168 for file_size in file_sizes:
169 process_photo(photo, file_size, destination_path, files, folder_format)
170 else:
171 LOGGER.debug(f"Skipping the unwanted photo {photo.filename}.")
172 for subalbum in album.subalbums:
173 sync_album(
174 album.subalbums[subalbum],
175 os.path.join(destination_path, subalbum),
176 file_sizes,
177 extensions,
178 files,
179 folder_format,
180 )
181 return True
184def remove_obsolete(destination_path, files):
185 """Remove local obsolete file."""
186 removed_paths = set()
187 if not (destination_path and files is not None):
188 return removed_paths
189 for path in Path(destination_path).rglob("*"):
190 local_file = str(path.absolute())
191 if local_file not in files:
192 if path.is_file():
193 LOGGER.info(f"Removing {local_file} ...")
194 path.unlink(missing_ok=True)
195 removed_paths.add(local_file)
196 return removed_paths
199def sync_photos(config, photos):
200 """Sync all photos."""
201 destination_path = config_parser.prepare_photos_destination(config=config)
202 filters = config_parser.get_photos_filters(config=config)
203 files = set()
204 download_all = config_parser.get_photos_all_albums(config=config)
205 libraries = filters["libraries"] if filters["libraries"] is not None else photos.libraries
206 folder_format = config_parser.get_photos_folder_format(config=config)
207 for library in libraries:
208 if download_all and library == "PrimarySync":
209 for album in photos.libraries[library].albums.keys():
210 if filters["albums"] and album in iter(filters["albums"]):
211 continue
212 sync_album(
213 album=photos.libraries[library].albums[album],
214 destination_path=os.path.join(destination_path, album),
215 file_sizes=filters["file_sizes"],
216 extensions=filters["extensions"],
217 files=files,
218 folder_format=folder_format,
219 )
220 elif filters["albums"] and library == "PrimarySync":
221 for album in iter(filters["albums"]):
222 sync_album(
223 album=photos.libraries[library].albums[album],
224 destination_path=os.path.join(destination_path, album),
225 file_sizes=filters["file_sizes"],
226 extensions=filters["extensions"],
227 files=files,
228 folder_format=folder_format,
229 )
230 elif filters["albums"]:
231 for album in iter(filters["albums"]):
232 if album in photos.libraries[library].albums:
233 sync_album(
234 album=photos.libraries[library].albums[album],
235 destination_path=os.path.join(destination_path, album),
236 file_sizes=filters["file_sizes"],
237 extensions=filters["extensions"],
238 files=files,
239 folder_format=folder_format,
240 )
241 else:
242 LOGGER.warning(f"Album {album} not found in {library}. Skipping the album {album} ...")
243 else:
244 sync_album(
245 album=photos.libraries[library].all,
246 destination_path=os.path.join(destination_path, "all"),
247 file_sizes=filters["file_sizes"],
248 extensions=filters["extensions"],
249 files=files,
250 folder_format=folder_format,
251 )
253 if config_parser.get_photos_remove_obsolete(config=config):
254 remove_obsolete(destination_path, files)
257# def enable_debug():
258# import contextlib
259# import http.client
260# import logging
261# import requests
262# import warnings
264# # from pprint import pprint
265# # from icloudpy import ICloudPyService
266# from urllib3.exceptions import InsecureRequestWarning
268# # Handle certificate warnings by ignoring them
269# old_merge_environment_settings = requests.Session.merge_environment_settings
271# @contextlib.contextmanager
272# def no_ssl_verification():
273# opened_adapters = set()
275# def merge_environment_settings(self, url, proxies, stream, verify, cert):
276# # Verification happens only once per connection so we need to close
277# # all the opened adapters once we're done. Otherwise, the effects of
278# # verify=False persist beyond the end of this context manager.
279# opened_adapters.add(self.get_adapter(url))
281# settings = old_merge_environment_settings(
282# self, url, proxies, stream, verify, cert
283# )
284# settings["verify"] = False
286# return settings
288# requests.Session.merge_environment_settings = merge_environment_settings
290# try:
291# with warnings.catch_warnings():
292# warnings.simplefilter("ignore", InsecureRequestWarning)
293# yield
294# finally:
295# requests.Session.merge_environment_settings = old_merge_environment_settings
297# for adapter in opened_adapters:
298# try:
299# adapter.close()
300# except Exception as e:
301# pass
303# # Monkeypatch the http client for full debugging output
304# httpclient_logger = logging.getLogger("http.client")
306# def httpclient_logging_patch(level=logging.DEBUG):
307# """Enable HTTPConnection debug logging to the logging framework"""
309# def httpclient_log(*args):
310# httpclient_logger.log(level, " ".join(args))
312# # mask the print() built-in in the http.client module to use
313# # logging instead
314# http.client.print = httpclient_log
315# # enable debugging
316# http.client.HTTPConnection.debuglevel = 1
318# # Enable general debug logging
319# logging.basicConfig(filename="log1.txt", encoding="utf-8", level=logging.DEBUG)
321# httpclient_logging_patch()
324# if __name__ == "__main__":
325# # enable_debug()
326# sync_photos()