Coverage for icloudpy/services/drive.py: 100%

182 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-11-02 05:49 +0000

1"""Drive service.""" 

2 

3import io 

4import json 

5import mimetypes 

6import os 

7import time 

8from datetime import datetime, timedelta 

9from re import search 

10 

11from requests import Response 

12 

13 

14class DriveService: 

15 """The 'Drive' iCloud service.""" 

16 

17 def __init__(self, service_root, document_root, session, params): 

18 self._service_root = service_root 

19 self._document_root = document_root 

20 self.session = session 

21 self.params = dict(params) 

22 self._root = None 

23 

24 def _get_token_from_cookie(self): 

25 for cookie in self.session.cookies: 

26 if cookie.name == "X-APPLE-WEBAUTH-VALIDATE": 

27 match = search(r"\bt=([^:]+)", cookie.value) 

28 if match is None: 

29 raise Exception(f"Can't extract token from {cookie.value}") 

30 return {"token": match.group(1)} 

31 raise Exception("Token cookie not found") 

32 

33 def get_node_data(self, drivewsid): 

34 """Returns the node data.""" 

35 request = self.session.post( 

36 self._service_root + "/retrieveItemDetailsInFolders", 

37 params=self.params, 

38 data=json.dumps( 

39 [ 

40 { 

41 "drivewsid": drivewsid, 

42 "partialData": False, 

43 }, 

44 ], 

45 ), 

46 ) 

47 if not request.ok: 

48 self.session.raise_error(request.status_code, request.reason) 

49 return request.json()[0] 

50 

51 def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs): 

52 """Returns iCloud Drive file.""" 

53 file_params = dict(self.params) 

54 file_params.update({"document_id": file_id}) 

55 response = self.session.get( 

56 self._document_root + f"/ws/{zone}/download/by_id", 

57 params=file_params, 

58 ) 

59 if not response.ok: 

60 self.session.raise_error(response.status_code, response.reason) 

61 package_token = response.json().get("package_token") 

62 data_token = response.json().get("data_token") 

63 if data_token and data_token.get("url"): 

64 return self.session.get(data_token["url"], params=self.params, **kwargs) 

65 elif package_token and package_token.get("url"): 

66 return self.session.get(package_token["url"], params=self.params, **kwargs) 

67 else: 

68 raise KeyError("'data_token' nor 'package_token' found in response.") 

69 

70 def get_app_data(self): 

71 """Returns the app library.""" 

72 request = self.session.get( 

73 self._service_root + "/retrieveAppLibraries", 

74 params=self.params, 

75 ) 

76 if not request.ok: 

77 self.session.raise_error(request.status_code, request.reason) 

78 return request.json()["items"] 

79 

80 def get_app_node(self, app_id, folder="documents"): 

81 """Returns the node of the app.""" 

82 return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder)) 

83 

84 def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"): 

85 """Get the contentWS endpoint URL to add a new file.""" 

86 content_type = mimetypes.guess_type(file_object.name)[0] 

87 if content_type is None: 

88 content_type = "" 

89 

90 # Get filesize from file object 

91 orig_pos = file_object.tell() 

92 file_object.seek(0, os.SEEK_END) 

93 file_size = file_object.tell() 

94 file_object.seek(orig_pos, os.SEEK_SET) 

95 

96 file_params = self.params 

97 file_params.update(self._get_token_from_cookie()) 

98 

99 request = self.session.post( 

100 self._document_root + f"/ws/{zone}/upload/web", 

101 params=file_params, 

102 headers={"Content-Type": "text/plain"}, 

103 data=json.dumps( 

104 { 

105 "filename": file_object.name, 

106 "type": "FILE", 

107 "content_type": content_type, 

108 "size": file_size, 

109 }, 

110 ), 

111 ) 

112 if not request.ok: 

113 self.session.raise_error(request.status_code, request.reason) 

114 return (request.json()[0]["document_id"], request.json()[0]["url"]) 

115 

116 def _update_contentws( 

117 self, 

118 folder_id, 

119 sf_info, 

120 document_id, 

121 file_object, 

122 zone="com.apple.CloudDocs", 

123 ): 

124 data = { 

125 "data": { 

126 "signature": sf_info["fileChecksum"], 

127 "wrapping_key": sf_info["wrappingKey"], 

128 "reference_signature": sf_info["referenceChecksum"], 

129 "size": sf_info["size"], 

130 }, 

131 "command": "add_file", 

132 "create_short_guid": True, 

133 "document_id": document_id, 

134 "path": { 

135 "starting_document_id": folder_id, 

136 "path": os.path.basename(file_object.name), 

137 }, 

138 "allow_conflict": True, 

139 "file_flags": { 

140 "is_writable": True, 

141 "is_executable": False, 

142 "is_hidden": False, 

143 }, 

144 "mtime": int(time.time() * 1000), 

145 "btime": int(time.time() * 1000), 

146 } 

147 

148 # Add the receipt if we have one. Will be absent for 0-sized files 

149 if sf_info.get("receipt"): 

150 data["data"].update({"receipt": sf_info["receipt"]}) 

151 

152 request = self.session.post( 

153 self._document_root + f"/ws/{zone}/update/documents", 

154 params=self.params, 

155 headers={"Content-Type": "text/plain"}, 

156 data=json.dumps(data), 

157 ) 

158 if not request.ok: 

159 self.session.raise_error(request.status_code, request.reason) 

160 return request.json() 

161 

162 def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"): 

163 """Send new file to iCloud Drive.""" 

164 document_id, content_url = self._get_upload_contentws_url(file_object, zone) 

165 

166 request = self.session.post(content_url, files={file_object.name: file_object}) 

167 if not request.ok: 

168 self.session.raise_error(request.status_code, request.reason) 

169 content_response = request.json()["singleFile"] 

170 

171 self._update_contentws( 

172 folder_id, 

173 content_response, 

174 document_id, 

175 file_object, 

176 zone, 

177 ) 

178 

179 def create_folders(self, parent, name): 

180 """Creates a new iCloud Drive folder""" 

181 request = self.session.post( 

182 self._service_root + "/createFolders", 

183 params=self.params, 

184 headers={"Content-Type": "text/plain"}, 

185 data=json.dumps( 

186 { 

187 "destinationDrivewsId": parent, 

188 "folders": [ 

189 { 

190 "clientId": self.params["clientId"], 

191 "name": name, 

192 }, 

193 ], 

194 }, 

195 ), 

196 ) 

197 return request.json() 

198 

199 def rename_items(self, node_id, etag, name): 

200 """Renames an iCloud Drive node""" 

201 request = self.session.post( 

202 self._service_root + "/renameItems", 

203 params=self.params, 

204 data=json.dumps( 

205 { 

206 "items": [ 

207 { 

208 "drivewsid": node_id, 

209 "etag": etag, 

210 "name": name, 

211 }, 

212 ], 

213 }, 

214 ), 

215 ) 

216 return request.json() 

217 

218 def move_items_to_trash(self, node_id, etag): 

219 """Moves an iCloud Drive node to the trash bin""" 

220 request = self.session.post( 

221 self._service_root + "/moveItemsToTrash", 

222 params=self.params, 

223 data=json.dumps( 

224 { 

225 "items": [ 

226 { 

227 "drivewsid": node_id, 

228 "etag": etag, 

229 "clientId": self.params["clientId"], 

230 }, 

231 ], 

232 }, 

233 ), 

234 ) 

235 if not request.ok: 

236 self.session.raise_error(request.status_code, request.reason) 

237 return request.json() 

238 

239 @property 

240 def root(self): 

241 """Returns the root node.""" 

242 if not self._root: 

243 self._root = DriveNode( 

244 self, 

245 self.get_node_data("FOLDER::com.apple.CloudDocs::root"), 

246 ) 

247 return self._root 

248 

249 def __getattr__(self, attr): 

250 return getattr(self.root, attr) 

251 

252 def __getitem__(self, key): 

253 return self.root[key] 

254 

255 

256class DriveNode: 

257 """Drive node.""" 

258 

259 def __init__(self, conn, data): 

260 self.data = data 

261 self.connection = conn 

262 self._children = None 

263 

264 @property 

265 def name(self): 

266 """Gets the node name.""" 

267 if "extension" in self.data: 

268 return f"{self.data['name']}.{self.data['extension']}" 

269 return self.data["name"] 

270 

271 @property 

272 def type(self): 

273 """Gets the node type.""" 

274 node_type = self.data.get("type") 

275 return node_type and node_type.lower() 

276 

277 def get_children(self): 

278 """Gets the node children.""" 

279 if not self._children: 

280 if "items" not in self.data: 

281 self.data.update(self.connection.get_node_data(self.data["drivewsid"])) 

282 if "items" not in self.data: 

283 raise KeyError(f"No items in folder, status: {self.data['status']}") 

284 self._children = [DriveNode(self.connection, item_data) for item_data in self.data["items"]] 

285 return self._children 

286 

287 @property 

288 def size(self): 

289 """Gets the node size.""" 

290 size = self.data.get("size") # Folder does not have size 

291 if not size: 

292 return None 

293 return int(size) 

294 

295 @property 

296 def date_created(self): 

297 """Gets the node created date (in UTC).""" 

298 return _date_to_utc(self.data.get("dateCreated")) 

299 

300 @property 

301 def date_changed(self): 

302 """Gets the node changed date (in UTC).""" 

303 return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date 

304 

305 @property 

306 def date_modified(self): 

307 """Gets the node modified date (in UTC).""" 

308 return _date_to_utc(self.data.get("dateModified")) # Folder does not have date 

309 

310 @property 

311 def date_last_open(self): 

312 """Gets the node last open date (in UTC).""" 

313 return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date 

314 

315 def open(self, **kwargs): 

316 """Gets the node file.""" 

317 # iCloud returns 400 Bad Request for 0-byte files 

318 if self.data["size"] == 0: 

319 response = Response() 

320 response.raw = io.BytesIO() 

321 return response 

322 return self.connection.get_file( 

323 self.data["docwsid"], 

324 zone=self.data["zone"], 

325 **kwargs, 

326 ) 

327 

328 def upload(self, file_object, **kwargs): 

329 """ "Upload a new file.""" 

330 return self.connection.send_file( 

331 self.data["docwsid"], 

332 file_object, 

333 zone=self.data["zone"], 

334 **kwargs, 

335 ) 

336 

337 def dir(self): 

338 """Gets the node list of directories.""" 

339 if self.type == "file": 

340 return None 

341 return [child.name for child in self.get_children()] 

342 

343 def mkdir(self, folder): 

344 """Create a new directory directory.""" 

345 # remove cached entries information first so that it will be re-read on next get_children() 

346 self._children = None 

347 if "items" in self.data: 

348 self.data.pop("items") 

349 return self.connection.create_folders(self.data["drivewsid"], folder) 

350 

351 def rename(self, name): 

352 """Rename an iCloud Drive item.""" 

353 return self.connection.rename_items( 

354 self.data["drivewsid"], 

355 self.data["etag"], 

356 name, 

357 ) 

358 

359 def delete(self): 

360 """Delete an iCloud Drive item.""" 

361 return self.connection.move_items_to_trash( 

362 self.data["drivewsid"], 

363 self.data["etag"], 

364 ) 

365 

366 def get(self, name): 

367 """Gets the node child.""" 

368 if self.type == "file": 

369 return None 

370 return [child for child in self.get_children() if child.name == name][0] 

371 

372 def __getitem__(self, key): 

373 try: 

374 return self.get(key) 

375 except IndexError as error: 

376 raise KeyError(f"No child named '{key}' exists") from error 

377 

378 def __unicode__(self): 

379 return f"{{type: {self.type}, name: {self.name}}}" 

380 

381 def __str__(self): 

382 return self.__unicode__() 

383 

384 def __repr__(self): 

385 return f"<{type(self).__name__}: {str(self)}>" 

386 

387 

388def _date_to_utc(date): 

389 if not date: 

390 return None 

391 # jump through hoops to return time in UTC rather than California time 

392 match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date) 

393 if not match: 

394 # Already in UTC 

395 return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") 

396 base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S") 

397 diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3))) 

398 return base - diff