Coverage for icloudpy/services/photos.py: 25%
235 statements
« prev ^ index » next coverage.py v6.5.0, created at 2024-04-12 14:26 +0000
« prev ^ index » next coverage.py v6.5.0, created at 2024-04-12 14:26 +0000
1"""Photo service."""
2import base64
3import json
4import logging
5from datetime import datetime
7# fmt: off
8from urllib.parse import urlencode # pylint: disable=bad-option-value,relative-import
10from pytz import UTC
11from six import PY2
13# fmt: on
14from icloudpy.exceptions import ICloudPyServiceNotActivatedException
16LOGGER = logging.getLogger(__name__)
19class PhotoLibrary:
20 """Represents a library in the user's photos.
22 This provides access to all the albums as well as the photos.
23 """
25 SMART_FOLDERS = {
26 "All Photos": {
27 "obj_type": "CPLAssetByAddedDate",
28 "list_type": "CPLAssetAndMasterByAddedDate",
29 "direction": "ASCENDING",
30 "query_filter": None,
31 },
32 "Time-lapse": {
33 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Timelapse",
34 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
35 "direction": "ASCENDING",
36 "query_filter": [
37 {
38 "fieldName": "smartAlbum",
39 "comparator": "EQUALS",
40 "fieldValue": {"type": "STRING", "value": "TIMELAPSE"},
41 }
42 ],
43 },
44 "Videos": {
45 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Video",
46 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
47 "direction": "ASCENDING",
48 "query_filter": [
49 {
50 "fieldName": "smartAlbum",
51 "comparator": "EQUALS",
52 "fieldValue": {"type": "STRING", "value": "VIDEO"},
53 }
54 ],
55 },
56 "Slo-mo": {
57 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Slomo",
58 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
59 "direction": "ASCENDING",
60 "query_filter": [
61 {
62 "fieldName": "smartAlbum",
63 "comparator": "EQUALS",
64 "fieldValue": {"type": "STRING", "value": "SLOMO"},
65 }
66 ],
67 },
68 "Bursts": {
69 "obj_type": "CPLAssetBurstStackAssetByAssetDate",
70 "list_type": "CPLBurstStackAssetAndMasterByAssetDate",
71 "direction": "ASCENDING",
72 "query_filter": None,
73 },
74 "Favorites": {
75 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Favorite",
76 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
77 "direction": "ASCENDING",
78 "query_filter": [
79 {
80 "fieldName": "smartAlbum",
81 "comparator": "EQUALS",
82 "fieldValue": {"type": "STRING", "value": "FAVORITE"},
83 }
84 ],
85 },
86 "Panoramas": {
87 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Panorama",
88 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
89 "direction": "ASCENDING",
90 "query_filter": [
91 {
92 "fieldName": "smartAlbum",
93 "comparator": "EQUALS",
94 "fieldValue": {"type": "STRING", "value": "PANORAMA"},
95 }
96 ],
97 },
98 "Screenshots": {
99 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Screenshot",
100 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
101 "direction": "ASCENDING",
102 "query_filter": [
103 {
104 "fieldName": "smartAlbum",
105 "comparator": "EQUALS",
106 "fieldValue": {"type": "STRING", "value": "SCREENSHOT"},
107 }
108 ],
109 },
110 "Live": {
111 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Live",
112 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
113 "direction": "ASCENDING",
114 "query_filter": [
115 {
116 "fieldName": "smartAlbum",
117 "comparator": "EQUALS",
118 "fieldValue": {"type": "STRING", "value": "LIVE"},
119 }
120 ],
121 },
122 "Recently Deleted": {
123 "obj_type": "CPLAssetDeletedByExpungedDate",
124 "list_type": "CPLAssetAndMasterDeletedByExpungedDate",
125 "direction": "ASCENDING",
126 "query_filter": None,
127 },
128 "Hidden": {
129 "obj_type": "CPLAssetHiddenByAssetDate",
130 "list_type": "CPLAssetAndMasterHiddenByAssetDate",
131 "direction": "ASCENDING",
132 "query_filter": None,
133 },
134 }
136 def __init__(self, service, zone_id):
137 self.service = service
138 self.zone_id = zone_id
140 self._albums = None
142 url = f"{self.service._service_endpoint}/records/query?{urlencode(self.service.params)}"
143 json_data = json.dumps(
144 {
145 "query": {"recordType": "CheckIndexingState"},
146 "zoneID": self.zone_id,
147 }
148 )
150 request = self.service.session.post(
151 url, data=json_data, headers={"Content-type": "text/plain"}
152 )
153 response = request.json()
154 indexing_state = response["records"][0]["fields"]["state"]["value"]
155 if indexing_state != "FINISHED":
156 raise ICloudPyServiceNotActivatedException(
157 (
158 "iCloud Photo Library not finished indexing. Please try "
159 "again in a few minutes"
160 ),
161 None,
162 )
164 @property
165 def albums(self):
166 if not self._albums:
167 self._albums = {
168 name: PhotoAlbum(self.service, name, zone_id=self.zone_id, **props)
169 for (name, props) in self.SMART_FOLDERS.items()
170 }
172 for folder in self._fetch_folders():
173 # FIXME: Handle subfolders
174 if folder["recordName"] in (
175 "----Root-Folder----",
176 "----Project-Root-Folder----",
177 ) or (
178 folder["fields"].get("isDeleted")
179 and folder["fields"]["isDeleted"]["value"]
180 ):
181 continue
183 folder_id = folder["recordName"]
184 folder_obj_type = (
185 f"CPLContainerRelationNotDeletedByAssetDate:{folder_id}"
186 )
187 folder_name = base64.b64decode(
188 folder["fields"]["albumNameEnc"]["value"]
189 ).decode("utf-8")
190 query_filter = [
191 {
192 "fieldName": "parentId",
193 "comparator": "EQUALS",
194 "fieldValue": {"type": "STRING", "value": folder_id},
195 }
196 ]
198 album = PhotoAlbum(
199 self.service,
200 folder_name,
201 "CPLContainerRelationLiveByAssetDate",
202 folder_obj_type,
203 "ASCENDING",
204 query_filter,
205 folder_id=folder_id,
206 zone_id=self.zone_id,
207 )
208 self._albums[folder_name] = album
210 return self._albums
212 def _fetch_folders(self):
213 url = f"{self.service._service_endpoint}/records/query?{urlencode(self.service.params)}"
214 json_data = json.dumps(
215 {
216 "query": {"recordType": "CPLAlbumByPositionLive"},
217 "zoneID": self.zone_id,
218 }
219 )
221 request = self.service.session.post(
222 url, data=json_data, headers={"Content-type": "text/plain"}
223 )
224 response = request.json()
226 return response["records"]
228 @property
229 def all(self):
230 return self.albums["All Photos"]
233class PhotosService(PhotoLibrary):
234 """The 'Photos' iCloud service.
236 This also acts as a way to access the user's primary library.
237 """
239 def __init__(self, service_root, session, params):
240 self.session = session
241 self.params = dict(params)
242 self._service_root = service_root
243 self._service_endpoint = (
244 f"{self._service_root}/database/1/com.apple.photos.cloud/production/private"
245 )
247 self._libraries = None
249 self.params.update({"remapEnums": True, "getCurrentSyncToken": True})
251 # TODO: Does syncToken ever change?
252 # self.params.update({
253 # 'syncToken': response['syncToken'],
254 # 'clientInstanceId': self.params.pop('clientId')
255 # })
257 self._photo_assets = {}
259 super().__init__(service=self, zone_id={"zoneName": "PrimarySync"})
261 @property
262 def libraries(self):
263 if not self._libraries:
264 try:
265 url = f"{self._service_endpoint}/zones/list"
266 request = self.session.post(
267 url, data="{}", headers={"Content-type": "text/plain"}
268 )
269 response = request.json()
270 zones = response["zones"]
271 except Exception as e:
272 LOGGER.error(f"library exception: {str(e)}")
274 libraries = {}
275 for zone in zones:
276 if not zone.get("deleted"):
277 zone_name = zone["zoneID"]["zoneName"]
278 libraries[zone_name] = PhotoLibrary(self, zone_id=zone["zoneID"])
279 # obj_type='CPLAssetByAssetDateWithoutHiddenOrDeleted',
280 # list_type="CPLAssetAndMasterByAssetDateWithoutHiddenOrDeleted",
281 # direction="ASCENDING", query_filter=None,
282 # zone_id=zone['zoneID'])
284 self._libraries = libraries
286 return self._libraries
289class PhotoAlbum:
290 """A photo album."""
292 def __init__(
293 self,
294 service,
295 name,
296 list_type,
297 obj_type,
298 direction,
299 query_filter=None,
300 page_size=100,
301 folder_id=None,
302 zone_id=None,
303 ):
304 self.name = name
305 self.service = service
306 self.list_type = list_type
307 self.obj_type = obj_type
308 self.direction = direction
309 self.query_filter = query_filter
310 self.page_size = page_size
311 self.folder_id = folder_id
313 if zone_id:
314 self._zone_id = zone_id
315 else:
316 self._zone_id = "PrimarySync"
318 self._len = None
320 self._subalbums = {}
322 @property
323 def title(self):
324 """Gets the album name."""
325 return self.name
327 def __iter__(self):
328 return self.photos
330 def __len__(self):
331 if self._len is None:
332 url = f"{self.service.service_endpoint}/internal/records/query/batch?{urlencode(self.service.params)}"
333 request = self.service.session.post(
334 url,
335 data=json.dumps(
336 {
337 "batch": [
338 {
339 "resultsLimit": 1,
340 "query": {
341 "filterBy": {
342 "fieldName": "indexCountID",
343 "fieldValue": {
344 "type": "STRING_LIST",
345 "value": [self.obj_type],
346 },
347 "comparator": "IN",
348 },
349 "recordType": "HyperionIndexCountLookup",
350 },
351 "zoneWide": True,
352 "zoneID": {"zoneName": self._zone_id},
353 }
354 ]
355 }
356 ),
357 headers={"Content-type": "text/plain"},
358 )
359 response = request.json()
361 self._len = response["batch"][0]["records"][0]["fields"]["itemCount"][
362 "value"
363 ]
365 return self._len
367 def _fetch_subalbums(self):
368 url = (f"{self.service._service_endpoint}/records/query?") + urlencode(
369 self.service.params
370 )
371 # pylint: disable=consider-using-f-string
372 query = """{{
373 "query": {{
374 "recordType":"CPLAlbumByPositionLive",
375 "filterBy": [
376 {{
377 "fieldName": "parentId",
378 "comparator": "EQUALS",
379 "fieldValue": {{
380 "value": "{}",
381 "type": "STRING"
382 }}
383 }}
384 ]
385 }},
386 "zoneID": {{
387 "zoneName":"{}"
388 }}
389 }}""".format(
390 self.folder_id, self._zone_id["zoneName"]
391 )
392 json_data = query
393 request = self.service.session.post(
394 url,
395 data=json_data,
396 headers={"Content-type": "text/plain"},
397 )
398 response = request.json()
400 return response["records"]
402 @property
403 def subalbums(self):
404 """Returns the subalbums"""
405 if not self._subalbums and self.folder_id:
406 for folder in self._fetch_subalbums():
407 if (
408 folder["fields"].get("isDeleted")
409 and folder["fields"]["isDeleted"]["value"]
410 ):
411 continue
413 folder_id = folder["recordName"]
414 folder_obj_type = (
415 f"CPLContainerRelationNotDeletedByAssetDate:{folder_id}"
416 )
417 folder_name = base64.b64decode(
418 folder["fields"]["albumNameEnc"]["value"]
419 ).decode("utf-8")
420 query_filter = [
421 {
422 "fieldName": "parentId",
423 "comparator": "EQUALS",
424 "fieldValue": {"type": "STRING", "value": folder_id},
425 }
426 ]
428 album = PhotoAlbum(
429 self.service,
430 name=folder_name,
431 list_type="CPLContainerRelationLiveByAssetDate",
432 obj_type=folder_obj_type,
433 direction="ASCENDING",
434 query_filter=query_filter,
435 folder_id=folder_id,
436 zone_id=self._zone_id,
437 )
438 self._subalbums[folder_name] = album
439 return self._subalbums
441 @property
442 def photos(self):
443 """Returns the album photos."""
444 if self.direction == "DESCENDING":
445 offset = len(self) - 1
446 else:
447 offset = 0
449 while True:
450 url = (f"{self.service._service_endpoint}/records/query?") + urlencode(
451 self.service.params
452 )
453 request = self.service.session.post(
454 url,
455 data=json.dumps(
456 self._list_query_gen(
457 offset, self.list_type, self.direction, self.query_filter
458 )
459 ),
460 headers={"Content-type": "text/plain"},
461 )
462 response = request.json()
464 asset_records = {}
465 master_records = []
466 for rec in response["records"]:
467 if rec["recordType"] == "CPLAsset":
468 master_id = rec["fields"]["masterRef"]["value"]["recordName"]
469 asset_records[master_id] = rec
470 elif rec["recordType"] == "CPLMaster":
471 master_records.append(rec)
473 master_records_len = len(master_records)
474 if master_records_len:
475 if self.direction == "DESCENDING":
476 offset = offset - master_records_len
477 else:
478 offset = offset + master_records_len
480 for master_record in master_records:
481 record_name = master_record["recordName"]
482 yield PhotoAsset(
483 self.service, master_record, asset_records[record_name]
484 )
485 else:
486 break
488 def _list_query_gen(self, offset, list_type, direction, query_filter=None):
489 query = {
490 "query": {
491 "filterBy": [
492 {
493 "fieldName": "startRank",
494 "fieldValue": {"type": "INT64", "value": offset},
495 "comparator": "EQUALS",
496 },
497 {
498 "fieldName": "direction",
499 "fieldValue": {"type": "STRING", "value": direction},
500 "comparator": "EQUALS",
501 },
502 ],
503 "recordType": list_type,
504 },
505 "resultsLimit": self.page_size * 2,
506 "desiredKeys": [
507 "resJPEGFullWidth",
508 "resJPEGFullHeight",
509 "resJPEGFullFileType",
510 "resJPEGFullFingerprint",
511 "resJPEGFullRes",
512 "resJPEGLargeWidth",
513 "resJPEGLargeHeight",
514 "resJPEGLargeFileType",
515 "resJPEGLargeFingerprint",
516 "resJPEGLargeRes",
517 "resJPEGMedWidth",
518 "resJPEGMedHeight",
519 "resJPEGMedFileType",
520 "resJPEGMedFingerprint",
521 "resJPEGMedRes",
522 "resJPEGThumbWidth",
523 "resJPEGThumbHeight",
524 "resJPEGThumbFileType",
525 "resJPEGThumbFingerprint",
526 "resJPEGThumbRes",
527 "resVidFullWidth",
528 "resVidFullHeight",
529 "resVidFullFileType",
530 "resVidFullFingerprint",
531 "resVidFullRes",
532 "resVidMedWidth",
533 "resVidMedHeight",
534 "resVidMedFileType",
535 "resVidMedFingerprint",
536 "resVidMedRes",
537 "resVidSmallWidth",
538 "resVidSmallHeight",
539 "resVidSmallFileType",
540 "resVidSmallFingerprint",
541 "resVidSmallRes",
542 "resSidecarWidth",
543 "resSidecarHeight",
544 "resSidecarFileType",
545 "resSidecarFingerprint",
546 "resSidecarRes",
547 "itemType",
548 "dataClassType",
549 "filenameEnc",
550 "originalOrientation",
551 "resOriginalWidth",
552 "resOriginalHeight",
553 "resOriginalFileType",
554 "resOriginalFingerprint",
555 "resOriginalRes",
556 "resOriginalAltWidth",
557 "resOriginalAltHeight",
558 "resOriginalAltFileType",
559 "resOriginalAltFingerprint",
560 "resOriginalAltRes",
561 "resOriginalVidComplWidth",
562 "resOriginalVidComplHeight",
563 "resOriginalVidComplFileType",
564 "resOriginalVidComplFingerprint",
565 "resOriginalVidComplRes",
566 "isDeleted",
567 "isExpunged",
568 "dateExpunged",
569 "remappedRef",
570 "recordName",
571 "recordType",
572 "recordChangeTag",
573 "masterRef",
574 "adjustmentRenderType",
575 "assetDate",
576 "addedDate",
577 "isFavorite",
578 "isHidden",
579 "orientation",
580 "duration",
581 "assetSubtype",
582 "assetSubtypeV2",
583 "assetHDRType",
584 "burstFlags",
585 "burstFlagsExt",
586 "burstId",
587 "captionEnc",
588 "locationEnc",
589 "locationV2Enc",
590 "locationLatitude",
591 "locationLongitude",
592 "adjustmentType",
593 "timeZoneOffset",
594 "vidComplDurValue",
595 "vidComplDurScale",
596 "vidComplDispValue",
597 "vidComplDispScale",
598 "vidComplVisibilityState",
599 "customRenderedValue",
600 "containerId",
601 "itemId",
602 "position",
603 "isKeyAsset",
604 ],
605 "zoneID": self._zone_id,
606 }
608 if query_filter:
609 query["query"]["filterBy"].extend(query_filter)
611 return query
613 def __unicode__(self):
614 return self.title
616 def __str__(self):
617 as_unicode = self.__unicode__()
618 if PY2:
619 return as_unicode.encode("utf-8", "ignore")
620 return as_unicode
622 def __repr__(self):
623 return f"<{type(self).__name__}: '{self}'>"
626class PhotoAsset:
627 """A photo."""
629 def __init__(self, service, master_record, asset_record):
630 self._service = service
631 self._master_record = master_record
632 self._asset_record = asset_record
634 self._versions = None
636 PHOTO_VERSION_LOOKUP = {
637 "full": "resJPEGFull",
638 "large": "resJPEGLarge",
639 "medium": "resJPEGMed",
640 "thumb": "resJPEGThumb",
641 "sidecar": "resSidecar",
642 "original": "resOriginal",
643 "original_alt": "resOriginalAlt",
644 }
646 VIDEO_VERSION_LOOKUP = {
647 "full": "resVidFull",
648 "medium": "resVidMed",
649 "thumb": "resVidSmall",
650 "original": "resOriginal",
651 "original_compl": "resOriginalVidCompl",
652 }
654 @property
655 def id(self):
656 """Gets the photo id."""
657 return self._master_record["recordName"]
659 @property
660 def filename(self):
661 """Gets the photo file name."""
662 return base64.b64decode(
663 self._master_record["fields"]["filenameEnc"]["value"]
664 ).decode("utf-8")
666 @property
667 def size(self):
668 """Gets the photo size."""
669 return self._master_record["fields"]["resOriginalRes"]["value"]["size"]
671 @property
672 def created(self):
673 """Gets the photo created date."""
674 return self.asset_date
676 @property
677 def asset_date(self):
678 """Gets the photo asset date."""
679 try:
680 return datetime.fromtimestamp(
681 self._asset_record["fields"]["assetDate"]["value"] / 1000.0, tz=UTC
682 )
683 except KeyError:
684 return datetime.fromtimestamp(0)
686 @property
687 def added_date(self):
688 """Gets the photo added date."""
689 return datetime.fromtimestamp(
690 self._asset_record["fields"]["addedDate"]["value"] / 1000.0, tz=UTC
691 )
693 @property
694 def dimensions(self):
695 """Gets the photo dimensions."""
696 return (
697 self._master_record["fields"]["resOriginalWidth"]["value"],
698 self._master_record["fields"]["resOriginalHeight"]["value"],
699 )
701 @property
702 def versions(self):
703 """Gets the photo versions."""
704 if not self._versions:
705 self._versions = {}
706 if "resVidSmallRes" in self._master_record["fields"]:
707 typed_version_lookup = self.VIDEO_VERSION_LOOKUP
708 else:
709 typed_version_lookup = self.PHOTO_VERSION_LOOKUP
711 for key, prefix in typed_version_lookup.items():
712 if f"{prefix}Res" in self._master_record["fields"]:
713 fields = self._master_record["fields"]
714 version = {"filename": self.filename}
716 width_entry = fields.get(f"{prefix}Width")
717 if width_entry:
718 version["width"] = width_entry["value"]
719 else:
720 version["width"] = None
722 height_entry = fields.get(f"{prefix}Height")
723 if height_entry:
724 version["height"] = height_entry["value"]
725 else:
726 version["height"] = None
728 size_entry = fields.get(f"{prefix}Res")
729 if size_entry:
730 version["size"] = size_entry["value"]["size"]
731 version["url"] = size_entry["value"]["downloadURL"]
732 else:
733 version["size"] = None
734 version["url"] = None
736 type_entry = fields.get(f"{prefix}FileType")
737 if type_entry:
738 version["type"] = type_entry["value"]
739 else:
740 version["type"] = None
742 self._versions[key] = version
744 return self._versions
746 def download(self, version="original", **kwargs):
747 """Returns the photo file."""
748 if version not in self.versions:
749 return None
751 return self._service.session.get(
752 self.versions[version]["url"], stream=True, **kwargs
753 )
755 def delete(self):
756 """Deletes the photo."""
757 json_data = (
758 '{"query":{"recordType":"CheckIndexingState"},'
759 '"zoneID":{"zoneName":"PrimarySync"}}'
760 )
761 # pylint: disable=consider-using-f-string
762 json_data = (
763 '{"operations":[{'
764 '"operationType":"update",'
765 '"record":{'
766 '"recordName":"%s",'
767 '"recordType":"%s",'
768 '"recordChangeTag":"%s",'
769 '"fields":{"isDeleted":{"value":1}'
770 "}}}],"
771 '"zoneID":{'
772 '"zoneName":"PrimarySync"'
773 '},"atomic":true}'
774 % (
775 self._asset_record["recordName"],
776 self._asset_record["recordType"],
777 self._master_record["recordChangeTag"],
778 )
779 )
781 endpoint = self._service.service_endpoint
782 params = urlencode(self._service.params)
783 url = f"{endpoint}/records/modify?{params}"
785 return self._service.session.post(
786 url, data=json_data, headers={"Content-type": "text/plain"}
787 )
789 def __repr__(self):
790 return f"<{type(self).__name__}: id={self.id}>"