Coverage for icloudpy/services/drive.py: 100%
182 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-02 05:49 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-02 05:49 +0000
1"""Drive service."""
3import io
4import json
5import mimetypes
6import os
7import time
8from datetime import datetime, timedelta
9from re import search
11from requests import Response
14class DriveService:
15 """The 'Drive' iCloud service."""
17 def __init__(self, service_root, document_root, session, params):
18 self._service_root = service_root
19 self._document_root = document_root
20 self.session = session
21 self.params = dict(params)
22 self._root = None
24 def _get_token_from_cookie(self):
25 for cookie in self.session.cookies:
26 if cookie.name == "X-APPLE-WEBAUTH-VALIDATE":
27 match = search(r"\bt=([^:]+)", cookie.value)
28 if match is None:
29 raise Exception(f"Can't extract token from {cookie.value}")
30 return {"token": match.group(1)}
31 raise Exception("Token cookie not found")
33 def get_node_data(self, drivewsid):
34 """Returns the node data."""
35 request = self.session.post(
36 self._service_root + "/retrieveItemDetailsInFolders",
37 params=self.params,
38 data=json.dumps(
39 [
40 {
41 "drivewsid": drivewsid,
42 "partialData": False,
43 },
44 ],
45 ),
46 )
47 if not request.ok:
48 self.session.raise_error(request.status_code, request.reason)
49 return request.json()[0]
51 def get_file(self, file_id, zone="com.apple.CloudDocs", **kwargs):
52 """Returns iCloud Drive file."""
53 file_params = dict(self.params)
54 file_params.update({"document_id": file_id})
55 response = self.session.get(
56 self._document_root + f"/ws/{zone}/download/by_id",
57 params=file_params,
58 )
59 if not response.ok:
60 self.session.raise_error(response.status_code, response.reason)
61 package_token = response.json().get("package_token")
62 data_token = response.json().get("data_token")
63 if data_token and data_token.get("url"):
64 return self.session.get(data_token["url"], params=self.params, **kwargs)
65 elif package_token and package_token.get("url"):
66 return self.session.get(package_token["url"], params=self.params, **kwargs)
67 else:
68 raise KeyError("'data_token' nor 'package_token' found in response.")
70 def get_app_data(self):
71 """Returns the app library."""
72 request = self.session.get(
73 self._service_root + "/retrieveAppLibraries",
74 params=self.params,
75 )
76 if not request.ok:
77 self.session.raise_error(request.status_code, request.reason)
78 return request.json()["items"]
80 def get_app_node(self, app_id, folder="documents"):
81 """Returns the node of the app."""
82 return DriveNode(self, self.get_node_data("FOLDER::" + app_id + "::" + folder))
84 def _get_upload_contentws_url(self, file_object, zone="com.apple.CloudDocs"):
85 """Get the contentWS endpoint URL to add a new file."""
86 content_type = mimetypes.guess_type(file_object.name)[0]
87 if content_type is None:
88 content_type = ""
90 # Get filesize from file object
91 orig_pos = file_object.tell()
92 file_object.seek(0, os.SEEK_END)
93 file_size = file_object.tell()
94 file_object.seek(orig_pos, os.SEEK_SET)
96 file_params = self.params
97 file_params.update(self._get_token_from_cookie())
99 request = self.session.post(
100 self._document_root + f"/ws/{zone}/upload/web",
101 params=file_params,
102 headers={"Content-Type": "text/plain"},
103 data=json.dumps(
104 {
105 "filename": file_object.name,
106 "type": "FILE",
107 "content_type": content_type,
108 "size": file_size,
109 },
110 ),
111 )
112 if not request.ok:
113 self.session.raise_error(request.status_code, request.reason)
114 return (request.json()[0]["document_id"], request.json()[0]["url"])
116 def _update_contentws(
117 self,
118 folder_id,
119 sf_info,
120 document_id,
121 file_object,
122 zone="com.apple.CloudDocs",
123 ):
124 data = {
125 "data": {
126 "signature": sf_info["fileChecksum"],
127 "wrapping_key": sf_info["wrappingKey"],
128 "reference_signature": sf_info["referenceChecksum"],
129 "size": sf_info["size"],
130 },
131 "command": "add_file",
132 "create_short_guid": True,
133 "document_id": document_id,
134 "path": {
135 "starting_document_id": folder_id,
136 "path": os.path.basename(file_object.name),
137 },
138 "allow_conflict": True,
139 "file_flags": {
140 "is_writable": True,
141 "is_executable": False,
142 "is_hidden": False,
143 },
144 "mtime": int(time.time() * 1000),
145 "btime": int(time.time() * 1000),
146 }
148 # Add the receipt if we have one. Will be absent for 0-sized files
149 if sf_info.get("receipt"):
150 data["data"].update({"receipt": sf_info["receipt"]})
152 request = self.session.post(
153 self._document_root + f"/ws/{zone}/update/documents",
154 params=self.params,
155 headers={"Content-Type": "text/plain"},
156 data=json.dumps(data),
157 )
158 if not request.ok:
159 self.session.raise_error(request.status_code, request.reason)
160 return request.json()
162 def send_file(self, folder_id, file_object, zone="com.apple.CloudDocs"):
163 """Send new file to iCloud Drive."""
164 document_id, content_url = self._get_upload_contentws_url(file_object, zone)
166 request = self.session.post(content_url, files={file_object.name: file_object})
167 if not request.ok:
168 self.session.raise_error(request.status_code, request.reason)
169 content_response = request.json()["singleFile"]
171 self._update_contentws(
172 folder_id,
173 content_response,
174 document_id,
175 file_object,
176 zone,
177 )
179 def create_folders(self, parent, name):
180 """Creates a new iCloud Drive folder"""
181 request = self.session.post(
182 self._service_root + "/createFolders",
183 params=self.params,
184 headers={"Content-Type": "text/plain"},
185 data=json.dumps(
186 {
187 "destinationDrivewsId": parent,
188 "folders": [
189 {
190 "clientId": self.params["clientId"],
191 "name": name,
192 },
193 ],
194 },
195 ),
196 )
197 return request.json()
199 def rename_items(self, node_id, etag, name):
200 """Renames an iCloud Drive node"""
201 request = self.session.post(
202 self._service_root + "/renameItems",
203 params=self.params,
204 data=json.dumps(
205 {
206 "items": [
207 {
208 "drivewsid": node_id,
209 "etag": etag,
210 "name": name,
211 },
212 ],
213 },
214 ),
215 )
216 return request.json()
218 def move_items_to_trash(self, node_id, etag):
219 """Moves an iCloud Drive node to the trash bin"""
220 request = self.session.post(
221 self._service_root + "/moveItemsToTrash",
222 params=self.params,
223 data=json.dumps(
224 {
225 "items": [
226 {
227 "drivewsid": node_id,
228 "etag": etag,
229 "clientId": self.params["clientId"],
230 },
231 ],
232 },
233 ),
234 )
235 if not request.ok:
236 self.session.raise_error(request.status_code, request.reason)
237 return request.json()
239 @property
240 def root(self):
241 """Returns the root node."""
242 if not self._root:
243 self._root = DriveNode(
244 self,
245 self.get_node_data("FOLDER::com.apple.CloudDocs::root"),
246 )
247 return self._root
249 def __getattr__(self, attr):
250 return getattr(self.root, attr)
252 def __getitem__(self, key):
253 return self.root[key]
256class DriveNode:
257 """Drive node."""
259 def __init__(self, conn, data):
260 self.data = data
261 self.connection = conn
262 self._children = None
264 @property
265 def name(self):
266 """Gets the node name."""
267 if "extension" in self.data:
268 return f"{self.data['name']}.{self.data['extension']}"
269 return self.data["name"]
271 @property
272 def type(self):
273 """Gets the node type."""
274 node_type = self.data.get("type")
275 return node_type and node_type.lower()
277 def get_children(self):
278 """Gets the node children."""
279 if not self._children:
280 if "items" not in self.data:
281 self.data.update(self.connection.get_node_data(self.data["drivewsid"]))
282 if "items" not in self.data:
283 raise KeyError(f"No items in folder, status: {self.data['status']}")
284 self._children = [DriveNode(self.connection, item_data) for item_data in self.data["items"]]
285 return self._children
287 @property
288 def size(self):
289 """Gets the node size."""
290 size = self.data.get("size") # Folder does not have size
291 if not size:
292 return None
293 return int(size)
295 @property
296 def date_created(self):
297 """Gets the node created date (in UTC)."""
298 return _date_to_utc(self.data.get("dateCreated"))
300 @property
301 def date_changed(self):
302 """Gets the node changed date (in UTC)."""
303 return _date_to_utc(self.data.get("dateChanged")) # Folder does not have date
305 @property
306 def date_modified(self):
307 """Gets the node modified date (in UTC)."""
308 return _date_to_utc(self.data.get("dateModified")) # Folder does not have date
310 @property
311 def date_last_open(self):
312 """Gets the node last open date (in UTC)."""
313 return _date_to_utc(self.data.get("lastOpenTime")) # Folder does not have date
315 def open(self, **kwargs):
316 """Gets the node file."""
317 # iCloud returns 400 Bad Request for 0-byte files
318 if self.data["size"] == 0:
319 response = Response()
320 response.raw = io.BytesIO()
321 return response
322 return self.connection.get_file(
323 self.data["docwsid"],
324 zone=self.data["zone"],
325 **kwargs,
326 )
328 def upload(self, file_object, **kwargs):
329 """ "Upload a new file."""
330 return self.connection.send_file(
331 self.data["docwsid"],
332 file_object,
333 zone=self.data["zone"],
334 **kwargs,
335 )
337 def dir(self):
338 """Gets the node list of directories."""
339 if self.type == "file":
340 return None
341 return [child.name for child in self.get_children()]
343 def mkdir(self, folder):
344 """Create a new directory directory."""
345 # remove cached entries information first so that it will be re-read on next get_children()
346 self._children = None
347 if "items" in self.data:
348 self.data.pop("items")
349 return self.connection.create_folders(self.data["drivewsid"], folder)
351 def rename(self, name):
352 """Rename an iCloud Drive item."""
353 return self.connection.rename_items(
354 self.data["drivewsid"],
355 self.data["etag"],
356 name,
357 )
359 def delete(self):
360 """Delete an iCloud Drive item."""
361 return self.connection.move_items_to_trash(
362 self.data["drivewsid"],
363 self.data["etag"],
364 )
366 def get(self, name):
367 """Gets the node child."""
368 if self.type == "file":
369 return None
370 return [child for child in self.get_children() if child.name == name][0]
372 def __getitem__(self, key):
373 try:
374 return self.get(key)
375 except IndexError as error:
376 raise KeyError(f"No child named '{key}' exists") from error
378 def __unicode__(self):
379 return f"{{type: {self.type}, name: {self.name}}}"
381 def __str__(self):
382 return self.__unicode__()
384 def __repr__(self):
385 return f"<{type(self).__name__}: {str(self)}>"
388def _date_to_utc(date):
389 if not date:
390 return None
391 # jump through hoops to return time in UTC rather than California time
392 match = search(r"^(.+?)([\+\-]\d+):(\d\d)$", date)
393 if not match:
394 # Already in UTC
395 return datetime.strptime(date, "%Y-%m-%dT%H:%M:%SZ")
396 base = datetime.strptime(match.group(1), "%Y-%m-%dT%H:%M:%S")
397 diff = timedelta(hours=int(match.group(2)), minutes=int(match.group(3)))
398 return base - diff