Coverage for icloudpy/base.py: 55%

376 statements  

« prev     ^ index     » next       coverage.py v7.6.10, created at 2024-12-30 19:31 +0000

1"""Library base file.""" 

2 

3import base64 

4import getpass 

5import hashlib 

6import http.cookiejar as cookielib 

7import inspect 

8import json 

9import logging 

10from os import mkdir, path 

11from re import match 

12from tempfile import gettempdir 

13from uuid import uuid1 

14 

15import srp 

16from requests import Session 

17from six import PY2 

18 

19from icloudpy.exceptions import ( 

20 ICloudPy2SARequiredException, 

21 ICloudPyAPIResponseException, 

22 ICloudPyFailedLoginException, 

23 ICloudPyServiceNotActivatedException, 

24) 

25from icloudpy.services import ( 

26 AccountService, 

27 CalendarService, 

28 ContactsService, 

29 DriveService, 

30 FindMyiPhoneServiceManager, 

31 PhotosService, 

32 RemindersService, 

33 UbiquityService, 

34) 

35from icloudpy.utils import get_password_from_keyring 

36 

37LOGGER = logging.getLogger(__name__) 

38 

39HEADER_DATA = { 

40 "X-Apple-ID-Account-Country": "account_country", 

41 "X-Apple-ID-Session-Id": "session_id", 

42 "X-Apple-Session-Token": "session_token", 

43 "X-Apple-TwoSV-Trust-Token": "trust_token", 

44 "scnt": "scnt", 

45} 

46 

47 

48class ICloudPyPasswordFilter(logging.Filter): 

49 """Password log hider.""" 

50 

51 def __init__(self, password): 

52 super().__init__(password) 

53 

54 def filter(self, record): 

55 message = record.getMessage() 

56 if self.name in message: 

57 record.msg = message.replace(self.name, "*" * 8) 

58 record.args = [] 

59 

60 return True 

61 

62 

63class ICloudPySession(Session): 

64 """iCloud session.""" 

65 

66 def __init__(self, service): 

67 self.service = service 

68 Session.__init__(self) 

69 

70 def request(self, method, url, **kwargs): # pylint: disable=arguments-differ 

71 # Charge logging to the right service endpoint 

72 callee = inspect.stack()[2] 

73 module = inspect.getmodule(callee[0]) 

74 request_logger = logging.getLogger(module.__name__).getChild("http") 

75 if self.service.password_filter not in request_logger.filters: 

76 request_logger.addFilter(self.service.password_filter) 

77 

78 request_logger.debug(f"{method} {url} {kwargs.get('data', '')}") 

79 

80 has_retried = kwargs.get("retried") 

81 kwargs.pop("retried", None) 

82 response = super().request(method, url, **kwargs) 

83 

84 content_type = response.headers.get("Content-Type", "").split(";")[0] 

85 json_mimetypes = ["application/json", "text/json"] 

86 

87 for header, value in HEADER_DATA.items(): 

88 if response.headers.get(header): 

89 session_arg = value 

90 self.service.session_data.update( 

91 {session_arg: response.headers.get(header)}, 

92 ) 

93 

94 # Save session_data to file 

95 with open(self.service.session_path, "w", encoding="utf-8") as outfile: 

96 json.dump(self.service.session_data, outfile) 

97 LOGGER.debug("Saved session data to file") 

98 

99 # Save cookies to file 

100 self.cookies.save(ignore_discard=True, ignore_expires=True) 

101 LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path) 

102 

103 if not response.ok and (content_type not in json_mimetypes or response.status_code in [421, 450, 500]): 

104 try: 

105 # pylint: disable=W0212 

106 fmip_url = self.service._get_webservice_url("findme") 

107 if has_retried is None and response.status_code == 450 and fmip_url in url: 

108 # Handle re-authentication for Find My iPhone 

109 LOGGER.debug("Re-authenticating Find My iPhone service") 

110 try: 

111 self.service.authenticate(True, "find") 

112 except ICloudPyAPIResponseException: 

113 LOGGER.debug("Re-authentication failed") 

114 kwargs["retried"] = True 

115 return self.request(method, url, **kwargs) 

116 except Exception: 

117 pass 

118 

119 if has_retried is None and response.status_code in [421, 450, 500]: 

120 api_error = ICloudPyAPIResponseException( 

121 response.reason, 

122 response.status_code, 

123 retry=True, 

124 ) 

125 request_logger.debug(api_error) 

126 kwargs["retried"] = True 

127 return self.request(method, url, **kwargs) 

128 

129 self._raise_error(response.status_code, response.reason) 

130 

131 if content_type not in json_mimetypes: 

132 return response 

133 

134 try: 

135 data = response.json() 

136 except: # noqa: E722 

137 request_logger.warning("Failed to parse response with JSON mimetype") 

138 return response 

139 

140 request_logger.debug(data) 

141 

142 if isinstance(data, dict): 

143 reason = data.get("errorMessage") 

144 reason = reason or data.get("reason") 

145 reason = reason or data.get("errorReason") 

146 if not reason and isinstance(data.get("error"), str): 

147 reason = data.get("error") 

148 if not reason and data.get("error"): 

149 reason = "Unknown reason" 

150 

151 code = data.get("errorCode") 

152 if not code and data.get("serverErrorCode"): 

153 code = data.get("serverErrorCode") 

154 

155 if reason: 

156 self._raise_error(code, reason) 

157 

158 return response 

159 

160 def _raise_error(self, code, reason): 

161 if self.service.requires_2sa and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie": 

162 raise ICloudPy2SARequiredException(self.service.user["apple_id"]) 

163 if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"): 

164 reason = ( 

165 reason + ". Please log into https://icloud.com/ to manually " "finish setting up your iCloud service" 

166 ) 

167 api_error = ICloudPyServiceNotActivatedException(reason, code) 

168 LOGGER.error(api_error) 

169 

170 raise api_error 

171 if code == "ACCESS_DENIED": 

172 reason = ( 

173 reason + ". Please wait a few minutes then try again." 

174 "The remote servers might be trying to throttle requests." 

175 ) 

176 if code in [421, 450, 500]: 

177 reason = "Authentication required for Account." 

178 

179 api_error = ICloudPyAPIResponseException(reason, code) 

180 LOGGER.error(api_error) 

181 raise api_error 

182 

183 # Public method to resolve linting error 

184 def raise_error(self, code, reason): 

185 return self._raise_error(code=code, reason=reason) 

186 

187 

188class ICloudPyService: 

189 """ 

190 A base authentication class for the iCloud service. Handles the 

191 authentication required to access iCloud services. 

192 

193 Usage: 

194 from src import ICloudPyService 

195 icloudpy = ICloudPyService('username@apple.com', 'password') 

196 icloudpy.iphone.location() 

197 """ 

198 

199 def __init__( 

200 self, 

201 apple_id, 

202 password=None, 

203 cookie_directory=None, 

204 verify=True, 

205 client_id=None, 

206 with_family=True, 

207 auth_endpoint="https://idmsa.apple.com/appleauth/auth", 

208 # For China, use "https://www.icloud.com.cn" 

209 home_endpoint="https://www.icloud.com", 

210 # For China, use "https://setup.icloud.com.cn/setup/ws/1" 

211 setup_endpoint="https://setup.icloud.com/setup/ws/1", 

212 ): 

213 if password is None: 

214 password = get_password_from_keyring(apple_id) 

215 

216 self.user = {"accountName": apple_id, "password": password} 

217 self.data = {} 

218 self.params = {} 

219 self.client_id = client_id or (f"auth-{str(uuid1()).lower()}") 

220 self.with_family = with_family 

221 self.auth_endpoint = auth_endpoint 

222 self.home_endpoint = home_endpoint 

223 self.setup_endpoint = setup_endpoint 

224 

225 self.password_filter = ICloudPyPasswordFilter(password) 

226 LOGGER.addFilter(self.password_filter) 

227 

228 if cookie_directory: 

229 self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) 

230 if not path.exists(self._cookie_directory): 

231 mkdir(self._cookie_directory, 0o700) 

232 else: 

233 topdir = path.join(gettempdir(), "icloudpy") 

234 self._cookie_directory = path.join(topdir, getpass.getuser()) 

235 if not path.exists(topdir): 

236 mkdir(topdir, 0o777) 

237 if not path.exists(self._cookie_directory): 

238 mkdir(self._cookie_directory, 0o700) 

239 

240 LOGGER.debug("Using session file %s", self.session_path) 

241 

242 self.session_data = {} 

243 try: 

244 with open(self.session_path, encoding="utf-8") as session_f: 

245 self.session_data = json.load(session_f) 

246 except: # noqa: E722 

247 LOGGER.info("Session file does not exist") 

248 if self.session_data.get("client_id"): 

249 self.client_id = self.session_data.get("client_id") 

250 self.params["clientId"] = self.client_id 

251 else: 

252 self.session_data.update({"client_id": self.client_id}) 

253 self.params["clientId"] = self.client_id 

254 

255 self.session = ICloudPySession(self) 

256 self.session.verify = verify 

257 self.session.headers.update( 

258 {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"}, 

259 ) 

260 

261 cookiejar_path = self.cookiejar_path 

262 self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) 

263 if path.exists(cookiejar_path): 

264 try: 

265 self.session.cookies.load(ignore_discard=True, ignore_expires=True) 

266 LOGGER.debug("Read cookies from %s", cookiejar_path) 

267 except: # noqa: E722 

268 # Most likely a pickled cookiejar from earlier versions. 

269 # The cookiejar will get replaced with a valid one after 

270 # successful authentication. 

271 LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) 

272 

273 self.authenticate() 

274 

275 self._drive = None 

276 self._files = None 

277 self._photos = None 

278 

279 def authenticate(self, force_refresh=False, service=None): 

280 """ 

281 Handles authentication, and persists cookies so that 

282 subsequent logins will not cause additional e-mails from Apple. 

283 """ 

284 

285 login_successful = False 

286 if self.session_data.get("session_token") and not force_refresh: 

287 LOGGER.debug("Checking session token validity") 

288 try: 

289 self.data = self._validate_token() 

290 login_successful = True 

291 except ICloudPyAPIResponseException: 

292 LOGGER.debug("Invalid authentication token, will log in from scratch.") 

293 

294 if not login_successful and service is not None: 

295 app = self.data["apps"][service] 

296 if "canLaunchWithOneFactor" in app and app["canLaunchWithOneFactor"] is True: 

297 LOGGER.debug( 

298 "Authenticating as %s for %s", 

299 self.user["accountName"], 

300 service, 

301 ) 

302 

303 try: 

304 self._authenticate_with_credentials_service(service) 

305 login_successful = True 

306 except Exception as error: 

307 LOGGER.debug( 

308 "Could not log into service. Attempting brand new login. %s", 

309 str(error), 

310 ) 

311 

312 if not login_successful: 

313 LOGGER.debug("Authenticating as %s", self.user["accountName"]) 

314 

315 headers = self._get_auth_headers() 

316 

317 if self.session_data.get("scnt"): 

318 headers["scnt"] = self.session_data.get("scnt") 

319 

320 if self.session_data.get("session_id"): 

321 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

322 

323 class SrpPassword: 

324 def __init__(self, password: str): 

325 self.password = password 

326 

327 def set_encrypt_info( 

328 self, 

329 salt: bytes, 

330 iterations: int, 

331 key_length: int, 

332 ): 

333 self.salt = salt 

334 self.iterations = iterations 

335 self.key_length = key_length 

336 

337 def encode(self): 

338 password_hash = hashlib.sha256( 

339 self.password.encode("utf-8"), 

340 ).digest() 

341 return hashlib.pbkdf2_hmac( 

342 "sha256", 

343 password_hash, 

344 salt, 

345 iterations, 

346 key_length, 

347 ) 

348 

349 srp_password = SrpPassword(self.user["password"]) 

350 srp.rfc5054_enable() 

351 srp.no_username_in_x() 

352 usr = srp.User( 

353 self.user["accountName"], 

354 srp_password, 

355 hash_alg=srp.SHA256, 

356 ng_type=srp.NG_2048, 

357 ) 

358 

359 uname, a_bytes = usr.start_authentication() 

360 

361 data = { 

362 "a": base64.b64encode(a_bytes).decode(), 

363 "accountName": uname, 

364 "protocols": ["s2k", "s2k_fo"], 

365 } 

366 

367 try: 

368 response = self.session.post( 

369 f"{self.auth_endpoint}/signin/init", 

370 data=json.dumps(data), 

371 headers=headers, 

372 ) 

373 response.raise_for_status() 

374 except ICloudPyAPIResponseException as error: 

375 msg = "Failed to initiate srp authentication." 

376 raise ICloudPyFailedLoginException(msg, error) from error 

377 

378 body = response.json() 

379 

380 salt = base64.b64decode(body["salt"]) 

381 b = base64.b64decode(body["b"]) 

382 c = body["c"] 

383 iterations = body["iteration"] 

384 key_length = 32 

385 srp_password.set_encrypt_info(salt, iterations, key_length) 

386 

387 m1 = usr.process_challenge(salt, b) 

388 m2 = usr.H_AMK 

389 

390 data = { 

391 "accountName": uname, 

392 "c": c, 

393 "m1": base64.b64encode(m1).decode(), 

394 "m2": base64.b64encode(m2).decode(), 

395 "rememberMe": True, 

396 "trustTokens": [], 

397 } 

398 

399 if self.session_data.get("trust_token"): 

400 data["trustTokens"] = [self.session_data.get("trust_token")] 

401 

402 try: 

403 self.session.post( 

404 f"{self.auth_endpoint}/signin/complete", 

405 params={"isRememberMeEnabled": "true"}, 

406 data=json.dumps(data), 

407 headers=headers, 

408 ) 

409 except ICloudPyAPIResponseException as error: 

410 msg = "Invalid email/password combination." 

411 raise ICloudPyFailedLoginException(msg, error) from error 

412 

413 self._authenticate_with_token() 

414 

415 self._webservices = self.data["webservices"] 

416 

417 LOGGER.debug("Authentication completed successfully") 

418 

419 def _authenticate_with_token(self): 

420 """Authenticate using session token.""" 

421 data = { 

422 "accountCountryCode": self.session_data.get("account_country"), 

423 "dsWebAuthToken": self.session_data.get("session_token"), 

424 "extended_login": True, 

425 "trustToken": self.session_data.get("trust_token", ""), 

426 } 

427 

428 try: 

429 req = self.session.post( 

430 f"{self.setup_endpoint}/accountLogin", 

431 data=json.dumps(data), 

432 ) 

433 self.data = req.json() 

434 except ICloudPyAPIResponseException as error: 

435 msg = "Invalid authentication token." 

436 raise ICloudPyFailedLoginException(msg, error) from error 

437 

438 def _authenticate_with_credentials_service(self, service): 

439 """Authenticate to a specific service using credentials.""" 

440 data = { 

441 "appName": service, 

442 "apple_id": self.user["accountName"], 

443 "password": self.user["password"], 

444 } 

445 

446 try: 

447 self.session.post( 

448 f"{self.setup_endpoint}/accountLogin", 

449 data=json.dumps(data), 

450 ) 

451 

452 self.data = self._validate_token() 

453 except ICloudPyAPIResponseException as error: 

454 msg = "Invalid email/password combination." 

455 raise ICloudPyFailedLoginException(msg, error) from error 

456 

457 def _validate_token(self): 

458 """Checks if the current access token is still valid.""" 

459 LOGGER.debug("Checking session token validity") 

460 try: 

461 req = self.session.post(f"{self.setup_endpoint}/validate", data="null") 

462 LOGGER.debug("Session token is still valid") 

463 return req.json() 

464 except ICloudPyAPIResponseException as err: 

465 LOGGER.debug("Invalid authentication token") 

466 raise err 

467 

468 def _get_auth_headers(self, overrides=None): 

469 headers = { 

470 "Accept": "application/json, text/javascript", 

471 "Content-Type": "application/json", 

472 "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", 

473 "X-Apple-OAuth-Client-Type": "firstPartyAuth", 

474 "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", 

475 "X-Apple-OAuth-Require-Grant-Code": "true", 

476 "X-Apple-OAuth-Response-Mode": "web_message", 

477 "X-Apple-OAuth-Response-Type": "code", 

478 "X-Apple-OAuth-State": self.client_id, 

479 "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", 

480 } 

481 if overrides: 

482 headers.update(overrides) 

483 return headers 

484 

485 @property 

486 def cookiejar_path(self): 

487 """Get path for cookiejar file.""" 

488 return path.join( 

489 self._cookie_directory, 

490 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), 

491 ) 

492 

493 @property 

494 def session_path(self): 

495 """Get path for session data file.""" 

496 return path.join( 

497 self._cookie_directory, 

498 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) + ".session", 

499 ) 

500 

501 @property 

502 def requires_2sa(self): 

503 """Returns True if two-step authentication is required.""" 

504 return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and ( 

505 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session 

506 ) 

507 

508 @property 

509 def requires_2fa(self): 

510 """Returns True if two-factor authentication is required.""" 

511 return self.data["dsInfo"].get("hsaVersion", 0) == 2 and ( 

512 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session 

513 ) 

514 

515 @property 

516 def is_trusted_session(self): 

517 """Returns True if the session is trusted.""" 

518 return self.data.get("hsaTrustedBrowser", False) 

519 

520 @property 

521 def trusted_devices(self): 

522 """Returns devices trusted for two-step authentication.""" 

523 request = self.session.get( 

524 f"{self.setup_endpoint}/listDevices", 

525 params=self.params, 

526 ) 

527 return request.json().get("devices") 

528 

529 def send_verification_code(self, device): 

530 """Requests that a verification code is sent to the given device.""" 

531 data = json.dumps(device) 

532 request = self.session.post( 

533 f"{self.setup_endpoint}/sendVerificationCode", 

534 params=self.params, 

535 data=data, 

536 ) 

537 return request.json().get("success", False) 

538 

539 def validate_verification_code(self, device, code): 

540 """Verifies a verification code received on a trusted device.""" 

541 device.update({"verificationCode": code, "trustBrowser": True}) 

542 data = json.dumps(device) 

543 

544 try: 

545 self.session.post( 

546 f"{self.setup_endpoint}/validateVerificationCode", 

547 params=self.params, 

548 data=data, 

549 ) 

550 except ICloudPyAPIResponseException as error: 

551 if error.code == -21669: 

552 # Wrong verification code 

553 return False 

554 raise 

555 

556 self.trust_session() 

557 

558 return not self.requires_2sa 

559 

560 def validate_2fa_code(self, code): 

561 """Verifies a verification code received via Apple's 2FA system (HSA2).""" 

562 data = {"securityCode": {"code": code}} 

563 

564 headers = self._get_auth_headers({"Accept": "application/json"}) 

565 

566 if self.session_data.get("scnt"): 

567 headers["scnt"] = self.session_data.get("scnt") 

568 

569 if self.session_data.get("session_id"): 

570 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

571 

572 try: 

573 self.session.post( 

574 f"{self.auth_endpoint}/verify/trusteddevice/securitycode", 

575 data=json.dumps(data), 

576 headers=headers, 

577 ) 

578 except ICloudPyAPIResponseException as error: 

579 if error.code == -21669: 

580 # Wrong verification code 

581 LOGGER.error("Code verification failed.") 

582 return False 

583 raise 

584 

585 LOGGER.debug("Code verification successful.") 

586 

587 self.trust_session() 

588 return not self.requires_2sa 

589 

590 def trust_session(self): 

591 """Request session trust to avoid user log in going forward.""" 

592 headers = self._get_auth_headers() 

593 

594 if self.session_data.get("scnt"): 

595 headers["scnt"] = self.session_data.get("scnt") 

596 

597 if self.session_data.get("session_id"): 

598 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

599 

600 try: 

601 self.session.get( 

602 f"{self.auth_endpoint}/2sv/trust", 

603 headers=headers, 

604 ) 

605 self._authenticate_with_token() 

606 return True 

607 except ICloudPyAPIResponseException: 

608 LOGGER.error("Session trust failed.") 

609 return False 

610 

611 def _get_webservice_url(self, ws_key): 

612 """Get webservice URL, raise an exception if not exists.""" 

613 if self._webservices.get(ws_key) is None: 

614 raise ICloudPyServiceNotActivatedException( 

615 "Webservice not available", 

616 ws_key, 

617 ) 

618 return self._webservices[ws_key]["url"] 

619 

620 @property 

621 def devices(self): 

622 """Returns all devices.""" 

623 service_root = self._get_webservice_url("findme") 

624 return FindMyiPhoneServiceManager( 

625 service_root, 

626 self.session, 

627 self.params, 

628 self.with_family, 

629 ) 

630 

631 @property 

632 def iphone(self): 

633 """Returns the iPhone.""" 

634 return self.devices[0] 

635 

636 @property 

637 def account(self): 

638 """Gets the 'Account' service.""" 

639 service_root = self._get_webservice_url("account") 

640 return AccountService(service_root, self.session, self.params) 

641 

642 @property 

643 def files(self): 

644 """Gets the 'File' service.""" 

645 if not self._files: 

646 service_root = self._get_webservice_url("ubiquity") 

647 self._files = UbiquityService(service_root, self.session, self.params) 

648 return self._files 

649 

650 @property 

651 def photos(self): 

652 """Gets the 'Photo' service.""" 

653 if not self._photos: 

654 service_root = self._get_webservice_url("ckdatabasews") 

655 self._photos = PhotosService(service_root, self.session, self.params) 

656 return self._photos 

657 

658 @property 

659 def calendar(self): 

660 """Gets the 'Calendar' service.""" 

661 service_root = self._get_webservice_url("calendar") 

662 return CalendarService(service_root, self.session, self.params) 

663 

664 @property 

665 def contacts(self): 

666 """Gets the 'Contacts' service.""" 

667 service_root = self._get_webservice_url("contacts") 

668 return ContactsService(service_root, self.session, self.params) 

669 

670 @property 

671 def reminders(self): 

672 """Gets the 'Reminders' service.""" 

673 service_root = self._get_webservice_url("reminders") 

674 return RemindersService(service_root, self.session, self.params) 

675 

676 @property 

677 def drive(self): 

678 """Gets the 'Drive' service.""" 

679 if not self._drive: 

680 self._drive = DriveService( 

681 service_root=self._get_webservice_url("drivews"), 

682 document_root=self._get_webservice_url("docws"), 

683 session=self.session, 

684 params=self.params, 

685 ) 

686 return self._drive 

687 

688 def __unicode__(self): 

689 return f"iCloud API: {self.user.get('accountName')}" 

690 

691 def __str__(self): 

692 as_unicode = self.__unicode__() 

693 if PY2: 

694 return as_unicode.encode("utf-8", "ignore") 

695 return as_unicode 

696 

697 def __repr__(self): 

698 return f"<{str(self)}>"