Coverage for icloudpy/base.py: 77%

365 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-02 05:49 +0000

1"""Library base file.""" 

2 

3import base64 

4import getpass 

5import hashlib 

6import http.cookiejar as cookielib 

7import inspect 

8import json 

9import logging 

10from os import mkdir, path 

11from re import match 

12from tempfile import gettempdir 

13from uuid import uuid1 

14 

15import srp 

16from requests import Session 

17 

18from icloudpy.exceptions import ( 

19 ICloudPy2SARequiredException, 

20 ICloudPyAPIResponseException, 

21 ICloudPyFailedLoginException, 

22 ICloudPyServiceNotActivatedException, 

23) 

24from icloudpy.services import ( 

25 AccountService, 

26 CalendarService, 

27 ContactsService, 

28 DriveService, 

29 FindMyiPhoneServiceManager, 

30 PhotosService, 

31 RemindersService, 

32) 

33from icloudpy.utils import get_password_from_keyring 

34 

35LOGGER = logging.getLogger(__name__) 

36 

37HEADER_DATA = { 

38 "X-Apple-ID-Account-Country": "account_country", 

39 "X-Apple-ID-Session-Id": "session_id", 

40 "X-Apple-Session-Token": "session_token", 

41 "X-Apple-TwoSV-Trust-Token": "trust_token", 

42 "scnt": "scnt", 

43} 

44 

45 

46class ICloudPyPasswordFilter(logging.Filter): 

47 """Password log hider.""" 

48 

49 def __init__(self, password): 

50 super().__init__(password) 

51 

52 def filter(self, record): 

53 message = record.getMessage() 

54 if self.name in message: 

55 record.msg = message.replace(self.name, "*" * 8) 

56 record.args = [] 

57 

58 return True 

59 

60 

61class ICloudPySession(Session): 

62 """iCloud session.""" 

63 

64 def __init__(self, service): 

65 self.service = service 

66 Session.__init__(self) 

67 

68 def request(self, method, url, **kwargs): # pylint: disable=arguments-differ 

69 # Charge logging to the right service endpoint 

70 callee = inspect.stack()[2] 

71 module = inspect.getmodule(callee[0]) 

72 request_logger = logging.getLogger(module.__name__).getChild("http") 

73 if self.service.password_filter not in request_logger.filters: 

74 request_logger.addFilter(self.service.password_filter) 

75 

76 request_logger.debug(f"{method} {url} {kwargs.get('data', '')}") 

77 

78 has_retried = kwargs.get("retried") 

79 kwargs.pop("retried", None) 

80 response = super().request(method, url, **kwargs) 

81 

82 content_type = response.headers.get("Content-Type", "").split(";")[0] 

83 json_mimetypes = ["application/json", "text/json"] 

84 

85 for header, value in HEADER_DATA.items(): 

86 if response.headers.get(header): 

87 session_arg = value 

88 self.service.session_data.update( 

89 {session_arg: response.headers.get(header)}, 

90 ) 

91 

92 # Save session_data to file 

93 with open(self.service.session_path, "w", encoding="utf-8") as outfile: 

94 json.dump(self.service.session_data, outfile) 

95 LOGGER.debug("Saved session data to file") 

96 

97 # Save cookies to file 

98 self.cookies.save(ignore_discard=True, ignore_expires=True) 

99 LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path) 

100 

101 if not response.ok and (content_type not in json_mimetypes or response.status_code in [421, 450, 500]): 

102 try: 

103 # pylint: disable=W0212 

104 fmip_url = self.service._get_webservice_url("findme") 

105 if has_retried is None and response.status_code == 450 and fmip_url in url: 

106 # Handle re-authentication for Find My iPhone 

107 LOGGER.debug("Re-authenticating Find My iPhone service") 

108 try: 

109 self.service.authenticate(True, "find") 

110 except ICloudPyAPIResponseException: 

111 LOGGER.debug("Re-authentication failed") 

112 kwargs["retried"] = True 

113 return self.request(method, url, **kwargs) 

114 except Exception: 

115 pass 

116 

117 if has_retried is None and response.status_code in [421, 450, 500]: 

118 api_error = ICloudPyAPIResponseException( 

119 response.reason, 

120 response.status_code, 

121 retry=True, 

122 ) 

123 request_logger.debug(api_error) 

124 kwargs["retried"] = True 

125 return self.request(method, url, **kwargs) 

126 

127 self._raise_error(response.status_code, response.reason) 

128 

129 if content_type not in json_mimetypes: 

130 return response 

131 

132 try: 

133 data = response.json() 

134 except: # noqa: E722 

135 request_logger.warning("Failed to parse response with JSON mimetype") 

136 return response 

137 

138 request_logger.debug(data) 

139 

140 if isinstance(data, dict): 

141 reason = data.get("errorMessage") 

142 reason = reason or data.get("reason") 

143 reason = reason or data.get("errorReason") 

144 if not reason and isinstance(data.get("error"), str): 

145 reason = data.get("error") 

146 if not reason and data.get("error"): 

147 reason = "Unknown reason" 

148 

149 code = data.get("errorCode") 

150 if not code and data.get("serverErrorCode"): 

151 code = data.get("serverErrorCode") 

152 

153 if reason: 

154 self._raise_error(code, reason) 

155 

156 return response 

157 

158 def _raise_error(self, code, reason): 

159 if self.service.requires_2sa and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie": 

160 raise ICloudPy2SARequiredException(self.service.user["apple_id"]) 

161 if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"): 

162 reason = reason + ". Please log into https://icloud.com/ to manually finish setting up your iCloud service" 

163 api_error = ICloudPyServiceNotActivatedException(reason, code) 

164 LOGGER.error(api_error) 

165 

166 raise api_error 

167 if code == "ACCESS_DENIED": 

168 reason = ( 

169 reason + ". Please wait a few minutes then try again." 

170 "The remote servers might be trying to throttle requests." 

171 ) 

172 if code in [421, 450, 500]: 

173 reason = "Authentication required for Account." 

174 

175 api_error = ICloudPyAPIResponseException(reason, code) 

176 LOGGER.error(api_error) 

177 raise api_error 

178 

179 # Public method to resolve linting error 

180 def raise_error(self, code, reason): 

181 return self._raise_error(code=code, reason=reason) 

182 

183 

184class ICloudPyService: 

185 """ 

186 A base authentication class for the iCloud service. Handles the 

187 authentication required to access iCloud services. 

188 

189 Usage: 

190 from src import ICloudPyService 

191 icloudpy = ICloudPyService('username@apple.com', 'password') 

192 icloudpy.iphone.location() 

193 """ 

194 

195 def __init__( 

196 self, 

197 apple_id, 

198 password=None, 

199 cookie_directory=None, 

200 verify=True, 

201 client_id=None, 

202 with_family=True, 

203 auth_endpoint="https://idmsa.apple.com/appleauth/auth", 

204 # For China, use "https://www.icloud.com.cn" 

205 home_endpoint="https://www.icloud.com", 

206 # For China, use "https://setup.icloud.com.cn/setup/ws/1" 

207 setup_endpoint="https://setup.icloud.com/setup/ws/1", 

208 ): 

209 if password is None: 

210 password = get_password_from_keyring(apple_id) 

211 

212 self.user = {"accountName": apple_id, "password": password} 

213 self.data = {} 

214 self.params = {} 

215 self.client_id = client_id or (f"auth-{str(uuid1()).lower()}") 

216 self.with_family = with_family 

217 self.auth_endpoint = auth_endpoint 

218 self.home_endpoint = home_endpoint 

219 self.setup_endpoint = setup_endpoint 

220 

221 self.password_filter = ICloudPyPasswordFilter(password) 

222 LOGGER.addFilter(self.password_filter) 

223 

224 if cookie_directory: 

225 self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) 

226 if not path.exists(self._cookie_directory): 

227 mkdir(self._cookie_directory, 0o700) 

228 else: 

229 topdir = path.join(gettempdir(), "icloudpy") 

230 self._cookie_directory = path.join(topdir, getpass.getuser()) 

231 if not path.exists(topdir): 

232 mkdir(topdir, 0o777) 

233 if not path.exists(self._cookie_directory): 

234 mkdir(self._cookie_directory, 0o700) 

235 

236 LOGGER.debug("Using session file %s", self.session_path) 

237 

238 self.session_data = {} 

239 try: 

240 with open(self.session_path, encoding="utf-8") as session_f: 

241 self.session_data = json.load(session_f) 

242 except: # noqa: E722 

243 LOGGER.info("Session file does not exist") 

244 if self.session_data.get("client_id"): 

245 self.client_id = self.session_data.get("client_id") 

246 self.params["clientId"] = self.client_id 

247 else: 

248 self.session_data.update({"client_id": self.client_id}) 

249 self.params["clientId"] = self.client_id 

250 

251 self.session = ICloudPySession(self) 

252 self.session.verify = verify 

253 self.session.headers.update( 

254 {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"}, 

255 ) 

256 

257 cookiejar_path = self.cookiejar_path 

258 self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) 

259 if path.exists(cookiejar_path): 

260 try: 

261 self.session.cookies.load(ignore_discard=True, ignore_expires=True) 

262 LOGGER.debug("Read cookies from %s", cookiejar_path) 

263 except: # noqa: E722 

264 # Most likely a pickled cookiejar from earlier versions. 

265 # The cookiejar will get replaced with a valid one after 

266 # successful authentication. 

267 LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) 

268 

269 self.authenticate() 

270 

271 self._drive = None 

272 self._photos = None 

273 

274 def authenticate(self, force_refresh=False, service=None): 

275 """ 

276 Handles authentication, and persists cookies so that 

277 subsequent logins will not cause additional e-mails from Apple. 

278 """ 

279 

280 login_successful = False 

281 if self.session_data.get("session_token") and not force_refresh: 

282 LOGGER.debug("Checking session token validity") 

283 try: 

284 self.data = self._validate_token() 

285 login_successful = True 

286 except ICloudPyAPIResponseException: 

287 LOGGER.debug("Invalid authentication token, will log in from scratch.") 

288 

289 if not login_successful and service is not None: 

290 app = self.data["apps"][service] 

291 if "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"] is True: 

292 LOGGER.debug( 

293 "Authenticating as %s for %s", 

294 self.user["accountName"], 

295 service, 

296 ) 

297 

298 try: 

299 self._authenticate_with_credentials_service(service) 

300 login_successful = True 

301 except Exception as error: 

302 LOGGER.debug( 

303 "Could not log into service. Attempting brand new login. %s", 

304 str(error), 

305 ) 

306 

307 if not login_successful: 

308 LOGGER.debug("Authenticating as %s", self.user["accountName"]) 

309 

310 headers = self._get_auth_headers() 

311 

312 if self.session_data.get("scnt"): 

313 headers["scnt"] = self.session_data.get("scnt") 

314 

315 if self.session_data.get("session_id"): 

316 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

317 

318 class SrpPassword: 

319 def __init__(self, password: str): 

320 self.password = password 

321 

322 def set_encrypt_info( 

323 self, 

324 salt: bytes, 

325 iterations: int, 

326 key_length: int, 

327 ): 

328 self.salt = salt 

329 self.iterations = iterations 

330 self.key_length = key_length 

331 

332 def encode(self): 

333 password_hash = hashlib.sha256( 

334 self.password.encode("utf-8"), 

335 ).digest() 

336 return hashlib.pbkdf2_hmac( 

337 "sha256", 

338 password_hash, 

339 salt, 

340 iterations, 

341 key_length, 

342 ) 

343 

344 srp_password = SrpPassword(self.user["password"]) 

345 srp.rfc5054_enable() 

346 srp.no_username_in_x() 

347 usr = srp.User( 

348 self.user["accountName"], 

349 srp_password, 

350 hash_alg=srp.SHA256, 

351 ng_type=srp.NG_2048, 

352 ) 

353 

354 uname, a_bytes = usr.start_authentication() 

355 

356 data = { 

357 "a": base64.b64encode(a_bytes).decode(), 

358 "accountName": uname, 

359 "protocols": ["s2k", "s2k_fo"], 

360 } 

361 

362 try: 

363 response = self.session.post( 

364 f"{self.auth_endpoint}/signin/init", 

365 data=json.dumps(data), 

366 headers=headers, 

367 ) 

368 response.raise_for_status() 

369 except ICloudPyAPIResponseException as error: 

370 msg = "Failed to initiate srp authentication." 

371 raise ICloudPyFailedLoginException(msg, error) from error 

372 

373 body = response.json() 

374 

375 salt = base64.b64decode(body["salt"]) 

376 b = base64.b64decode(body["b"]) 

377 c = body["c"] 

378 iterations = body["iteration"] 

379 key_length = 32 

380 srp_password.set_encrypt_info(salt, iterations, key_length) 

381 

382 m1 = usr.process_challenge(salt, b) 

383 m2 = usr.H_AMK 

384 

385 data = { 

386 "accountName": uname, 

387 "c": c, 

388 "m1": base64.b64encode(m1).decode(), 

389 "m2": base64.b64encode(m2).decode(), 

390 "rememberMe": True, 

391 "trustTokens": [], 

392 } 

393 

394 if self.session_data.get("trust_token"): 

395 data["trustTokens"] = [self.session_data.get("trust_token")] 

396 

397 try: 

398 self.session.post( 

399 f"{self.auth_endpoint}/signin/complete", 

400 params={"isRememberMeEnabled": "true"}, 

401 data=json.dumps(data), 

402 headers=headers, 

403 ) 

404 except ICloudPyAPIResponseException as error: 

405 msg = "Invalid email/password combination." 

406 raise ICloudPyFailedLoginException(msg, error) from error 

407 

408 self._authenticate_with_token() 

409 

410 self._webservices = self.data["webservices"] 

411 

412 LOGGER.debug("Authentication completed successfully") 

413 

414 def _authenticate_with_token(self): 

415 """Authenticate using session token.""" 

416 data = { 

417 "accountCountryCode": self.session_data.get("account_country"), 

418 "dsWebAuthToken": self.session_data.get("session_token"), 

419 "extended_login": True, 

420 "trustToken": self.session_data.get("trust_token", ""), 

421 } 

422 

423 try: 

424 req = self.session.post( 

425 f"{self.setup_endpoint}/accountLogin", 

426 data=json.dumps(data), 

427 ) 

428 self.data = req.json() 

429 except ICloudPyAPIResponseException as error: 

430 msg = "Invalid authentication token." 

431 raise ICloudPyFailedLoginException(msg, error) from error 

432 

433 def _authenticate_with_credentials_service(self, service): 

434 """Authenticate to a specific service using credentials.""" 

435 data = { 

436 "appName": service, 

437 "apple_id": self.user["accountName"], 

438 "password": self.user["password"], 

439 } 

440 

441 try: 

442 self.session.post( 

443 f"{self.setup_endpoint}/accountLogin", 

444 data=json.dumps(data), 

445 ) 

446 

447 self.data = self._validate_token() 

448 except ICloudPyAPIResponseException as error: 

449 msg = "Invalid email/password combination." 

450 raise ICloudPyFailedLoginException(msg, error) from error 

451 

452 def _validate_token(self): 

453 """Checks if the current access token is still valid.""" 

454 LOGGER.debug("Checking session token validity") 

455 try: 

456 req = self.session.post(f"{self.setup_endpoint}/validate", data="null") 

457 LOGGER.debug("Session token is still valid") 

458 return req.json() 

459 except ICloudPyAPIResponseException as err: 

460 LOGGER.debug("Invalid authentication token") 

461 raise err 

462 

463 def _get_auth_headers(self, overrides=None): 

464 headers = { 

465 "Accept": "application/json, text/javascript", 

466 "Content-Type": "application/json", 

467 "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", 

468 "X-Apple-OAuth-Client-Type": "firstPartyAuth", 

469 "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", 

470 "X-Apple-OAuth-Require-Grant-Code": "true", 

471 "X-Apple-OAuth-Response-Mode": "web_message", 

472 "X-Apple-OAuth-Response-Type": "code", 

473 "X-Apple-OAuth-State": self.client_id, 

474 "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", 

475 } 

476 if overrides: 

477 headers.update(overrides) 

478 return headers 

479 

480 @property 

481 def cookiejar_path(self): 

482 """Get path for cookiejar file.""" 

483 return path.join( 

484 self._cookie_directory, 

485 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), 

486 ) 

487 

488 @property 

489 def session_path(self): 

490 """Get path for session data file.""" 

491 return path.join( 

492 self._cookie_directory, 

493 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) + ".session", 

494 ) 

495 

496 @property 

497 def requires_2sa(self): 

498 """Returns True if two-step authentication is required.""" 

499 return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and ( 

500 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session 

501 ) 

502 

503 @property 

504 def requires_2fa(self): 

505 """Returns True if two-factor authentication is required.""" 

506 return self.data["dsInfo"].get("hsaVersion", 0) == 2 and ( 

507 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session 

508 ) 

509 

510 @property 

511 def is_trusted_session(self): 

512 """Returns True if the session is trusted.""" 

513 return self.data.get("hsaTrustedBrowser", False) 

514 

515 @property 

516 def trusted_devices(self): 

517 """Returns devices trusted for two-step authentication.""" 

518 request = self.session.get( 

519 f"{self.setup_endpoint}/listDevices", 

520 params=self.params, 

521 ) 

522 return request.json().get("devices") 

523 

524 def send_verification_code(self, device): 

525 """Requests that a verification code is sent to the given device.""" 

526 data = json.dumps(device) 

527 request = self.session.post( 

528 f"{self.setup_endpoint}/sendVerificationCode", 

529 params=self.params, 

530 data=data, 

531 ) 

532 return request.json().get("success", False) 

533 

534 def validate_verification_code(self, device, code): 

535 """Verifies a verification code received on a trusted device.""" 

536 device.update({"verificationCode": code, "trustBrowser": True}) 

537 data = json.dumps(device) 

538 

539 try: 

540 self.session.post( 

541 f"{self.setup_endpoint}/validateVerificationCode", 

542 params=self.params, 

543 data=data, 

544 ) 

545 except ICloudPyAPIResponseException as error: 

546 if error.code == -21669: 

547 # Wrong verification code 

548 return False 

549 raise 

550 

551 self.trust_session() 

552 

553 return not self.requires_2sa 

554 

555 def validate_2fa_code(self, code): 

556 """Verifies a verification code received via Apple's 2FA system (HSA2).""" 

557 data = {"securityCode": {"code": code}} 

558 

559 headers = self._get_auth_headers({"Accept": "application/json"}) 

560 

561 if self.session_data.get("scnt"): 

562 headers["scnt"] = self.session_data.get("scnt") 

563 

564 if self.session_data.get("session_id"): 

565 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

566 

567 try: 

568 self.session.post( 

569 f"{self.auth_endpoint}/verify/trusteddevice/securitycode", 

570 data=json.dumps(data), 

571 headers=headers, 

572 ) 

573 except ICloudPyAPIResponseException as error: 

574 if error.code == -21669: 

575 # Wrong verification code 

576 LOGGER.error("Code verification failed.") 

577 return False 

578 raise 

579 

580 LOGGER.debug("Code verification successful.") 

581 

582 self.trust_session() 

583 return not self.requires_2sa 

584 

585 def trust_session(self): 

586 """Request session trust to avoid user log in going forward.""" 

587 headers = self._get_auth_headers() 

588 

589 if self.session_data.get("scnt"): 

590 headers["scnt"] = self.session_data.get("scnt") 

591 

592 if self.session_data.get("session_id"): 

593 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

594 

595 try: 

596 self.session.get( 

597 f"{self.auth_endpoint}/2sv/trust", 

598 headers=headers, 

599 ) 

600 self._authenticate_with_token() 

601 return True 

602 except ICloudPyAPIResponseException: 

603 LOGGER.error("Session trust failed.") 

604 return False 

605 

606 def _get_webservice_url(self, ws_key): 

607 """Get webservice URL, raise an exception if not exists.""" 

608 if self._webservices.get(ws_key) is None: 

609 raise ICloudPyServiceNotActivatedException( 

610 "Webservice not available", 

611 ws_key, 

612 ) 

613 return self._webservices[ws_key]["url"] 

614 

615 @property 

616 def devices(self): 

617 """Returns all devices.""" 

618 service_root = self._get_webservice_url("findme") 

619 return FindMyiPhoneServiceManager( 

620 service_root, 

621 self.session, 

622 self.params, 

623 self.with_family, 

624 ) 

625 

626 @property 

627 def iphone(self): 

628 """Returns the iPhone.""" 

629 return self.devices[0] 

630 

631 @property 

632 def account(self): 

633 """Gets the 'Account' service.""" 

634 service_root = self._get_webservice_url("account") 

635 return AccountService(service_root, self.session, self.params) 

636 

637 @property 

638 def photos(self): 

639 """Gets the 'Photo' service.""" 

640 if not self._photos: 

641 service_root = self._get_webservice_url("ckdatabasews") 

642 self._photos = PhotosService(service_root, self.session, self.params) 

643 return self._photos 

644 

645 @property 

646 def calendar(self): 

647 """Gets the 'Calendar' service.""" 

648 service_root = self._get_webservice_url("calendar") 

649 return CalendarService(service_root, self.session, self.params) 

650 

651 @property 

652 def contacts(self): 

653 """Gets the 'Contacts' service.""" 

654 service_root = self._get_webservice_url("contacts") 

655 return ContactsService(service_root, self.session, self.params) 

656 

657 @property 

658 def reminders(self): 

659 """Gets the 'Reminders' service.""" 

660 service_root = self._get_webservice_url("reminders") 

661 return RemindersService(service_root, self.session, self.params) 

662 

663 @property 

664 def drive(self): 

665 """Gets the 'Drive' service.""" 

666 if not self._drive: 

667 self._drive = DriveService( 

668 service_root=self._get_webservice_url("drivews"), 

669 document_root=self._get_webservice_url("docws"), 

670 session=self.session, 

671 params=self.params, 

672 ) 

673 return self._drive 

674 

675 def __unicode__(self): 

676 return f"iCloud API: {self.user.get('accountName')}" 

677 

678 def __str__(self): 

679 return self.__unicode__() 

680 

681 def __repr__(self): 

682 return f"<{str(self)}>"