Coverage for icloudpy/services/drive.py: 66%
186 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-30 19:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-30 19:31 +0000
1"""Drive service."""
2import io
3import json
4import mimetypes
5import os
6import time
7from datetime import datetime, timedelta
8from re import search
10from requests import Response
11from six import PY2
14class DriveService:
15 """The 'Drive' iCloud service."""
17 def __init__(self, service_root, document_root, session, params):
18 self._service_root = service_root
19 self._document_root = document_root
20 self.session = session
21 self.params = dict(params)
22 self._root = None
24 def _get_token_from_cookie(self):
25 for cookie in self.session.cookies:
26 if cookie.name == "X-APPLE-WEBAUTH-VALIDATE":
27 match = search(r"\bt=([^:]+)", cookie.value)
28 if match is None:
29 raise Exception(f"Can't extract token from {cookie.value}")
30 return {"token": match.group(1)}
31 raise Exception("Token cookie not found")
33 def get_node_data(self, drivewsid):
34 """Returns the node data."""
35 request = self.session.post(
36 self._service_root + "/retrieveItemDetailsInFolders",
37 params=self.params,
38 data=json.dumps(
39 [
40 {
41 "drivewsid": drivewsid,
42 "partialData": False,
43 },
44 ],
45 ),
46 )
47 if not request.ok:
48 self.session.raise_error(request.status_code, request.reason)
49 return request.json()[0]
51 def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs):
52 """Returns iCloud Drive file."""
53 file_params = dict(self.params)
54 file_params.update({"document_id": file_id})
55 response = self.session.get(
56 self._document_root + f"/ws/{zone}/download/by_id",
57 params=file_params,
58 )
59 if not response.ok:
60 self.session.raise_error(response.status_code, response.reason)
61 package_token = response.json().get("package_token")
62 data_token = response.json().get("data_token")
63 if data_token and data_token.get("url"):
64 return self.session.get(data_token["url"], params=self.params, **kwargs)
65 elif package_token and package_token.get("url"):
66 return self.session.get(package_token["url"], params=self.params, **kwargs)
67 else:
68 raise KeyError("'data_token' nor 'package_token' found in response.")
70 def get_app_data(self):
71 """Returns the app library (previously ubiquity)."""
72 request = self.session.get(
73 self._service_root + "/retrieveAppLibraries", params=self.params,
74 )
75 if not request.ok:
76 self.session.raise_error(request.status_code, request.reason)
77 return request.json()["items"]
79 def get_app_node(self, app_id, folder="documents"):
80 """Returns the node of the app (ubiquity)"""
81 return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder))
83 def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"):
84 """Get the contentWS endpoint URL to add a new file."""
85 content_type = mimetypes.guess_type(file_object.name)[0]
86 if content_type is None:
87 content_type = ""
89 # Get filesize from file object
90 orig_pos = file_object.tell()
91 file_object.seek(0, os.SEEK_END)
92 file_size = file_object.tell()
93 file_object.seek(orig_pos, os.SEEK_SET)
95 file_params = self.params
96 file_params.update(self._get_token_from_cookie())
98 request = self.session.post(
99 self._document_root + f"/ws/{zone}/upload/web",
100 params=file_params,
101 headers={"Content-Type": "text/plain"},
102 data=json.dumps(
103 {
104 "filename": file_object.name,
105 "type": "FILE",
106 "content_type": content_type,
107 "size": file_size,
108 },
109 ),
110 )
111 if not request.ok:
112 self.session.raise_error(request.status_code, request.reason)
113 return (request.json()[0]["document_id"], request.json()[0]["url"])
115 def _update_contentws(
116 self, folder_id, sf_info, document_id, file_object, zone="com.apple.CloudDocs",
117 ):
118 data = {
119 "data": {
120 "signature": sf_info["fileChecksum"],
121 "wrapping_key": sf_info["wrappingKey"],
122 "reference_signature": sf_info["referenceChecksum"],
123 "size": sf_info["size"],
124 },
125 "command": "add_file",
126 "create_short_guid": True,
127 "document_id": document_id,
128 "path": {
129 "starting_document_id": folder_id,
130 "path": os.path.basename(file_object.name),
131 },
132 "allow_conflict": True,
133 "file_flags": {
134 "is_writable": True,
135 "is_executable": False,
136 "is_hidden": False,
137 },
138 "mtime": int(time.time() * 1000),
139 "btime": int(time.time() * 1000),
140 }
142 # Add the receipt if we have one. Will be absent for 0-sized files
143 if sf_info.get("receipt"):
144 data["data"].update({"receipt": sf_info["receipt"]})
146 request = self.session.post(
147 self._document_root + f"/ws/{zone}/update/documents",
148 params=self.params,
149 headers={"Content-Type": "text/plain"},
150 data=json.dumps(data),
151 )
152 if not request.ok:
153 self.session.raise_error(request.status_code, request.reason)
154 return request.json()
156 def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"):
157 """Send new file to iCloud Drive."""
158 document_id, content_url = self._get_upload_contentws_url(file_object, zone)
160 request = self.session.post(content_url, files={file_object.name: file_object})
161 if not request.ok:
162 self.session.raise_error(request.status_code, request.reason)
163 content_response = request.json()["singleFile"]
165 self._update_contentws(
166 folder_id, content_response, document_id, file_object, zone,
167 )
169 def create_folders(self, parent, name):
170 """Creates a new iCloud Drive folder"""
171 request = self.session.post(
172 self._service_root + "/createFolders",
173 params=self.params,
174 headers={"Content-Type": "text/plain"},
175 data=json.dumps(
176 {
177 "destinationDrivewsId": parent,
178 "folders": [
179 {
180 "clientId": self.params["clientId"],
181 "name": name,
182 },
183 ],
184 },
185 ),
186 )
187 return request.json()
189 def rename_items(self, node_id, etag, name):
190 """Renames an iCloud Drive node"""
191 request = self.session.post(
192 self._service_root + "/renameItems",
193 params=self.params,
194 data=json.dumps(
195 {
196 "items": [
197 {
198 "drivewsid": node_id,
199 "etag": etag,
200 "name": name,
201 },
202 ],
203 },
204 ),
205 )
206 return request.json()
208 def move_items_to_trash(self, node_id, etag):
209 """Moves an iCloud Drive node to the trash bin"""
210 request = self.session.post(
211 self._service_root + "/moveItemsToTrash",
212 params=self.params,
213 data=json.dumps(
214 {
215 "items": [
216 {
217 "drivewsid": node_id,
218 "etag": etag,
219 "clientId": self.params["clientId"],
220 },
221 ],
222 },
223 ),
224 )
225 if not request.ok:
226 self.session.raise_error(request.status_code, request.reason)
227 return request.json()
229 @property
230 def root(self):
231 """Returns the root node."""
232 if not self._root:
233 self._root = DriveNode(
234 self, self.get_node_data("FOLDER::com.apple.CloudDocs::root"),
235 )
236 return self._root
238 def __getattr__(self, attr):
239 return getattr(self.root, attr)
241 def __getitem__(self, key):
242 return self.root[key]
245class DriveNode:
246 """Drive node."""
248 def __init__(self, conn, data):
249 self.data = data
250 self.connection = conn
251 self._children = None
253 @property
254 def name(self):
255 """Gets the node name."""
256 if "extension" in self.data:
257 return f'{self.data["name"]}.{self.data["extension"]}'
258 return self.data["name"]
260 @property
261 def type(self):
262 """Gets the node type."""
263 node_type = self.data.get("type")
264 return node_type and node_type.lower()
266 def get_children(self):
267 """Gets the node children."""
268 if not self._children:
269 if "items" not in self.data:
270 self.data.update(self.connection.get_node_data(self.data["drivewsid"]))
271 if "items" not in self.data:
272 raise KeyError(f'No items in folder, status: {self.data["status"]}')
273 self._children = [
274 DriveNode(self.connection, item_data)
275 for item_data in self.data["items"]
276 ]
277 return self._children
279 @property
280 def size(self):
281 """Gets the node size."""
282 size = self.data.get("size") # Folder does not have size
283 if not size:
284 return None
285 return int(size)
287 @property
288 def date_created(self):
289 """Gets the node created date (in UTC)."""
290 return _date_to_utc(self.data.get("dateCreated"))
292 @property
293 def date_changed(self):
294 """Gets the node changed date (in UTC)."""
295 return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date
297 @property
298 def date_modified(self):
299 """Gets the node modified date (in UTC)."""
300 return _date_to_utc(self.data.get("dateModified")) # Folder does not have date
302 @property
303 def date_last_open(self):
304 """Gets the node last open date (in UTC)."""
305 return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date
307 def open(self, **kwargs):
308 """Gets the node file."""
309 # iCloud returns 400 Bad Request for 0-byte files
310 if self.data["size"] == 0:
311 response = Response()
312 response.raw = io.BytesIO()
313 return response
314 return self.connection.get_file(
315 self.data["docwsid"], zone=self.data["zone"], **kwargs,
316 )
318 def upload(self, file_object, **kwargs):
319 """ "Upload a new file."""
320 return self.connection.send_file(
321 self.data["docwsid"], file_object, zone=self.data["zone"], **kwargs,
322 )
324 def dir(self):
325 """Gets the node list of directories."""
326 if self.type == "file":
327 return None
328 return [child.name for child in self.get_children()]
330 def mkdir(self, folder):
331 """Create a new directory directory."""
332 # remove cached entries information first so that it will be re-read on next get_children()
333 self._children = None
334 if "items" in self.data:
335 self.data.pop("items")
336 return self.connection.create_folders(self.data["drivewsid"], folder)
338 def rename(self, name):
339 """Rename an iCloud Drive item."""
340 return self.connection.rename_items(
341 self.data["drivewsid"], self.data["etag"], name,
342 )
344 def delete(self):
345 """Delete an iCloud Drive item."""
346 return self.connection.move_items_to_trash(
347 self.data["drivewsid"], self.data["etag"],
348 )
350 def get(self, name):
351 """Gets the node child."""
352 if self.type == "file":
353 return None
354 return [child for child in self.get_children() if child.name == name][0]
356 def __getitem__(self, key):
357 try:
358 return self.get(key)
359 except IndexError as error:
360 raise KeyError(f"No child named '{key}' exists") from error
362 def __unicode__(self):
363 return f"{{type: {self.type}, name: {self.name}}}"
365 def __str__(self):
366 as_unicode = self.__unicode__()
367 if PY2:
368 return as_unicode.encode("utf-8", "ignore")
369 return as_unicode
371 def __repr__(self):
372 return f"<{type(self).__name__}: {str(self)}>"
375def _date_to_utc(date):
376 if not date:
377 return None
378 # jump through hoops to return time in UTC rather than California time
379 match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date)
380 if not match:
381 # Already in UTC
382 return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
383 base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S")
384 diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3)))
385 return base - diff