Coverage for icloudpy/base.py: 51%

344 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-04-12 14:26 +0000

1"""Library base file.""" 

2import getpass 

3import http.cookiejar as cookielib 

4import inspect 

5import json 

6import logging 

7from os import mkdir, path 

8from re import match 

9from tempfile import gettempdir 

10from uuid import uuid1 

11 

12from requests import Session 

13from six import PY2 

14 

15from icloudpy.exceptions import ( 

16 ICloudPy2SARequiredException, 

17 ICloudPyAPIResponseException, 

18 ICloudPyFailedLoginException, 

19 ICloudPyServiceNotActivatedException, 

20) 

21from icloudpy.services import ( 

22 AccountService, 

23 CalendarService, 

24 ContactsService, 

25 DriveService, 

26 FindMyiPhoneServiceManager, 

27 PhotosService, 

28 RemindersService, 

29 UbiquityService, 

30) 

31from icloudpy.utils import get_password_from_keyring 

32 

33LOGGER = logging.getLogger(__name__) 

34 

35HEADER_DATA = { 

36 "X-Apple-ID-Account-Country": "account_country", 

37 "X-Apple-ID-Session-Id": "session_id", 

38 "X-Apple-Session-Token": "session_token", 

39 "X-Apple-TwoSV-Trust-Token": "trust_token", 

40 "scnt": "scnt", 

41} 

42 

43 

44class ICloudPyPasswordFilter(logging.Filter): 

45 """Password log hider.""" 

46 

47 def __init__(self, password): 

48 super().__init__(password) 

49 

50 def filter(self, record): 

51 message = record.getMessage() 

52 if self.name in message: 

53 record.msg = message.replace(self.name, "*" * 8) 

54 record.args = [] 

55 

56 return True 

57 

58 

59class ICloudPySession(Session): 

60 """iCloud session.""" 

61 

62 def __init__(self, service): 

63 self.service = service 

64 Session.__init__(self) 

65 

66 def request(self, method, url, **kwargs): # pylint: disable=arguments-differ 

67 

68 # Charge logging to the right service endpoint 

69 callee = inspect.stack()[2] 

70 module = inspect.getmodule(callee[0]) 

71 request_logger = logging.getLogger(module.__name__).getChild("http") 

72 if self.service.password_filter not in request_logger.filters: 

73 request_logger.addFilter(self.service.password_filter) 

74 

75 request_logger.debug(f"{method} {url} {kwargs.get('data', '')}") 

76 

77 has_retried = kwargs.get("retried") 

78 kwargs.pop("retried", None) 

79 response = super().request(method, url, **kwargs) 

80 

81 content_type = response.headers.get("Content-Type", "").split(";")[0] 

82 json_mimetypes = ["application/json", "text/json"] 

83 

84 for header, value in HEADER_DATA.items(): 

85 if response.headers.get(header): 

86 session_arg = value 

87 self.service.session_data.update( 

88 {session_arg: response.headers.get(header)} 

89 ) 

90 

91 # Save session_data to file 

92 with open(self.service.session_path, "w", encoding="utf-8") as outfile: 

93 json.dump(self.service.session_data, outfile) 

94 LOGGER.debug("Saved session data to file") 

95 

96 # Save cookies to file 

97 self.cookies.save(ignore_discard=True, ignore_expires=True) 

98 LOGGER.debug("Cookies saved to %s", self.service.cookiejar_path) 

99 

100 if not response.ok and ( 

101 content_type not in json_mimetypes 

102 or response.status_code in [421, 450, 500] 

103 ): 

104 try: 

105 # pylint: disable=W0212 

106 fmip_url = self.service._get_webservice_url("findme") 

107 if ( 

108 has_retried is None 

109 and response.status_code == 450 

110 and fmip_url in url 

111 ): 

112 # Handle re-authentication for Find My iPhone 

113 LOGGER.debug("Re-authenticating Find My iPhone service") 

114 try: 

115 self.service.authenticate(True, "find") 

116 except ICloudPyAPIResponseException: 

117 LOGGER.debug("Re-authentication failed") 

118 kwargs["retried"] = True 

119 return self.request(method, url, **kwargs) 

120 except Exception: 

121 pass 

122 

123 if has_retried is None and response.status_code in [421, 450, 500]: 

124 api_error = ICloudPyAPIResponseException( 

125 response.reason, response.status_code, retry=True 

126 ) 

127 request_logger.debug(api_error) 

128 kwargs["retried"] = True 

129 return self.request(method, url, **kwargs) 

130 

131 self._raise_error(response.status_code, response.reason) 

132 

133 if content_type not in json_mimetypes: 

134 return response 

135 

136 try: 

137 data = response.json() 

138 except: # pylint: disable=bare-except 

139 request_logger.warning("Failed to parse response with JSON mimetype") 

140 return response 

141 

142 request_logger.debug(data) 

143 

144 if isinstance(data, dict): 

145 reason = data.get("errorMessage") 

146 reason = reason or data.get("reason") 

147 reason = reason or data.get("errorReason") 

148 if not reason and isinstance(data.get("error"), str): 

149 reason = data.get("error") 

150 if not reason and data.get("error"): 

151 reason = "Unknown reason" 

152 

153 code = data.get("errorCode") 

154 if not code and data.get("serverErrorCode"): 

155 code = data.get("serverErrorCode") 

156 

157 if reason: 

158 self._raise_error(code, reason) 

159 

160 return response 

161 

162 def _raise_error(self, code, reason): 

163 if ( 

164 self.service.requires_2sa 

165 and reason == "Missing X-APPLE-WEBAUTH-TOKEN cookie" 

166 ): 

167 raise ICloudPy2SARequiredException(self.service.user["apple_id"]) 

168 if code in ("ZONE_NOT_FOUND", "AUTHENTICATION_FAILED"): 

169 reason = ( 

170 reason + ". Please log into https://icloud.com/ to manually " 

171 "finish setting up your iCloud service" 

172 ) 

173 api_error = ICloudPyServiceNotActivatedException(reason, code) 

174 LOGGER.error(api_error) 

175 

176 raise api_error 

177 if code == "ACCESS_DENIED": 

178 reason = ( 

179 reason + ". Please wait a few minutes then try again." 

180 "The remote servers might be trying to throttle requests." 

181 ) 

182 if code in [421, 450, 500]: 

183 reason = "Authentication required for Account." 

184 

185 api_error = ICloudPyAPIResponseException(reason, code) 

186 LOGGER.error(api_error) 

187 raise api_error 

188 

189 # Public method to resolve linting error 

190 def raise_error(self, code, reason): 

191 return self._raise_error(code=code, reason=reason) 

192 

193 

194class ICloudPyService: 

195 """ 

196 A base authentication class for the iCloud service. Handles the 

197 authentication required to access iCloud services. 

198 

199 Usage: 

200 from src import ICloudPyService 

201 icloudpy = ICloudPyService('username@apple.com', 'password') 

202 icloudpy.iphone.location() 

203 """ 

204 

205 def __init__( 

206 self, 

207 apple_id, 

208 password=None, 

209 cookie_directory=None, 

210 verify=True, 

211 client_id=None, 

212 with_family=True, 

213 auth_endpoint="https://idmsa.apple.com/appleauth/auth", 

214 # For China, use "https://www.icloud.com.cn" 

215 home_endpoint="https://www.icloud.com", 

216 # For China, use "https://setup.icloud.com.cn/setup/ws/1" 

217 setup_endpoint="https://setup.icloud.com/setup/ws/1", 

218 ): 

219 if password is None: 

220 password = get_password_from_keyring(apple_id) 

221 

222 self.user = {"accountName": apple_id, "password": password} 

223 self.data = {} 

224 self.params = {} 

225 self.client_id = client_id or (f"auth-{str(uuid1()).lower()}") 

226 self.with_family = with_family 

227 self.auth_endpoint = auth_endpoint 

228 self.home_endpoint = home_endpoint 

229 self.setup_endpoint = setup_endpoint 

230 

231 self.password_filter = ICloudPyPasswordFilter(password) 

232 LOGGER.addFilter(self.password_filter) 

233 

234 if cookie_directory: 

235 self._cookie_directory = path.expanduser(path.normpath(cookie_directory)) 

236 if not path.exists(self._cookie_directory): 

237 mkdir(self._cookie_directory, 0o700) 

238 else: 

239 topdir = path.join(gettempdir(), "icloudpy") 

240 self._cookie_directory = path.join(topdir, getpass.getuser()) 

241 if not path.exists(topdir): 

242 mkdir(topdir, 0o777) 

243 if not path.exists(self._cookie_directory): 

244 mkdir(self._cookie_directory, 0o700) 

245 

246 LOGGER.debug("Using session file %s", self.session_path) 

247 

248 self.session_data = {} 

249 try: 

250 with open(self.session_path, encoding="utf-8") as session_f: 

251 self.session_data = json.load(session_f) 

252 except: # pylint: disable=bare-except 

253 LOGGER.info("Session file does not exist") 

254 if self.session_data.get("client_id"): 

255 self.client_id = self.session_data.get("client_id") 

256 self.params["clientId"] = self.client_id 

257 else: 

258 self.session_data.update({"client_id": self.client_id}) 

259 self.params["clientId"] = self.client_id 

260 

261 self.session = ICloudPySession(self) 

262 self.session.verify = verify 

263 self.session.headers.update( 

264 {"Origin": self.home_endpoint, "Referer": f"{self.home_endpoint}/"} 

265 ) 

266 

267 cookiejar_path = self.cookiejar_path 

268 self.session.cookies = cookielib.LWPCookieJar(filename=cookiejar_path) 

269 if path.exists(cookiejar_path): 

270 try: 

271 self.session.cookies.load(ignore_discard=True, ignore_expires=True) 

272 LOGGER.debug("Read cookies from %s", cookiejar_path) 

273 except: # pylint: disable=bare-except 

274 # Most likely a pickled cookiejar from earlier versions. 

275 # The cookiejar will get replaced with a valid one after 

276 # successful authentication. 

277 LOGGER.warning("Failed to read cookiejar %s", cookiejar_path) 

278 

279 self.authenticate() 

280 

281 self._drive = None 

282 self._files = None 

283 self._photos = None 

284 

285 def authenticate(self, force_refresh=False, service=None): 

286 """ 

287 Handles authentication, and persists cookies so that 

288 subsequent logins will not cause additional e-mails from Apple. 

289 """ 

290 

291 login_successful = False 

292 if self.session_data.get("session_token") and not force_refresh: 

293 LOGGER.debug("Checking session token validity") 

294 try: 

295 self.data = self._validate_token() 

296 login_successful = True 

297 except ICloudPyAPIResponseException: 

298 LOGGER.debug("Invalid authentication token, will log in from scratch.") 

299 

300 if not login_successful and service is not None: 

301 app = self.data["apps"][service] 

302 if ( 

303 "canLaunchWithOneFactor" in app 

304 and app["canLaunchWithOneFactor"] is True 

305 ): 

306 LOGGER.debug( 

307 "Authenticating as %s for %s", self.user["accountName"], service 

308 ) 

309 

310 try: 

311 self._authenticate_with_credentials_service(service) 

312 login_successful = True 

313 except Exception as error: 

314 LOGGER.debug( 

315 "Could not log into service. Attempting brand new login. %s", 

316 str(error), 

317 ) 

318 

319 if not login_successful: 

320 LOGGER.debug("Authenticating as %s", self.user["accountName"]) 

321 

322 data = dict(self.user) 

323 

324 data["rememberMe"] = True 

325 data["trustTokens"] = [] 

326 if self.session_data.get("trust_token"): 

327 data["trustTokens"] = [self.session_data.get("trust_token")] 

328 

329 headers = self._get_auth_headers() 

330 

331 if self.session_data.get("scnt"): 

332 headers["scnt"] = self.session_data.get("scnt") 

333 

334 if self.session_data.get("session_id"): 

335 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

336 

337 try: 

338 self.session.post( 

339 f"{self.auth_endpoint}/signin", 

340 params={"isRememberMeEnabled": "true"}, 

341 data=json.dumps(data), 

342 headers=headers, 

343 ) 

344 except ICloudPyAPIResponseException as error: 

345 msg = "Invalid email/password combination." 

346 raise ICloudPyFailedLoginException(msg, error) from error 

347 

348 self._authenticate_with_token() 

349 

350 self._webservices = self.data["webservices"] 

351 

352 LOGGER.debug("Authentication completed successfully") 

353 

354 def _authenticate_with_token(self): 

355 """Authenticate using session token.""" 

356 data = { 

357 "accountCountryCode": self.session_data.get("account_country"), 

358 "dsWebAuthToken": self.session_data.get("session_token"), 

359 "extended_login": True, 

360 "trustToken": self.session_data.get("trust_token", ""), 

361 } 

362 

363 try: 

364 req = self.session.post( 

365 f"{self.setup_endpoint}/accountLogin", data=json.dumps(data) 

366 ) 

367 self.data = req.json() 

368 except ICloudPyAPIResponseException as error: 

369 msg = "Invalid authentication token." 

370 raise ICloudPyFailedLoginException(msg, error) from error 

371 

372 def _authenticate_with_credentials_service(self, service): 

373 """Authenticate to a specific service using credentials.""" 

374 data = { 

375 "appName": service, 

376 "apple_id": self.user["accountName"], 

377 "password": self.user["password"], 

378 } 

379 

380 try: 

381 self.session.post( 

382 f"{self.setup_endpoint}/accountLogin", data=json.dumps(data) 

383 ) 

384 

385 self.data = self._validate_token() 

386 except ICloudPyAPIResponseException as error: 

387 msg = "Invalid email/password combination." 

388 raise ICloudPyFailedLoginException(msg, error) from error 

389 

390 def _validate_token(self): 

391 """Checks if the current access token is still valid.""" 

392 LOGGER.debug("Checking session token validity") 

393 try: 

394 req = self.session.post(f"{self.setup_endpoint}/validate", data="null") 

395 LOGGER.debug("Session token is still valid") 

396 return req.json() 

397 except ICloudPyAPIResponseException as err: 

398 LOGGER.debug("Invalid authentication token") 

399 raise err 

400 

401 def _get_auth_headers(self, overrides=None): 

402 headers = { 

403 "Accept": "*/*", 

404 "Content-Type": "application/json", 

405 "X-Apple-OAuth-Client-Id": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", 

406 "X-Apple-OAuth-Client-Type": "firstPartyAuth", 

407 "X-Apple-OAuth-Redirect-URI": "https://www.icloud.com", 

408 "X-Apple-OAuth-Require-Grant-Code": "true", 

409 "X-Apple-OAuth-Response-Mode": "web_message", 

410 "X-Apple-OAuth-Response-Type": "code", 

411 "X-Apple-OAuth-State": self.client_id, 

412 "X-Apple-Widget-Key": "d39ba9916b7251055b22c7f910e2ea796ee65e98b2ddecea8f5dde8d9d1a815d", 

413 } 

414 if overrides: 

415 headers.update(overrides) 

416 return headers 

417 

418 @property 

419 def cookiejar_path(self): 

420 """Get path for cookiejar file.""" 

421 return path.join( 

422 self._cookie_directory, 

423 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]), 

424 ) 

425 

426 @property 

427 def session_path(self): 

428 """Get path for session data file.""" 

429 return path.join( 

430 self._cookie_directory, 

431 "".join([c for c in self.user.get("accountName") if match(r"\w", c)]) 

432 + ".session", 

433 ) 

434 

435 @property 

436 def requires_2sa(self): 

437 """Returns True if two-step authentication is required.""" 

438 return self.data.get("dsInfo", {}).get("hsaVersion", 0) >= 1 and ( 

439 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session 

440 ) 

441 

442 @property 

443 def requires_2fa(self): 

444 """Returns True if two-factor authentication is required.""" 

445 return self.data["dsInfo"].get("hsaVersion", 0) == 2 and ( 

446 self.data.get("hsaChallengeRequired", False) or not self.is_trusted_session 

447 ) 

448 

449 @property 

450 def is_trusted_session(self): 

451 """Returns True if the session is trusted.""" 

452 return self.data.get("hsaTrustedBrowser", False) 

453 

454 @property 

455 def trusted_devices(self): 

456 """Returns devices trusted for two-step authentication.""" 

457 request = self.session.get( 

458 f"{self.setup_endpoint}/listDevices", params=self.params 

459 ) 

460 return request.json().get("devices") 

461 

462 def send_verification_code(self, device): 

463 """Requests that a verification code is sent to the given device.""" 

464 data = json.dumps(device) 

465 request = self.session.post( 

466 f"{self.setup_endpoint}/sendVerificationCode", 

467 params=self.params, 

468 data=data, 

469 ) 

470 return request.json().get("success", False) 

471 

472 def validate_verification_code(self, device, code): 

473 """Verifies a verification code received on a trusted device.""" 

474 device.update({"verificationCode": code, "trustBrowser": True}) 

475 data = json.dumps(device) 

476 

477 try: 

478 self.session.post( 

479 f"{self.setup_endpoint}/validateVerificationCode", 

480 params=self.params, 

481 data=data, 

482 ) 

483 except ICloudPyAPIResponseException as error: 

484 if error.code == -21669: 

485 # Wrong verification code 

486 return False 

487 raise 

488 

489 self.trust_session() 

490 

491 return not self.requires_2sa 

492 

493 def validate_2fa_code(self, code): 

494 """Verifies a verification code received via Apple's 2FA system (HSA2).""" 

495 data = {"securityCode": {"code": code}} 

496 

497 headers = self._get_auth_headers({"Accept": "application/json"}) 

498 

499 if self.session_data.get("scnt"): 

500 headers["scnt"] = self.session_data.get("scnt") 

501 

502 if self.session_data.get("session_id"): 

503 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

504 

505 try: 

506 self.session.post( 

507 f"{self.auth_endpoint}/verify/trusteddevice/securitycode", 

508 data=json.dumps(data), 

509 headers=headers, 

510 ) 

511 except ICloudPyAPIResponseException as error: 

512 if error.code == -21669: 

513 # Wrong verification code 

514 LOGGER.error("Code verification failed.") 

515 return False 

516 raise 

517 

518 LOGGER.debug("Code verification successful.") 

519 

520 self.trust_session() 

521 return not self.requires_2sa 

522 

523 def trust_session(self): 

524 """Request session trust to avoid user log in going forward.""" 

525 headers = self._get_auth_headers() 

526 

527 if self.session_data.get("scnt"): 

528 headers["scnt"] = self.session_data.get("scnt") 

529 

530 if self.session_data.get("session_id"): 

531 headers["X-Apple-ID-Session-Id"] = self.session_data.get("session_id") 

532 

533 try: 

534 self.session.get( 

535 f"{self.auth_endpoint}/2sv/trust", 

536 headers=headers, 

537 ) 

538 self._authenticate_with_token() 

539 return True 

540 except ICloudPyAPIResponseException: 

541 LOGGER.error("Session trust failed.") 

542 return False 

543 

544 def _get_webservice_url(self, ws_key): 

545 """Get webservice URL, raise an exception if not exists.""" 

546 if self._webservices.get(ws_key) is None: 

547 raise ICloudPyServiceNotActivatedException( 

548 "Webservice not available", ws_key 

549 ) 

550 return self._webservices[ws_key]["url"] 

551 

552 @property 

553 def devices(self): 

554 """Returns all devices.""" 

555 service_root = self._get_webservice_url("findme") 

556 return FindMyiPhoneServiceManager( 

557 service_root, self.session, self.params, self.with_family 

558 ) 

559 

560 @property 

561 def iphone(self): 

562 """Returns the iPhone.""" 

563 return self.devices[0] 

564 

565 @property 

566 def account(self): 

567 """Gets the 'Account' service.""" 

568 service_root = self._get_webservice_url("account") 

569 return AccountService(service_root, self.session, self.params) 

570 

571 @property 

572 def files(self): 

573 """Gets the 'File' service.""" 

574 if not self._files: 

575 service_root = self._get_webservice_url("ubiquity") 

576 self._files = UbiquityService(service_root, self.session, self.params) 

577 return self._files 

578 

579 @property 

580 def photos(self): 

581 """Gets the 'Photo' service.""" 

582 if not self._photos: 

583 service_root = self._get_webservice_url("ckdatabasews") 

584 self._photos = PhotosService(service_root, self.session, self.params) 

585 return self._photos 

586 

587 @property 

588 def calendar(self): 

589 """Gets the 'Calendar' service.""" 

590 service_root = self._get_webservice_url("calendar") 

591 return CalendarService(service_root, self.session, self.params) 

592 

593 @property 

594 def contacts(self): 

595 """Gets the 'Contacts' service.""" 

596 service_root = self._get_webservice_url("contacts") 

597 return ContactsService(service_root, self.session, self.params) 

598 

599 @property 

600 def reminders(self): 

601 """Gets the 'Reminders' service.""" 

602 service_root = self._get_webservice_url("reminders") 

603 return RemindersService(service_root, self.session, self.params) 

604 

605 @property 

606 def drive(self): 

607 """Gets the 'Drive' service.""" 

608 if not self._drive: 

609 self._drive = DriveService( 

610 service_root=self._get_webservice_url("drivews"), 

611 document_root=self._get_webservice_url("docws"), 

612 session=self.session, 

613 params=self.params, 

614 ) 

615 return self._drive 

616 

617 def __unicode__(self): 

618 return f"iCloud API: {self.user.get('accountName')}" 

619 

620 def __str__(self): 

621 as_unicode = self.__unicode__() 

622 if PY2: 

623 return as_unicode.encode("utf-8", "ignore") 

624 return as_unicode 

625 

626 def __repr__(self): 

627 return f"<{str(self)}>"