Coverage for icloudpy/services/drive.py: 66%

186 statements  

« prev     ^ index     » next       coverage.py v6.5.0, created at 2024-04-12 14:26 +0000

1"""Drive service.""" 

2import io 

3import json 

4import mimetypes 

5import os 

6import time 

7from datetime import datetime, timedelta 

8from re import search 

9 

10from requests import Response 

11from six import PY2 

12 

13 

14class DriveService: 

15 """The 'Drive' iCloud service.""" 

16 

17 def __init__(self, service_root, document_root, session, params): 

18 self._service_root = service_root 

19 self._document_root = document_root 

20 self.session = session 

21 self.params = dict(params) 

22 self._root = None 

23 

24 def _get_token_from_cookie(self): 

25 for cookie in self.session.cookies: 

26 if cookie.name == "X-APPLE-WEBAUTH-VALIDATE": 

27 match = search(r"\bt=([^:]+)", cookie.value) 

28 if match is None: 

29 raise Exception(f"Can't extract token from {cookie.value}") 

30 return {"token": match.group(1)} 

31 raise Exception("Token cookie not found") 

32 

33 def get_node_data(self, drivewsid): 

34 """Returns the node data.""" 

35 request = self.session.post( 

36 self._service_root + "/retrieveItemDetailsInFolders", 

37 params=self.params, 

38 data=json.dumps( 

39 [ 

40 { 

41 "drivewsid": drivewsid, 

42 "partialData": False, 

43 } 

44 ] 

45 ), 

46 ) 

47 if not request.ok: 

48 self.session.raise_error(request.status_code, request.reason) 

49 return request.json()[0] 

50 

51 def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs): 

52 """Returns iCloud Drive file.""" 

53 file_params = dict(self.params) 

54 file_params.update({"document_id": file_id}) 

55 response = self.session.get( 

56 self._document_root + f"/ws/{zone}/download/by_id", 

57 params=file_params, 

58 ) 

59 if not response.ok: 

60 self.session.raise_error(response.status_code, response.reason) 

61 package_token = response.json().get("package_token") 

62 data_token = response.json().get("data_token") 

63 if data_token and data_token.get("url"): 

64 return self.session.get(data_token["url"], params=self.params, **kwargs) 

65 elif package_token and package_token.get("url"): 

66 return self.session.get(package_token["url"], params=self.params, **kwargs) 

67 else: 

68 raise KeyError("'data_token' nor 'package_token' found in response.") 

69 

70 def get_app_data(self): 

71 """Returns the app library (previously ubiquity).""" 

72 request = self.session.get( 

73 self._service_root + "/retrieveAppLibraries", params=self.params 

74 ) 

75 if not request.ok: 

76 self.session.raise_error(request.status_code, request.reason) 

77 return request.json()["items"] 

78 

79 def get_app_node(self, app_id, folder="documents"): 

80 """Returns the node of the app (ubiquity)""" 

81 return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder)) 

82 

83 def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"): 

84 """Get the contentWS endpoint URL to add a new file.""" 

85 content_type = mimetypes.guess_type(file_object.name)[0] 

86 if content_type is None: 

87 content_type = "" 

88 

89 # Get filesize from file object 

90 orig_pos = file_object.tell() 

91 file_object.seek(0, os.SEEK_END) 

92 file_size = file_object.tell() 

93 file_object.seek(orig_pos, os.SEEK_SET) 

94 

95 file_params = self.params 

96 file_params.update(self._get_token_from_cookie()) 

97 

98 request = self.session.post( 

99 self._document_root + f"/ws/{zone}/upload/web", 

100 params=file_params, 

101 headers={"Content-Type": "text/plain"}, 

102 data=json.dumps( 

103 { 

104 "filename": file_object.name, 

105 "type": "FILE", 

106 "content_type": content_type, 

107 "size": file_size, 

108 } 

109 ), 

110 ) 

111 if not request.ok: 

112 self.session.raise_error(request.status_code, request.reason) 

113 return (request.json()[0]["document_id"], request.json()[0]["url"]) 

114 

115 def _update_contentws( 

116 self, folder_id, sf_info, document_id, file_object, zone="com.apple.CloudDocs" 

117 ): 

118 data = { 

119 "data": { 

120 "signature": sf_info["fileChecksum"], 

121 "wrapping_key": sf_info["wrappingKey"], 

122 "reference_signature": sf_info["referenceChecksum"], 

123 "size": sf_info["size"], 

124 }, 

125 "command": "add_file", 

126 "create_short_guid": True, 

127 "document_id": document_id, 

128 "path": { 

129 "starting_document_id": folder_id, 

130 "path": os.path.basename(file_object.name), 

131 }, 

132 "allow_conflict": True, 

133 "file_flags": { 

134 "is_writable": True, 

135 "is_executable": False, 

136 "is_hidden": False, 

137 }, 

138 "mtime": int(time.time() * 1000), 

139 "btime": int(time.time() * 1000), 

140 } 

141 

142 # Add the receipt if we have one. Will be absent for 0-sized files 

143 if sf_info.get("receipt"): 

144 data["data"].update({"receipt": sf_info["receipt"]}) 

145 

146 request = self.session.post( 

147 self._document_root + f"/ws/{zone}/update/documents", 

148 params=self.params, 

149 headers={"Content-Type": "text/plain"}, 

150 data=json.dumps(data), 

151 ) 

152 if not request.ok: 

153 self.session.raise_error(request.status_code, request.reason) 

154 return request.json() 

155 

156 def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"): 

157 """Send new file to iCloud Drive.""" 

158 document_id, content_url = self._get_upload_contentws_url(file_object, zone) 

159 

160 request = self.session.post(content_url, files={file_object.name: file_object}) 

161 if not request.ok: 

162 self.session.raise_error(request.status_code, request.reason) 

163 content_response = request.json()["singleFile"] 

164 

165 self._update_contentws( 

166 folder_id, content_response, document_id, file_object, zone 

167 ) 

168 

169 def create_folders(self, parent, name): 

170 """Creates a new iCloud Drive folder""" 

171 request = self.session.post( 

172 self._service_root + "/createFolders", 

173 params=self.params, 

174 headers={"Content-Type": "text/plain"}, 

175 data=json.dumps( 

176 { 

177 "destinationDrivewsId": parent, 

178 "folders": [ 

179 { 

180 "clientId": self.params["clientId"], 

181 "name": name, 

182 } 

183 ], 

184 } 

185 ), 

186 ) 

187 return request.json() 

188 

189 def rename_items(self, node_id, etag, name): 

190 """Renames an iCloud Drive node""" 

191 request = self.session.post( 

192 self._service_root + "/renameItems", 

193 params=self.params, 

194 data=json.dumps( 

195 { 

196 "items": [ 

197 { 

198 "drivewsid": node_id, 

199 "etag": etag, 

200 "name": name, 

201 } 

202 ], 

203 } 

204 ), 

205 ) 

206 return request.json() 

207 

208 def move_items_to_trash(self, node_id, etag): 

209 """Moves an iCloud Drive node to the trash bin""" 

210 request = self.session.post( 

211 self._service_root + "/moveItemsToTrash", 

212 params=self.params, 

213 data=json.dumps( 

214 { 

215 "items": [ 

216 { 

217 "drivewsid": node_id, 

218 "etag": etag, 

219 "clientId": self.params["clientId"], 

220 } 

221 ], 

222 } 

223 ), 

224 ) 

225 if not request.ok: 

226 self.session.raise_error(request.status_code, request.reason) 

227 return request.json() 

228 

229 @property 

230 def root(self): 

231 """Returns the root node.""" 

232 if not self._root: 

233 self._root = DriveNode( 

234 self, self.get_node_data("FOLDER::com.apple.CloudDocs::root") 

235 ) 

236 return self._root 

237 

238 def __getattr__(self, attr): 

239 return getattr(self.root, attr) 

240 

241 def __getitem__(self, key): 

242 return self.root[key] 

243 

244 

245class DriveNode: 

246 """Drive node.""" 

247 

248 def __init__(self, conn, data): 

249 self.data = data 

250 self.connection = conn 

251 self._children = None 

252 

253 @property 

254 def name(self): 

255 """Gets the node name.""" 

256 if "extension" in self.data: 

257 return f'{self.data["name"]}.{self.data["extension"]}' 

258 return self.data["name"] 

259 

260 @property 

261 def type(self): 

262 """Gets the node type.""" 

263 node_type = self.data.get("type") 

264 return node_type and node_type.lower() 

265 

266 def get_children(self): 

267 """Gets the node children.""" 

268 if not self._children: 

269 if "items" not in self.data: 

270 self.data.update(self.connection.get_node_data(self.data["drivewsid"])) 

271 if "items" not in self.data: 

272 raise KeyError(f'No items in folder, status: {self.data["status"]}') 

273 self._children = [ 

274 DriveNode(self.connection, item_data) 

275 for item_data in self.data["items"] 

276 ] 

277 return self._children 

278 

279 @property 

280 def size(self): 

281 """Gets the node size.""" 

282 size = self.data.get("size") # Folder does not have size 

283 if not size: 

284 return None 

285 return int(size) 

286 

287 @property 

288 def date_created(self): 

289 """Gets the node created date (in UTC).""" 

290 return _date_to_utc(self.data.get("dateCreated")) 

291 

292 @property 

293 def date_changed(self): 

294 """Gets the node changed date (in UTC).""" 

295 return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date 

296 

297 @property 

298 def date_modified(self): 

299 """Gets the node modified date (in UTC).""" 

300 return _date_to_utc(self.data.get("dateModified")) # Folder does not have date 

301 

302 @property 

303 def date_last_open(self): 

304 """Gets the node last open date (in UTC).""" 

305 return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date 

306 

307 def open(self, **kwargs): 

308 """Gets the node file.""" 

309 # iCloud returns 400 Bad Request for 0-byte files 

310 if self.data["size"] == 0: 

311 response = Response() 

312 response.raw = io.BytesIO() 

313 return response 

314 return self.connection.get_file( 

315 self.data["docwsid"], zone=self.data["zone"], **kwargs 

316 ) 

317 

318 def upload(self, file_object, **kwargs): 

319 """ "Upload a new file.""" 

320 return self.connection.send_file( 

321 self.data["docwsid"], file_object, zone=self.data["zone"], **kwargs 

322 ) 

323 

324 def dir(self): 

325 """Gets the node list of directories.""" 

326 if self.type == "file": 

327 return None 

328 return [child.name for child in self.get_children()] 

329 

330 def mkdir(self, folder): 

331 """Create a new directory directory.""" 

332 # remove cached entries information first so that it will be re-read on next get_children() 

333 self._children = None 

334 if "items" in self.data: 

335 self.data.pop("items") 

336 return self.connection.create_folders(self.data["drivewsid"], folder) 

337 

338 def rename(self, name): 

339 """Rename an iCloud Drive item.""" 

340 return self.connection.rename_items( 

341 self.data["drivewsid"], self.data["etag"], name 

342 ) 

343 

344 def delete(self): 

345 """Delete an iCloud Drive item.""" 

346 return self.connection.move_items_to_trash( 

347 self.data["drivewsid"], self.data["etag"] 

348 ) 

349 

350 def get(self, name): 

351 """Gets the node child.""" 

352 if self.type == "file": 

353 return None 

354 return [child for child in self.get_children() if child.name == name][0] 

355 

356 def __getitem__(self, key): 

357 try: 

358 return self.get(key) 

359 except IndexError as error: 

360 raise KeyError(f"No child named '{key}' exists") from error 

361 

362 def __unicode__(self): 

363 return f"{{type: {self.type}, name: {self.name}}}" 

364 

365 def __str__(self): 

366 as_unicode = self.__unicode__() 

367 if PY2: 

368 return as_unicode.encode("utf-8", "ignore") 

369 return as_unicode 

370 

371 def __repr__(self): 

372 return f"<{type(self).__name__}: {str(self)}>" 

373 

374 

375def _date_to_utc(date): 

376 if not date: 

377 return None 

378 # jump through hoops to return time in UTC rather than California time 

379 match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date) 

380 if not match: 

381 # Already in UTC 

382 return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ") 

383 base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S") 

384 diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3))) 

385 return base - diff