Coverage for icloudpy/base.py: 77%
365 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-02 05:49 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-02 05:49 +0000
1"""Library base file."""
3import base64
4import getpass
5import hashlib
6import http.cookiejar as cookielib
7import inspect
8import json
9import logging
10from os import mkdir, path
11from re import match
12from tempfile import gettempdir
13from uuid import uuid1
15import srp
16from requests import Session
18from icloudpy.exceptions import (
19 ICloudPy2SARequiredException,
20 ICloudPyAPIResponseException,
21 ICloudPyFailedLoginException,
22 ICloudPyServiceNotActivatedException,
23)
24from icloudpy.services import (
25 AccountService,
26 CalendarService,
27 ContactsService,
28 DriveService,
29 FindMyiPhoneServiceManager,
30 PhotosService,
31 RemindersService,
32)
33from icloudpy.utils import get_password_from_keyring
35LOGGER = logging.getLogger(__name__)
37HEADER_DATA = {
38 "X-Apple-ID-Account-Country": "account_country",
39 "X-Apple-ID-Session-Id": "session_id",
40 "X-Apple-Session-Token": "session_token",
41 "X-Apple-TwoSV-Trust-Token": "trust_token",
42 "scnt": "scnt",
43}
46class ICloudPyPasswordFilter(logging.Filter):
47 """Password log hider."""
49 def __init__(self, password):
50 super().__init__(password)
52 def filter(self, record):
53 message = record.getMessage()
54 if self.name in message:
55 record.msg = message.replace(self.name, "*" * 8)
56 record.args = []
58 return True
61class ICloudPySession(Session):
62 """iCloud session."""
64 def __init__(self, service):
65 self.service = service
66 Session.__init__(self)
68 def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
69 # Charge logging to the right service endpoint
70 callee = inspect.stack()[2]
71 module = inspect.getmodule(callee[0])
72 request_logger = logging.getLogger(module.__name__).getChild("http")
73 if self.service.password_filter not in request_logger.filters:
74 request_logger.addFilter(self.service.password_filter)
76 request_logger.debug(f"{method} {url} {kwargs.get('data', '')}")
78 has_retried = kwargs.get("retried")
79 kwargs.pop("retried", None)
80 response = super().request(method, url, **kwargs)
82 content_type = response.headers.get("Content-Type", "").split(";")[0]
83 json_mimetypes = ["application/json", "text/json"]
85 for header, value in HEADER_DATA.items():
86 if response.headers.get(header):
87 session_arg = value
88 self.service.session_data.update(
89 {session_arg: response.headers.get(header)},
90 )
92 # Save session_data to file
93 with open(self.service.session_path, "w", encoding="utf-8") as outfile:
94 json.dump(self.service.session_data, outfile)
95 LOGGER.debug("Saved session data to file")
97 # Save cookies to file
98 self.cookies.save(ignore_discard=True, ignore_expires=True)
99 LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
101 if not response.ok and (content_type not in json_mimetypes or response.status_code in [421, 450, 500]):
102 try:
103 # pylint: disable=W0212
104 fmip_url = self.service._get_webservice_url("findme")
105 if has_retried is None and response.status_code == 450 and fmip_url in url:
106 # Handle re-authentication for Find My iPhone
107 LOGGER.debug("Re-authenticating Find My iPhone service")
108 try:
109 self.service.authenticate(True, "find")
110 except ICloudPyAPIResponseException:
111 LOGGER.debug("Re-authentication failed")
112 kwargs["retried"] = True
113 return self.request(method, url, **kwargs)
114 except Exception:
115 pass
117 if has_retried is None and response.status_code in [421, 450, 500]:
118 api_error = ICloudPyAPIResponseException(
119 response.reason,
120 response.status_code,
121 retry=True,
122 )
123 request_logger.debug(api_error)
124 kwargs["retried"] = True
125 return self.request(method, url, **kwargs)
127 self._raise_error(response.status_code, response.reason)
129 if content_type not in json_mimetypes:
130 return response
132 try:
133 data = response.json()
134 except: # noqa: E722
135 request_logger.warning("Failed to parse response with JSON mimetype")
136 return response
138 request_logger.debug(data)
140 if isinstance(data, dict):
141 reason = data.get("errorMessage")
142 reason = reason or data.get("reason")
143 reason = reason or data.get("errorReason")
144 if not reason and isinstance(data.get("error"), str):
145 reason = data.get("error")
146 if not reason and data.get("error"):
147 reason = "Unknown reason"
149 code = data.get("errorCode")
150 if not code and data.get("serverErrorCode"):
151 code = data.get("serverErrorCode")
153 if reason:
154 self._raise_error(code, reason)
156 return response
158 def _raise_error(self, code, reason):
159 if self.service.requires_2sa and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie":
160 raise ICloudPy2SARequiredException(self.service.user["apple_id"])
161 if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"):
162 reason = reason + ". Please log into https://icloud.com/ to manually finish setting up your iCloud service"
163 api_error = ICloudPyServiceNotActivatedException(reason, code)
164 LOGGER.error(api_error)
166 raise api_error
167 if code == "ACCESS_DENIED":
168 reason = (
169 reason + ". Please wait a few minutes then try again."
170 "The remote servers might be trying to throttle requests."
171 )
172 if code in [421, 450, 500]:
173 reason = "Authentication required for Account."
175 api_error = ICloudPyAPIResponseException(reason, code)
176 LOGGER.error(api_error)
177 raise api_error
179 # Public method to resolve linting error
180 def raise_error(self, code, reason):
181 return self._raise_error(code=code, reason=reason)
184class ICloudPyService:
185 """
186 A base authentication class for the iCloud service. Handles the
187 authentication required to access iCloud services.
189 Usage:
190 from src import ICloudPyService
191 icloudpy = ICloudPyService('username@apple.com', 'password')
192 icloudpy.iphone.location()
193 """
195 def __init__(
196 self,
197 apple_id,
198 password=None,
199 cookie_directory=None,
200 verify=True,
201 client_id=None,
202 with_family=True,
203 auth_endpoint="https://idmsa.apple.com/appleauth/auth",
204 # For China, use "https://www.icloud.com.cn"
205 home_endpoint="https://www.icloud.com",
206 # For China, use "https://setup.icloud.com.cn/setup/ws/1"
207 setup_endpoint="https://setup.icloud.com/setup/ws/1",
208 ):
209 if password is None:
210 password = get_password_from_keyring(apple_id)
212 self.user = {"accountName": apple_id, "password": password}
213 self.data = {}
214 self.params = {}
215 self.client_id = client_id or (f"auth-{str(uuid1()).lower()}")
216 self.with_family = with_family
217 self.auth_endpoint = auth_endpoint
218 self.home_endpoint = home_endpoint
219 self.setup_endpoint = setup_endpoint
221 self.password_filter = ICloudPyPasswordFilter(password)
222 LOGGER.addFilter(self.password_filter)
224 if cookie_directory:
225 self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
226 if not path.exists(self._cookie_directory):
227 mkdir(self._cookie_directory, 0o700)
228 else:
229 topdir = path.join(gettempdir(), "icloudpy")
230 self._cookie_directory = path.join(topdir, getpass.getuser())
231 if not path.exists(topdir):
232 mkdir(topdir, 0o777)
233 if not path.exists(self._cookie_directory):
234 mkdir(self._cookie_directory, 0o700)
236 LOGGER.debug("Using session file %s", self.session_path)
238 self.session_data = {}
239 try:
240 with open(self.session_path, encoding="utf-8") as session_f:
241 self.session_data = json.load(session_f)
242 except: # noqa: E722
243 LOGGER.info("Session file does not exist")
244 if self.session_data.get("client_id"):
245 self.client_id = self.session_data.get("client_id")
246 self.params["clientId"] = self.client_id
247 else:
248 self.session_data.update({"client_id": self.client_id})
249 self.params["clientId"] = self.client_id
251 self.session = ICloudPySession(self)
252 self.session.verify = verify
253 self.session.headers.update(
254 {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"},
255 )
257 cookiejar_path = self.cookiejar_path
258 self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
259 if path.exists(cookiejar_path):
260 try:
261 self.session.cookies.load(ignore_discard=True, ignore_expires=True)
262 LOGGER.debug("Read cookies from %s", cookiejar_path)
263 except: # noqa: E722
264 # Most likely a pickled cookiejar from earlier versions.
265 # The cookiejar will get replaced with a valid one after
266 # successful authentication.
267 LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
269 self.authenticate()
271 self._drive = None
272 self._photos = None
274 def authenticate(self, force_refresh=False, service=None):
275 """
276 Handles authentication, and persists cookies so that
277 subsequent logins will not cause additional e-mails from Apple.
278 """
280 login_successful = False
281 if self.session_data.get("session_token") and not force_refresh:
282 LOGGER.debug("Checking session token validity")
283 try:
284 self.data = self._validate_token()
285 login_successful = True
286 except ICloudPyAPIResponseException:
287 LOGGER.debug("Invalid authentication token, will log in from scratch.")
289 if not login_successful and service is not None:
290 app = self.data["apps"][service]
291 if "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"] is True:
292 LOGGER.debug(
293 "Authenticating as %s for %s",
294 self.user["accountName"],
295 service,
296 )
298 try:
299 self._authenticate_with_credentials_service(service)
300 login_successful = True
301 except Exception as error:
302 LOGGER.debug(
303 "Could not log into service. Attempting brand new login. %s",
304 str(error),
305 )
307 if not login_successful:
308 LOGGER.debug("Authenticating as %s", self.user["accountName"])
310 headers = self._get_auth_headers()
312 if self.session_data.get("scnt"):
313 headers["scnt"] = self.session_data.get("scnt")
315 if self.session_data.get("session_id"):
316 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
318 class SrpPassword:
319 def __init__(self, password: str):
320 self.password = password
322 def set_encrypt_info(
323 self,
324 salt: bytes,
325 iterations: int,
326 key_length: int,
327 ):
328 self.salt = salt
329 self.iterations = iterations
330 self.key_length = key_length
332 def encode(self):
333 password_hash = hashlib.sha256(
334 self.password.encode("utf-8"),
335 ).digest()
336 return hashlib.pbkdf2_hmac(
337 "sha256",
338 password_hash,
339 salt,
340 iterations,
341 key_length,
342 )
344 srp_password = SrpPassword(self.user["password"])
345 srp.rfc5054_enable()
346 srp.no_username_in_x()
347 usr = srp.User(
348 self.user["accountName"],
349 srp_password,
350 hash_alg=srp.SHA256,
351 ng_type=srp.NG_2048,
352 )
354 uname, a_bytes = usr.start_authentication()
356 data = {
357 "a": base64.b64encode(a_bytes).decode(),
358 "accountName": uname,
359 "protocols": ["s2k", "s2k_fo"],
360 }
362 try:
363 response = self.session.post(
364 f"{self.auth_endpoint}/signin/init",
365 data=json.dumps(data),
366 headers=headers,
367 )
368 response.raise_for_status()
369 except ICloudPyAPIResponseException as error:
370 msg = "Failed to initiate srp authentication."
371 raise ICloudPyFailedLoginException(msg, error) from error
373 body = response.json()
375 salt = base64.b64decode(body["salt"])
376 b = base64.b64decode(body["b"])
377 c = body["c"]
378 iterations = body["iteration"]
379 key_length = 32
380 srp_password.set_encrypt_info(salt, iterations, key_length)
382 m1 = usr.process_challenge(salt, b)
383 m2 = usr.H_AMK
385 data = {
386 "accountName": uname,
387 "c": c,
388 "m1": base64.b64encode(m1).decode(),
389 "m2": base64.b64encode(m2).decode(),
390 "rememberMe": True,
391 "trustTokens": [],
392 }
394 if self.session_data.get("trust_token"):
395 data["trustTokens"] = [self.session_data.get("trust_token")]
397 try:
398 self.session.post(
399 f"{self.auth_endpoint}/signin/complete",
400 params={"isRememberMeEnabled": "true"},
401 data=json.dumps(data),
402 headers=headers,
403 )
404 except ICloudPyAPIResponseException as error:
405 msg = "Invalid email/password combination."
406 raise ICloudPyFailedLoginException(msg, error) from error
408 self._authenticate_with_token()
410 self._webservices = self.data["webservices"]
412 LOGGER.debug("Authentication completed successfully")
414 def _authenticate_with_token(self):
415 """Authenticate using session token."""
416 data = {
417 "accountCountryCode": self.session_data.get("account_country"),
418 "dsWebAuthToken": self.session_data.get("session_token"),
419 "extended_login": True,
420 "trustToken": self.session_data.get("trust_token", ""),
421 }
423 try:
424 req = self.session.post(
425 f"{self.setup_endpoint}/accountLogin",
426 data=json.dumps(data),
427 )
428 self.data = req.json()
429 except ICloudPyAPIResponseException as error:
430 msg = "Invalid authentication token."
431 raise ICloudPyFailedLoginException(msg, error) from error
433 def _authenticate_with_credentials_service(self, service):
434 """Authenticate to a specific service using credentials."""
435 data = {
436 "appName": service,
437 "apple_id": self.user["accountName"],
438 "password": self.user["password"],
439 }
441 try:
442 self.session.post(
443 f"{self.setup_endpoint}/accountLogin",
444 data=json.dumps(data),
445 )
447 self.data = self._validate_token()
448 except ICloudPyAPIResponseException as error:
449 msg = "Invalid email/password combination."
450 raise ICloudPyFailedLoginException(msg, error) from error
452 def _validate_token(self):
453 """Checks if the current access token is still valid."""
454 LOGGER.debug("Checking session token validity")
455 try:
456 req = self.session.post(f"{self.setup_endpoint}/validate", data="null")
457 LOGGER.debug("Session token is still valid")
458 return req.json()
459 except ICloudPyAPIResponseException as err:
460 LOGGER.debug("Invalid authentication token")
461 raise err
463 def _get_auth_headers(self, overrides=None):
464 headers = {
465 "Accept": "application/json, text/javascript",
466 "Content-Type": "application/json",
467 "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
468 "X-Apple-OAuth-Client-Type": "firstPartyAuth",
469 "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
470 "X-Apple-OAuth-Require-Grant-Code": "true",
471 "X-Apple-OAuth-Response-Mode": "web_message",
472 "X-Apple-OAuth-Response-Type": "code",
473 "X-Apple-OAuth-State": self.client_id,
474 "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
475 }
476 if overrides:
477 headers.update(overrides)
478 return headers
480 @property
481 def cookiejar_path(self):
482 """Get path for cookiejar file."""
483 return path.join(
484 self._cookie_directory,
485 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
486 )
488 @property
489 def session_path(self):
490 """Get path for session data file."""
491 return path.join(
492 self._cookie_directory,
493 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) + ".session",
494 )
496 @property
497 def requires_2sa(self):
498 """Returns True if two-step authentication is required."""
499 return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
500 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
501 )
503 @property
504 def requires_2fa(self):
505 """Returns True if two-factor authentication is required."""
506 return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
507 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
508 )
510 @property
511 def is_trusted_session(self):
512 """Returns True if the session is trusted."""
513 return self.data.get("hsaTrustedBrowser", False)
515 @property
516 def trusted_devices(self):
517 """Returns devices trusted for two-step authentication."""
518 request = self.session.get(
519 f"{self.setup_endpoint}/listDevices",
520 params=self.params,
521 )
522 return request.json().get("devices")
524 def send_verification_code(self, device):
525 """Requests that a verification code is sent to the given device."""
526 data = json.dumps(device)
527 request = self.session.post(
528 f"{self.setup_endpoint}/sendVerificationCode",
529 params=self.params,
530 data=data,
531 )
532 return request.json().get("success", False)
534 def validate_verification_code(self, device, code):
535 """Verifies a verification code received on a trusted device."""
536 device.update({"verificationCode": code, "trustBrowser": True})
537 data = json.dumps(device)
539 try:
540 self.session.post(
541 f"{self.setup_endpoint}/validateVerificationCode",
542 params=self.params,
543 data=data,
544 )
545 except ICloudPyAPIResponseException as error:
546 if error.code == -21669:
547 # Wrong verification code
548 return False
549 raise
551 self.trust_session()
553 return not self.requires_2sa
555 def validate_2fa_code(self, code):
556 """Verifies a verification code received via Apple's 2FA system (HSA2)."""
557 data = {"securityCode": {"code": code}}
559 headers = self._get_auth_headers({"Accept": "application/json"})
561 if self.session_data.get("scnt"):
562 headers["scnt"] = self.session_data.get("scnt")
564 if self.session_data.get("session_id"):
565 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
567 try:
568 self.session.post(
569 f"{self.auth_endpoint}/verify/trusteddevice/securitycode",
570 data=json.dumps(data),
571 headers=headers,
572 )
573 except ICloudPyAPIResponseException as error:
574 if error.code == -21669:
575 # Wrong verification code
576 LOGGER.error("Code verification failed.")
577 return False
578 raise
580 LOGGER.debug("Code verification successful.")
582 self.trust_session()
583 return not self.requires_2sa
585 def trust_session(self):
586 """Request session trust to avoid user log in going forward."""
587 headers = self._get_auth_headers()
589 if self.session_data.get("scnt"):
590 headers["scnt"] = self.session_data.get("scnt")
592 if self.session_data.get("session_id"):
593 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
595 try:
596 self.session.get(
597 f"{self.auth_endpoint}/2sv/trust",
598 headers=headers,
599 )
600 self._authenticate_with_token()
601 return True
602 except ICloudPyAPIResponseException:
603 LOGGER.error("Session trust failed.")
604 return False
606 def _get_webservice_url(self, ws_key):
607 """Get webservice URL, raise an exception if not exists."""
608 if self._webservices.get(ws_key) is None:
609 raise ICloudPyServiceNotActivatedException(
610 "Webservice not available",
611 ws_key,
612 )
613 return self._webservices[ws_key]["url"]
615 @property
616 def devices(self):
617 """Returns all devices."""
618 service_root = self._get_webservice_url("findme")
619 return FindMyiPhoneServiceManager(
620 service_root,
621 self.session,
622 self.params,
623 self.with_family,
624 )
626 @property
627 def iphone(self):
628 """Returns the iPhone."""
629 return self.devices[0]
631 @property
632 def account(self):
633 """Gets the 'Account' service."""
634 service_root = self._get_webservice_url("account")
635 return AccountService(service_root, self.session, self.params)
637 @property
638 def photos(self):
639 """Gets the 'Photo' service."""
640 if not self._photos:
641 service_root = self._get_webservice_url("ckdatabasews")
642 self._photos = PhotosService(service_root, self.session, self.params)
643 return self._photos
645 @property
646 def calendar(self):
647 """Gets the 'Calendar' service."""
648 service_root = self._get_webservice_url("calendar")
649 return CalendarService(service_root, self.session, self.params)
651 @property
652 def contacts(self):
653 """Gets the 'Contacts' service."""
654 service_root = self._get_webservice_url("contacts")
655 return ContactsService(service_root, self.session, self.params)
657 @property
658 def reminders(self):
659 """Gets the 'Reminders' service."""
660 service_root = self._get_webservice_url("reminders")
661 return RemindersService(service_root, self.session, self.params)
663 @property
664 def drive(self):
665 """Gets the 'Drive' service."""
666 if not self._drive:
667 self._drive = DriveService(
668 service_root=self._get_webservice_url("drivews"),
669 document_root=self._get_webservice_url("docws"),
670 session=self.session,
671 params=self.params,
672 )
673 return self._drive
675 def __unicode__(self):
676 return f"iCloud API: {self.user.get('accountName')}"
678 def __str__(self):
679 return self.__unicode__()
681 def __repr__(self):
682 return f"<{str(self)}>"