Coverage for src/notify.py: 100%
308 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
1"""Send notifications when 2FA is required for iCloud authentication."""
3import datetime
4import smtplib
6import requests
8from src import config_parser, get_logger
9from src.email_message import EmailMessage as Message
11LOGGER = get_logger()
13# Throttling period for notifications (24 hours)
14THROTTLE_HOURS = 24
17def _is_throttled(last_send) -> bool:
18 """
19 Check if notification should be throttled based on last send time.
21 Args:
22 last_send: The datetime when notification was last sent, or None
24 Returns:
25 True if notification should be throttled, False otherwise
26 """
27 if last_send is None:
28 return False
29 if not isinstance(last_send, datetime.datetime):
30 return False
31 return last_send > datetime.datetime.now() - datetime.timedelta(hours=THROTTLE_HOURS)
34def _create_2fa_message(username: str, region: str = "global") -> tuple[str, str]:
35 """
36 Create the 2FA notification message and subject.
38 Args:
39 username: The iCloud username requiring 2FA
40 region: The iCloud region (default: "global")
42 Returns:
43 Tuple of (message, subject)
44 """
45 region_opt = "" if region == "global" else f"--region={region} "
46 message = f"""Two-step authentication for iCloud Drive, Photos (Docker) is required.
47 Please login to your server and authenticate. Please run -
48 `docker exec -it icloud /bin/sh -c "su-exec abc icloud --session-directory=/config/session_data {region_opt}--username={username}"`.""" # noqa: E501
49 subject = f"icloud-docker: Two step authentication is required for {username}"
50 return message, subject
53def _get_current_timestamp() -> datetime.datetime:
54 """
55 Get the current timestamp for notification tracking.
57 Returns:
58 Current datetime
59 """
60 return datetime.datetime.now()
63def _get_telegram_config(config) -> tuple[str | None, str | None, bool]:
64 """
65 Extract Telegram configuration from config.
67 Args:
68 config: The configuration dictionary
70 Returns:
71 Tuple of (bot_token, chat_id, is_configured)
72 """
73 bot_token = config_parser.get_telegram_bot_token(config=config)
74 chat_id = config_parser.get_telegram_chat_id(config=config)
75 is_configured = bool(bot_token and chat_id)
76 return bot_token, chat_id, is_configured
79def notify_telegram(config, message, last_send=None, dry_run=False):
80 """
81 Send Telegram notification with throttling and error handling.
83 Args:
84 config: Configuration dictionary
85 message: Message to send
86 last_send: Timestamp of last send for throttling
87 dry_run: If True, don't actually send the message
89 Returns:
90 Timestamp when message was sent, or last_send if throttled, or None if failed
91 """
92 if _is_throttled(last_send):
93 LOGGER.info("Throttling telegram to once a day")
94 return last_send
96 bot_token, chat_id, is_configured = _get_telegram_config(config)
97 if not is_configured:
98 LOGGER.warning("Not sending 2FA notification because Telegram is not configured.")
99 return None
101 sent_on = _get_current_timestamp()
102 if dry_run:
103 return sent_on
105 # bot_token and chat_id are guaranteed to be non-None due to is_configured check
106 if post_message_to_telegram(bot_token, chat_id, message): # type: ignore[arg-type]
107 return sent_on
108 return None
111def post_message_to_telegram(bot_token: str, chat_id: str, message: str) -> bool:
112 """
113 Post message to Telegram bot using API.
115 Args:
116 bot_token: Telegram bot token
117 chat_id: Telegram chat ID
118 message: Message to send
120 Returns:
121 True if message was sent successfully, False otherwise
122 """
123 url = f"https://api.telegram.org/bot{bot_token}/sendMessage"
124 params = {"chat_id": chat_id, "text": message}
125 response = requests.post(url, params=params, timeout=10)
126 if response.status_code == 200:
127 return True
128 # Log error message
129 LOGGER.error(f"Failed to send telegram notification. Response: {response.text}")
130 return False
133def _get_discord_config(config) -> tuple[str | None, str | None, bool]:
134 """
135 Extract Discord configuration from config.
137 Args:
138 config: The configuration dictionary
140 Returns:
141 Tuple of (webhook_url, username, is_configured)
142 """
143 webhook_url = config_parser.get_discord_webhook_url(config=config)
144 username = config_parser.get_discord_username(config=config)
145 is_configured = bool(webhook_url and username)
146 return webhook_url, username, is_configured
149def post_message_to_discord(webhook_url: str, username: str, message: str) -> bool:
150 """
151 Post message to Discord webhook.
153 Args:
154 webhook_url: Discord webhook URL
155 username: Username to display in Discord
156 message: Message to send
158 Returns:
159 True if message was sent successfully, False otherwise
160 """
161 data = {"username": username, "content": message}
162 response = requests.post(webhook_url, data=data, timeout=10)
163 if response.status_code == 204:
164 return True
165 # Log error message
166 LOGGER.error(f"Failed to send Discord notification. Response: {response.text}")
167 return False
170def notify_discord(config, message, last_send=None, dry_run=False):
171 """
172 Send Discord notification with throttling and error handling.
174 Args:
175 config: Configuration dictionary
176 message: Message to send
177 last_send: Timestamp of last send for throttling
178 dry_run: If True, don't actually send the message
180 Returns:
181 Timestamp when message was sent, or last_send if throttled, or None if failed
182 """
183 if _is_throttled(last_send):
184 LOGGER.info("Throttling discord to once a day")
185 return last_send
187 webhook_url, username, is_configured = _get_discord_config(config)
188 if not is_configured:
189 LOGGER.warning("Not sending 2FA notification because Discord is not configured.")
190 return None
192 sent_on = _get_current_timestamp()
193 if dry_run or post_message_to_discord(webhook_url, username, message): # type: ignore[arg-type]
194 return sent_on
195 return None
198def _get_pushover_config(config) -> tuple[str | None, str | None, int | None, bool]:
199 """
200 Extract Pushover configuration from config.
202 Args:
203 config: The configuration dictionary
205 Returns:
206 Tuple of (user_key, api_token, priority, is_configured)
207 """
208 user_key = config_parser.get_pushover_user_key(config=config)
209 api_token = config_parser.get_pushover_api_token(config=config)
210 priority = config_parser.get_pushover_notification_priority(config=config)
211 is_configured = bool(user_key and api_token)
212 return user_key, api_token, priority, is_configured
215def post_message_to_pushover(api_token: str, user_key: str, priority: int | None, message: str) -> bool:
216 """
217 Post message to Pushover API.
219 Args:
220 api_token: Pushover API token
221 user_key: Pushover user key
222 priority: Pushover notification priority (-2 to 2, optional)
223 message: Message to send
225 Returns:
226 True if message was sent successfully, False otherwise
227 """
228 url = "https://api.pushover.net/1/messages.json"
229 data = {"token": api_token, "user": user_key, "message": message}
230 if priority is not None:
231 data["priority"] = priority
232 response = requests.post(url, data=data, timeout=10)
233 if response.status_code == 200:
234 return True
235 LOGGER.error(f"Failed to send Pushover notification. Response: {response.text}")
236 return False
239def notify_pushover(config, message, last_send=None, dry_run=False):
240 """
241 Send Pushover notification with throttling and error handling.
243 Args:
244 config: Configuration dictionary
245 message: Message to send
246 last_send: Timestamp of last send for throttling
247 dry_run: If True, don't actually send the message
249 Returns:
250 Timestamp when message was sent, or last_send if throttled, or None if failed
251 """
252 if _is_throttled(last_send):
253 LOGGER.info("Throttling Pushover to once a day")
254 return last_send
256 user_key, api_token, priority, is_configured = _get_pushover_config(config)
257 if not is_configured:
258 LOGGER.warning("Not sending 2FA notification because Pushover is not configured.")
259 return None
261 sent_on = _get_current_timestamp()
262 if dry_run:
263 return sent_on
265 # user_key and api_token are guaranteed to be non-None due to is_configured check
266 if post_message_to_pushover(api_token, user_key, priority, message): # type: ignore[arg-type]
267 return sent_on
268 return None
271def notify_email(config, message: str, subject: str, last_send=None, dry_run=False):
272 """
273 Send email notification with throttling and error handling.
275 Args:
276 config: Configuration dictionary
277 message: Message to send
278 subject: Email subject
279 last_send: Timestamp of last send for throttling
280 dry_run: If True, don't actually send the message
282 Returns:
283 Timestamp when message was sent, or last_send if throttled, or None if failed
284 """
285 if _is_throttled(last_send):
286 LOGGER.info("Throttling email to once a day")
287 return last_send
289 email, to_email, host, port, no_tls, username, password, is_configured = _get_smtp_config(config)
290 if not is_configured:
291 LOGGER.warning("Not sending 2FA notification because SMTP is not configured")
292 return None
294 sent_on = _get_current_timestamp()
295 if dry_run:
296 return sent_on
298 try:
299 # All necessary config values are guaranteed to be non-None due to is_configured check
300 smtp = _create_smtp_connection(host, port, no_tls) # type: ignore[arg-type]
302 if password:
303 _authenticate_smtp(smtp, email, username, password) # type: ignore[arg-type]
305 # to_email could be None, use email as fallback
306 recipient = to_email if to_email else email
307 msg = build_message(email, recipient, message, subject) # type: ignore[arg-type]
308 _send_email_message(smtp, email, recipient, msg) # type: ignore[arg-type]
309 smtp.quit()
310 return sent_on
311 except Exception as e:
312 LOGGER.error(f"Failed to send email: {e!s}.")
313 return None
316def send(config, username, last_send=None, dry_run=False, region="global"):
317 """
318 Send 2FA notification to all configured notification services.
320 Args:
321 config: Configuration dictionary
322 username: iCloud username requiring 2FA
323 last_send: Timestamp of last send for throttling
324 dry_run: If True, don't actually send notifications
325 region: iCloud region (default: "global")
327 Returns:
328 Timestamp when notifications were sent, or None if all failed
329 """
330 message, subject = _create_2fa_message(username, region)
332 # Send to all notification services
333 telegram_sent = notify_telegram(config=config, message=message, last_send=last_send, dry_run=dry_run)
334 discord_sent = notify_discord(config=config, message=message, last_send=last_send, dry_run=dry_run)
335 pushover_sent = notify_pushover(config=config, message=message, last_send=last_send, dry_run=dry_run)
336 email_sent = notify_email(config=config, message=message, subject=subject, last_send=last_send, dry_run=dry_run)
338 # Return the timestamp if any notification was sent successfully
339 sent_timestamps = [t for t in [telegram_sent, discord_sent, pushover_sent, email_sent] if t is not None]
340 return sent_timestamps[0] if sent_timestamps else None
343def _get_smtp_config(
344 config,
345) -> tuple[str | None, str | None, str | None, int | None, bool, str | None, str | None, bool]:
346 """
347 Extract SMTP configuration from config.
349 Args:
350 config: The configuration dictionary
352 Returns:
353 Tuple of (email, to_email, host, port, no_tls, username, password, is_configured)
354 """
355 email = config_parser.get_smtp_email(config=config)
356 to_email = config_parser.get_smtp_to_email(config=config)
357 host = config_parser.get_smtp_host(config=config)
358 port = config_parser.get_smtp_port(config=config)
359 no_tls = config_parser.get_smtp_no_tls(config=config)
360 username = config_parser.get_smtp_username(config=config)
361 password = config_parser.get_smtp_password(config=config)
362 is_configured = bool(email and host and port)
363 return email, to_email, host, port, no_tls, username, password, is_configured
366def _create_smtp_connection(host: str, port: int, no_tls: bool) -> smtplib.SMTP:
367 """
368 Create and configure SMTP connection.
370 Args:
371 host: SMTP host
372 port: SMTP port
373 no_tls: Whether to skip TLS
375 Returns:
376 Configured SMTP connection
377 """
378 smtp = smtplib.SMTP(host, port)
379 smtp.set_debuglevel(0)
380 smtp.connect(host, port)
381 if not no_tls:
382 smtp.starttls()
383 return smtp
386def _authenticate_smtp(smtp: smtplib.SMTP, email: str, username: str | None, password: str) -> None:
387 """
388 Authenticate SMTP connection.
390 Args:
391 smtp: SMTP connection
392 email: Email address for fallback authentication
393 username: SMTP username (optional)
394 password: SMTP password
395 """
396 if username:
397 smtp.login(username, password)
398 else:
399 smtp.login(email, password)
402def _send_email_message(smtp: smtplib.SMTP, email: str, to_email: str, message_obj: Message) -> None:
403 """
404 Send email message through SMTP connection.
406 Args:
407 smtp: SMTP connection
408 email: From email address
409 to_email: To email address
410 message_obj: Email message object
411 """
412 smtp.sendmail(from_addr=email, to_addrs=to_email, msg=message_obj.as_string())
415def _contains_non_ascii(text: str | None) -> bool:
416 """Determine if the provided text contains non-ASCII characters."""
418 if text is None:
419 return False
421 try:
422 text.encode("ascii")
423 except UnicodeEncodeError:
424 return True
425 return False
428def build_message(email: str, to_email: str, message: str, subject: str) -> Message:
429 """
430 Create email message with proper headers.
432 Args:
433 email: From email address
434 to_email: To email address
435 message: Message body
436 subject: Message subject
438 Returns:
439 Configured email message object
440 """
441 requires_utf8 = _contains_non_ascii(message) or _contains_non_ascii(subject)
442 charset = "utf-8" if requires_utf8 else "us-ascii"
444 msg = Message(to=to_email, charset=charset)
445 msg.sender = "icloud-docker <" + email + ">"
446 msg.date = datetime.datetime.now().strftime("%d/%m/%Y %H:%M")
447 msg.subject = subject
448 msg.body = message
449 return msg
452# =============================================================================
453# Sync Summary Notification Functions
454# =============================================================================
457def _format_sync_summary_message(summary) -> tuple[str, str]:
458 """
459 Format sync summary as notification message.
461 Args:
462 summary: SyncSummary object containing sync statistics
464 Returns:
465 Tuple of (message, subject)
466 """
467 from src.sync_stats import format_bytes, format_duration
469 has_errors = summary.has_errors()
470 status_emoji = "⚠️" if has_errors else "✅"
471 status_text = "Completed with Errors" if has_errors else "Complete"
473 message_lines = [f"{status_emoji} iCloud Sync {status_text}", ""]
475 # Drive statistics
476 if summary.drive_stats and summary.drive_stats.has_activity():
477 drive = summary.drive_stats
478 message_lines.append("📁 Drive:")
479 if drive.files_downloaded > 0:
480 size_str = format_bytes(drive.bytes_downloaded)
481 message_lines.append(f" • Downloaded: {drive.files_downloaded} files ({size_str})")
482 if drive.files_skipped > 0:
483 message_lines.append(f" • Skipped: {drive.files_skipped} files (up-to-date)")
484 if drive.files_removed > 0:
485 message_lines.append(f" • Removed: {drive.files_removed} obsolete files")
486 if drive.duration_seconds > 0:
487 duration_str = format_duration(drive.duration_seconds)
488 message_lines.append(f" • Duration: {duration_str}")
489 if drive.has_errors():
490 message_lines.append(f" • Errors: {len(drive.errors)} failed")
491 message_lines.append("")
493 # Photos statistics
494 if summary.photo_stats and summary.photo_stats.has_activity():
495 photos = summary.photo_stats
496 message_lines.append("📷 Photos:")
497 if photos.photos_downloaded > 0:
498 size_str = format_bytes(photos.bytes_downloaded)
499 message_lines.append(f" • Downloaded: {photos.photos_downloaded} photos ({size_str})")
500 if photos.photos_hardlinked > 0:
501 message_lines.append(f" • Hard-linked: {photos.photos_hardlinked} photos")
502 if photos.bytes_saved_by_hardlinks > 0:
503 saved_str = format_bytes(photos.bytes_saved_by_hardlinks)
504 message_lines.append(f" • Storage saved: {saved_str}")
505 if photos.albums_synced:
506 albums_str = ", ".join(photos.albums_synced[:5])
507 if len(photos.albums_synced) > 5:
508 albums_str += f" (+{len(photos.albums_synced) - 5} more)"
509 message_lines.append(f" • Albums: {albums_str}")
510 if photos.duration_seconds > 0:
511 duration_str = format_duration(photos.duration_seconds)
512 message_lines.append(f" • Duration: {duration_str}")
513 if photos.has_errors():
514 message_lines.append(f" • Errors: {len(photos.errors)} failed")
515 message_lines.append("")
517 # Error details if present
518 if has_errors:
519 message_lines.append("Failed items:")
520 all_errors = []
521 if summary.drive_stats:
522 all_errors.extend(summary.drive_stats.errors[:5]) # Limit to first 5
523 if summary.photo_stats:
524 all_errors.extend(summary.photo_stats.errors[:5]) # Limit to first 5
525 message_lines.extend([f" • {error}" for error in all_errors[:10]])
526 total_errors = 0
527 if summary.drive_stats:
528 total_errors += len(summary.drive_stats.errors)
529 if summary.photo_stats:
530 total_errors += len(summary.photo_stats.errors)
531 if total_errors > 10:
532 message_lines.append(f" ... and {total_errors - 10} more errors")
533 message_lines.append("")
535 message = "\n".join(message_lines)
536 subject = f"icloud-docker: Sync {status_text}"
537 return message, subject
540def _should_send_sync_summary(config, summary) -> bool:
541 """
542 Determine if sync summary notification should be sent.
544 Args:
545 config: Configuration dictionary
546 summary: SyncSummary object
548 Returns:
549 True if notification should be sent, False otherwise
550 """
551 # Check if sync summary is enabled
552 if not config_parser.get_sync_summary_enabled(config=config):
553 return False
555 # Check if there was any activity
556 if not summary.has_activity():
557 return False
559 # Check error/success preferences
560 has_errors = summary.has_errors()
561 on_error = config_parser.get_sync_summary_on_error(config=config)
562 on_success = config_parser.get_sync_summary_on_success(config=config)
564 if has_errors and not on_error:
565 return False
566 if not has_errors and not on_success:
567 return False
569 # Check minimum downloads threshold
570 min_downloads = config_parser.get_sync_summary_min_downloads(config=config)
571 total_downloads = 0
572 if summary.drive_stats:
573 total_downloads += summary.drive_stats.files_downloaded
574 if summary.photo_stats:
575 total_downloads += summary.photo_stats.photos_downloaded
577 if total_downloads < min_downloads:
578 return False
580 return True
583def send_sync_summary(config, summary, dry_run=False):
584 """
585 Send sync summary notification to all configured services.
587 Note: Sync summaries are NOT throttled like 2FA notifications,
588 as they provide valuable operational information for each sync.
590 Args:
591 config: Configuration dictionary
592 summary: SyncSummary object containing sync statistics
593 dry_run: If True, don't actually send notifications
595 Returns:
596 True if at least one notification was sent successfully, False otherwise
597 """
598 if not _should_send_sync_summary(config, summary):
599 LOGGER.debug("Sync summary notification skipped (not enabled or no activity)")
600 return False
602 message, subject = _format_sync_summary_message(summary)
604 # Send to all notification services (no throttling for sync summaries)
605 telegram_sent = _send_telegram_no_throttle(config, message, dry_run)
606 discord_sent = _send_discord_no_throttle(config, message, dry_run)
607 pushover_sent = _send_pushover_no_throttle(config, message, dry_run)
608 email_sent = _send_email_no_throttle(config, message, subject, dry_run)
610 # Return True if any notification was sent successfully
611 any_sent = any([telegram_sent, discord_sent, pushover_sent, email_sent])
612 if any_sent:
613 LOGGER.info("Sync summary notification sent successfully")
614 return any_sent
617def _send_telegram_no_throttle(config, message: str, dry_run: bool) -> bool:
618 """Send Telegram notification without throttling.
620 Args:
621 config: Configuration dictionary
622 message: Message to send
623 dry_run: If True, don't actually send
625 Returns:
626 True if sent successfully, False otherwise
627 """
628 bot_token, chat_id, is_configured = _get_telegram_config(config)
629 if not is_configured:
630 return False
632 if dry_run:
633 return True
635 return post_message_to_telegram(bot_token, chat_id, message) # type: ignore[arg-type]
638def _send_discord_no_throttle(config, message: str, dry_run: bool) -> bool:
639 """Send Discord notification without throttling.
641 Args:
642 config: Configuration dictionary
643 message: Message to send
644 dry_run: If True, don't actually send
646 Returns:
647 True if sent successfully, False otherwise
648 """
649 webhook_url, username, is_configured = _get_discord_config(config)
650 if not is_configured:
651 return False
653 if dry_run:
654 return True
656 return post_message_to_discord(webhook_url, username, message) # type: ignore[arg-type]
659def _send_pushover_no_throttle(config, message: str, dry_run: bool) -> bool:
660 """Send Pushover notification without throttling.
662 Args:
663 config: Configuration dictionary
664 message: Message to send
665 dry_run: If True, don't actually send
667 Returns:
668 True if sent successfully, False otherwise
669 """
670 user_key, api_token, priority, is_configured = _get_pushover_config(config)
671 if not is_configured:
672 return False
674 if dry_run:
675 return True
677 return post_message_to_pushover(api_token, user_key, priority, message) # type: ignore[arg-type]
680def _send_email_no_throttle(config, message: str, subject: str, dry_run: bool) -> bool:
681 """Send email notification without throttling.
683 Args:
684 config: Configuration dictionary
685 message: Message to send
686 subject: Email subject
687 dry_run: If True, don't actually send
689 Returns:
690 True if sent successfully, False otherwise
691 """
692 email, to_email, host, port, no_tls, username, password, is_configured = _get_smtp_config(config)
693 if not is_configured:
694 return False
696 if dry_run:
697 return True
699 try:
700 smtp = _create_smtp_connection(host, port, no_tls) # type: ignore[arg-type]
702 if password:
703 _authenticate_smtp(smtp, email, username, password) # type: ignore[arg-type]
705 recipient = to_email if to_email else email
706 msg = build_message(email, recipient, message, subject) # type: ignore[arg-type]
707 _send_email_message(smtp, email, recipient, msg) # type: ignore[arg-type]
708 smtp.quit()
709 return True
710 except Exception as e:
711 LOGGER.error(f"Failed to send sync summary email: {e!s}")
712 return False