Coverage for src/notify.py: 100%
305 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
1"""Send notifications when 2FA is required for iCloud authentication."""
3import datetime
4import smtplib
6import requests
8from src import config_parser, get_logger
9from src.email_message import EmailMessage as Message
11LOGGER = get_logger()
13# Throttling period for notifications (24 hours)
14THROTTLE_HOURS = 24
17def _is_throttled(last_send) -> bool:
18 """
19 Check if notification should be throttled based on last send time.
21 Args:
22 last_send: The datetime when notification was last sent, or None
24 Returns:
25 True if notification should be throttled, False otherwise
26 """
27 if last_send is None:
28 return False
29 if not isinstance(last_send, datetime.datetime):
30 return False
31 return last_send > datetime.datetime.now() - datetime.timedelta(hours=THROTTLE_HOURS)
34def _create_2fa_message(username: str, region: str = "global") -> tuple[str, str]:
35 """
36 Create the 2FA notification message and subject.
38 Args:
39 username: The iCloud username requiring 2FA
40 region: The iCloud region (default: "global")
42 Returns:
43 Tuple of (message, subject)
44 """
45 region_opt = "" if region == "global" else f"--region={region} "
46 message = f"""Two-step authentication for iCloud Drive, Photos (Docker) is required.
47 Please login to your server and authenticate. Please run -
48 `docker exec -it icloud /bin/sh -c "su-exec abc icloud --session-directory=/config/session_data {region_opt}--username={username}"`.""" # noqa: E501
49 subject = f"icloud-docker: Two step authentication is required for {username}"
50 return message, subject
53def _get_current_timestamp() -> datetime.datetime:
54 """
55 Get the current timestamp for notification tracking.
57 Returns:
58 Current datetime
59 """
60 return datetime.datetime.now()
63def _get_telegram_config(config) -> tuple[str | None, str | None, bool]:
64 """
65 Extract Telegram configuration from config.
67 Args:
68 config: The configuration dictionary
70 Returns:
71 Tuple of (bot_token, chat_id, is_configured)
72 """
73 bot_token = config_parser.get_telegram_bot_token(config=config)
74 chat_id = config_parser.get_telegram_chat_id(config=config)
75 is_configured = bool(bot_token and chat_id)
76 return bot_token, chat_id, is_configured
79def notify_telegram(config, message, last_send=None, dry_run=False):
80 """
81 Send Telegram notification with throttling and error handling.
83 Args:
84 config: Configuration dictionary
85 message: Message to send
86 last_send: Timestamp of last send for throttling
87 dry_run: If True, don't actually send the message
89 Returns:
90 Timestamp when message was sent, or last_send if throttled, or None if failed
91 """
92 if _is_throttled(last_send):
93 LOGGER.info("Throttling telegram to once a day")
94 return last_send
96 bot_token, chat_id, is_configured = _get_telegram_config(config)
97 if not is_configured:
98 LOGGER.warning("Not sending 2FA notification because Telegram is not configured.")
99 return None
101 sent_on = _get_current_timestamp()
102 if dry_run:
103 return sent_on
105 # bot_token and chat_id are guaranteed to be non-None due to is_configured check
106 if post_message_to_telegram(bot_token, chat_id, message): # type: ignore[arg-type]
107 return sent_on
108 return None
111def post_message_to_telegram(bot_token: str, chat_id: str, message: str) -> bool:
112 """
113 Post message to Telegram bot using API.
115 Args:
116 bot_token: Telegram bot token
117 chat_id: Telegram chat ID
118 message: Message to send
120 Returns:
121 True if message was sent successfully, False otherwise
122 """
123 url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
124 params = {"chat_id": chat_id, "text": message}
125 response = requests.post(url, params=params, timeout=10)
126 if response.status_code == 200:
127 return True
128 # Log error message
129 LOGGER.error(f"Failed to send telegram notification. Response: {response.text}")
130 return False
133def _get_discord_config(config) -> tuple[str | None, str | None, bool]:
134 """
135 Extract Discord configuration from config.
137 Args:
138 config: The configuration dictionary
140 Returns:
141 Tuple of (webhook_url, username, is_configured)
142 """
143 webhook_url = config_parser.get_discord_webhook_url(config=config)
144 username = config_parser.get_discord_username(config=config)
145 is_configured = bool(webhook_url and username)
146 return webhook_url, username, is_configured
149def post_message_to_discord(webhook_url: str, username: str, message: str) -> bool:
150 """
151 Post message to Discord webhook.
153 Args:
154 webhook_url: Discord webhook URL
155 username: Username to display in Discord
156 message: Message to send
158 Returns:
159 True if message was sent successfully, False otherwise
160 """
161 data = {"username": username, "content": message}
162 response = requests.post(webhook_url, data=data, timeout=10)
163 if response.status_code == 204:
164 return True
165 # Log error message
166 LOGGER.error(f"Failed to send Discord notification. Response: {response.text}")
167 return False
170def notify_discord(config, message, last_send=None, dry_run=False):
171 """
172 Send Discord notification with throttling and error handling.
174 Args:
175 config: Configuration dictionary
176 message: Message to send
177 last_send: Timestamp of last send for throttling
178 dry_run: If True, don't actually send the message
180 Returns:
181 Timestamp when message was sent, or last_send if throttled, or None if failed
182 """
183 if _is_throttled(last_send):
184 LOGGER.info("Throttling discord to once a day")
185 return last_send
187 webhook_url, username, is_configured = _get_discord_config(config)
188 if not is_configured:
189 LOGGER.warning("Not sending 2FA notification because Discord is not configured.")
190 return None
192 sent_on = _get_current_timestamp()
193 if dry_run or post_message_to_discord(webhook_url, username, message): # type: ignore[arg-type]
194 return sent_on
195 return None
198def _get_pushover_config(config) -> tuple[str | None, str | None, bool]:
199 """
200 Extract Pushover configuration from config.
202 Args:
203 config: The configuration dictionary
205 Returns:
206 Tuple of (user_key, api_token, is_configured)
207 """
208 user_key = config_parser.get_pushover_user_key(config=config)
209 api_token = config_parser.get_pushover_api_token(config=config)
210 is_configured = bool(user_key and api_token)
211 return user_key, api_token, is_configured
214def post_message_to_pushover(api_token: str, user_key: str, message: str) -> bool:
215 """
216 Post message to Pushover API.
218 Args:
219 api_token: Pushover API token
220 user_key: Pushover user key
221 message: Message to send
223 Returns:
224 True if message was sent successfully, False otherwise
225 """
226 url = "https://api.pushover.net/1/messages.json"
227 data = {"token": api_token, "user": user_key, "message": message}
228 response = requests.post(url, data=data, timeout=10)
229 if response.status_code == 200:
230 return True
231 LOGGER.error(f"Failed to send Pushover notification. Response: {response.text}")
232 return False
235def notify_pushover(config, message, last_send=None, dry_run=False):
236 """
237 Send Pushover notification with throttling and error handling.
239 Args:
240 config: Configuration dictionary
241 message: Message to send
242 last_send: Timestamp of last send for throttling
243 dry_run: If True, don't actually send the message
245 Returns:
246 Timestamp when message was sent, or last_send if throttled, or None if failed
247 """
248 if _is_throttled(last_send):
249 LOGGER.info("Throttling Pushover to once a day")
250 return last_send
252 user_key, api_token, is_configured = _get_pushover_config(config)
253 if not is_configured:
254 LOGGER.warning("Not sending 2FA notification because Pushover is not configured.")
255 return None
257 sent_on = _get_current_timestamp()
258 if dry_run:
259 return sent_on
261 # user_key and api_token are guaranteed to be non-None due to is_configured check
262 if post_message_to_pushover(api_token, user_key, message): # type: ignore[arg-type]
263 return sent_on
264 return None
267def notify_email(config, message: str, subject: str, last_send=None, dry_run=False):
268 """
269 Send email notification with throttling and error handling.
271 Args:
272 config: Configuration dictionary
273 message: Message to send
274 subject: Email subject
275 last_send: Timestamp of last send for throttling
276 dry_run: If True, don't actually send the message
278 Returns:
279 Timestamp when message was sent, or last_send if throttled, or None if failed
280 """
281 if _is_throttled(last_send):
282 LOGGER.info("Throttling email to once a day")
283 return last_send
285 email, to_email, host, port, no_tls, username, password, is_configured = _get_smtp_config(config)
286 if not is_configured:
287 LOGGER.warning("Not sending 2FA notification because SMTP is not configured")
288 return None
290 sent_on = _get_current_timestamp()
291 if dry_run:
292 return sent_on
294 try:
295 # All necessary config values are guaranteed to be non-None due to is_configured check
296 smtp = _create_smtp_connection(host, port, no_tls) # type: ignore[arg-type]
298 if password:
299 _authenticate_smtp(smtp, email, username, password) # type: ignore[arg-type]
301 # to_email could be None, use email as fallback
302 recipient = to_email if to_email else email
303 msg = build_message(email, recipient, message, subject) # type: ignore[arg-type]
304 _send_email_message(smtp, email, recipient, msg) # type: ignore[arg-type]
305 smtp.quit()
306 return sent_on
307 except Exception as e:
308 LOGGER.error(f"Failed to send email: {e!s}.")
309 return None
312def send(config, username, last_send=None, dry_run=False, region="global"):
313 """
314 Send 2FA notification to all configured notification services.
316 Args:
317 config: Configuration dictionary
318 username: iCloud username requiring 2FA
319 last_send: Timestamp of last send for throttling
320 dry_run: If True, don't actually send notifications
321 region: iCloud region (default: "global")
323 Returns:
324 Timestamp when notifications were sent, or None if all failed
325 """
326 message, subject = _create_2fa_message(username, region)
328 # Send to all notification services
329 telegram_sent = notify_telegram(config=config, message=message, last_send=last_send, dry_run=dry_run)
330 discord_sent = notify_discord(config=config, message=message, last_send=last_send, dry_run=dry_run)
331 pushover_sent = notify_pushover(config=config, message=message, last_send=last_send, dry_run=dry_run)
332 email_sent = notify_email(config=config, message=message, subject=subject, last_send=last_send, dry_run=dry_run)
334 # Return the timestamp if any notification was sent successfully
335 sent_timestamps = [t for t in [telegram_sent, discord_sent, pushover_sent, email_sent] if t is not None]
336 return sent_timestamps[0] if sent_timestamps else None
339def _get_smtp_config(
340 config,
341) -> tuple[str | None, str | None, str | None, int | None, bool, str | None, str | None, bool]:
342 """
343 Extract SMTP configuration from config.
345 Args:
346 config: The configuration dictionary
348 Returns:
349 Tuple of (email, to_email, host, port, no_tls, username, password, is_configured)
350 """
351 email = config_parser.get_smtp_email(config=config)
352 to_email = config_parser.get_smtp_to_email(config=config)
353 host = config_parser.get_smtp_host(config=config)
354 port = config_parser.get_smtp_port(config=config)
355 no_tls = config_parser.get_smtp_no_tls(config=config)
356 username = config_parser.get_smtp_username(config=config)
357 password = config_parser.get_smtp_password(config=config)
358 is_configured = bool(email and host and port)
359 return email, to_email, host, port, no_tls, username, password, is_configured
362def _create_smtp_connection(host: str, port: int, no_tls: bool) -> smtplib.SMTP:
363 """
364 Create and configure SMTP connection.
366 Args:
367 host: SMTP host
368 port: SMTP port
369 no_tls: Whether to skip TLS
371 Returns:
372 Configured SMTP connection
373 """
374 smtp = smtplib.SMTP(host, port)
375 smtp.set_debuglevel(0)
376 smtp.connect(host, port)
377 if not no_tls:
378 smtp.starttls()
379 return smtp
382def _authenticate_smtp(smtp: smtplib.SMTP, email: str, username: str | None, password: str) -> None:
383 """
384 Authenticate SMTP connection.
386 Args:
387 smtp: SMTP connection
388 email: Email address for fallback authentication
389 username: SMTP username (optional)
390 password: SMTP password
391 """
392 if username:
393 smtp.login(username, password)
394 else:
395 smtp.login(email, password)
398def _send_email_message(smtp: smtplib.SMTP, email: str, to_email: str, message_obj: Message) -> None:
399 """
400 Send email message through SMTP connection.
402 Args:
403 smtp: SMTP connection
404 email: From email address
405 to_email: To email address
406 message_obj: Email message object
407 """
408 smtp.sendmail(from_addr=email, to_addrs=to_email, msg=message_obj.as_string())
411def _contains_non_ascii(text: str | None) -> bool:
412 """Determine if the provided text contains non-ASCII characters."""
414 if text is None:
415 return False
417 try:
418 text.encode("ascii")
419 except UnicodeEncodeError:
420 return True
421 return False
424def build_message(email: str, to_email: str, message: str, subject: str) -> Message:
425 """
426 Create email message with proper headers.
428 Args:
429 email: From email address
430 to_email: To email address
431 message: Message body
432 subject: Message subject
434 Returns:
435 Configured email message object
436 """
437 requires_utf8 = _contains_non_ascii(message) or _contains_non_ascii(subject)
438 charset = "utf-8" if requires_utf8 else "us-ascii"
440 msg = Message(to=to_email, charset=charset)
441 msg.sender = "icloud-docker <" + email + ">"
442 msg.date = datetime.datetime.now().strftime("%d/%m/%Y %H:%M")
443 msg.subject = subject
444 msg.body = message
445 return msg
448# =============================================================================
449# Sync Summary Notification Functions
450# =============================================================================
453def _format_sync_summary_message(summary) -> tuple[str, str]:
454 """
455 Format sync summary as notification message.
457 Args:
458 summary: SyncSummary object containing sync statistics
460 Returns:
461 Tuple of (message, subject)
462 """
463 from src.sync_stats import format_bytes, format_duration
465 has_errors = summary.has_errors()
466 status_emoji = "⚠️" if has_errors else "✅"
467 status_text = "Completed with Errors" if has_errors else "Complete"
469 message_lines = [f"{status_emoji} iCloud Sync {status_text}", ""]
471 # Drive statistics
472 if summary.drive_stats and summary.drive_stats.has_activity():
473 drive = summary.drive_stats
474 message_lines.append("📁 Drive:")
475 if drive.files_downloaded > 0:
476 size_str = format_bytes(drive.bytes_downloaded)
477 message_lines.append(f" • Downloaded: {drive.files_downloaded} files ({size_str})")
478 if drive.files_skipped > 0:
479 message_lines.append(f" • Skipped: {drive.files_skipped} files (up-to-date)")
480 if drive.files_removed > 0:
481 message_lines.append(f" • Removed: {drive.files_removed} obsolete files")
482 if drive.duration_seconds > 0:
483 duration_str = format_duration(drive.duration_seconds)
484 message_lines.append(f" • Duration: {duration_str}")
485 if drive.has_errors():
486 message_lines.append(f" • Errors: {len(drive.errors)} failed")
487 message_lines.append("")
489 # Photos statistics
490 if summary.photo_stats and summary.photo_stats.has_activity():
491 photos = summary.photo_stats
492 message_lines.append("📷 Photos:")
493 if photos.photos_downloaded > 0:
494 size_str = format_bytes(photos.bytes_downloaded)
495 message_lines.append(f" • Downloaded: {photos.photos_downloaded} photos ({size_str})")
496 if photos.photos_hardlinked > 0:
497 message_lines.append(f" • Hard-linked: {photos.photos_hardlinked} photos")
498 if photos.bytes_saved_by_hardlinks > 0:
499 saved_str = format_bytes(photos.bytes_saved_by_hardlinks)
500 message_lines.append(f" • Storage saved: {saved_str}")
501 if photos.albums_synced:
502 albums_str = ", ".join(photos.albums_synced[:5])
503 if len(photos.albums_synced) > 5:
504 albums_str += f" (+{len(photos.albums_synced) - 5} more)"
505 message_lines.append(f" • Albums: {albums_str}")
506 if photos.duration_seconds > 0:
507 duration_str = format_duration(photos.duration_seconds)
508 message_lines.append(f" • Duration: {duration_str}")
509 if photos.has_errors():
510 message_lines.append(f" • Errors: {len(photos.errors)} failed")
511 message_lines.append("")
513 # Error details if present
514 if has_errors:
515 message_lines.append("Failed items:")
516 all_errors = []
517 if summary.drive_stats:
518 all_errors.extend(summary.drive_stats.errors[:5]) # Limit to first 5
519 if summary.photo_stats:
520 all_errors.extend(summary.photo_stats.errors[:5]) # Limit to first 5
521 message_lines.extend([f" • {error}" for error in all_errors[:10]])
522 total_errors = 0
523 if summary.drive_stats:
524 total_errors += len(summary.drive_stats.errors)
525 if summary.photo_stats:
526 total_errors += len(summary.photo_stats.errors)
527 if total_errors > 10:
528 message_lines.append(f" ... and {total_errors - 10} more errors")
529 message_lines.append("")
531 message = "\n".join(message_lines)
532 subject = f"icloud-docker: Sync {status_text}"
533 return message, subject
536def _should_send_sync_summary(config, summary) -> bool:
537 """
538 Determine if sync summary notification should be sent.
540 Args:
541 config: Configuration dictionary
542 summary: SyncSummary object
544 Returns:
545 True if notification should be sent, False otherwise
546 """
547 # Check if sync summary is enabled
548 if not config_parser.get_sync_summary_enabled(config=config):
549 return False
551 # Check if there was any activity
552 if not summary.has_activity():
553 return False
555 # Check error/success preferences
556 has_errors = summary.has_errors()
557 on_error = config_parser.get_sync_summary_on_error(config=config)
558 on_success = config_parser.get_sync_summary_on_success(config=config)
560 if has_errors and not on_error:
561 return False
562 if not has_errors and not on_success:
563 return False
565 # Check minimum downloads threshold
566 min_downloads = config_parser.get_sync_summary_min_downloads(config=config)
567 total_downloads = 0
568 if summary.drive_stats:
569 total_downloads += summary.drive_stats.files_downloaded
570 if summary.photo_stats:
571 total_downloads += summary.photo_stats.photos_downloaded
573 if total_downloads < min_downloads:
574 return False
576 return True
579def send_sync_summary(config, summary, dry_run=False):
580 """
581 Send sync summary notification to all configured services.
583 Note: Sync summaries are NOT throttled like 2FA notifications,
584 as they provide valuable operational information for each sync.
586 Args:
587 config: Configuration dictionary
588 summary: SyncSummary object containing sync statistics
589 dry_run: If True, don't actually send notifications
591 Returns:
592 True if at least one notification was sent successfully, False otherwise
593 """
594 if not _should_send_sync_summary(config, summary):
595 LOGGER.debug("Sync summary notification skipped (not enabled or no activity)")
596 return False
598 message, subject = _format_sync_summary_message(summary)
600 # Send to all notification services (no throttling for sync summaries)
601 telegram_sent = _send_telegram_no_throttle(config, message, dry_run)
602 discord_sent = _send_discord_no_throttle(config, message, dry_run)
603 pushover_sent = _send_pushover_no_throttle(config, message, dry_run)
604 email_sent = _send_email_no_throttle(config, message, subject, dry_run)
606 # Return True if any notification was sent successfully
607 any_sent = any([telegram_sent, discord_sent, pushover_sent, email_sent])
608 if any_sent:
609 LOGGER.info("Sync summary notification sent successfully")
610 return any_sent
613def _send_telegram_no_throttle(config, message: str, dry_run: bool) -> bool:
614 """Send Telegram notification without throttling.
616 Args:
617 config: Configuration dictionary
618 message: Message to send
619 dry_run: If True, don't actually send
621 Returns:
622 True if sent successfully, False otherwise
623 """
624 bot_token, chat_id, is_configured = _get_telegram_config(config)
625 if not is_configured:
626 return False
628 if dry_run:
629 return True
631 return post_message_to_telegram(bot_token, chat_id, message) # type: ignore[arg-type]
634def _send_discord_no_throttle(config, message: str, dry_run: bool) -> bool:
635 """Send Discord notification without throttling.
637 Args:
638 config: Configuration dictionary
639 message: Message to send
640 dry_run: If True, don't actually send
642 Returns:
643 True if sent successfully, False otherwise
644 """
645 webhook_url, username, is_configured = _get_discord_config(config)
646 if not is_configured:
647 return False
649 if dry_run:
650 return True
652 return post_message_to_discord(webhook_url, username, message) # type: ignore[arg-type]
655def _send_pushover_no_throttle(config, message: str, dry_run: bool) -> bool:
656 """Send Pushover notification without throttling.
658 Args:
659 config: Configuration dictionary
660 message: Message to send
661 dry_run: If True, don't actually send
663 Returns:
664 True if sent successfully, False otherwise
665 """
666 user_key, api_token, is_configured = _get_pushover_config(config)
667 if not is_configured:
668 return False
670 if dry_run:
671 return True
673 return post_message_to_pushover(api_token, user_key, message) # type: ignore[arg-type]
676def _send_email_no_throttle(config, message: str, subject: str, dry_run: bool) -> bool:
677 """Send email notification without throttling.
679 Args:
680 config: Configuration dictionary
681 message: Message to send
682 subject: Email subject
683 dry_run: If True, don't actually send
685 Returns:
686 True if sent successfully, False otherwise
687 """
688 email, to_email, host, port, no_tls, username, password, is_configured = _get_smtp_config(config)
689 if not is_configured:
690 return False
692 if dry_run:
693 return True
695 try:
696 smtp = _create_smtp_connection(host, port, no_tls) # type: ignore[arg-type]
698 if password:
699 _authenticate_smtp(smtp, email, username, password) # type: ignore[arg-type]
701 recipient = to_email if to_email else email
702 msg = build_message(email, recipient, message, subject) # type: ignore[arg-type]
703 _send_email_message(smtp, email, recipient, msg) # type: ignore[arg-type]
704 smtp.quit()
705 return True
706 except Exception as e:
707 LOGGER.error(f"Failed to send sync summary email: {e!s}")
708 return False