Coverage for icloudpy/base.py: 55%
376 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-30 19:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-30 19:31 +0000
1"""Library base file."""
3import base64
4import getpass
5import hashlib
6import http.cookiejar as cookielib
7import inspect
8import json
9import logging
10from os import mkdir, path
11from re import match
12from tempfile import gettempdir
13from uuid import uuid1
15import srp
16from requests import Session
17from six import PY2
19from icloudpy.exceptions import (
20 ICloudPy2SARequiredException,
21 ICloudPyAPIResponseException,
22 ICloudPyFailedLoginException,
23 ICloudPyServiceNotActivatedException,
24)
25from icloudpy.services import (
26 AccountService,
27 CalendarService,
28 ContactsService,
29 DriveService,
30 FindMyiPhoneServiceManager,
31 PhotosService,
32 RemindersService,
33 UbiquityService,
34)
35from icloudpy.utils import get_password_from_keyring
37LOGGER = logging.getLogger(__name__)
39HEADER_DATA = {
40 "X-Apple-ID-Account-Country": "account_country",
41 "X-Apple-ID-Session-Id": "session_id",
42 "X-Apple-Session-Token": "session_token",
43 "X-Apple-TwoSV-Trust-Token": "trust_token",
44 "scnt": "scnt",
45}
48class ICloudPyPasswordFilter(logging.Filter):
49 """Password log hider."""
51 def __init__(self, password):
52 super().__init__(password)
54 def filter(self, record):
55 message = record.getMessage()
56 if self.name in message:
57 record.msg = message.replace(self.name, "*" * 8)
58 record.args = []
60 return True
63class ICloudPySession(Session):
64 """iCloud session."""
66 def __init__(self, service):
67 self.service = service
68 Session.__init__(self)
70 def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
71 # Charge logging to the right service endpoint
72 callee = inspect.stack()[2]
73 module = inspect.getmodule(callee[0])
74 request_logger = logging.getLogger(module.__name__).getChild("http")
75 if self.service.password_filter not in request_logger.filters:
76 request_logger.addFilter(self.service.password_filter)
78 request_logger.debug(f"{method} {url} {kwargs.get('data', '')}")
80 has_retried = kwargs.get("retried")
81 kwargs.pop("retried", None)
82 response = super().request(method, url, **kwargs)
84 content_type = response.headers.get("Content-Type", "").split(";")[0]
85 json_mimetypes = ["application/json", "text/json"]
87 for header, value in HEADER_DATA.items():
88 if response.headers.get(header):
89 session_arg = value
90 self.service.session_data.update(
91 {session_arg: response.headers.get(header)},
92 )
94 # Save session_data to file
95 with open(self.service.session_path, "w", encoding="utf-8") as outfile:
96 json.dump(self.service.session_data, outfile)
97 LOGGER.debug("Saved session data to file")
99 # Save cookies to file
100 self.cookies.save(ignore_discard=True, ignore_expires=True)
101 LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
103 if not response.ok and (content_type not in json_mimetypes or response.status_code in [421, 450, 500]):
104 try:
105 # pylint: disable=W0212
106 fmip_url = self.service._get_webservice_url("findme")
107 if has_retried is None and response.status_code == 450 and fmip_url in url:
108 # Handle re-authentication for Find My iPhone
109 LOGGER.debug("Re-authenticating Find My iPhone service")
110 try:
111 self.service.authenticate(True, "find")
112 except ICloudPyAPIResponseException:
113 LOGGER.debug("Re-authentication failed")
114 kwargs["retried"] = True
115 return self.request(method, url, **kwargs)
116 except Exception:
117 pass
119 if has_retried is None and response.status_code in [421, 450, 500]:
120 api_error = ICloudPyAPIResponseException(
121 response.reason,
122 response.status_code,
123 retry=True,
124 )
125 request_logger.debug(api_error)
126 kwargs["retried"] = True
127 return self.request(method, url, **kwargs)
129 self._raise_error(response.status_code, response.reason)
131 if content_type not in json_mimetypes:
132 return response
134 try:
135 data = response.json()
136 except: # noqa: E722
137 request_logger.warning("Failed to parse response with JSON mimetype")
138 return response
140 request_logger.debug(data)
142 if isinstance(data, dict):
143 reason = data.get("errorMessage")
144 reason = reason or data.get("reason")
145 reason = reason or data.get("errorReason")
146 if not reason and isinstance(data.get("error"), str):
147 reason = data.get("error")
148 if not reason and data.get("error"):
149 reason = "Unknown reason"
151 code = data.get("errorCode")
152 if not code and data.get("serverErrorCode"):
153 code = data.get("serverErrorCode")
155 if reason:
156 self._raise_error(code, reason)
158 return response
160 def _raise_error(self, code, reason):
161 if self.service.requires_2sa and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie":
162 raise ICloudPy2SARequiredException(self.service.user["apple_id"])
163 if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"):
164 reason = (
165 reason + ". Please log into https://icloud.com/ to manually " "finish setting up your iCloud service"
166 )
167 api_error = ICloudPyServiceNotActivatedException(reason, code)
168 LOGGER.error(api_error)
170 raise api_error
171 if code == "ACCESS_DENIED":
172 reason = (
173 reason + ". Please wait a few minutes then try again."
174 "The remote servers might be trying to throttle requests."
175 )
176 if code in [421, 450, 500]:
177 reason = "Authentication required for Account."
179 api_error = ICloudPyAPIResponseException(reason, code)
180 LOGGER.error(api_error)
181 raise api_error
183 # Public method to resolve linting error
184 def raise_error(self, code, reason):
185 return self._raise_error(code=code, reason=reason)
188class ICloudPyService:
189 """
190 A base authentication class for the iCloud service. Handles the
191 authentication required to access iCloud services.
193 Usage:
194 from src import ICloudPyService
195 icloudpy = ICloudPyService('username@apple.com', 'password')
196 icloudpy.iphone.location()
197 """
199 def __init__(
200 self,
201 apple_id,
202 password=None,
203 cookie_directory=None,
204 verify=True,
205 client_id=None,
206 with_family=True,
207 auth_endpoint="https://idmsa.apple.com/appleauth/auth",
208 # For China, use "https://www.icloud.com.cn"
209 home_endpoint="https://www.icloud.com",
210 # For China, use "https://setup.icloud.com.cn/setup/ws/1"
211 setup_endpoint="https://setup.icloud.com/setup/ws/1",
212 ):
213 if password is None:
214 password = get_password_from_keyring(apple_id)
216 self.user = {"accountName": apple_id, "password": password}
217 self.data = {}
218 self.params = {}
219 self.client_id = client_id or (f"auth-{str(uuid1()).lower()}")
220 self.with_family = with_family
221 self.auth_endpoint = auth_endpoint
222 self.home_endpoint = home_endpoint
223 self.setup_endpoint = setup_endpoint
225 self.password_filter = ICloudPyPasswordFilter(password)
226 LOGGER.addFilter(self.password_filter)
228 if cookie_directory:
229 self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
230 if not path.exists(self._cookie_directory):
231 mkdir(self._cookie_directory, 0o700)
232 else:
233 topdir = path.join(gettempdir(), "icloudpy")
234 self._cookie_directory = path.join(topdir, getpass.getuser())
235 if not path.exists(topdir):
236 mkdir(topdir, 0o777)
237 if not path.exists(self._cookie_directory):
238 mkdir(self._cookie_directory, 0o700)
240 LOGGER.debug("Using session file %s", self.session_path)
242 self.session_data = {}
243 try:
244 with open(self.session_path, encoding="utf-8") as session_f:
245 self.session_data = json.load(session_f)
246 except: # noqa: E722
247 LOGGER.info("Session file does not exist")
248 if self.session_data.get("client_id"):
249 self.client_id = self.session_data.get("client_id")
250 self.params["clientId"] = self.client_id
251 else:
252 self.session_data.update({"client_id": self.client_id})
253 self.params["clientId"] = self.client_id
255 self.session = ICloudPySession(self)
256 self.session.verify = verify
257 self.session.headers.update(
258 {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"},
259 )
261 cookiejar_path = self.cookiejar_path
262 self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
263 if path.exists(cookiejar_path):
264 try:
265 self.session.cookies.load(ignore_discard=True, ignore_expires=True)
266 LOGGER.debug("Read cookies from %s", cookiejar_path)
267 except: # noqa: E722
268 # Most likely a pickled cookiejar from earlier versions.
269 # The cookiejar will get replaced with a valid one after
270 # successful authentication.
271 LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
273 self.authenticate()
275 self._drive = None
276 self._files = None
277 self._photos = None
279 def authenticate(self, force_refresh=False, service=None):
280 """
281 Handles authentication, and persists cookies so that
282 subsequent logins will not cause additional e-mails from Apple.
283 """
285 login_successful = False
286 if self.session_data.get("session_token") and not force_refresh:
287 LOGGER.debug("Checking session token validity")
288 try:
289 self.data = self._validate_token()
290 login_successful = True
291 except ICloudPyAPIResponseException:
292 LOGGER.debug("Invalid authentication token, will log in from scratch.")
294 if not login_successful and service is not None:
295 app = self.data["apps"][service]
296 if "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"] is True:
297 LOGGER.debug(
298 "Authenticating as %s for %s",
299 self.user["accountName"],
300 service,
301 )
303 try:
304 self._authenticate_with_credentials_service(service)
305 login_successful = True
306 except Exception as error:
307 LOGGER.debug(
308 "Could not log into service. Attempting brand new login. %s",
309 str(error),
310 )
312 if not login_successful:
313 LOGGER.debug("Authenticating as %s", self.user["accountName"])
315 headers = self._get_auth_headers()
317 if self.session_data.get("scnt"):
318 headers["scnt"] = self.session_data.get("scnt")
320 if self.session_data.get("session_id"):
321 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
323 class SrpPassword:
324 def __init__(self, password: str):
325 self.password = password
327 def set_encrypt_info(
328 self,
329 salt: bytes,
330 iterations: int,
331 key_length: int,
332 ):
333 self.salt = salt
334 self.iterations = iterations
335 self.key_length = key_length
337 def encode(self):
338 password_hash = hashlib.sha256(
339 self.password.encode("utf-8"),
340 ).digest()
341 return hashlib.pbkdf2_hmac(
342 "sha256",
343 password_hash,
344 salt,
345 iterations,
346 key_length,
347 )
349 srp_password = SrpPassword(self.user["password"])
350 srp.rfc5054_enable()
351 srp.no_username_in_x()
352 usr = srp.User(
353 self.user["accountName"],
354 srp_password,
355 hash_alg=srp.SHA256,
356 ng_type=srp.NG_2048,
357 )
359 uname, a_bytes = usr.start_authentication()
361 data = {
362 "a": base64.b64encode(a_bytes).decode(),
363 "accountName": uname,
364 "protocols": ["s2k", "s2k_fo"],
365 }
367 try:
368 response = self.session.post(
369 f"{self.auth_endpoint}/signin/init",
370 data=json.dumps(data),
371 headers=headers,
372 )
373 response.raise_for_status()
374 except ICloudPyAPIResponseException as error:
375 msg = "Failed to initiate srp authentication."
376 raise ICloudPyFailedLoginException(msg, error) from error
378 body = response.json()
380 salt = base64.b64decode(body["salt"])
381 b = base64.b64decode(body["b"])
382 c = body["c"]
383 iterations = body["iteration"]
384 key_length = 32
385 srp_password.set_encrypt_info(salt, iterations, key_length)
387 m1 = usr.process_challenge(salt, b)
388 m2 = usr.H_AMK
390 data = {
391 "accountName": uname,
392 "c": c,
393 "m1": base64.b64encode(m1).decode(),
394 "m2": base64.b64encode(m2).decode(),
395 "rememberMe": True,
396 "trustTokens": [],
397 }
399 if self.session_data.get("trust_token"):
400 data["trustTokens"] = [self.session_data.get("trust_token")]
402 try:
403 self.session.post(
404 f"{self.auth_endpoint}/signin/complete",
405 params={"isRememberMeEnabled": "true"},
406 data=json.dumps(data),
407 headers=headers,
408 )
409 except ICloudPyAPIResponseException as error:
410 msg = "Invalid email/password combination."
411 raise ICloudPyFailedLoginException(msg, error) from error
413 self._authenticate_with_token()
415 self._webservices = self.data["webservices"]
417 LOGGER.debug("Authentication completed successfully")
419 def _authenticate_with_token(self):
420 """Authenticate using session token."""
421 data = {
422 "accountCountryCode": self.session_data.get("account_country"),
423 "dsWebAuthToken": self.session_data.get("session_token"),
424 "extended_login": True,
425 "trustToken": self.session_data.get("trust_token", ""),
426 }
428 try:
429 req = self.session.post(
430 f"{self.setup_endpoint}/accountLogin",
431 data=json.dumps(data),
432 )
433 self.data = req.json()
434 except ICloudPyAPIResponseException as error:
435 msg = "Invalid authentication token."
436 raise ICloudPyFailedLoginException(msg, error) from error
438 def _authenticate_with_credentials_service(self, service):
439 """Authenticate to a specific service using credentials."""
440 data = {
441 "appName": service,
442 "apple_id": self.user["accountName"],
443 "password": self.user["password"],
444 }
446 try:
447 self.session.post(
448 f"{self.setup_endpoint}/accountLogin",
449 data=json.dumps(data),
450 )
452 self.data = self._validate_token()
453 except ICloudPyAPIResponseException as error:
454 msg = "Invalid email/password combination."
455 raise ICloudPyFailedLoginException(msg, error) from error
457 def _validate_token(self):
458 """Checks if the current access token is still valid."""
459 LOGGER.debug("Checking session token validity")
460 try:
461 req = self.session.post(f"{self.setup_endpoint}/validate", data="null")
462 LOGGER.debug("Session token is still valid")
463 return req.json()
464 except ICloudPyAPIResponseException as err:
465 LOGGER.debug("Invalid authentication token")
466 raise err
468 def _get_auth_headers(self, overrides=None):
469 headers = {
470 "Accept": "application/json, text/javascript",
471 "Content-Type": "application/json",
472 "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
473 "X-Apple-OAuth-Client-Type": "firstPartyAuth",
474 "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
475 "X-Apple-OAuth-Require-Grant-Code": "true",
476 "X-Apple-OAuth-Response-Mode": "web_message",
477 "X-Apple-OAuth-Response-Type": "code",
478 "X-Apple-OAuth-State": self.client_id,
479 "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
480 }
481 if overrides:
482 headers.update(overrides)
483 return headers
485 @property
486 def cookiejar_path(self):
487 """Get path for cookiejar file."""
488 return path.join(
489 self._cookie_directory,
490 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
491 )
493 @property
494 def session_path(self):
495 """Get path for session data file."""
496 return path.join(
497 self._cookie_directory,
498 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) + ".session",
499 )
501 @property
502 def requires_2sa(self):
503 """Returns True if two-step authentication is required."""
504 return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
505 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
506 )
508 @property
509 def requires_2fa(self):
510 """Returns True if two-factor authentication is required."""
511 return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
512 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
513 )
515 @property
516 def is_trusted_session(self):
517 """Returns True if the session is trusted."""
518 return self.data.get("hsaTrustedBrowser", False)
520 @property
521 def trusted_devices(self):
522 """Returns devices trusted for two-step authentication."""
523 request = self.session.get(
524 f"{self.setup_endpoint}/listDevices",
525 params=self.params,
526 )
527 return request.json().get("devices")
529 def send_verification_code(self, device):
530 """Requests that a verification code is sent to the given device."""
531 data = json.dumps(device)
532 request = self.session.post(
533 f"{self.setup_endpoint}/sendVerificationCode",
534 params=self.params,
535 data=data,
536 )
537 return request.json().get("success", False)
539 def validate_verification_code(self, device, code):
540 """Verifies a verification code received on a trusted device."""
541 device.update({"verificationCode": code, "trustBrowser": True})
542 data = json.dumps(device)
544 try:
545 self.session.post(
546 f"{self.setup_endpoint}/validateVerificationCode",
547 params=self.params,
548 data=data,
549 )
550 except ICloudPyAPIResponseException as error:
551 if error.code == -21669:
552 # Wrong verification code
553 return False
554 raise
556 self.trust_session()
558 return not self.requires_2sa
560 def validate_2fa_code(self, code):
561 """Verifies a verification code received via Apple's 2FA system (HSA2)."""
562 data = {"securityCode": {"code": code}}
564 headers = self._get_auth_headers({"Accept": "application/json"})
566 if self.session_data.get("scnt"):
567 headers["scnt"] = self.session_data.get("scnt")
569 if self.session_data.get("session_id"):
570 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
572 try:
573 self.session.post(
574 f"{self.auth_endpoint}/verify/trusteddevice/securitycode",
575 data=json.dumps(data),
576 headers=headers,
577 )
578 except ICloudPyAPIResponseException as error:
579 if error.code == -21669:
580 # Wrong verification code
581 LOGGER.error("Code verification failed.")
582 return False
583 raise
585 LOGGER.debug("Code verification successful.")
587 self.trust_session()
588 return not self.requires_2sa
590 def trust_session(self):
591 """Request session trust to avoid user log in going forward."""
592 headers = self._get_auth_headers()
594 if self.session_data.get("scnt"):
595 headers["scnt"] = self.session_data.get("scnt")
597 if self.session_data.get("session_id"):
598 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
600 try:
601 self.session.get(
602 f"{self.auth_endpoint}/2sv/trust",
603 headers=headers,
604 )
605 self._authenticate_with_token()
606 return True
607 except ICloudPyAPIResponseException:
608 LOGGER.error("Session trust failed.")
609 return False
611 def _get_webservice_url(self, ws_key):
612 """Get webservice URL, raise an exception if not exists."""
613 if self._webservices.get(ws_key) is None:
614 raise ICloudPyServiceNotActivatedException(
615 "Webservice not available",
616 ws_key,
617 )
618 return self._webservices[ws_key]["url"]
620 @property
621 def devices(self):
622 """Returns all devices."""
623 service_root = self._get_webservice_url("findme")
624 return FindMyiPhoneServiceManager(
625 service_root,
626 self.session,
627 self.params,
628 self.with_family,
629 )
631 @property
632 def iphone(self):
633 """Returns the iPhone."""
634 return self.devices[0]
636 @property
637 def account(self):
638 """Gets the 'Account' service."""
639 service_root = self._get_webservice_url("account")
640 return AccountService(service_root, self.session, self.params)
642 @property
643 def files(self):
644 """Gets the 'File' service."""
645 if not self._files:
646 service_root = self._get_webservice_url("ubiquity")
647 self._files = UbiquityService(service_root, self.session, self.params)
648 return self._files
650 @property
651 def photos(self):
652 """Gets the 'Photo' service."""
653 if not self._photos:
654 service_root = self._get_webservice_url("ckdatabasews")
655 self._photos = PhotosService(service_root, self.session, self.params)
656 return self._photos
658 @property
659 def calendar(self):
660 """Gets the 'Calendar' service."""
661 service_root = self._get_webservice_url("calendar")
662 return CalendarService(service_root, self.session, self.params)
664 @property
665 def contacts(self):
666 """Gets the 'Contacts' service."""
667 service_root = self._get_webservice_url("contacts")
668 return ContactsService(service_root, self.session, self.params)
670 @property
671 def reminders(self):
672 """Gets the 'Reminders' service."""
673 service_root = self._get_webservice_url("reminders")
674 return RemindersService(service_root, self.session, self.params)
676 @property
677 def drive(self):
678 """Gets the 'Drive' service."""
679 if not self._drive:
680 self._drive = DriveService(
681 service_root=self._get_webservice_url("drivews"),
682 document_root=self._get_webservice_url("docws"),
683 session=self.session,
684 params=self.params,
685 )
686 return self._drive
688 def __unicode__(self):
689 return f"iCloud API: {self.user.get('accountName')}"
691 def __str__(self):
692 as_unicode = self.__unicode__()
693 if PY2:
694 return as_unicode.encode("utf-8", "ignore")
695 return as_unicode
697 def __repr__(self):
698 return f"<{str(self)}>"