Coverage for icloudpy/services/photos.py: 91%
233 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-12 17:36 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-11-12 17:36 +0000
1"""Photo service."""
3import base64
4import json
5import logging
6from datetime import datetime
8# fmt: off
9from urllib.parse import urlencode # pylint: disable=bad-option-value,relative-import
11from pytz import UTC
13# fmt: on
14from icloudpy.exceptions import ICloudPyServiceNotActivatedException
16LOGGER = logging.getLogger(__name__)
19class PhotoLibrary:
20 """Represents a library in the user's photos.
22 This provides access to all the albums as well as the photos.
23 """
25 SMART_FOLDERS = {
26 "All Photos": {
27 "obj_type": "CPLAssetByAddedDate",
28 "list_type": "CPLAssetAndMasterByAddedDate",
29 "direction": "ASCENDING",
30 "query_filter": None,
31 },
32 "Time-lapse": {
33 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Timelapse",
34 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
35 "direction": "ASCENDING",
36 "query_filter": [
37 {
38 "fieldName": "smartAlbum",
39 "comparator": "EQUALS",
40 "fieldValue": {"type": "STRING", "value": "TIMELAPSE"},
41 },
42 ],
43 },
44 "Videos": {
45 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Video",
46 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
47 "direction": "ASCENDING",
48 "query_filter": [
49 {
50 "fieldName": "smartAlbum",
51 "comparator": "EQUALS",
52 "fieldValue": {"type": "STRING", "value": "VIDEO"},
53 },
54 ],
55 },
56 "Slo-mo": {
57 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Slomo",
58 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
59 "direction": "ASCENDING",
60 "query_filter": [
61 {
62 "fieldName": "smartAlbum",
63 "comparator": "EQUALS",
64 "fieldValue": {"type": "STRING", "value": "SLOMO"},
65 },
66 ],
67 },
68 "Bursts": {
69 "obj_type": "CPLAssetBurstStackAssetByAssetDate",
70 "list_type": "CPLBurstStackAssetAndMasterByAssetDate",
71 "direction": "ASCENDING",
72 "query_filter": None,
73 },
74 "Favorites": {
75 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Favorite",
76 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
77 "direction": "ASCENDING",
78 "query_filter": [
79 {
80 "fieldName": "smartAlbum",
81 "comparator": "EQUALS",
82 "fieldValue": {"type": "STRING", "value": "FAVORITE"},
83 },
84 ],
85 },
86 "Panoramas": {
87 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Panorama",
88 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
89 "direction": "ASCENDING",
90 "query_filter": [
91 {
92 "fieldName": "smartAlbum",
93 "comparator": "EQUALS",
94 "fieldValue": {"type": "STRING", "value": "PANORAMA"},
95 },
96 ],
97 },
98 "Screenshots": {
99 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Screenshot",
100 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
101 "direction": "ASCENDING",
102 "query_filter": [
103 {
104 "fieldName": "smartAlbum",
105 "comparator": "EQUALS",
106 "fieldValue": {"type": "STRING", "value": "SCREENSHOT"},
107 },
108 ],
109 },
110 "Live": {
111 "obj_type": "CPLAssetInSmartAlbumByAssetDate:Live",
112 "list_type": "CPLAssetAndMasterInSmartAlbumByAssetDate",
113 "direction": "ASCENDING",
114 "query_filter": [
115 {
116 "fieldName": "smartAlbum",
117 "comparator": "EQUALS",
118 "fieldValue": {"type": "STRING", "value": "LIVE"},
119 },
120 ],
121 },
122 "Recently Deleted": {
123 "obj_type": "CPLAssetDeletedByExpungedDate",
124 "list_type": "CPLAssetAndMasterDeletedByExpungedDate",
125 "direction": "ASCENDING",
126 "query_filter": None,
127 },
128 "Hidden": {
129 "obj_type": "CPLAssetHiddenByAssetDate",
130 "list_type": "CPLAssetAndMasterHiddenByAssetDate",
131 "direction": "ASCENDING",
132 "query_filter": None,
133 },
134 }
136 def __init__(self, service, zone_id):
137 self.service = service
138 self.zone_id = zone_id
140 self._albums = None
142 url = f"{self.service._service_endpoint}/records/query?{urlencode(self.service.params)}"
143 json_data = json.dumps(
144 {
145 "query": {"recordType": "CheckIndexingState"},
146 "zoneID": self.zone_id,
147 },
148 )
150 request = self.service.session.post(
151 url,
152 data=json_data,
153 headers={"Content-type": "text/plain"},
154 )
155 response = request.json()
156 indexing_state = response["records"][0]["fields"]["state"]["value"]
157 if indexing_state != "FINISHED":
158 raise ICloudPyServiceNotActivatedException(
159 ("iCloud Photo Library not finished indexing. Please try " "again in a few minutes"),
160 None,
161 )
163 @property
164 def albums(self):
165 if not self._albums:
166 self._albums = {
167 name: PhotoAlbum(self.service, name, zone_id=self.zone_id, **props)
168 for (name, props) in self.SMART_FOLDERS.items()
169 }
171 for folder in self._fetch_folders():
172 if folder["recordName"] in (
173 "----Root-Folder----",
174 "----Project-Root-Folder----",
175 ) or (folder["fields"].get("isDeleted") and folder["fields"]["isDeleted"]["value"]):
176 continue
178 folder_id = folder["recordName"]
179 folder_obj_type = f"CPLContainerRelationNotDeletedByAssetDate:{folder_id}"
180 folder_name = base64.b64decode(
181 folder["fields"]["albumNameEnc"]["value"],
182 ).decode("utf-8")
183 query_filter = [
184 {
185 "fieldName": "parentId",
186 "comparator": "EQUALS",
187 "fieldValue": {"type": "STRING", "value": folder_id},
188 },
189 ]
191 album = PhotoAlbum(
192 self.service,
193 folder_name,
194 "CPLContainerRelationLiveByAssetDate",
195 folder_obj_type,
196 "ASCENDING",
197 query_filter,
198 folder_id=folder_id,
199 zone_id=self.zone_id,
200 )
201 self._albums[folder_name] = album
203 return self._albums
205 def _fetch_folders(self):
206 url = f"{self.service._service_endpoint}/records/query?{urlencode(self.service.params)}"
207 json_data = json.dumps(
208 {
209 "query": {"recordType": "CPLAlbumByPositionLive"},
210 "zoneID": self.zone_id,
211 },
212 )
214 request = self.service.session.post(
215 url,
216 data=json_data,
217 headers={"Content-type": "text/plain"},
218 )
219 response = request.json()
221 return response["records"]
223 @property
224 def all(self):
225 return self.albums["All Photos"]
228class PhotosService(PhotoLibrary):
229 """The 'Photos' iCloud service.
231 This also acts as a way to access the user's primary library.
232 """
234 def __init__(self, service_root, session, params):
235 self.session = session
236 self.params = dict(params)
237 self._service_root = service_root
238 self._service_endpoint = f"{self._service_root}/database/1/com.apple.photos.cloud/production/private"
240 self._libraries = None
242 self.params.update({"remapEnums": True, "getCurrentSyncToken": True})
244 self._photo_assets = {}
246 super().__init__(service=self, zone_id={"zoneName": "PrimarySync"})
248 @property
249 def libraries(self):
250 if not self._libraries:
251 try:
252 url = f"{self._service_endpoint}/zones/list"
253 request = self.session.post(
254 url,
255 data="{}",
256 headers={"Content-type": "text/plain"},
257 )
258 response = request.json()
259 zones = response["zones"]
260 except Exception as e:
261 LOGGER.error(f"library exception: {str(e)}")
263 libraries = {}
264 for zone in zones:
265 if not zone.get("deleted"):
266 zone_name = zone["zoneID"]["zoneName"]
267 libraries[zone_name] = PhotoLibrary(self, zone_id=zone["zoneID"])
268 # obj_type='CPLAssetByAssetDateWithoutHiddenOrDeleted',
269 # list_type="CPLAssetAndMasterByAssetDateWithoutHiddenOrDeleted",
270 # direction="ASCENDING", query_filter=None,
271 # zone_id=zone['zoneID'])
273 self._libraries = libraries
275 return self._libraries
278class PhotoAlbum:
279 """A photo album."""
281 def __init__(
282 self,
283 service,
284 name,
285 list_type,
286 obj_type,
287 direction,
288 query_filter=None,
289 page_size=100,
290 folder_id=None,
291 zone_id=None,
292 ):
293 self.name = name
294 self.service = service
295 self.list_type = list_type
296 self.obj_type = obj_type
297 self.direction = direction
298 self.query_filter = query_filter
299 self.page_size = page_size
300 self.folder_id = folder_id
302 if zone_id:
303 self._zone_id = zone_id
304 else:
305 self._zone_id = "PrimarySync"
307 self._len = None
309 self._subalbums = {}
311 @property
312 def title(self):
313 """Gets the album name."""
314 return self.name
316 def __iter__(self):
317 return self.photos
319 def __len__(self):
320 if self._len is None:
321 url = f"{self.service._service_endpoint}/internal/records/query/batch?{urlencode(self.service.params)}"
322 request = self.service.session.post(
323 url,
324 data=json.dumps(
325 {
326 "batch": [
327 {
328 "resultsLimit": 1,
329 "query": {
330 "filterBy": {
331 "fieldName": "indexCountID",
332 "fieldValue": {
333 "type": "STRING_LIST",
334 "value": [self.obj_type],
335 },
336 "comparator": "IN",
337 },
338 "recordType": "HyperionIndexCountLookup",
339 },
340 "zoneWide": True,
341 "zoneID": {"zoneName": self._zone_id["zoneName"]},
342 },
343 ],
344 },
345 ),
346 headers={"Content-type": "text/plain"},
347 )
348 response = request.json()
350 self._len = response["batch"][0]["records"][0]["fields"]["itemCount"]["value"]
352 return self._len
354 def _fetch_subalbums(self):
355 url = (f"{self.service._service_endpoint}/records/query?") + urlencode(
356 self.service.params,
357 )
358 # pylint: disable=consider-using-f-string
359 query = """{{
360 "query": {{
361 "recordType":"CPLAlbumByPositionLive",
362 "filterBy": [
363 {{
364 "fieldName": "parentId",
365 "comparator": "EQUALS",
366 "fieldValue": {{
367 "value": "{}",
368 "type": "STRING"
369 }}
370 }}
371 ]
372 }},
373 "zoneID": {{
374 "zoneName":"{}"
375 }}
376 }}""".format(
377 self.folder_id,
378 self._zone_id["zoneName"],
379 )
380 json_data = query
381 request = self.service.session.post(
382 url,
383 data=json_data,
384 headers={"Content-type": "text/plain"},
385 )
386 response = request.json()
388 return response["records"]
390 @property
391 def subalbums(self):
392 """Returns the subalbums"""
393 if not self._subalbums and self.folder_id:
394 for folder in self._fetch_subalbums():
395 if folder["fields"].get("isDeleted") and folder["fields"]["isDeleted"]["value"]:
396 continue
398 folder_id = folder["recordName"]
399 folder_obj_type = f"CPLContainerRelationNotDeletedByAssetDate:{folder_id}"
400 folder_name = base64.b64decode(
401 folder["fields"]["albumNameEnc"]["value"],
402 ).decode("utf-8")
403 query_filter = [
404 {
405 "fieldName": "parentId",
406 "comparator": "EQUALS",
407 "fieldValue": {"type": "STRING", "value": folder_id},
408 },
409 ]
411 album = PhotoAlbum(
412 self.service,
413 name=folder_name,
414 list_type="CPLContainerRelationLiveByAssetDate",
415 obj_type=folder_obj_type,
416 direction="ASCENDING",
417 query_filter=query_filter,
418 folder_id=folder_id,
419 zone_id=self._zone_id,
420 )
421 self._subalbums[folder_name] = album
422 return self._subalbums
424 @property
425 def photos(self):
426 """Returns the album photos."""
427 if self.direction == "DESCENDING":
428 offset = len(self) - 1
429 else:
430 offset = 0
432 while True:
433 url = (f"{self.service._service_endpoint}/records/query?") + urlencode(
434 self.service.params,
435 )
436 request = self.service.session.post(
437 url,
438 data=json.dumps(
439 self._list_query_gen(
440 offset,
441 self.list_type,
442 self.direction,
443 self.query_filter,
444 ),
445 ),
446 headers={"Content-type": "text/plain"},
447 )
448 response = request.json()
450 asset_records = {}
451 master_records = []
452 for rec in response["records"]:
453 if rec["recordType"] == "CPLAsset":
454 master_id = rec["fields"]["masterRef"]["value"]["recordName"]
455 asset_records[master_id] = rec
456 elif rec["recordType"] == "CPLMaster":
457 master_records.append(rec)
459 master_records_len = len(master_records)
460 if master_records_len:
461 if self.direction == "DESCENDING":
462 offset = offset - master_records_len
463 else:
464 offset = offset + master_records_len
466 for master_record in master_records:
467 record_name = master_record["recordName"]
468 yield PhotoAsset(
469 self.service,
470 master_record,
471 asset_records[record_name],
472 )
473 else:
474 break
476 def _list_query_gen(self, offset, list_type, direction, query_filter=None):
477 query = {
478 "query": {
479 "filterBy": [
480 {
481 "fieldName": "startRank",
482 "fieldValue": {"type": "INT64", "value": offset},
483 "comparator": "EQUALS",
484 },
485 {
486 "fieldName": "direction",
487 "fieldValue": {"type": "STRING", "value": direction},
488 "comparator": "EQUALS",
489 },
490 ],
491 "recordType": list_type,
492 },
493 "resultsLimit": self.page_size * 2,
494 "desiredKeys": [
495 "resJPEGFullWidth",
496 "resJPEGFullHeight",
497 "resJPEGFullFileType",
498 "resJPEGFullFingerprint",
499 "resJPEGFullRes",
500 "resJPEGLargeWidth",
501 "resJPEGLargeHeight",
502 "resJPEGLargeFileType",
503 "resJPEGLargeFingerprint",
504 "resJPEGLargeRes",
505 "resJPEGMedWidth",
506 "resJPEGMedHeight",
507 "resJPEGMedFileType",
508 "resJPEGMedFingerprint",
509 "resJPEGMedRes",
510 "resJPEGThumbWidth",
511 "resJPEGThumbHeight",
512 "resJPEGThumbFileType",
513 "resJPEGThumbFingerprint",
514 "resJPEGThumbRes",
515 "resVidFullWidth",
516 "resVidFullHeight",
517 "resVidFullFileType",
518 "resVidFullFingerprint",
519 "resVidFullRes",
520 "resVidMedWidth",
521 "resVidMedHeight",
522 "resVidMedFileType",
523 "resVidMedFingerprint",
524 "resVidMedRes",
525 "resVidSmallWidth",
526 "resVidSmallHeight",
527 "resVidSmallFileType",
528 "resVidSmallFingerprint",
529 "resVidSmallRes",
530 "resSidecarWidth",
531 "resSidecarHeight",
532 "resSidecarFileType",
533 "resSidecarFingerprint",
534 "resSidecarRes",
535 "itemType",
536 "dataClassType",
537 "filenameEnc",
538 "originalOrientation",
539 "resOriginalWidth",
540 "resOriginalHeight",
541 "resOriginalFileType",
542 "resOriginalFingerprint",
543 "resOriginalRes",
544 "resOriginalAltWidth",
545 "resOriginalAltHeight",
546 "resOriginalAltFileType",
547 "resOriginalAltFingerprint",
548 "resOriginalAltRes",
549 "resOriginalVidComplWidth",
550 "resOriginalVidComplHeight",
551 "resOriginalVidComplFileType",
552 "resOriginalVidComplFingerprint",
553 "resOriginalVidComplRes",
554 "isDeleted",
555 "isExpunged",
556 "dateExpunged",
557 "remappedRef",
558 "recordName",
559 "recordType",
560 "recordChangeTag",
561 "masterRef",
562 "adjustmentRenderType",
563 "assetDate",
564 "addedDate",
565 "isFavorite",
566 "isHidden",
567 "orientation",
568 "duration",
569 "assetSubtype",
570 "assetSubtypeV2",
571 "assetHDRType",
572 "burstFlags",
573 "burstFlagsExt",
574 "burstId",
575 "captionEnc",
576 "extendedDescEnc",
577 "locationEnc",
578 "locationV2Enc",
579 "locationLatitude",
580 "locationLongitude",
581 "adjustmentType",
582 "timeZoneOffset",
583 "vidComplDurValue",
584 "vidComplDurScale",
585 "vidComplDispValue",
586 "vidComplDispScale",
587 "vidComplVisibilityState",
588 "customRenderedValue",
589 "containerId",
590 "itemId",
591 "position",
592 "isKeyAsset",
593 "importedByBundleIdentifierEnc",
594 "importedByDisplayNameEnc",
595 "importedBy",
596 ],
597 "zoneID": self._zone_id,
598 }
600 if query_filter:
601 query["query"]["filterBy"].extend(query_filter)
603 return query
605 def __unicode__(self):
606 return self.title
608 def __str__(self):
609 return self.__unicode__()
611 def __repr__(self):
612 return f"<{type(self).__name__}: '{self}'>"
615class PhotoAsset:
616 """A photo."""
618 def __init__(self, service, master_record, asset_record):
619 self._service = service
620 self._master_record = master_record
621 self._asset_record = asset_record
623 self._versions = None
625 PHOTO_VERSION_LOOKUP = {
626 "full": "resJPEGFull",
627 "large": "resJPEGLarge",
628 "medium": "resJPEGMed",
629 "thumb": "resJPEGThumb",
630 "sidecar": "resSidecar",
631 "original": "resOriginal",
632 "original_alt": "resOriginalAlt",
633 }
635 VIDEO_VERSION_LOOKUP = {
636 "full": "resVidFull",
637 "medium": "resVidMed",
638 "thumb": "resVidSmall",
639 "original": "resOriginal",
640 "original_compl": "resOriginalVidCompl",
641 }
643 @property
644 def id(self):
645 """Gets the photo id."""
646 return self._master_record["recordName"]
648 @property
649 def filename(self):
650 """Gets the photo file name."""
651 try:
652 return base64.b64decode(
653 self._master_record["fields"]["filenameEnc"]["value"],
654 ).decode("utf-8")
655 except KeyError:
656 # Some photos/videos (e.g., GoPro videos) may not have a filename
657 return None
659 @property
660 def size(self):
661 """Gets the photo size."""
662 return self._master_record["fields"]["resOriginalRes"]["value"]["size"]
664 @property
665 def created(self):
666 """Gets the photo created date."""
667 return self.asset_date
669 @property
670 def asset_date(self):
671 """Gets the photo asset date."""
672 try:
673 return datetime.fromtimestamp(
674 self._asset_record["fields"]["assetDate"]["value"] / 1000.0,
675 tz=UTC,
676 )
677 except KeyError:
678 return datetime.fromtimestamp(0)
680 @property
681 def added_date(self):
682 """Gets the photo added date."""
683 return datetime.fromtimestamp(
684 self._asset_record["fields"]["addedDate"]["value"] / 1000.0,
685 tz=UTC,
686 )
688 @property
689 def dimensions(self):
690 """Gets the photo dimensions."""
691 return (
692 self._master_record["fields"]["resOriginalWidth"]["value"],
693 self._master_record["fields"]["resOriginalHeight"]["value"],
694 )
696 @property
697 def versions(self):
698 """Gets the photo versions."""
699 if not self._versions:
700 self._versions = {}
701 if "resVidSmallRes" in self._master_record["fields"]:
702 typed_version_lookup = self.VIDEO_VERSION_LOOKUP
703 else:
704 typed_version_lookup = self.PHOTO_VERSION_LOOKUP
706 for key, prefix in typed_version_lookup.items():
707 if f"{prefix}Res" in self._master_record["fields"]:
708 fields = self._master_record["fields"]
709 version = {"filename": self.filename}
711 width_entry = fields.get(f"{prefix}Width")
712 if width_entry:
713 version["width"] = width_entry["value"]
714 else:
715 version["width"] = None
717 height_entry = fields.get(f"{prefix}Height")
718 if height_entry:
719 version["height"] = height_entry["value"]
720 else:
721 version["height"] = None
723 size_entry = fields.get(f"{prefix}Res")
724 if size_entry:
725 version["size"] = size_entry["value"]["size"]
726 version["url"] = size_entry["value"]["downloadURL"]
727 else:
728 version["size"] = None
729 version["url"] = None
731 type_entry = fields.get(f"{prefix}FileType")
732 if type_entry:
733 version["type"] = type_entry["value"]
734 else:
735 version["type"] = None
737 self._versions[key] = version
739 return self._versions
741 def download(self, version="original", **kwargs):
742 """Returns the photo file."""
743 if version not in self.versions:
744 return None
746 return self._service.session.get(
747 self.versions[version]["url"],
748 stream=True,
749 **kwargs,
750 )
752 def delete(self):
753 """Deletes the photo."""
754 json_data = json.dumps(
755 {
756 "atomic": True,
757 "desiredKeys": ["isDeleted"],
758 "operations": [
759 {
760 "operationType": "update",
761 "record": {
762 "fields": {"isDeleted": {"value": 1}},
763 "recordChangeTag": self._asset_record["recordChangeTag"],
764 "recordName": self._asset_record["recordName"],
765 "recordType": self._asset_record["recordType"],
766 },
767 },
768 ],
769 "zoneID": self._service.zone_id,
770 },
771 )
773 endpoint = self._service._service_endpoint
774 params = urlencode(self._service.params)
775 url = f"{endpoint}/records/modify?{params}"
777 return self._service.session.post(
778 url,
779 data=json_data,
780 headers={"Content-type": "text/plain"},
781 )
783 def __repr__(self):
784 return f"<{type(self).__name__}: id={self.id}>"