Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Sync photos module.""" 

2 

3___author___ = "Mandar Patil <mandarons@pm.me>" 

4import base64 

5import os 

6import shutil 

7import time 

8import unicodedata 

9from pathlib import Path 

10 

11from icloudpy import exceptions 

12 

13from src import config_parser, get_logger 

14 

15LOGGER = get_logger() 

16original_alt_filetype_to_extension = { 

17 "public.png": "png", 

18 "public.jpeg": "jpeg", 

19 "public.heic": "heic", 

20 "public.image": "HEIC", 

21 "com.sony.arw-raw-image": "arw", 

22 "org.webmproject.webp": "webp", 

23 "com.compuserve.gif": "gif", 

24 "com.adobe.raw-image": "dng", 

25 "public.tiff": "tiff", 

26 "public.jpeg-2000": "jp2", 

27 "com.truevision.tga-image": "tga", 

28 "com.sgi.sgi-image": "sgi", 

29 "com.adobe.photoshop-image": "psd", 

30 "public.pbm": "pbm", 

31 "public.heif": "heif", 

32 "com.microsoft.bmp": "bmp", 

33 "com.fuji.raw-image": "raf", 

34 "com.canon.cr2-raw-image": "cr2", 

35 "com.panasonic.rw2-raw-image": "rw2", 

36 "com.nikon.nrw-raw-image": "nrw", 

37 "com.pentax.raw-image": "pef", 

38 "com.nikon.raw-image": "nef", 

39 "com.olympus.raw-image": "orf", 

40 "com.adobe.pdf": "pdf", 

41 "com.canon.cr3-raw-image": "cr3", 

42 "com.olympus.or-raw-image": "orf", 

43 "public.mpo-image": "mpo", 

44 "com.dji.mimo.pano.jpeg": "jpg", 

45 "public.avif": "avif", 

46 "com.canon.crw-raw-image": "crw", 

47} 

48 

49 

50def get_name_and_extension(photo, file_size): 

51 """Extract filename and extension.""" 

52 filename = photo.filename 

53 name, extension = filename.rsplit(".", 1) if "." in filename else [filename, ""] 

54 if file_size == "original_alt" and file_size in photo.versions: 

55 filetype = photo.versions[file_size]["type"] 

56 if filetype in original_alt_filetype_to_extension: 

57 extension = original_alt_filetype_to_extension[filetype] 

58 else: 

59 LOGGER.warning(f"Unknown filetype {filetype} for original_alt version of {filename}") 

60 return name, extension 

61 

62 

63def photo_wanted(photo, extensions): 

64 """Check if photo is wanted based on extension.""" 

65 if not extensions or len(extensions) == 0: 

66 return True 

67 for extension in extensions: 

68 if photo.filename.lower().endswith(str(extension).lower()): 

69 return True 

70 return False 

71 

72 

73def generate_file_name(photo, file_size, destination_path, folder_format): 

74 """Generate full path to file.""" 

75 filename = photo.filename 

76 name, extension = get_name_and_extension(photo, file_size) 

77 file_path = os.path.join(destination_path, filename) 

78 file_size_path = os.path.join( 

79 destination_path, 

80 f'{"__".join([name, file_size])}' if extension == "" else f'{"__".join([name, file_size])}.{extension}', 

81 ) 

82 file_size_id_path = os.path.join( 

83 destination_path, 

84 f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}' 

85 if extension == "" 

86 else f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}.{extension}', 

87 ) 

88 

89 if folder_format is not None: 

90 folder = photo.created.strftime(folder_format) 

91 file_size_id_path = os.path.join( 

92 destination_path, 

93 folder, 

94 f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}' 

95 if extension == "" 

96 else f'{"__".join([name, file_size, base64.urlsafe_b64encode(photo.id.encode()).decode()])}.{extension}', 

97 ) 

98 os.makedirs(os.path.join(destination_path, folder), exist_ok=True) 

99 

100 file_size_id_path_norm = unicodedata.normalize("NFC", file_size_id_path) 

101 

102 if os.path.isfile(file_path): 

103 os.rename(file_path, file_size_id_path) 

104 if os.path.isfile(file_size_path): 

105 os.rename(file_size_path, file_size_id_path) 

106 if os.path.isfile(file_size_id_path): 

107 os.rename(file_size_id_path, file_size_id_path_norm) 

108 return file_size_id_path_norm 

109 

110 

111def photo_exists(photo, file_size, local_path): 

112 """Check if photo exist locally.""" 

113 if photo and local_path and os.path.isfile(local_path): 

114 local_size = os.path.getsize(local_path) 

115 remote_size = int(photo.versions[file_size]["size"]) 

116 if local_size == remote_size: 

117 LOGGER.debug(f"No changes detected. Skipping the file {local_path} ...") 

118 return True 

119 else: 

120 LOGGER.debug(f"Change detected: local_file_size is {local_size} and remote_file_size is {remote_size}.") 

121 return False 

122 

123 

124def download_photo(photo, file_size, destination_path): 

125 """Download photo from server.""" 

126 if not (photo and file_size and destination_path): 

127 return False 

128 LOGGER.info(f"Downloading {destination_path} ...") 

129 try: 

130 download = photo.download(file_size) 

131 with open(destination_path, "wb") as file_out: 

132 shutil.copyfileobj(download.raw, file_out) 

133 local_modified_time = time.mktime(photo.added_date.timetuple()) 

134 os.utime(destination_path, (local_modified_time, local_modified_time)) 

135 except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e: 

136 LOGGER.error(f"Failed to download {destination_path}: {e!s}") 

137 return False 

138 return True 

139 

140 

141def process_photo(photo, file_size, destination_path, files, folder_format): 

142 """Process photo details.""" 

143 photo_path = generate_file_name( 

144 photo=photo, 

145 file_size=file_size, 

146 destination_path=destination_path, 

147 folder_format=folder_format, 

148 ) 

149 if file_size not in photo.versions: 

150 LOGGER.warning(f"File size {file_size} not found on server. Skipping the photo {photo_path} ...") 

151 return False 

152 if files is not None: 

153 files.add(photo_path) 

154 if photo_exists(photo, file_size, photo_path): 

155 return False 

156 download_photo(photo, file_size, photo_path) 

157 return True 

158 

159 

160def sync_album(album, destination_path, file_sizes, extensions=None, files=None, folder_format=None): 

161 """Sync given album.""" 

162 if album is None or destination_path is None or file_sizes is None: 

163 return None 

164 os.makedirs(unicodedata.normalize("NFC", destination_path), exist_ok=True) 

165 LOGGER.info(f"Syncing {album.title}") 

166 for photo in album: 

167 if photo_wanted(photo, extensions): 

168 for file_size in file_sizes: 

169 process_photo(photo, file_size, destination_path, files, folder_format) 

170 else: 

171 LOGGER.debug(f"Skipping the unwanted photo {photo.filename}.") 

172 for subalbum in album.subalbums: 

173 sync_album( 

174 album.subalbums[subalbum], 

175 os.path.join(destination_path, subalbum), 

176 file_sizes, 

177 extensions, 

178 files, 

179 folder_format, 

180 ) 

181 return True 

182 

183 

184def remove_obsolete(destination_path, files): 

185 """Remove local obsolete file.""" 

186 removed_paths = set() 

187 if not (destination_path and files is not None): 

188 return removed_paths 

189 for path in Path(destination_path).rglob("*"): 

190 local_file = str(path.absolute()) 

191 if local_file not in files: 

192 if path.is_file(): 

193 LOGGER.info(f"Removing {local_file} ...") 

194 path.unlink(missing_ok=True) 

195 removed_paths.add(local_file) 

196 return removed_paths 

197 

198 

199def sync_photos(config, photos): 

200 """Sync all photos.""" 

201 destination_path = config_parser.prepare_photos_destination(config=config) 

202 filters = config_parser.get_photos_filters(config=config) 

203 files = set() 

204 download_all = config_parser.get_photos_all_albums(config=config) 

205 libraries = filters["libraries"] if filters["libraries"] is not None else photos.libraries 

206 folder_format = config_parser.get_photos_folder_format(config=config) 

207 for library in libraries: 

208 if download_all and library == "PrimarySync": 

209 for album in photos.libraries[library].albums.keys(): 

210 if filters["albums"] and album in iter(filters["albums"]): 

211 continue 

212 sync_album( 

213 album=photos.libraries[library].albums[album], 

214 destination_path=os.path.join(destination_path, album), 

215 file_sizes=filters["file_sizes"], 

216 extensions=filters["extensions"], 

217 files=files, 

218 folder_format=folder_format, 

219 ) 

220 elif filters["albums"] and library == "PrimarySync": 

221 for album in iter(filters["albums"]): 

222 sync_album( 

223 album=photos.libraries[library].albums[album], 

224 destination_path=os.path.join(destination_path, album), 

225 file_sizes=filters["file_sizes"], 

226 extensions=filters["extensions"], 

227 files=files, 

228 folder_format=folder_format, 

229 ) 

230 elif filters["albums"]: 

231 for album in iter(filters["albums"]): 

232 if album in photos.libraries[library].albums: 

233 sync_album( 

234 album=photos.libraries[library].albums[album], 

235 destination_path=os.path.join(destination_path, album), 

236 file_sizes=filters["file_sizes"], 

237 extensions=filters["extensions"], 

238 files=files, 

239 folder_format=folder_format, 

240 ) 

241 else: 

242 LOGGER.warning(f"Album {album} not found in {library}. Skipping the album {album} ...") 

243 else: 

244 sync_album( 

245 album=photos.libraries[library].all, 

246 destination_path=os.path.join(destination_path, "all"), 

247 file_sizes=filters["file_sizes"], 

248 extensions=filters["extensions"], 

249 files=files, 

250 folder_format=folder_format, 

251 ) 

252 

253 if config_parser.get_photos_remove_obsolete(config=config): 

254 remove_obsolete(destination_path, files) 

255 

256 

257# def enable_debug(): 

258# import contextlib 

259# import http.client 

260# import logging 

261# import requests 

262# import warnings 

263 

264# # from pprint import pprint 

265# # from icloudpy import ICloudPyService 

266# from urllib3.exceptions import InsecureRequestWarning 

267 

268# # Handle certificate warnings by ignoring them 

269# old_merge_environment_settings = requests.Session.merge_environment_settings 

270 

271# @contextlib.contextmanager 

272# def no_ssl_verification(): 

273# opened_adapters = set() 

274 

275# def merge_environment_settings(self, url, proxies, stream, verify, cert): 

276# # Verification happens only once per connection so we need to close 

277# # all the opened adapters once we're done. Otherwise, the effects of 

278# # verify=False persist beyond the end of this context manager. 

279# opened_adapters.add(self.get_adapter(url)) 

280 

281# settings = old_merge_environment_settings( 

282# self, url, proxies, stream, verify, cert 

283# ) 

284# settings["verify"] = False 

285 

286# return settings 

287 

288# requests.Session.merge_environment_settings = merge_environment_settings 

289 

290# try: 

291# with warnings.catch_warnings(): 

292# warnings.simplefilter("ignore", InsecureRequestWarning) 

293# yield 

294# finally: 

295# requests.Session.merge_environment_settings = old_merge_environment_settings 

296 

297# for adapter in opened_adapters: 

298# try: 

299# adapter.close() 

300# except Exception as e: 

301# pass 

302 

303# # Monkeypatch the http client for full debugging output 

304# httpclient_logger = logging.getLogger("http.client") 

305 

306# def httpclient_logging_patch(level=logging.DEBUG): 

307# """Enable HTTPConnection debug logging to the logging framework""" 

308 

309# def httpclient_log(*args): 

310# httpclient_logger.log(level, " ".join(args)) 

311 

312# # mask the print() built-in in the http.client module to use 

313# # logging instead 

314# http.client.print = httpclient_log 

315# # enable debugging 

316# http.client.HTTPConnection.debuglevel = 1 

317 

318# # Enable general debug logging 

319# logging.basicConfig(filename="log1.txt", encoding="utf-8", level=logging.DEBUG) 

320 

321# httpclient_logging_patch() 

322 

323 

324# if __name__ == "__main__": 

325# # enable_debug() 

326# sync_photos()