Coverage for icloudpy/base.py: 51%
344 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-04-12 14:26 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-04-12 14:26 +0000
1"""Library base file."""
2import getpass
3import http.cookiejar as cookielib
4import inspect
5import json
6import logging
7from os import mkdir, path
8from re import match
9from tempfile import gettempdir
10from uuid import uuid1
12from requests import Session
13from six import PY2
15from icloudpy.exceptions import (
16 ICloudPy2SARequiredException,
17 ICloudPyAPIResponseException,
18 ICloudPyFailedLoginException,
19 ICloudPyServiceNotActivatedException,
20)
21from icloudpy.services import (
22 AccountService,
23 CalendarService,
24 ContactsService,
25 DriveService,
26 FindMyiPhoneServiceManager,
27 PhotosService,
28 RemindersService,
29 UbiquityService,
30)
31from icloudpy.utils import get_password_from_keyring
33LOGGER = logging.getLogger(__name__)
35HEADER_DATA = {
36 "X-Apple-ID-Account-Country": "account_country",
37 "X-Apple-ID-Session-Id": "session_id",
38 "X-Apple-Session-Token": "session_token",
39 "X-Apple-TwoSV-Trust-Token": "trust_token",
40 "scnt": "scnt",
41}
44class ICloudPyPasswordFilter(logging.Filter):
45 """Password log hider."""
47 def __init__(self, password):
48 super().__init__(password)
50 def filter(self, record):
51 message = record.getMessage()
52 if self.name in message:
53 record.msg = message.replace(self.name, "*" * 8)
54 record.args = []
56 return True
59class ICloudPySession(Session):
60 """iCloud session."""
62 def __init__(self, service):
63 self.service = service
64 Session.__init__(self)
66 def request(self, method, url, **kwargs): # pylint: disable=arguments-differ
68 # Charge logging to the right service endpoint
69 callee = inspect.stack()[2]
70 module = inspect.getmodule(callee[0])
71 request_logger = logging.getLogger(module.__name__).getChild("http")
72 if self.service.password_filter not in request_logger.filters:
73 request_logger.addFilter(self.service.password_filter)
75 request_logger.debug(f"{method} {url} {kwargs.get('data', '')}")
77 has_retried = kwargs.get("retried")
78 kwargs.pop("retried", None)
79 response = super().request(method, url, **kwargs)
81 content_type = response.headers.get("Content-Type", "").split(";")[0]
82 json_mimetypes = ["application/json", "text/json"]
84 for header, value in HEADER_DATA.items():
85 if response.headers.get(header):
86 session_arg = value
87 self.service.session_data.update(
88 {session_arg: response.headers.get(header)}
89 )
91 # Save session_data to file
92 with open(self.service.session_path, "w", encoding="utf-8") as outfile:
93 json.dump(self.service.session_data, outfile)
94 LOGGER.debug("Saved session data to file")
96 # Save cookies to file
97 self.cookies.save(ignore_discard=True, ignore_expires=True)
98 LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path)
100 if not response.ok and (
101 content_type not in json_mimetypes
102 or response.status_code in [421, 450, 500]
103 ):
104 try:
105 # pylint: disable=W0212
106 fmip_url = self.service._get_webservice_url("findme")
107 if (
108 has_retried is None
109 and response.status_code == 450
110 and fmip_url in url
111 ):
112 # Handle re-authentication for Find My iPhone
113 LOGGER.debug("Re-authenticating Find My iPhone service")
114 try:
115 self.service.authenticate(True, "find")
116 except ICloudPyAPIResponseException:
117 LOGGER.debug("Re-authentication failed")
118 kwargs["retried"] = True
119 return self.request(method, url, **kwargs)
120 except Exception:
121 pass
123 if has_retried is None and response.status_code in [421, 450, 500]:
124 api_error = ICloudPyAPIResponseException(
125 response.reason, response.status_code, retry=True
126 )
127 request_logger.debug(api_error)
128 kwargs["retried"] = True
129 return self.request(method, url, **kwargs)
131 self._raise_error(response.status_code, response.reason)
133 if content_type not in json_mimetypes:
134 return response
136 try:
137 data = response.json()
138 except: # pylint: disable=bare-except
139 request_logger.warning("Failed to parse response with JSON mimetype")
140 return response
142 request_logger.debug(data)
144 if isinstance(data, dict):
145 reason = data.get("errorMessage")
146 reason = reason or data.get("reason")
147 reason = reason or data.get("errorReason")
148 if not reason and isinstance(data.get("error"), str):
149 reason = data.get("error")
150 if not reason and data.get("error"):
151 reason = "Unknown reason"
153 code = data.get("errorCode")
154 if not code and data.get("serverErrorCode"):
155 code = data.get("serverErrorCode")
157 if reason:
158 self._raise_error(code, reason)
160 return response
162 def _raise_error(self, code, reason):
163 if (
164 self.service.requires_2sa
165 and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie"
166 ):
167 raise ICloudPy2SARequiredException(self.service.user["apple_id"])
168 if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"):
169 reason = (
170 reason + ". Please log into https://icloud.com/ to manually "
171 "finish setting up your iCloud service"
172 )
173 api_error = ICloudPyServiceNotActivatedException(reason, code)
174 LOGGER.error(api_error)
176 raise api_error
177 if code == "ACCESS_DENIED":
178 reason = (
179 reason + ". Please wait a few minutes then try again."
180 "The remote servers might be trying to throttle requests."
181 )
182 if code in [421, 450, 500]:
183 reason = "Authentication required for Account."
185 api_error = ICloudPyAPIResponseException(reason, code)
186 LOGGER.error(api_error)
187 raise api_error
189 # Public method to resolve linting error
190 def raise_error(self, code, reason):
191 return self._raise_error(code=code, reason=reason)
194class ICloudPyService:
195 """
196 A base authentication class for the iCloud service. Handles the
197 authentication required to access iCloud services.
199 Usage:
200 from src import ICloudPyService
201 icloudpy = ICloudPyService('username@apple.com', 'password')
202 icloudpy.iphone.location()
203 """
205 def __init__(
206 self,
207 apple_id,
208 password=None,
209 cookie_directory=None,
210 verify=True,
211 client_id=None,
212 with_family=True,
213 auth_endpoint="https://idmsa.apple.com/appleauth/auth",
214 # For China, use "https://www.icloud.com.cn"
215 home_endpoint="https://www.icloud.com",
216 # For China, use "https://setup.icloud.com.cn/setup/ws/1"
217 setup_endpoint="https://setup.icloud.com/setup/ws/1",
218 ):
219 if password is None:
220 password = get_password_from_keyring(apple_id)
222 self.user = {"accountName": apple_id, "password": password}
223 self.data = {}
224 self.params = {}
225 self.client_id = client_id or (f"auth-{str(uuid1()).lower()}")
226 self.with_family = with_family
227 self.auth_endpoint = auth_endpoint
228 self.home_endpoint = home_endpoint
229 self.setup_endpoint = setup_endpoint
231 self.password_filter = ICloudPyPasswordFilter(password)
232 LOGGER.addFilter(self.password_filter)
234 if cookie_directory:
235 self._cookie_directory = path.expanduser(path.normpath(cookie_directory))
236 if not path.exists(self._cookie_directory):
237 mkdir(self._cookie_directory, 0o700)
238 else:
239 topdir = path.join(gettempdir(), "icloudpy")
240 self._cookie_directory = path.join(topdir, getpass.getuser())
241 if not path.exists(topdir):
242 mkdir(topdir, 0o777)
243 if not path.exists(self._cookie_directory):
244 mkdir(self._cookie_directory, 0o700)
246 LOGGER.debug("Using session file %s", self.session_path)
248 self.session_data = {}
249 try:
250 with open(self.session_path, encoding="utf-8") as session_f:
251 self.session_data = json.load(session_f)
252 except: # pylint: disable=bare-except
253 LOGGER.info("Session file does not exist")
254 if self.session_data.get("client_id"):
255 self.client_id = self.session_data.get("client_id")
256 self.params["clientId"] = self.client_id
257 else:
258 self.session_data.update({"client_id": self.client_id})
259 self.params["clientId"] = self.client_id
261 self.session = ICloudPySession(self)
262 self.session.verify = verify
263 self.session.headers.update(
264 {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"}
265 )
267 cookiejar_path = self.cookiejar_path
268 self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path)
269 if path.exists(cookiejar_path):
270 try:
271 self.session.cookies.load(ignore_discard=True, ignore_expires=True)
272 LOGGER.debug("Read cookies from %s", cookiejar_path)
273 except: # pylint: disable=bare-except
274 # Most likely a pickled cookiejar from earlier versions.
275 # The cookiejar will get replaced with a valid one after
276 # successful authentication.
277 LOGGER.warning("Failed to read cookiejar %s", cookiejar_path)
279 self.authenticate()
281 self._drive = None
282 self._files = None
283 self._photos = None
285 def authenticate(self, force_refresh=False, service=None):
286 """
287 Handles authentication, and persists cookies so that
288 subsequent logins will not cause additional e-mails from Apple.
289 """
291 login_successful = False
292 if self.session_data.get("session_token") and not force_refresh:
293 LOGGER.debug("Checking session token validity")
294 try:
295 self.data = self._validate_token()
296 login_successful = True
297 except ICloudPyAPIResponseException:
298 LOGGER.debug("Invalid authentication token, will log in from scratch.")
300 if not login_successful and service is not None:
301 app = self.data["apps"][service]
302 if (
303 "canLaunchWithOneFactor" in app
304 and app["canLaunchWithOneFactor"] is True
305 ):
306 LOGGER.debug(
307 "Authenticating as %s for %s", self.user["accountName"], service
308 )
310 try:
311 self._authenticate_with_credentials_service(service)
312 login_successful = True
313 except Exception as error:
314 LOGGER.debug(
315 "Could not log into service. Attempting brand new login. %s",
316 str(error),
317 )
319 if not login_successful:
320 LOGGER.debug("Authenticating as %s", self.user["accountName"])
322 data = dict(self.user)
324 data["rememberMe"] = True
325 data["trustTokens"] = []
326 if self.session_data.get("trust_token"):
327 data["trustTokens"] = [self.session_data.get("trust_token")]
329 headers = self._get_auth_headers()
331 if self.session_data.get("scnt"):
332 headers["scnt"] = self.session_data.get("scnt")
334 if self.session_data.get("session_id"):
335 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
337 try:
338 self.session.post(
339 f"{self.auth_endpoint}/signin",
340 params={"isRememberMeEnabled": "true"},
341 data=json.dumps(data),
342 headers=headers,
343 )
344 except ICloudPyAPIResponseException as error:
345 msg = "Invalid email/password combination."
346 raise ICloudPyFailedLoginException(msg, error) from error
348 self._authenticate_with_token()
350 self._webservices = self.data["webservices"]
352 LOGGER.debug("Authentication completed successfully")
354 def _authenticate_with_token(self):
355 """Authenticate using session token."""
356 data = {
357 "accountCountryCode": self.session_data.get("account_country"),
358 "dsWebAuthToken": self.session_data.get("session_token"),
359 "extended_login": True,
360 "trustToken": self.session_data.get("trust_token", ""),
361 }
363 try:
364 req = self.session.post(
365 f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
366 )
367 self.data = req.json()
368 except ICloudPyAPIResponseException as error:
369 msg = "Invalid authentication token."
370 raise ICloudPyFailedLoginException(msg, error) from error
372 def _authenticate_with_credentials_service(self, service):
373 """Authenticate to a specific service using credentials."""
374 data = {
375 "appName": service,
376 "apple_id": self.user["accountName"],
377 "password": self.user["password"],
378 }
380 try:
381 self.session.post(
382 f"{self.setup_endpoint}/accountLogin", data=json.dumps(data)
383 )
385 self.data = self._validate_token()
386 except ICloudPyAPIResponseException as error:
387 msg = "Invalid email/password combination."
388 raise ICloudPyFailedLoginException(msg, error) from error
390 def _validate_token(self):
391 """Checks if the current access token is still valid."""
392 LOGGER.debug("Checking session token validity")
393 try:
394 req = self.session.post(f"{self.setup_endpoint}/validate", data="null")
395 LOGGER.debug("Session token is still valid")
396 return req.json()
397 except ICloudPyAPIResponseException as err:
398 LOGGER.debug("Invalid authentication token")
399 raise err
401 def _get_auth_headers(self, overrides=None):
402 headers = {
403 "Accept": "*/*",
404 "Content-Type": "application/json",
405 "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
406 "X-Apple-OAuth-Client-Type": "firstPartyAuth",
407 "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com",
408 "X-Apple-OAuth-Require-Grant-Code": "true",
409 "X-Apple-OAuth-Response-Mode": "web_message",
410 "X-Apple-OAuth-Response-Type": "code",
411 "X-Apple-OAuth-State": self.client_id,
412 "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d",
413 }
414 if overrides:
415 headers.update(overrides)
416 return headers
418 @property
419 def cookiejar_path(self):
420 """Get path for cookiejar file."""
421 return path.join(
422 self._cookie_directory,
423 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]),
424 )
426 @property
427 def session_path(self):
428 """Get path for session data file."""
429 return path.join(
430 self._cookie_directory,
431 "".join([c for c in self.user.get("accountName") if match(r"\w", c)])
432 + ".session",
433 )
435 @property
436 def requires_2sa(self):
437 """Returns True if two-step authentication is required."""
438 return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and (
439 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
440 )
442 @property
443 def requires_2fa(self):
444 """Returns True if two-factor authentication is required."""
445 return self.data["dsInfo"].get("hsaVersion", 0) == 2 and (
446 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session
447 )
449 @property
450 def is_trusted_session(self):
451 """Returns True if the session is trusted."""
452 return self.data.get("hsaTrustedBrowser", False)
454 @property
455 def trusted_devices(self):
456 """Returns devices trusted for two-step authentication."""
457 request = self.session.get(
458 f"{self.setup_endpoint}/listDevices", params=self.params
459 )
460 return request.json().get("devices")
462 def send_verification_code(self, device):
463 """Requests that a verification code is sent to the given device."""
464 data = json.dumps(device)
465 request = self.session.post(
466 f"{self.setup_endpoint}/sendVerificationCode",
467 params=self.params,
468 data=data,
469 )
470 return request.json().get("success", False)
472 def validate_verification_code(self, device, code):
473 """Verifies a verification code received on a trusted device."""
474 device.update({"verificationCode": code, "trustBrowser": True})
475 data = json.dumps(device)
477 try:
478 self.session.post(
479 f"{self.setup_endpoint}/validateVerificationCode",
480 params=self.params,
481 data=data,
482 )
483 except ICloudPyAPIResponseException as error:
484 if error.code == -21669:
485 # Wrong verification code
486 return False
487 raise
489 self.trust_session()
491 return not self.requires_2sa
493 def validate_2fa_code(self, code):
494 """Verifies a verification code received via Apple's 2FA system (HSA2)."""
495 data = {"securityCode": {"code": code}}
497 headers = self._get_auth_headers({"Accept": "application/json"})
499 if self.session_data.get("scnt"):
500 headers["scnt"] = self.session_data.get("scnt")
502 if self.session_data.get("session_id"):
503 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
505 try:
506 self.session.post(
507 f"{self.auth_endpoint}/verify/trusteddevice/securitycode",
508 data=json.dumps(data),
509 headers=headers,
510 )
511 except ICloudPyAPIResponseException as error:
512 if error.code == -21669:
513 # Wrong verification code
514 LOGGER.error("Code verification failed.")
515 return False
516 raise
518 LOGGER.debug("Code verification successful.")
520 self.trust_session()
521 return not self.requires_2sa
523 def trust_session(self):
524 """Request session trust to avoid user log in going forward."""
525 headers = self._get_auth_headers()
527 if self.session_data.get("scnt"):
528 headers["scnt"] = self.session_data.get("scnt")
530 if self.session_data.get("session_id"):
531 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id")
533 try:
534 self.session.get(
535 f"{self.auth_endpoint}/2sv/trust",
536 headers=headers,
537 )
538 self._authenticate_with_token()
539 return True
540 except ICloudPyAPIResponseException:
541 LOGGER.error("Session trust failed.")
542 return False
544 def _get_webservice_url(self, ws_key):
545 """Get webservice URL, raise an exception if not exists."""
546 if self._webservices.get(ws_key) is None:
547 raise ICloudPyServiceNotActivatedException(
548 "Webservice not available", ws_key
549 )
550 return self._webservices[ws_key]["url"]
552 @property
553 def devices(self):
554 """Returns all devices."""
555 service_root = self._get_webservice_url("findme")
556 return FindMyiPhoneServiceManager(
557 service_root, self.session, self.params, self.with_family
558 )
560 @property
561 def iphone(self):
562 """Returns the iPhone."""
563 return self.devices[0]
565 @property
566 def account(self):
567 """Gets the 'Account' service."""
568 service_root = self._get_webservice_url("account")
569 return AccountService(service_root, self.session, self.params)
571 @property
572 def files(self):
573 """Gets the 'File' service."""
574 if not self._files:
575 service_root = self._get_webservice_url("ubiquity")
576 self._files = UbiquityService(service_root, self.session, self.params)
577 return self._files
579 @property
580 def photos(self):
581 """Gets the 'Photo' service."""
582 if not self._photos:
583 service_root = self._get_webservice_url("ckdatabasews")
584 self._photos = PhotosService(service_root, self.session, self.params)
585 return self._photos
587 @property
588 def calendar(self):
589 """Gets the 'Calendar' service."""
590 service_root = self._get_webservice_url("calendar")
591 return CalendarService(service_root, self.session, self.params)
593 @property
594 def contacts(self):
595 """Gets the 'Contacts' service."""
596 service_root = self._get_webservice_url("contacts")
597 return ContactsService(service_root, self.session, self.params)
599 @property
600 def reminders(self):
601 """Gets the 'Reminders' service."""
602 service_root = self._get_webservice_url("reminders")
603 return RemindersService(service_root, self.session, self.params)
605 @property
606 def drive(self):
607 """Gets the 'Drive' service."""
608 if not self._drive:
609 self._drive = DriveService(
610 service_root=self._get_webservice_url("drivews"),
611 document_root=self._get_webservice_url("docws"),
612 session=self.session,
613 params=self.params,
614 )
615 return self._drive
617 def __unicode__(self):
618 return f"iCloud API: {self.user.get('accountName')}"
620 def __str__(self):
621 as_unicode = self.__unicode__()
622 if PY2:
623 return as_unicode.encode("utf-8", "ignore")
624 return as_unicode
626 def __repr__(self):
627 return f"<{str(self)}>"