Coverage for icloudpy/services/photos.py: 25%
234 statements
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-30 19:31 +0000
« prev ^ index » next coverage.py v7.6.10, created at 2024-12-30 19:31 +0000
1"""Photo service."""
3import base64
4import json
5import logging
6from datetime import datetime
8# fmt: off
9from urllib.parse import urlencode # pylint: disable=bad-option-value,relative-import
11from pytz import UTC
12from six import PY2
14# fmt: on
15from icloudpy.exceptions import ICloudPyServiceNotActivatedException
17LOGGER = logging.getLogger(__name__)
20class PhotoLibrary:
21 """Represents a library in the user's photos.
23 This provides access to all the albums as well as the photos.
24 """
26 SMART_FOLDERS = {
27 "All Photos": {
28 "obj_type": "CPLAssetByAddedDate",
29 "list_type": "CPLAssetAndMasterByAddedDate",
30 "direction": "ASCENDING",
31 "query_filter": None,
32 },
33 "Time-lapse": {
34 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Timelapse",
35 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
36 "direction": "ASCENDING",
37 "query_filter": [
38 {
39 "fieldName": "smartAlbum",
40 "comparator": "EQUALS",
41 "fieldValue": {"type": "STRING", "value": "TIMELAPSE"},
42 },
43 ],
44 },
45 "Videos": {
46 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Video",
47 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
48 "direction": "ASCENDING",
49 "query_filter": [
50 {
51 "fieldName": "smartAlbum",
52 "comparator": "EQUALS",
53 "fieldValue": {"type": "STRING", "value": "VIDEO"},
54 },
55 ],
56 },
57 "Slo-mo": {
58 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Slomo",
59 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
60 "direction": "ASCENDING",
61 "query_filter": [
62 {
63 "fieldName": "smartAlbum",
64 "comparator": "EQUALS",
65 "fieldValue": {"type": "STRING", "value": "SLOMO"},
66 },
67 ],
68 },
69 "Bursts": {
70 "obj_type": "CPLAssetBurstStackAssetByAssetDate",
71 "list_type": "CPLBurstStackAssetAndMasterByAssetDate",
72 "direction": "ASCENDING",
73 "query_filter": None,
74 },
75 "Favorites": {
76 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Favorite",
77 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
78 "direction": "ASCENDING",
79 "query_filter": [
80 {
81 "fieldName": "smartAlbum",
82 "comparator": "EQUALS",
83 "fieldValue": {"type": "STRING", "value": "FAVORITE"},
84 },
85 ],
86 },
87 "Panoramas": {
88 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Panorama",
89 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
90 "direction": "ASCENDING",
91 "query_filter": [
92 {
93 "fieldName": "smartAlbum",
94 "comparator": "EQUALS",
95 "fieldValue": {"type": "STRING", "value": "PANORAMA"},
96 },
97 ],
98 },
99 "Screenshots": {
100 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Screenshot",
101 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
102 "direction": "ASCENDING",
103 "query_filter": [
104 {
105 "fieldName": "smartAlbum",
106 "comparator": "EQUALS",
107 "fieldValue": {"type": "STRING", "value": "SCREENSHOT"},
108 },
109 ],
110 },
111 "Live": {
112 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Live",
113 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
114 "direction": "ASCENDING",
115 "query_filter": [
116 {
117 "fieldName": "smartAlbum",
118 "comparator": "EQUALS",
119 "fieldValue": {"type": "STRING", "value": "LIVE"},
120 },
121 ],
122 },
123 "Recently Deleted": {
124 "obj_type": "CPLAssetDeletedByExpungedDate",
125 "list_type": "CPLAssetAndMasterDeletedByExpungedDate",
126 "direction": "ASCENDING",
127 "query_filter": None,
128 },
129 "Hidden": {
130 "obj_type": "CPLAssetHiddenByAssetDate",
131 "list_type": "CPLAssetAndMasterHiddenByAssetDate",
132 "direction": "ASCENDING",
133 "query_filter": None,
134 },
135 }
137 def __init__(self, service, zone_id):
138 self.service = service
139 self.zone_id = zone_id
141 self._albums = None
143 url = f"{self.service._service_endpoint}/records/query?{urlencode(self.service.params)}"
144 json_data = json.dumps(
145 {
146 "query": {"recordType": "CheckIndexingState"},
147 "zoneID": self.zone_id,
148 },
149 )
151 request = self.service.session.post(
152 url,
153 data=json_data,
154 headers={"Content-type": "text/plain"},
155 )
156 response = request.json()
157 indexing_state = response["records"][0]["fields"]["state"]["value"]
158 if indexing_state != "FINISHED":
159 raise ICloudPyServiceNotActivatedException(
160 ("iCloud Photo Library not finished indexing. Please try " "again in a few minutes"),
161 None,
162 )
164 @property
165 def albums(self):
166 if not self._albums:
167 self._albums = {
168 name: PhotoAlbum(self.service, name, zone_id=self.zone_id, **props)
169 for (name, props) in self.SMART_FOLDERS.items()
170 }
172 for folder in self._fetch_folders():
173 if folder["recordName"] in (
174 "----Root-Folder----",
175 "----Project-Root-Folder----",
176 ) or (folder["fields"].get("isDeleted") and folder["fields"]["isDeleted"]["value"]):
177 continue
179 folder_id = folder["recordName"]
180 folder_obj_type = f"CPLContainerRelationNotDeletedByAssetDate:{folder_id}"
181 folder_name = base64.b64decode(
182 folder["fields"]["albumNameEnc"]["value"],
183 ).decode("utf-8")
184 query_filter = [
185 {
186 "fieldName": "parentId",
187 "comparator": "EQUALS",
188 "fieldValue": {"type": "STRING", "value": folder_id},
189 },
190 ]
192 album = PhotoAlbum(
193 self.service,
194 folder_name,
195 "CPLContainerRelationLiveByAssetDate",
196 folder_obj_type,
197 "ASCENDING",
198 query_filter,
199 folder_id=folder_id,
200 zone_id=self.zone_id,
201 )
202 self._albums[folder_name] = album
204 return self._albums
206 def _fetch_folders(self):
207 url = f"{self.service._service_endpoint}/records/query?{urlencode(self.service.params)}"
208 json_data = json.dumps(
209 {
210 "query": {"recordType": "CPLAlbumByPositionLive"},
211 "zoneID": self.zone_id,
212 },
213 )
215 request = self.service.session.post(
216 url,
217 data=json_data,
218 headers={"Content-type": "text/plain"},
219 )
220 response = request.json()
222 return response["records"]
224 @property
225 def all(self):
226 return self.albums["All Photos"]
229class PhotosService(PhotoLibrary):
230 """The 'Photos' iCloud service.
232 This also acts as a way to access the user's primary library.
233 """
235 def __init__(self, service_root, session, params):
236 self.session = session
237 self.params = dict(params)
238 self._service_root = service_root
239 self._service_endpoint = f"{self._service_root}/database/1/com.apple.photos.cloud/production/private"
241 self._libraries = None
243 self.params.update({"remapEnums": True, "getCurrentSyncToken": True})
245 self._photo_assets = {}
247 super().__init__(service=self, zone_id={"zoneName": "PrimarySync"})
249 @property
250 def libraries(self):
251 if not self._libraries:
252 try:
253 url = f"{self._service_endpoint}/zones/list"
254 request = self.session.post(
255 url,
256 data="{}",
257 headers={"Content-type": "text/plain"},
258 )
259 response = request.json()
260 zones = response["zones"]
261 except Exception as e:
262 LOGGER.error(f"library exception: {str(e)}")
264 libraries = {}
265 for zone in zones:
266 if not zone.get("deleted"):
267 zone_name = zone["zoneID"]["zoneName"]
268 libraries[zone_name] = PhotoLibrary(self, zone_id=zone["zoneID"])
269 # obj_type='CPLAssetByAssetDateWithoutHiddenOrDeleted',
270 # list_type="CPLAssetAndMasterByAssetDateWithoutHiddenOrDeleted",
271 # direction="ASCENDING", query_filter=None,
272 # zone_id=zone['zoneID'])
274 self._libraries = libraries
276 return self._libraries
279class PhotoAlbum:
280 """A photo album."""
282 def __init__(
283 self,
284 service,
285 name,
286 list_type,
287 obj_type,
288 direction,
289 query_filter=None,
290 page_size=100,
291 folder_id=None,
292 zone_id=None,
293 ):
294 self.name = name
295 self.service = service
296 self.list_type = list_type
297 self.obj_type = obj_type
298 self.direction = direction
299 self.query_filter = query_filter
300 self.page_size = page_size
301 self.folder_id = folder_id
303 if zone_id:
304 self._zone_id = zone_id
305 else:
306 self._zone_id = "PrimarySync"
308 self._len = None
310 self._subalbums = {}
312 @property
313 def title(self):
314 """Gets the album name."""
315 return self.name
317 def __iter__(self):
318 return self.photos
320 def __len__(self):
321 if self._len is None:
322 url = f"{self.service._service_endpoint}/internal/records/query/batch?{urlencode(self.service.params)}"
323 request = self.service.session.post(
324 url,
325 data=json.dumps(
326 {
327 "batch": [
328 {
329 "resultsLimit": 1,
330 "query": {
331 "filterBy": {
332 "fieldName": "indexCountID",
333 "fieldValue": {
334 "type": "STRING_LIST",
335 "value": [self.obj_type],
336 },
337 "comparator": "IN",
338 },
339 "recordType": "HyperionIndexCountLookup",
340 },
341 "zoneWide": True,
342 "zoneID": {"zoneName": self._zone_id["zoneName"]},
343 },
344 ],
345 },
346 ),
347 headers={"Content-type": "text/plain"},
348 )
349 response = request.json()
351 self._len = response["batch"][0]["records"][0]["fields"]["itemCount"]["value"]
353 return self._len
355 def _fetch_subalbums(self):
356 url = (f"{self.service._service_endpoint}/records/query?") + urlencode(
357 self.service.params,
358 )
359 # pylint: disable=consider-using-f-string
360 query = """{{
361 "query": {{
362 "recordType":"CPLAlbumByPositionLive",
363 "filterBy": [
364 {{
365 "fieldName": "parentId",
366 "comparator": "EQUALS",
367 "fieldValue": {{
368 "value": "{}",
369 "type": "STRING"
370 }}
371 }}
372 ]
373 }},
374 "zoneID": {{
375 "zoneName":"{}"
376 }}
377 }}""".format(
378 self.folder_id,
379 self._zone_id["zoneName"],
380 )
381 json_data = query
382 request = self.service.session.post(
383 url,
384 data=json_data,
385 headers={"Content-type": "text/plain"},
386 )
387 response = request.json()
389 return response["records"]
391 @property
392 def subalbums(self):
393 """Returns the subalbums"""
394 if not self._subalbums and self.folder_id:
395 for folder in self._fetch_subalbums():
396 if folder["fields"].get("isDeleted") and folder["fields"]["isDeleted"]["value"]:
397 continue
399 folder_id = folder["recordName"]
400 folder_obj_type = f"CPLContainerRelationNotDeletedByAssetDate:{folder_id}"
401 folder_name = base64.b64decode(
402 folder["fields"]["albumNameEnc"]["value"],
403 ).decode("utf-8")
404 query_filter = [
405 {
406 "fieldName": "parentId",
407 "comparator": "EQUALS",
408 "fieldValue": {"type": "STRING", "value": folder_id},
409 },
410 ]
412 album = PhotoAlbum(
413 self.service,
414 name=folder_name,
415 list_type="CPLContainerRelationLiveByAssetDate",
416 obj_type=folder_obj_type,
417 direction="ASCENDING",
418 query_filter=query_filter,
419 folder_id=folder_id,
420 zone_id=self._zone_id,
421 )
422 self._subalbums[folder_name] = album
423 return self._subalbums
425 @property
426 def photos(self):
427 """Returns the album photos."""
428 if self.direction == "DESCENDING":
429 offset = len(self) - 1
430 else:
431 offset = 0
433 while True:
434 url = (f"{self.service._service_endpoint}/records/query?") + urlencode(
435 self.service.params,
436 )
437 request = self.service.session.post(
438 url,
439 data=json.dumps(
440 self._list_query_gen(
441 offset,
442 self.list_type,
443 self.direction,
444 self.query_filter,
445 ),
446 ),
447 headers={"Content-type": "text/plain"},
448 )
449 response = request.json()
451 asset_records = {}
452 master_records = []
453 for rec in response["records"]:
454 if rec["recordType"] == "CPLAsset":
455 master_id = rec["fields"]["masterRef"]["value"]["recordName"]
456 asset_records[master_id] = rec
457 elif rec["recordType"] == "CPLMaster":
458 master_records.append(rec)
460 master_records_len = len(master_records)
461 if master_records_len:
462 if self.direction == "DESCENDING":
463 offset = offset - master_records_len
464 else:
465 offset = offset + master_records_len
467 for master_record in master_records:
468 record_name = master_record["recordName"]
469 yield PhotoAsset(
470 self.service,
471 master_record,
472 asset_records[record_name],
473 )
474 else:
475 break
477 def _list_query_gen(self, offset, list_type, direction, query_filter=None):
478 query = {
479 "query": {
480 "filterBy": [
481 {
482 "fieldName": "startRank",
483 "fieldValue": {"type": "INT64", "value": offset},
484 "comparator": "EQUALS",
485 },
486 {
487 "fieldName": "direction",
488 "fieldValue": {"type": "STRING", "value": direction},
489 "comparator": "EQUALS",
490 },
491 ],
492 "recordType": list_type,
493 },
494 "resultsLimit": self.page_size * 2,
495 "desiredKeys": [
496 "resJPEGFullWidth",
497 "resJPEGFullHeight",
498 "resJPEGFullFileType",
499 "resJPEGFullFingerprint",
500 "resJPEGFullRes",
501 "resJPEGLargeWidth",
502 "resJPEGLargeHeight",
503 "resJPEGLargeFileType",
504 "resJPEGLargeFingerprint",
505 "resJPEGLargeRes",
506 "resJPEGMedWidth",
507 "resJPEGMedHeight",
508 "resJPEGMedFileType",
509 "resJPEGMedFingerprint",
510 "resJPEGMedRes",
511 "resJPEGThumbWidth",
512 "resJPEGThumbHeight",
513 "resJPEGThumbFileType",
514 "resJPEGThumbFingerprint",
515 "resJPEGThumbRes",
516 "resVidFullWidth",
517 "resVidFullHeight",
518 "resVidFullFileType",
519 "resVidFullFingerprint",
520 "resVidFullRes",
521 "resVidMedWidth",
522 "resVidMedHeight",
523 "resVidMedFileType",
524 "resVidMedFingerprint",
525 "resVidMedRes",
526 "resVidSmallWidth",
527 "resVidSmallHeight",
528 "resVidSmallFileType",
529 "resVidSmallFingerprint",
530 "resVidSmallRes",
531 "resSidecarWidth",
532 "resSidecarHeight",
533 "resSidecarFileType",
534 "resSidecarFingerprint",
535 "resSidecarRes",
536 "itemType",
537 "dataClassType",
538 "filenameEnc",
539 "originalOrientation",
540 "resOriginalWidth",
541 "resOriginalHeight",
542 "resOriginalFileType",
543 "resOriginalFingerprint",
544 "resOriginalRes",
545 "resOriginalAltWidth",
546 "resOriginalAltHeight",
547 "resOriginalAltFileType",
548 "resOriginalAltFingerprint",
549 "resOriginalAltRes",
550 "resOriginalVidComplWidth",
551 "resOriginalVidComplHeight",
552 "resOriginalVidComplFileType",
553 "resOriginalVidComplFingerprint",
554 "resOriginalVidComplRes",
555 "isDeleted",
556 "isExpunged",
557 "dateExpunged",
558 "remappedRef",
559 "recordName",
560 "recordType",
561 "recordChangeTag",
562 "masterRef",
563 "adjustmentRenderType",
564 "assetDate",
565 "addedDate",
566 "isFavorite",
567 "isHidden",
568 "orientation",
569 "duration",
570 "assetSubtype",
571 "assetSubtypeV2",
572 "assetHDRType",
573 "burstFlags",
574 "burstFlagsExt",
575 "burstId",
576 "captionEnc",
577 "extendedDescEnc",
578 "locationEnc",
579 "locationV2Enc",
580 "locationLatitude",
581 "locationLongitude",
582 "adjustmentType",
583 "timeZoneOffset",
584 "vidComplDurValue",
585 "vidComplDurScale",
586 "vidComplDispValue",
587 "vidComplDispScale",
588 "vidComplVisibilityState",
589 "customRenderedValue",
590 "containerId",
591 "itemId",
592 "position",
593 "isKeyAsset",
594 "importedByBundleIdentifierEnc",
595 "importedByDisplayNameEnc",
596 "importedBy",
597 ],
598 "zoneID": self._zone_id,
599 }
601 if query_filter:
602 query["query"]["filterBy"].extend(query_filter)
604 return query
606 def __unicode__(self):
607 return self.title
609 def __str__(self):
610 as_unicode = self.__unicode__()
611 if PY2:
612 return as_unicode.encode("utf-8", "ignore")
613 return as_unicode
615 def __repr__(self):
616 return f"<{type(self).__name__}: '{self}'>"
619class PhotoAsset:
620 """A photo."""
622 def __init__(self, service, master_record, asset_record):
623 self._service = service
624 self._master_record = master_record
625 self._asset_record = asset_record
627 self._versions = None
629 PHOTO_VERSION_LOOKUP = {
630 "full": "resJPEGFull",
631 "large": "resJPEGLarge",
632 "medium": "resJPEGMed",
633 "thumb": "resJPEGThumb",
634 "sidecar": "resSidecar",
635 "original": "resOriginal",
636 "original_alt": "resOriginalAlt",
637 }
639 VIDEO_VERSION_LOOKUP = {
640 "full": "resVidFull",
641 "medium": "resVidMed",
642 "thumb": "resVidSmall",
643 "original": "resOriginal",
644 "original_compl": "resOriginalVidCompl",
645 }
647 @property
648 def id(self):
649 """Gets the photo id."""
650 return self._master_record["recordName"]
652 @property
653 def filename(self):
654 """Gets the photo file name."""
655 return base64.b64decode(
656 self._master_record["fields"]["filenameEnc"]["value"],
657 ).decode("utf-8")
659 @property
660 def size(self):
661 """Gets the photo size."""
662 return self._master_record["fields"]["resOriginalRes"]["value"]["size"]
664 @property
665 def created(self):
666 """Gets the photo created date."""
667 return self.asset_date
669 @property
670 def asset_date(self):
671 """Gets the photo asset date."""
672 try:
673 return datetime.fromtimestamp(
674 self._asset_record["fields"]["assetDate"]["value"] / 1000.0,
675 tz=UTC,
676 )
677 except KeyError:
678 return datetime.fromtimestamp(0)
680 @property
681 def added_date(self):
682 """Gets the photo added date."""
683 return datetime.fromtimestamp(
684 self._asset_record["fields"]["addedDate"]["value"] / 1000.0,
685 tz=UTC,
686 )
688 @property
689 def dimensions(self):
690 """Gets the photo dimensions."""
691 return (
692 self._master_record["fields"]["resOriginalWidth"]["value"],
693 self._master_record["fields"]["resOriginalHeight"]["value"],
694 )
696 @property
697 def versions(self):
698 """Gets the photo versions."""
699 if not self._versions:
700 self._versions = {}
701 if "resVidSmallRes" in self._master_record["fields"]:
702 typed_version_lookup = self.VIDEO_VERSION_LOOKUP
703 else:
704 typed_version_lookup = self.PHOTO_VERSION_LOOKUP
706 for key, prefix in typed_version_lookup.items():
707 if f"{prefix}Res" in self._master_record["fields"]:
708 fields = self._master_record["fields"]
709 version = {"filename": self.filename}
711 width_entry = fields.get(f"{prefix}Width")
712 if width_entry:
713 version["width"] = width_entry["value"]
714 else:
715 version["width"] = None
717 height_entry = fields.get(f"{prefix}Height")
718 if height_entry:
719 version["height"] = height_entry["value"]
720 else:
721 version["height"] = None
723 size_entry = fields.get(f"{prefix}Res")
724 if size_entry:
725 version["size"] = size_entry["value"]["size"]
726 version["url"] = size_entry["value"]["downloadURL"]
727 else:
728 version["size"] = None
729 version["url"] = None
731 type_entry = fields.get(f"{prefix}FileType")
732 if type_entry:
733 version["type"] = type_entry["value"]
734 else:
735 version["type"] = None
737 self._versions[key] = version
739 return self._versions
741 def download(self, version="original", **kwargs):
742 """Returns the photo file."""
743 if version not in self.versions:
744 return None
746 return self._service.session.get(
747 self.versions[version]["url"],
748 stream=True,
749 **kwargs,
750 )
752 def delete(self):
753 """Deletes the photo."""
754 json_data = json.dumps(
755 {
756 "atomic": True,
757 "desiredKeys": ["isDeleted"],
758 "operations": [
759 {
760 "operationType": "update",
761 "record": {
762 "fields": {"isDeleted": {"value": 1}},
763 "recordChangeTag": self._asset_record["recordChangeTag"],
764 "recordName": self._asset_record["recordName"],
765 "recordType": self._asset_record["recordType"],
766 },
767 },
768 ],
769 "zoneID": self._service.zone_id,
770 },
771 )
773 endpoint = self._service._service_endpoint
774 params = urlencode(self._service.params)
775 url = f"{endpoint}/records/modify?{params}"
777 return self._service.session.post(
778 url,
779 data=json_data,
780 headers={"Content-type": "text/plain"},
781 )
783 def __repr__(self):
784 return f"<{type(self).__name__}: id={self.id}>"