Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Sync drive module.""" 

2 

3__author__ = "Mandar Patil (mandarons@pm.me)" 

4 

5import gzip 

6import os 

7import re 

8import time 

9import unicodedata 

10import zipfile 

11from pathlib import Path, PurePath 

12from shutil import copyfileobj, rmtree 

13 

14import magic 

15from icloudpy import exceptions 

16 

17from src import config_parser, get_logger 

18 

19LOGGER = get_logger() 

20 

21 

22def wanted_file(filters, ignore, file_path): 

23 """Check if file is wanted.""" 

24 if not file_path: 

25 return False 

26 if ignore: 

27 if ignored_path(ignore, file_path): 

28 LOGGER.debug(f"Skipping the unwanted file {file_path}") 

29 return False 

30 if not filters or len(filters) == 0: 

31 return True 

32 for file_extension in filters: 

33 if re.search(f"{file_extension}$", file_path, re.IGNORECASE): 

34 return True 

35 LOGGER.debug(f"Skipping the unwanted file {file_path}") 

36 return False 

37 

38 

39def wanted_folder(filters, ignore, root, folder_path): 

40 """Check if folder is wanted.""" 

41 if ignore: 

42 if ignored_path(ignore, folder_path): 

43 return False 

44 

45 if not filters or not folder_path or not root or len(filters) == 0: 

46 # Nothing to filter, return True 

47 return True 

48 # Something to filter 

49 folder_path = Path(folder_path) 

50 for folder in filters: 

51 child_path = Path(os.path.join(os.path.abspath(root), str(folder).removeprefix("/").removesuffix("/"))) 

52 if folder_path in child_path.parents or child_path in folder_path.parents or folder_path == child_path: 

53 return True 

54 return False 

55 

56 

57def ignored_path(ignore_list, path): 

58 """Check if path is ignored.""" 

59 for ignore in ignore_list: 

60 if PurePath(path).match(ignore + "*" if ignore.endswith("/") else ignore): 

61 return True 

62 return False 

63 

64 

65def wanted_parent_folder(filters, ignore, root, folder_path): 

66 """Check if parent folder is wanted.""" 

67 if not filters or not folder_path or not root or len(filters) == 0: 

68 return True 

69 folder_path = Path(folder_path) 

70 for folder in filters: 

71 child_path = Path(os.path.join(os.path.abspath(root), folder.removeprefix("/").removesuffix("/"))) 

72 if child_path in folder_path.parents or folder_path == child_path: 

73 return True 

74 return False 

75 

76 

77def process_folder(item, destination_path, filters, ignore, root): 

78 """Process the given folder.""" 

79 if not (item and destination_path and root): 

80 return None 

81 new_directory = os.path.join(destination_path, item.name) 

82 new_directory_norm = unicodedata.normalize("NFC", new_directory) 

83 if not wanted_folder(filters=filters, ignore=ignore, folder_path=new_directory_norm, root=root): 

84 LOGGER.debug(f"Skipping the unwanted folder {new_directory} ...") 

85 return None 

86 os.makedirs(new_directory_norm, exist_ok=True) 

87 return new_directory 

88 

89 

90def package_exists(item, local_package_path): 

91 """Check for package existence.""" 

92 if item and local_package_path and os.path.isdir(local_package_path): 

93 local_package_modified_time = int(os.path.getmtime(local_package_path)) 

94 remote_package_modified_time = int(item.date_modified.timestamp()) 

95 local_package_size = sum(f.stat().st_size for f in Path(local_package_path).glob("**/*") if f.is_file()) 

96 remote_package_size = item.size 

97 if local_package_modified_time == remote_package_modified_time and local_package_size == remote_package_size: 

98 LOGGER.debug(f"No changes detected. Skipping the package {local_package_path} ...") 

99 return True 

100 else: 

101 LOGGER.info( 

102 f"Changes detected: local_modified_time is {local_package_modified_time}, " 

103 + f"remote_modified_time is {remote_package_modified_time}, " 

104 + f"local_package_size is {local_package_size} and remote_package_size is {remote_package_size}.", 

105 ) 

106 rmtree(local_package_path) 

107 else: 

108 LOGGER.debug(f"Package {local_package_path} does not exist locally.") 

109 return False 

110 

111 

112def file_exists(item, local_file): 

113 """Check for file existence locally.""" 

114 if item and local_file and os.path.isfile(local_file): 

115 local_file_modified_time = int(os.path.getmtime(local_file)) 

116 remote_file_modified_time = int(item.date_modified.timestamp()) 

117 local_file_size = os.path.getsize(local_file) 

118 remote_file_size = item.size 

119 if local_file_modified_time == remote_file_modified_time and ( 

120 local_file_size == remote_file_size 

121 or (local_file_size == 0 and remote_file_size is None) 

122 or (local_file_size is None and remote_file_size == 0) 

123 ): 

124 LOGGER.debug(f"No changes detected. Skipping the file {local_file} ...") 

125 return True 

126 else: 

127 LOGGER.debug( 

128 f"Changes detected: local_modified_time is {local_file_modified_time}, " 

129 + f"remote_modified_time is {remote_file_modified_time}, " 

130 + f"local_file_size is {local_file_size} and remote_file_size is {remote_file_size}.", 

131 ) 

132 else: 

133 LOGGER.debug(f"File {local_file} does not exist locally.") 

134 return False 

135 

136 

137def process_package(local_file): 

138 """Process the package.""" 

139 archive_file = local_file 

140 magic_object = magic.Magic(mime=True) 

141 if magic_object.from_file(filename=local_file) == "application/zip": 

142 archive_file += ".zip" 

143 os.rename(local_file, archive_file) 

144 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(archive_file)}") 

145 zipfile.ZipFile(archive_file).extractall(path=os.path.dirname(archive_file)) 

146 normalized_path = unicodedata.normalize("NFD", local_file) 

147 if normalized_path is not local_file: 

148 os.rename(local_file, normalized_path) 

149 local_file = normalized_path 

150 os.remove(archive_file) 

151 elif magic_object.from_file(filename=local_file) == "application/gzip": 

152 archive_file += ".gz" 

153 os.rename(local_file, archive_file) 

154 LOGGER.info(f"Unpacking {archive_file} to {os.path.dirname(local_file)}") 

155 with gzip.GzipFile(filename=archive_file, mode="rb") as gz_file: 

156 with open(file=local_file, mode="wb") as package_file: 

157 copyfileobj(gz_file, package_file) 

158 os.remove(archive_file) 

159 process_package(local_file=local_file) 

160 else: 

161 LOGGER.error( 

162 f"Unhandled file type - cannot unpack the package {magic_object.from_file(filename=archive_file)}.", 

163 ) 

164 return False 

165 LOGGER.info(f"Successfully unpacked the package {archive_file}.") 

166 return local_file 

167 

168 

169def is_package(item): 

170 """Determine if item is a package.""" 

171 file_is_a_package = False 

172 with item.open(stream=True) as response: 

173 file_is_a_package = response.url and "/packageDownload?" in response.url 

174 return file_is_a_package 

175 

176 

177def download_file(item, local_file): 

178 """Download file from server.""" 

179 if not (item and local_file): 

180 return False 

181 LOGGER.info(f"Downloading {local_file} ...") 

182 try: 

183 with item.open(stream=True) as response: 

184 with open(local_file, "wb") as file_out: 

185 for chunk in response.iter_content(4 * 1024 * 1024): 

186 file_out.write(chunk) 

187 if response.url and "/packageDownload?" in response.url: 

188 local_file = process_package(local_file=local_file) 

189 item_modified_time = time.mktime(item.date_modified.timetuple()) 

190 os.utime(local_file, (item_modified_time, item_modified_time)) 

191 except (exceptions.ICloudPyAPIResponseException, FileNotFoundError, Exception) as e: 

192 LOGGER.error(f"Failed to download {local_file}: {e!s}") 

193 return False 

194 return local_file 

195 

196 

197def process_file(item, destination_path, filters, ignore, files): 

198 """Process given item as file.""" 

199 if not (item and destination_path and files is not None): 

200 return False 

201 local_file = os.path.join(destination_path, item.name) 

202 local_file = unicodedata.normalize("NFC", local_file) 

203 if not wanted_file(filters=filters, ignore=ignore, file_path=local_file): 

204 return False 

205 files.add(local_file) 

206 item_is_package = is_package(item=item) 

207 if item_is_package: 

208 if package_exists(item=item, local_package_path=local_file): 

209 for f in Path(local_file).glob("**/*"): 

210 files.add(str(f)) 

211 return False 

212 elif file_exists(item=item, local_file=local_file): 

213 return False 

214 local_file = download_file(item=item, local_file=local_file) 

215 if item_is_package: 

216 for f in Path(local_file).glob("**/*"): 

217 f = str(f) 

218 f_normalized = unicodedata.normalize("NFD", f) 

219 if os.path.exists(f): 

220 os.rename(f, f_normalized) 

221 files.add(f_normalized) 

222 return True 

223 

224 

225def remove_obsolete(destination_path, files): 

226 """Remove local obsolete file.""" 

227 removed_paths = set() 

228 if not (destination_path and files is not None): 

229 return removed_paths 

230 for path in Path(destination_path).rglob("*"): 

231 local_file = str(path.absolute()) 

232 if local_file not in files: 

233 LOGGER.info(f"Removing {local_file} ...") 

234 if path.is_file(): 

235 path.unlink(missing_ok=True) 

236 removed_paths.add(local_file) 

237 elif path.is_dir(): 

238 rmtree(local_file) 

239 removed_paths.add(local_file) 

240 return removed_paths 

241 

242 

243def sync_directory( 

244 drive, 

245 destination_path, 

246 items, 

247 root, 

248 top=True, 

249 filters=None, 

250 ignore=None, 

251 remove=False, 

252): 

253 """Sync folder.""" 

254 files = set() 

255 if drive and destination_path and items and root: 

256 for i in items: 

257 item = drive[i] 

258 if item.type in ("folder", "app_library"): 

259 new_folder = process_folder( 

260 item=item, 

261 destination_path=destination_path, 

262 filters=filters["folders"] if filters and "folders" in filters else None, 

263 ignore=ignore, 

264 root=root, 

265 ) 

266 if not new_folder: 

267 continue 

268 try: 

269 files.add(unicodedata.normalize("NFC", new_folder)) 

270 files.update( 

271 sync_directory( 

272 drive=item, 

273 destination_path=new_folder, 

274 items=item.dir(), 

275 root=root, 

276 top=False, 

277 filters=filters, 

278 ignore=ignore, 

279 ), 

280 ) 

281 except Exception: 

282 # Continue execution to next item, without crashing the app 

283 pass 

284 elif item.type == "file": 

285 if wanted_parent_folder( 

286 filters=filters["folders"] if filters and "folders" in filters else None, 

287 ignore=ignore, 

288 root=root, 

289 folder_path=destination_path, 

290 ): 

291 try: 

292 process_file( 

293 item=item, 

294 destination_path=destination_path, 

295 filters=filters["file_extensions"] if filters and "file_extensions" in filters else None, 

296 ignore=ignore, 

297 files=files, 

298 ) 

299 except Exception: 

300 # Continue execution to next item, without crashing the app 

301 pass 

302 if top and remove: 

303 remove_obsolete(destination_path=destination_path, files=files) 

304 return files 

305 

306 

307def sync_drive(config, drive): 

308 """Sync drive.""" 

309 destination_path = config_parser.prepare_drive_destination(config=config) 

310 return sync_directory( 

311 drive=drive, 

312 destination_path=destination_path, 

313 root=destination_path, 

314 items=drive.dir(), 

315 top=True, 

316 filters=config["drive"]["filters"] if "drive" in config and "filters" in config["drive"] else None, 

317 ignore=config["drive"]["ignore"] if "drive" in config and "ignore" in config["drive"] else None, 

318 remove=config_parser.get_drive_remove_obsolete(config=config), 

319 )