Coverage for src/notify.py: 100%

305 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 04:41 +0000

1"""Send notifications when 2FA is required for iCloud authentication.""" 

2 

3import datetime 

4import smtplib 

5 

6import requests 

7 

8from src import config_parser, get_logger 

9from src.email_message import EmailMessage as Message 

10 

11LOGGER = get_logger() 

12 

13# Throttling period for notifications (24 hours) 

14THROTTLE_HOURS = 24 

15 

16 

17def _is_throttled(last_send) -> bool: 

18 """ 

19 Check if notification should be throttled based on last send time. 

20 

21 Args: 

22 last_send: The datetime when notification was last sent, or None 

23 

24 Returns: 

25 True if notification should be throttled, False otherwise 

26 """ 

27 if last_send is None: 

28 return False 

29 if not isinstance(last_send, datetime.datetime): 

30 return False 

31 return last_send > datetime.datetime.now() - datetime.timedelta(hours=THROTTLE_HOURS) 

32 

33 

34def _create_2fa_message(username: str, region: str = "global") -> tuple[str, str]: 

35 """ 

36 Create the 2FA notification message and subject. 

37 

38 Args: 

39 username: The iCloud username requiring 2FA 

40 region: The iCloud region (default: "global") 

41 

42 Returns: 

43 Tuple of (message, subject) 

44 """ 

45 region_opt = "" if region == "global" else f"--region={region} " 

46 message = f"""Two-step authentication for iCloud Drive, Photos (Docker) is required. 

47 Please login to your server and authenticate. Please run - 

48 `docker exec -it icloud /bin/sh -c "su-exec abc icloud --session-directory=/config/session_data {region_opt}--username={username}"`.""" # noqa: E501 

49 subject = f"icloud-docker: Two step authentication is required for {username}" 

50 return message, subject 

51 

52 

53def _get_current_timestamp() -> datetime.datetime: 

54 """ 

55 Get the current timestamp for notification tracking. 

56 

57 Returns: 

58 Current datetime 

59 """ 

60 return datetime.datetime.now() 

61 

62 

63def _get_telegram_config(config) -> tuple[str | None, str | None, bool]: 

64 """ 

65 Extract Telegram configuration from config. 

66 

67 Args: 

68 config: The configuration dictionary 

69 

70 Returns: 

71 Tuple of (bot_token, chat_id, is_configured) 

72 """ 

73 bot_token = config_parser.get_telegram_bot_token(config=config) 

74 chat_id = config_parser.get_telegram_chat_id(config=config) 

75 is_configured = bool(bot_token and chat_id) 

76 return bot_token, chat_id, is_configured 

77 

78 

79def notify_telegram(config, message, last_send=None, dry_run=False): 

80 """ 

81 Send Telegram notification with throttling and error handling. 

82 

83 Args: 

84 config: Configuration dictionary 

85 message: Message to send 

86 last_send: Timestamp of last send for throttling 

87 dry_run: If True, don't actually send the message 

88 

89 Returns: 

90 Timestamp when message was sent, or last_send if throttled, or None if failed 

91 """ 

92 if _is_throttled(last_send): 

93 LOGGER.info("Throttling telegram to once a day") 

94 return last_send 

95 

96 bot_token, chat_id, is_configured = _get_telegram_config(config) 

97 if not is_configured: 

98 LOGGER.warning("Not sending 2FA notification because Telegram is not configured.") 

99 return None 

100 

101 sent_on = _get_current_timestamp() 

102 if dry_run: 

103 return sent_on 

104 

105 # bot_token and chat_id are guaranteed to be non-None due to is_configured check 

106 if post_message_to_telegram(bot_token, chat_id, message): # type: ignore[arg-type] 

107 return sent_on 

108 return None 

109 

110 

111def post_message_to_telegram(bot_token: str, chat_id: str, message: str) -> bool: 

112 """ 

113 Post message to Telegram bot using API. 

114 

115 Args: 

116 bot_token: Telegram bot token 

117 chat_id: Telegram chat ID 

118 message: Message to send 

119 

120 Returns: 

121 True if message was sent successfully, False otherwise 

122 """ 

123 url = f"https://api.telegram.org/bot{bot_token}/sendMessage" 

124 params = {"chat_id": chat_id, "text": message} 

125 response = requests.post(url, params=params, timeout=10) 

126 if response.status_code == 200: 

127 return True 

128 # Log error message 

129 LOGGER.error(f"Failed to send telegram notification. Response: {response.text}") 

130 return False 

131 

132 

133def _get_discord_config(config) -> tuple[str | None, str | None, bool]: 

134 """ 

135 Extract Discord configuration from config. 

136 

137 Args: 

138 config: The configuration dictionary 

139 

140 Returns: 

141 Tuple of (webhook_url, username, is_configured) 

142 """ 

143 webhook_url = config_parser.get_discord_webhook_url(config=config) 

144 username = config_parser.get_discord_username(config=config) 

145 is_configured = bool(webhook_url and username) 

146 return webhook_url, username, is_configured 

147 

148 

149def post_message_to_discord(webhook_url: str, username: str, message: str) -> bool: 

150 """ 

151 Post message to Discord webhook. 

152 

153 Args: 

154 webhook_url: Discord webhook URL 

155 username: Username to display in Discord 

156 message: Message to send 

157 

158 Returns: 

159 True if message was sent successfully, False otherwise 

160 """ 

161 data = {"username": username, "content": message} 

162 response = requests.post(webhook_url, data=data, timeout=10) 

163 if response.status_code == 204: 

164 return True 

165 # Log error message 

166 LOGGER.error(f"Failed to send Discord notification. Response: {response.text}") 

167 return False 

168 

169 

170def notify_discord(config, message, last_send=None, dry_run=False): 

171 """ 

172 Send Discord notification with throttling and error handling. 

173 

174 Args: 

175 config: Configuration dictionary 

176 message: Message to send 

177 last_send: Timestamp of last send for throttling 

178 dry_run: If True, don't actually send the message 

179 

180 Returns: 

181 Timestamp when message was sent, or last_send if throttled, or None if failed 

182 """ 

183 if _is_throttled(last_send): 

184 LOGGER.info("Throttling discord to once a day") 

185 return last_send 

186 

187 webhook_url, username, is_configured = _get_discord_config(config) 

188 if not is_configured: 

189 LOGGER.warning("Not sending 2FA notification because Discord is not configured.") 

190 return None 

191 

192 sent_on = _get_current_timestamp() 

193 if dry_run or post_message_to_discord(webhook_url, username, message): # type: ignore[arg-type] 

194 return sent_on 

195 return None 

196 

197 

198def _get_pushover_config(config) -> tuple[str | None, str | None, bool]: 

199 """ 

200 Extract Pushover configuration from config. 

201 

202 Args: 

203 config: The configuration dictionary 

204 

205 Returns: 

206 Tuple of (user_key, api_token, is_configured) 

207 """ 

208 user_key = config_parser.get_pushover_user_key(config=config) 

209 api_token = config_parser.get_pushover_api_token(config=config) 

210 is_configured = bool(user_key and api_token) 

211 return user_key, api_token, is_configured 

212 

213 

214def post_message_to_pushover(api_token: str, user_key: str, message: str) -> bool: 

215 """ 

216 Post message to Pushover API. 

217 

218 Args: 

219 api_token: Pushover API token 

220 user_key: Pushover user key 

221 message: Message to send 

222 

223 Returns: 

224 True if message was sent successfully, False otherwise 

225 """ 

226 url = "https://api.pushover.net/1/messages.json" 

227 data = {"token": api_token, "user": user_key, "message": message} 

228 response = requests.post(url, data=data, timeout=10) 

229 if response.status_code == 200: 

230 return True 

231 LOGGER.error(f"Failed to send Pushover notification. Response: {response.text}") 

232 return False 

233 

234 

235def notify_pushover(config, message, last_send=None, dry_run=False): 

236 """ 

237 Send Pushover notification with throttling and error handling. 

238 

239 Args: 

240 config: Configuration dictionary 

241 message: Message to send 

242 last_send: Timestamp of last send for throttling 

243 dry_run: If True, don't actually send the message 

244 

245 Returns: 

246 Timestamp when message was sent, or last_send if throttled, or None if failed 

247 """ 

248 if _is_throttled(last_send): 

249 LOGGER.info("Throttling Pushover to once a day") 

250 return last_send 

251 

252 user_key, api_token, is_configured = _get_pushover_config(config) 

253 if not is_configured: 

254 LOGGER.warning("Not sending 2FA notification because Pushover is not configured.") 

255 return None 

256 

257 sent_on = _get_current_timestamp() 

258 if dry_run: 

259 return sent_on 

260 

261 # user_key and api_token are guaranteed to be non-None due to is_configured check 

262 if post_message_to_pushover(api_token, user_key, message): # type: ignore[arg-type] 

263 return sent_on 

264 return None 

265 

266 

267def notify_email(config, message: str, subject: str, last_send=None, dry_run=False): 

268 """ 

269 Send email notification with throttling and error handling. 

270 

271 Args: 

272 config: Configuration dictionary 

273 message: Message to send 

274 subject: Email subject 

275 last_send: Timestamp of last send for throttling 

276 dry_run: If True, don't actually send the message 

277 

278 Returns: 

279 Timestamp when message was sent, or last_send if throttled, or None if failed 

280 """ 

281 if _is_throttled(last_send): 

282 LOGGER.info("Throttling email to once a day") 

283 return last_send 

284 

285 email, to_email, host, port, no_tls, username, password, is_configured = _get_smtp_config(config) 

286 if not is_configured: 

287 LOGGER.warning("Not sending 2FA notification because SMTP is not configured") 

288 return None 

289 

290 sent_on = _get_current_timestamp() 

291 if dry_run: 

292 return sent_on 

293 

294 try: 

295 # All necessary config values are guaranteed to be non-None due to is_configured check 

296 smtp = _create_smtp_connection(host, port, no_tls) # type: ignore[arg-type] 

297 

298 if password: 

299 _authenticate_smtp(smtp, email, username, password) # type: ignore[arg-type] 

300 

301 # to_email could be None, use email as fallback 

302 recipient = to_email if to_email else email 

303 msg = build_message(email, recipient, message, subject) # type: ignore[arg-type] 

304 _send_email_message(smtp, email, recipient, msg) # type: ignore[arg-type] 

305 smtp.quit() 

306 return sent_on 

307 except Exception as e: 

308 LOGGER.error(f"Failed to send email: {e!s}.") 

309 return None 

310 

311 

312def send(config, username, last_send=None, dry_run=False, region="global"): 

313 """ 

314 Send 2FA notification to all configured notification services. 

315 

316 Args: 

317 config: Configuration dictionary 

318 username: iCloud username requiring 2FA 

319 last_send: Timestamp of last send for throttling 

320 dry_run: If True, don't actually send notifications 

321 region: iCloud region (default: "global") 

322 

323 Returns: 

324 Timestamp when notifications were sent, or None if all failed 

325 """ 

326 message, subject = _create_2fa_message(username, region) 

327 

328 # Send to all notification services 

329 telegram_sent = notify_telegram(config=config, message=message, last_send=last_send, dry_run=dry_run) 

330 discord_sent = notify_discord(config=config, message=message, last_send=last_send, dry_run=dry_run) 

331 pushover_sent = notify_pushover(config=config, message=message, last_send=last_send, dry_run=dry_run) 

332 email_sent = notify_email(config=config, message=message, subject=subject, last_send=last_send, dry_run=dry_run) 

333 

334 # Return the timestamp if any notification was sent successfully 

335 sent_timestamps = [t for t in [telegram_sent, discord_sent, pushover_sent, email_sent] if t is not None] 

336 return sent_timestamps[0] if sent_timestamps else None 

337 

338 

339def _get_smtp_config( 

340 config, 

341) -> tuple[str | None, str | None, str | None, int | None, bool, str | None, str | None, bool]: 

342 """ 

343 Extract SMTP configuration from config. 

344 

345 Args: 

346 config: The configuration dictionary 

347 

348 Returns: 

349 Tuple of (email, to_email, host, port, no_tls, username, password, is_configured) 

350 """ 

351 email = config_parser.get_smtp_email(config=config) 

352 to_email = config_parser.get_smtp_to_email(config=config) 

353 host = config_parser.get_smtp_host(config=config) 

354 port = config_parser.get_smtp_port(config=config) 

355 no_tls = config_parser.get_smtp_no_tls(config=config) 

356 username = config_parser.get_smtp_username(config=config) 

357 password = config_parser.get_smtp_password(config=config) 

358 is_configured = bool(email and host and port) 

359 return email, to_email, host, port, no_tls, username, password, is_configured 

360 

361 

362def _create_smtp_connection(host: str, port: int, no_tls: bool) -> smtplib.SMTP: 

363 """ 

364 Create and configure SMTP connection. 

365 

366 Args: 

367 host: SMTP host 

368 port: SMTP port 

369 no_tls: Whether to skip TLS 

370 

371 Returns: 

372 Configured SMTP connection 

373 """ 

374 smtp = smtplib.SMTP(host, port) 

375 smtp.set_debuglevel(0) 

376 smtp.connect(host, port) 

377 if not no_tls: 

378 smtp.starttls() 

379 return smtp 

380 

381 

382def _authenticate_smtp(smtp: smtplib.SMTP, email: str, username: str | None, password: str) -> None: 

383 """ 

384 Authenticate SMTP connection. 

385 

386 Args: 

387 smtp: SMTP connection 

388 email: Email address for fallback authentication 

389 username: SMTP username (optional) 

390 password: SMTP password 

391 """ 

392 if username: 

393 smtp.login(username, password) 

394 else: 

395 smtp.login(email, password) 

396 

397 

398def _send_email_message(smtp: smtplib.SMTP, email: str, to_email: str, message_obj: Message) -> None: 

399 """ 

400 Send email message through SMTP connection. 

401 

402 Args: 

403 smtp: SMTP connection 

404 email: From email address 

405 to_email: To email address 

406 message_obj: Email message object 

407 """ 

408 smtp.sendmail(from_addr=email, to_addrs=to_email, msg=message_obj.as_string()) 

409 

410 

411def _contains_non_ascii(text: str | None) -> bool: 

412 """Determine if the provided text contains non-ASCII characters.""" 

413 

414 if text is None: 

415 return False 

416 

417 try: 

418 text.encode("ascii") 

419 except UnicodeEncodeError: 

420 return True 

421 return False 

422 

423 

424def build_message(email: str, to_email: str, message: str, subject: str) -> Message: 

425 """ 

426 Create email message with proper headers. 

427 

428 Args: 

429 email: From email address 

430 to_email: To email address 

431 message: Message body 

432 subject: Message subject 

433 

434 Returns: 

435 Configured email message object 

436 """ 

437 requires_utf8 = _contains_non_ascii(message) or _contains_non_ascii(subject) 

438 charset = "utf-8" if requires_utf8 else "us-ascii" 

439 

440 msg = Message(to=to_email, charset=charset) 

441 msg.sender = "icloud-docker <" + email + ">" 

442 msg.date = datetime.datetime.now().strftime("%d/%m/%Y %H:%M") 

443 msg.subject = subject 

444 msg.body = message 

445 return msg 

446 

447 

448# ============================================================================= 

449# Sync Summary Notification Functions 

450# ============================================================================= 

451 

452 

453def _format_sync_summary_message(summary) -> tuple[str, str]: 

454 """ 

455 Format sync summary as notification message. 

456 

457 Args: 

458 summary: SyncSummary object containing sync statistics 

459 

460 Returns: 

461 Tuple of (message, subject) 

462 """ 

463 from src.sync_stats import format_bytes, format_duration 

464 

465 has_errors = summary.has_errors() 

466 status_emoji = "⚠️" if has_errors else "✅" 

467 status_text = "Completed with Errors" if has_errors else "Complete" 

468 

469 message_lines = [f"{status_emoji} iCloud Sync {status_text}", ""] 

470 

471 # Drive statistics 

472 if summary.drive_stats and summary.drive_stats.has_activity(): 

473 drive = summary.drive_stats 

474 message_lines.append("📁 Drive:") 

475 if drive.files_downloaded > 0: 

476 size_str = format_bytes(drive.bytes_downloaded) 

477 message_lines.append(f" • Downloaded: {drive.files_downloaded} files ({size_str})") 

478 if drive.files_skipped > 0: 

479 message_lines.append(f" • Skipped: {drive.files_skipped} files (up-to-date)") 

480 if drive.files_removed > 0: 

481 message_lines.append(f" • Removed: {drive.files_removed} obsolete files") 

482 if drive.duration_seconds > 0: 

483 duration_str = format_duration(drive.duration_seconds) 

484 message_lines.append(f" • Duration: {duration_str}") 

485 if drive.has_errors(): 

486 message_lines.append(f" • Errors: {len(drive.errors)} failed") 

487 message_lines.append("") 

488 

489 # Photos statistics 

490 if summary.photo_stats and summary.photo_stats.has_activity(): 

491 photos = summary.photo_stats 

492 message_lines.append("📷 Photos:") 

493 if photos.photos_downloaded > 0: 

494 size_str = format_bytes(photos.bytes_downloaded) 

495 message_lines.append(f" • Downloaded: {photos.photos_downloaded} photos ({size_str})") 

496 if photos.photos_hardlinked > 0: 

497 message_lines.append(f" • Hard-linked: {photos.photos_hardlinked} photos") 

498 if photos.bytes_saved_by_hardlinks > 0: 

499 saved_str = format_bytes(photos.bytes_saved_by_hardlinks) 

500 message_lines.append(f" • Storage saved: {saved_str}") 

501 if photos.albums_synced: 

502 albums_str = ", ".join(photos.albums_synced[:5]) 

503 if len(photos.albums_synced) > 5: 

504 albums_str += f" (+{len(photos.albums_synced) - 5} more)" 

505 message_lines.append(f" • Albums: {albums_str}") 

506 if photos.duration_seconds > 0: 

507 duration_str = format_duration(photos.duration_seconds) 

508 message_lines.append(f" • Duration: {duration_str}") 

509 if photos.has_errors(): 

510 message_lines.append(f" • Errors: {len(photos.errors)} failed") 

511 message_lines.append("") 

512 

513 # Error details if present 

514 if has_errors: 

515 message_lines.append("Failed items:") 

516 all_errors = [] 

517 if summary.drive_stats: 

518 all_errors.extend(summary.drive_stats.errors[:5]) # Limit to first 5 

519 if summary.photo_stats: 

520 all_errors.extend(summary.photo_stats.errors[:5]) # Limit to first 5 

521 message_lines.extend([f" • {error}" for error in all_errors[:10]]) 

522 total_errors = 0 

523 if summary.drive_stats: 

524 total_errors += len(summary.drive_stats.errors) 

525 if summary.photo_stats: 

526 total_errors += len(summary.photo_stats.errors) 

527 if total_errors > 10: 

528 message_lines.append(f" ... and {total_errors - 10} more errors") 

529 message_lines.append("") 

530 

531 message = "\n".join(message_lines) 

532 subject = f"icloud-docker: Sync {status_text}" 

533 return message, subject 

534 

535 

536def _should_send_sync_summary(config, summary) -> bool: 

537 """ 

538 Determine if sync summary notification should be sent. 

539 

540 Args: 

541 config: Configuration dictionary 

542 summary: SyncSummary object 

543 

544 Returns: 

545 True if notification should be sent, False otherwise 

546 """ 

547 # Check if sync summary is enabled 

548 if not config_parser.get_sync_summary_enabled(config=config): 

549 return False 

550 

551 # Check if there was any activity 

552 if not summary.has_activity(): 

553 return False 

554 

555 # Check error/success preferences 

556 has_errors = summary.has_errors() 

557 on_error = config_parser.get_sync_summary_on_error(config=config) 

558 on_success = config_parser.get_sync_summary_on_success(config=config) 

559 

560 if has_errors and not on_error: 

561 return False 

562 if not has_errors and not on_success: 

563 return False 

564 

565 # Check minimum downloads threshold 

566 min_downloads = config_parser.get_sync_summary_min_downloads(config=config) 

567 total_downloads = 0 

568 if summary.drive_stats: 

569 total_downloads += summary.drive_stats.files_downloaded 

570 if summary.photo_stats: 

571 total_downloads += summary.photo_stats.photos_downloaded 

572 

573 if total_downloads < min_downloads: 

574 return False 

575 

576 return True 

577 

578 

579def send_sync_summary(config, summary, dry_run=False): 

580 """ 

581 Send sync summary notification to all configured services. 

582 

583 Note: Sync summaries are NOT throttled like 2FA notifications, 

584 as they provide valuable operational information for each sync. 

585 

586 Args: 

587 config: Configuration dictionary 

588 summary: SyncSummary object containing sync statistics 

589 dry_run: If True, don't actually send notifications 

590 

591 Returns: 

592 True if at least one notification was sent successfully, False otherwise 

593 """ 

594 if not _should_send_sync_summary(config, summary): 

595 LOGGER.debug("Sync summary notification skipped (not enabled or no activity)") 

596 return False 

597 

598 message, subject = _format_sync_summary_message(summary) 

599 

600 # Send to all notification services (no throttling for sync summaries) 

601 telegram_sent = _send_telegram_no_throttle(config, message, dry_run) 

602 discord_sent = _send_discord_no_throttle(config, message, dry_run) 

603 pushover_sent = _send_pushover_no_throttle(config, message, dry_run) 

604 email_sent = _send_email_no_throttle(config, message, subject, dry_run) 

605 

606 # Return True if any notification was sent successfully 

607 any_sent = any([telegram_sent, discord_sent, pushover_sent, email_sent]) 

608 if any_sent: 

609 LOGGER.info("Sync summary notification sent successfully") 

610 return any_sent 

611 

612 

613def _send_telegram_no_throttle(config, message: str, dry_run: bool) -> bool: 

614 """Send Telegram notification without throttling. 

615 

616 Args: 

617 config: Configuration dictionary 

618 message: Message to send 

619 dry_run: If True, don't actually send 

620 

621 Returns: 

622 True if sent successfully, False otherwise 

623 """ 

624 bot_token, chat_id, is_configured = _get_telegram_config(config) 

625 if not is_configured: 

626 return False 

627 

628 if dry_run: 

629 return True 

630 

631 return post_message_to_telegram(bot_token, chat_id, message) # type: ignore[arg-type] 

632 

633 

634def _send_discord_no_throttle(config, message: str, dry_run: bool) -> bool: 

635 """Send Discord notification without throttling. 

636 

637 Args: 

638 config: Configuration dictionary 

639 message: Message to send 

640 dry_run: If True, don't actually send 

641 

642 Returns: 

643 True if sent successfully, False otherwise 

644 """ 

645 webhook_url, username, is_configured = _get_discord_config(config) 

646 if not is_configured: 

647 return False 

648 

649 if dry_run: 

650 return True 

651 

652 return post_message_to_discord(webhook_url, username, message) # type: ignore[arg-type] 

653 

654 

655def _send_pushover_no_throttle(config, message: str, dry_run: bool) -> bool: 

656 """Send Pushover notification without throttling. 

657 

658 Args: 

659 config: Configuration dictionary 

660 message: Message to send 

661 dry_run: If True, don't actually send 

662 

663 Returns: 

664 True if sent successfully, False otherwise 

665 """ 

666 user_key, api_token, is_configured = _get_pushover_config(config) 

667 if not is_configured: 

668 return False 

669 

670 if dry_run: 

671 return True 

672 

673 return post_message_to_pushover(api_token, user_key, message) # type: ignore[arg-type] 

674 

675 

676def _send_email_no_throttle(config, message: str, subject: str, dry_run: bool) -> bool: 

677 """Send email notification without throttling. 

678 

679 Args: 

680 config: Configuration dictionary 

681 message: Message to send 

682 subject: Email subject 

683 dry_run: If True, don't actually send 

684 

685 Returns: 

686 True if sent successfully, False otherwise 

687 """ 

688 email, to_email, host, port, no_tls, username, password, is_configured = _get_smtp_config(config) 

689 if not is_configured: 

690 return False 

691 

692 if dry_run: 

693 return True 

694 

695 try: 

696 smtp = _create_smtp_connection(host, port, no_tls) # type: ignore[arg-type] 

697 

698 if password: 

699 _authenticate_smtp(smtp, email, username, password) # type: ignore[arg-type] 

700 

701 recipient = to_email if to_email else email 

702 msg = build_message(email, recipient, message, subject) # type: ignore[arg-type] 

703 _send_email_message(smtp, email, recipient, msg) # type: ignore[arg-type] 

704 smtp.quit() 

705 return True 

706 except Exception as e: 

707 LOGGER.error(f"Failed to send sync summary email: {e!s}") 

708 return False