Hide keyboard shortcuts

Hot-keys on this page

r m x p   toggle line displays

j k   next/prev highlighted chunk

0   (zero) top of page

1   (one) first highlighted chunk

1"""Config file parser.""" 

2 

3__author__ = "Mandar Patil (mandarons@pm.me)" 

4 

5import os 

6 

7from icloudpy.services.photos import PhotoAsset 

8 

9from src import ( 

10 DEFAULT_DRIVE_DESTINATION, 

11 DEFAULT_PHOTOS_DESTINATION, 

12 DEFAULT_RETRY_LOGIN_INTERVAL_SEC, 

13 DEFAULT_ROOT_DESTINATION, 

14 DEFAULT_SYNC_INTERVAL_SEC, 

15 get_logger, 

16) 

17 

18LOGGER = get_logger() 

19 

20 

21def config_path_to_string(config_path): 

22 """Build config path as string.""" 

23 return " > ".join(config_path) 

24 

25 

26def traverse_config_path(config, config_path: list[str]) -> bool: 

27 """Traverse given config path.""" 

28 if len(config_path) == 0: 

29 return True 

30 if not (config and config_path[0] in config): 

31 return False 

32 return traverse_config_path(config[config_path[0]], config_path=config_path[1:]) 

33 

34 

35def get_config_value(config, config_path): 

36 """Return config value.""" 

37 if len(config_path) == 1: 

38 return config[config_path[0]] 

39 return get_config_value(config=config[config_path[0]], config_path=config_path[1:]) 

40 

41 

42def get_username(config): 

43 """Get usename from config.""" 

44 username = None 

45 config_path = ["app", "credentials", "username"] 

46 if not traverse_config_path(config=config, config_path=config_path): 

47 LOGGER.error(f"username is missing in {config_path_to_string(config_path)}. Please set the username.") 

48 else: 

49 username = get_config_value(config=config, config_path=config_path) 

50 username = username.strip() 

51 if len(username) == 0: 

52 username = None 

53 LOGGER.error(f"username is empty in {config_path_to_string(config_path)}.") 

54 return username 

55 

56 

57def get_retry_login_interval(config): 

58 """Return retry login interval from config.""" 

59 retry_login_interval = DEFAULT_RETRY_LOGIN_INTERVAL_SEC 

60 config_path = ["app", "credentials", "retry_login_interval"] 

61 if not traverse_config_path(config=config, config_path=config_path): 

62 LOGGER.warning( 

63 f"retry_login_interval not found in {config_path_to_string(config_path=config_path)}." 

64 + f" Using default {retry_login_interval} seconds ...", 

65 ) 

66 else: 

67 retry_login_interval = get_config_value(config=config, config_path=config_path) 

68 LOGGER.info(f"Retrying login every {retry_login_interval} seconds.") 

69 return retry_login_interval 

70 

71 

72def get_drive_sync_interval(config): 

73 """Return drive sync interval from config.""" 

74 sync_interval = DEFAULT_SYNC_INTERVAL_SEC 

75 config_path = ["drive", "sync_interval"] 

76 if not traverse_config_path(config=config, config_path=config_path): 

77 LOGGER.warning( 

78 f"sync_interval is not found in {config_path_to_string(config_path=config_path)}." 

79 + f" Using default sync_interval: {sync_interval} seconds ...", 

80 ) 

81 else: 

82 sync_interval = get_config_value(config=config, config_path=config_path) 

83 LOGGER.info(f"Syncing drive every {sync_interval} seconds.") 

84 return sync_interval 

85 

86 

87def get_photos_sync_interval(config): 

88 """Return photos sync interval from config.""" 

89 sync_interval = DEFAULT_SYNC_INTERVAL_SEC 

90 config_path = ["photos", "sync_interval"] 

91 if not traverse_config_path(config=config, config_path=config_path): 

92 LOGGER.warning( 

93 f"sync_interval is not found in {config_path_to_string(config_path=config_path)}." 

94 + f" Using default sync_interval: {sync_interval} seconds ...", 

95 ) 

96 else: 

97 sync_interval = get_config_value(config=config, config_path=config_path) 

98 LOGGER.info(f"Syncing photos every {sync_interval} seconds.") 

99 return sync_interval 

100 

101 

102def get_photos_all_albums(config): 

103 """Return flag to download all albums from config.""" 

104 download_all = False 

105 config_path = ["photos", "all_albums"] 

106 if traverse_config_path(config=config, config_path=config_path): 

107 download_all = get_config_value(config=config, config_path=config_path) 

108 LOGGER.info("Syncing all albums.") 

109 return download_all 

110 

111 

112def prepare_root_destination(config): 

113 """Prepare root destination.""" 

114 LOGGER.debug("Checking root destination ...") 

115 root_destination = DEFAULT_ROOT_DESTINATION 

116 config_path = ["app", "root"] 

117 if not traverse_config_path(config=config, config_path=config_path): 

118 LOGGER.warning( 

119 f"Warning: root destination is missing in {config_path_to_string(config_path)}." 

120 + f" Using default root destination: {root_destination}", 

121 ) 

122 else: 

123 root_destination = get_config_value(config=config, config_path=config_path) 

124 root_destination_path = os.path.abspath(root_destination) 

125 os.makedirs(root_destination_path, exist_ok=True) 

126 return root_destination_path 

127 

128 

129def get_smtp_email(config): 

130 """Return smtp from email from config.""" 

131 email = None 

132 config_path = ["app", "smtp", "email"] 

133 if traverse_config_path(config=config, config_path=config_path): 

134 email = get_config_value(config=config, config_path=config_path) 

135 return email 

136 

137 

138def get_smtp_username(config): 

139 """Return smtp username from the config, if set.""" 

140 username = None 

141 config_path = ["app", "smtp", "username"] 

142 if traverse_config_path(config=config, config_path=config_path): 

143 username = get_config_value(config=config, config_path=config_path) 

144 return username 

145 

146 

147def get_smtp_to_email(config): 

148 """Return smtp to email from config.""" 

149 to_email = None 

150 config_path = ["app", "smtp", "to"] 

151 if traverse_config_path(config=config, config_path=config_path): 

152 to_email = get_config_value(config=config, config_path=config_path) 

153 else: 

154 to_email = get_smtp_email(config=config) 

155 return to_email 

156 

157 

158def get_smtp_password(config): 

159 """Return smtp password from config.""" 

160 password = None 

161 config_path = ["app", "smtp", "password"] 

162 if not traverse_config_path(config=config, config_path=config_path): 

163 LOGGER.warning(f"Warning: password is not found in {config_path_to_string(config_path)}") 

164 else: 

165 password = get_config_value(config=config, config_path=config_path) 

166 return password 

167 

168 

169def get_smtp_host(config): 

170 """Return smtp host from config.""" 

171 host = None 

172 config_path = ["app", "smtp", "host"] 

173 if not traverse_config_path(config=config, config_path=config_path): 

174 LOGGER.warning(f"Warning: host is not found in {config_path_to_string(config_path)}") 

175 else: 

176 host = get_config_value(config=config, config_path=config_path) 

177 return host 

178 

179 

180def get_smtp_port(config): 

181 """Return smtp port from config.""" 

182 port = None 

183 config_path = ["app", "smtp", "port"] 

184 if not traverse_config_path(config=config, config_path=config_path): 

185 LOGGER.warning(f"Warning: port is not found in {config_path_to_string(config_path)}") 

186 else: 

187 port = get_config_value(config=config, config_path=config_path) 

188 return port 

189 

190 

191def get_smtp_no_tls(config): 

192 """Return smtp no_tls from config.""" 

193 no_tls = False 

194 config_path = ["app", "smtp", "no_tls"] 

195 if not traverse_config_path(config=config, config_path=config_path): 

196 LOGGER.warning(f"Warning: no_tls is not found in {config_path_to_string(config_path)}") 

197 else: 

198 no_tls = get_config_value(config=config, config_path=config_path) 

199 return no_tls 

200 

201 

202def prepare_drive_destination(config): 

203 """Prepare drive destination path.""" 

204 LOGGER.debug("Checking drive destination ...") 

205 config_path = ["drive", "destination"] 

206 drive_destination = DEFAULT_DRIVE_DESTINATION 

207 if not traverse_config_path(config=config, config_path=config_path): 

208 LOGGER.warning( 

209 f"Warning: destination is missing in {config_path_to_string(config_path)}." 

210 + f" Using default drive destination: {drive_destination}.", 

211 ) 

212 else: 

213 drive_destination = get_config_value(config=config, config_path=config_path) 

214 drive_destination_path = os.path.abspath(os.path.join(prepare_root_destination(config=config), drive_destination)) 

215 os.makedirs(drive_destination_path, exist_ok=True) 

216 return drive_destination_path 

217 

218 

219def get_drive_remove_obsolete(config): 

220 """Return drive remove obsolete from config.""" 

221 drive_remove_obsolete = False 

222 config_path = ["drive", "remove_obsolete"] 

223 if not traverse_config_path(config=config, config_path=config_path): 

224 LOGGER.warning( 

225 f"Warning: remove_obsolete is not found in {config_path_to_string(config_path)}." 

226 + " Not removing the obsolete files and folders.", 

227 ) 

228 else: 

229 drive_remove_obsolete = get_config_value(config=config, config_path=config_path) 

230 LOGGER.debug(f"{'R' if drive_remove_obsolete else 'Not R'}emoving obsolete files and folders ...") 

231 return drive_remove_obsolete 

232 

233 

234def prepare_photos_destination(config): 

235 """Prepare photos destination path.""" 

236 LOGGER.debug("Checking photos destination ...") 

237 config_path = ["photos", "destination"] 

238 photos_destination = DEFAULT_PHOTOS_DESTINATION 

239 if not traverse_config_path(config=config, config_path=config_path): 

240 LOGGER.warning( 

241 f"Warning: destination is missing in {photos_destination}." 

242 + f" Using default photos destination: {config_path_to_string(config_path)}", 

243 ) 

244 else: 

245 photos_destination = get_config_value(config=config, config_path=config_path) 

246 photos_destination_path = os.path.abspath(os.path.join(prepare_root_destination(config=config), photos_destination)) 

247 os.makedirs(photos_destination_path, exist_ok=True) 

248 return photos_destination_path 

249 

250 

251def get_photos_remove_obsolete(config): 

252 """Return remove obsolete for photos from config.""" 

253 photos_remove_obsolete = False 

254 config_path = ["photos", "remove_obsolete"] 

255 if not traverse_config_path(config=config, config_path=config_path): 

256 LOGGER.warning( 

257 f"Warning: remove_obsolete is not found in {config_path_to_string(config_path)}." 

258 + " Not removing the obsolete photos.", 

259 ) 

260 else: 

261 photos_remove_obsolete = get_config_value(config=config, config_path=config_path) 

262 LOGGER.debug(f"{'R' if photos_remove_obsolete else 'Not R'}emoving obsolete photos ...") 

263 return photos_remove_obsolete 

264 

265 

266def get_photos_filters(config): 

267 """Return photos filters from config.""" 

268 photos_filters = { 

269 "libraries": None, 

270 "albums": None, 

271 "file_sizes": ["original"], 

272 "extensions": None, 

273 } 

274 valid_file_sizes = list(PhotoAsset.PHOTO_VERSION_LOOKUP.keys()) 

275 config_path = ["photos", "filters"] 

276 

277 # Check for filters 

278 if not traverse_config_path(config=config, config_path=config_path): 

279 LOGGER.warning( 

280 f"{config_path_to_string(config_path=config_path)} not found. \ 

281 Downloading all libraries and albums with original size ...", 

282 ) 

283 return photos_filters 

284 

285 # Parse libraries 

286 config_path.append("libraries") 

287 if ( 

288 not traverse_config_path(config=config, config_path=config_path) 

289 or not get_config_value(config=config, config_path=config_path) 

290 or len(get_config_value(config=config, config_path=config_path)) == 0 

291 ): 

292 LOGGER.warning(f"{config_path_to_string(config_path=config_path)} not found. Downloading all libraries ...") 

293 else: 

294 photos_filters["libraries"] = get_config_value(config=config, config_path=config_path) 

295 

296 # Parse albums 

297 config_path[2] = "albums" 

298 if ( 

299 not traverse_config_path(config=config, config_path=config_path) 

300 or not get_config_value(config=config, config_path=config_path) 

301 or len(get_config_value(config=config, config_path=config_path)) == 0 

302 ): 

303 LOGGER.warning(f"{config_path_to_string(config_path=config_path)} not found. Downloading all albums ...") 

304 else: 

305 photos_filters["albums"] = get_config_value(config=config, config_path=config_path) 

306 

307 # Parse file sizes 

308 config_path[2] = "file_sizes" 

309 if not traverse_config_path(config=config, config_path=config_path): 

310 LOGGER.warning( 

311 f"{config_path_to_string(config_path=config_path)} not found. Downloading original size photos ...", 

312 ) 

313 else: 

314 file_sizes = get_config_value(config=config, config_path=config_path) 

315 for file_size in file_sizes: 

316 if file_size not in valid_file_sizes: 

317 LOGGER.warning( 

318 f"Skipping the invalid file size {file_size}, " 

319 + f"valid file sizes are {','.join(valid_file_sizes)}.", 

320 ) 

321 file_sizes.remove(file_size) 

322 if len(file_sizes) == 0: 

323 file_sizes = ["original"] 

324 photos_filters["file_sizes"] = file_sizes 

325 

326 # Parse extensions 

327 config_path[2] = "extensions" 

328 if ( 

329 not traverse_config_path(config=config, config_path=config_path) 

330 or not get_config_value(config=config, config_path=config_path) 

331 or len(get_config_value(config=config, config_path=config_path)) == 0 

332 ): 

333 LOGGER.warning(f"{config_path_to_string(config_path=config_path)} not found. Downloading all extensions ...") 

334 else: 

335 photos_filters["extensions"] = get_config_value(config=config, config_path=config_path) 

336 

337 return photos_filters 

338 

339 

340def get_region(config): 

341 """Return region from config.""" 

342 region = "global" 

343 config_path = ["app", "region"] 

344 if not traverse_config_path(config=config, config_path=config_path): 

345 LOGGER.warning(f"{config_path_to_string(config_path=config_path)} not found. Using default value - global ...") 

346 else: 

347 region = get_config_value(config=config, config_path=config_path) 

348 if region not in ["global", "china"]: 

349 LOGGER.error( 

350 f"{config_path_to_string(config_path=config_path)} is invalid. \ 

351 Valid values are - global or china. Using default value - global ...", 

352 ) 

353 region = "global" 

354 

355 return region 

356 

357 

358def get_photos_folder_format(config): 

359 """Return filename format or None.""" 

360 fmt = None 

361 config_path = ["photos", "folder_format"] 

362 if traverse_config_path(config=config, config_path=config_path): 

363 fmt = get_config_value(config=config, config_path=config_path) 

364 LOGGER.info(f"Using format {fmt}.") 

365 return fmt 

366 

367 

368def get_telegram_bot_token(config): 

369 """Return telegram bot token from config.""" 

370 bot_token = None 

371 config_path = ["app", "telegram", "bot_token"] 

372 if not traverse_config_path(config=config, config_path=config_path): 

373 LOGGER.warning(f"Warning: bot_token is not found in {config_path_to_string(config_path)}.") 

374 else: 

375 bot_token = get_config_value(config=config, config_path=config_path) 

376 return bot_token 

377 

378 

379def get_telegram_chat_id(config): 

380 """Return telegram chat id from config.""" 

381 chat_id = None 

382 config_path = ["app", "telegram", "chat_id"] 

383 if not traverse_config_path(config=config, config_path=config_path): 

384 LOGGER.warning(f"Warning: chat_id is not found in {config_path_to_string(config_path)}.") 

385 else: 

386 chat_id = get_config_value(config=config, config_path=config_path) 

387 return chat_id 

388 

389 

390# Get discord webhook_url 

391def get_discord_webhook_url(config): 

392 """Return discord webhook_url from config.""" 

393 webhook_url = None 

394 config_path = ["app", "discord", "webhook_url"] 

395 if not traverse_config_path(config=config, config_path=config_path): 

396 LOGGER.warning(f"Warning: webhook_url is not found in {config_path_to_string(config_path)}.") 

397 else: 

398 webhook_url = get_config_value(config=config, config_path=config_path) 

399 return webhook_url 

400 

401 

402# Get discord username 

403def get_discord_username(config): 

404 """Return discord username from config.""" 

405 username = None 

406 config_path = ["app", "discord", "username"] 

407 if not traverse_config_path(config=config, config_path=config_path): 

408 LOGGER.warning(f"Warning: username is not found in {config_path_to_string(config_path)}.") 

409 else: 

410 username = get_config_value(config=config, config_path=config_path) 

411 return username