Coverage for src/usage.py: 100%
196 statements
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
« prev ^ index » next coverage.py v7.14.1, created at 2026-06-06 02:49 +0000
1"""To record usage of the app."""
3import json
4import os
5import tempfile
6import time
7from datetime import datetime
8from typing import Any
10import requests
12from src import get_logger
13from src.config_parser import get_usage_tracking_enabled, prepare_root_destination
15LOGGER = get_logger()
17# Same ICLOUD_DOCKER_CONFIG_DIR override as ``DEFAULT_COOKIE_DIRECTORY``
18# in ``src/__init__.py`` — lets the test suite on non-container hosts
19# (macOS, sandboxes) redirect the usage cache to a writable tempdir
20# instead of the read-only ``/config`` mount point. Defaults to
21# ``/config/.data`` so production container deployments are unchanged.
22CACHE_FILE_NAME = os.path.join(os.environ.get("ICLOUD_DOCKER_CONFIG_DIR", "/config"), ".data")
23NEW_INSTALLATION_ENDPOINT = os.environ.get("NEW_INSTALLATION_ENDPOINT", None)
24NEW_HEARTBEAT_ENDPOINT = os.environ.get("NEW_HEARTBEAT_ENDPOINT", None)
25APP_NAME = "icloud-docker"
26APP_VERSION = os.environ.get("APP_VERSION", "dev")
27NEW_INSTALLATION_DATA = {"appName": APP_NAME, "appVersion": APP_VERSION}
29# Retry configuration
30MAX_RETRIES = int(os.environ.get("USAGE_TRACKING_MAX_RETRIES", "3"))
31RETRY_BACKOFF_FACTOR = float(os.environ.get("USAGE_TRACKING_RETRY_BACKOFF", "2.0"))
34def init_cache(config: dict) -> str:
35 """Initialize the cache file.
37 Args:
38 config: Configuration dictionary containing root destination path
40 Returns:
41 Absolute path to the cache file
42 """
43 root_destination_path = prepare_root_destination(config=config)
44 cache_file_path = os.path.join(root_destination_path, CACHE_FILE_NAME)
45 LOGGER.debug(f"Initialized usage cache at: {cache_file_path}")
46 return cache_file_path
49def validate_cache_data(data: dict) -> bool:
50 """Validate cache data structure.
52 Args:
53 data: Dictionary to validate
55 Returns:
56 True if data is valid, False otherwise
57 """
58 # Basic structure validation
59 if not isinstance(data, dict):
60 return False
62 # If we have an ID, validate it's a string
63 if "id" in data and not isinstance(data["id"], str):
64 return False
66 # If we have app_version, validate it's a string
67 if "app_version" in data and not isinstance(data["app_version"], str):
68 return False
70 # If we have heartbeat timestamp, validate format
71 if "heartbeat_timestamp" in data:
72 try:
73 datetime.strptime(data["heartbeat_timestamp"], "%Y-%m-%d %H:%M:%S.%f")
74 except (ValueError, TypeError):
75 return False
77 return True
80def load_cache(file_path: str) -> dict:
81 """Load the cache file with validation and corruption recovery.
83 Args:
84 file_path: Absolute path to the cache file
86 Returns:
87 Dictionary containing cached usage data
88 """
89 data = {}
90 if os.path.isfile(file_path):
91 try:
92 with open(file_path, encoding="utf-8") as f:
93 loaded_data = json.load(f)
95 # Validate the loaded data
96 if validate_cache_data(loaded_data):
97 data = loaded_data
98 LOGGER.debug(f"Loaded and validated usage cache from: {file_path}")
99 else:
100 LOGGER.warning(f"Cache data validation failed for {file_path}, starting fresh")
101 save_cache(file_path=file_path, data={})
102 except (json.JSONDecodeError, OSError) as e:
103 LOGGER.error(f"Failed to load usage cache from {file_path}: {e}")
104 LOGGER.info("Creating new empty cache file due to corruption")
105 save_cache(file_path=file_path, data={})
106 else:
107 LOGGER.debug(f"Usage cache file not found, creating: {file_path}")
108 save_cache(file_path=file_path, data={})
109 return data
112def save_cache(file_path: str, data: dict) -> bool:
113 """Save data to the cache file using atomic operations.
115 Args:
116 file_path: Absolute path to the cache file
117 data: Dictionary containing usage data to save
119 Returns:
120 True if save was successful, False otherwise
121 """
122 try:
123 # Write to temporary file first for atomic operation
124 dir_name = os.path.dirname(file_path)
125 with tempfile.NamedTemporaryFile(
126 mode="w",
127 encoding="utf-8",
128 dir=dir_name,
129 delete=False,
130 suffix=".tmp",
131 ) as temp_file:
132 json.dump(data, temp_file, indent=2)
133 temp_path = temp_file.name
135 # Atomically move temp file to final location
136 os.rename(temp_path, file_path)
137 LOGGER.debug(f"Atomically saved usage cache to: {file_path}")
138 return True
139 except OSError as e:
140 LOGGER.error(f"Failed to save usage cache to {file_path}: {e}")
141 # Clean up temp file if it exists
142 try:
143 if "temp_path" in locals():
144 os.unlink(temp_path)
145 except OSError:
146 pass
147 return False
150def post_with_retry(
151 url: str,
152 json_data: dict,
153 timeout: int = 10,
154 max_retries: int = MAX_RETRIES,
155 backoff_factor: float = RETRY_BACKOFF_FACTOR,
156) -> requests.Response | None:
157 """Post request with exponential backoff retry.
159 Args:
160 url: Endpoint URL
161 json_data: JSON payload
162 timeout: Request timeout in seconds
163 max_retries: Maximum number of retry attempts
164 backoff_factor: Multiplier for exponential backoff
166 Returns:
167 Response object if successful, None otherwise
168 """
169 last_exception = None
171 for attempt in range(max_retries):
172 try:
173 response = requests.post(url, json=json_data, timeout=timeout) # type: ignore[arg-type]
175 # Don't retry on validation errors (4xx except rate limit)
176 if 400 <= response.status_code < 500 and response.status_code != 429:
177 LOGGER.debug(f"Non-retriable error (status {response.status_code})")
178 return response
180 # Success or retriable error
181 if response.ok:
182 return response
184 # Rate limit (429) or server error (5xx) - retry
185 LOGGER.warning(
186 f"Request failed with status {response.status_code}, " f"attempt {attempt + 1}/{max_retries}",
187 )
189 except (requests.ConnectionError, requests.Timeout) as e:
190 last_exception = e
191 LOGGER.warning(f"Network error: {e}, attempt {attempt + 1}/{max_retries}")
192 except Exception as e:
193 # Catch other exceptions but don't retry
194 LOGGER.error(f"Unexpected error during request: {e}")
195 return None
197 # Exponential backoff before next retry
198 if attempt < max_retries - 1:
199 wait_time = backoff_factor**attempt
200 LOGGER.debug(f"Waiting {wait_time}s before retry...")
201 time.sleep(wait_time)
203 # All retries exhausted
204 if last_exception:
205 LOGGER.error(f"All retry attempts failed: {last_exception}")
206 return None
209def post_new_installation(data: dict, endpoint=NEW_INSTALLATION_ENDPOINT) -> str | None:
210 """Post new installation to server with retry logic.
212 Args:
213 data: Dictionary containing installation data
214 endpoint: API endpoint URL, defaults to NEW_INSTALLATION_ENDPOINT
216 Returns:
217 Installation ID if successful, None otherwise
218 """
219 try:
220 LOGGER.debug(f"Posting new installation to: {endpoint}")
221 response = post_with_retry(endpoint, data, timeout=10)
223 if response and response.ok:
224 response_data = response.json()
225 installation_id = response_data["id"]
226 LOGGER.debug(f"Successfully registered new installation: {installation_id}")
227 return installation_id
228 else:
229 status = response.status_code if response else "no response"
230 LOGGER.error(f"Installation registration failed: {status}")
231 except Exception as e:
232 LOGGER.error(f"Failed to post new installation: {e}")
233 return None
236def record_new_installation(previous_id: str | None = None) -> str | None:
237 """Record new or upgrade existing installation.
239 Args:
240 previous_id: Previous installation ID for upgrades, None for new installations
242 Returns:
243 New installation ID if successful, None otherwise
244 """
245 data = dict(NEW_INSTALLATION_DATA)
246 if previous_id:
247 data["previousId"] = previous_id
248 return post_new_installation(data)
251def already_installed(cached_data: dict) -> bool:
252 """Check if already installed.
254 Args:
255 cached_data: Dictionary containing cached usage data
257 Returns:
258 True if installation is up-to-date, False otherwise
259 """
260 return "id" in cached_data and "app_version" in cached_data and cached_data["app_version"] == APP_VERSION
263def install(cached_data: dict) -> dict | None:
264 """Install the app.
266 Args:
267 cached_data: Dictionary containing cached usage data
269 Returns:
270 Updated cached data dictionary if successful, None otherwise
271 """
272 previous_id = cached_data.get("id", None)
273 if previous_id:
274 LOGGER.debug(f"Upgrading existing installation: {previous_id}")
275 else:
276 LOGGER.debug("Installing new instance")
278 new_id = record_new_installation(previous_id)
279 if new_id:
280 cached_data["id"] = new_id
281 cached_data["app_version"] = APP_VERSION
282 LOGGER.debug(f"Installation completed with ID: {new_id}")
283 return cached_data
285 LOGGER.error("Installation failed")
286 return None
289def post_new_heartbeat(data: dict, endpoint=NEW_HEARTBEAT_ENDPOINT) -> bool:
290 """Post the heartbeat to server with retry logic.
292 Args:
293 data: Dictionary containing heartbeat data
294 endpoint: API endpoint URL, defaults to NEW_HEARTBEAT_ENDPOINT
296 Returns:
297 True if heartbeat was sent successfully, False otherwise
298 """
299 try:
300 LOGGER.debug(f"Posting heartbeat to: {endpoint}")
301 response = post_with_retry(endpoint, data, timeout=20)
303 if response and response.ok:
304 LOGGER.debug("Heartbeat sent successfully")
305 return True
306 else:
307 status = response.status_code if response else "no response"
308 LOGGER.error(f"Heartbeat failed: {status}")
309 except Exception as e:
310 LOGGER.error(f"Failed to post heartbeat: {e}")
311 return False
314def send_heartbeat(app_id: str | None, data: Any = None) -> bool:
315 """Prepare and send heartbeat to server.
317 Args:
318 app_id: Installation ID for heartbeat identification
319 data: Additional data to send with heartbeat
321 Returns:
322 True if heartbeat was sent successfully, False otherwise
323 """
324 data = {"installationId": app_id, "data": data}
325 return post_new_heartbeat(data)
328def current_time() -> datetime:
329 """Get current UTC time.
331 Returns:
332 Current UTC datetime object
333 """
334 return datetime.utcnow()
337def heartbeat(cached_data: dict, data: Any) -> dict | None:
338 """Send heartbeat.
340 Args:
341 cached_data: Dictionary containing cached usage data
342 data: Additional data to send with heartbeat
344 Returns:
345 Updated cached data dictionary if heartbeat was sent,
346 None if heartbeat was throttled or failed
347 """
348 previous_heartbeat = cached_data.get("heartbeat_timestamp", None)
349 current = current_time()
351 if previous_heartbeat:
352 try:
353 previous = datetime.strptime(previous_heartbeat, "%Y-%m-%d %H:%M:%S.%f")
354 time_since_last = current - previous
355 LOGGER.debug(f"Time since last heartbeat: {time_since_last}")
357 # Check if different UTC day, not just 24 hours
358 if previous.date() < current.date():
359 LOGGER.debug("Sending heartbeat (different UTC day)")
360 if send_heartbeat(cached_data.get("id"), data=data):
361 cached_data["heartbeat_timestamp"] = str(current)
362 return cached_data
363 else:
364 LOGGER.warning("Heartbeat send failed")
365 return None
366 else:
367 LOGGER.debug("Heartbeat throttled (same UTC day)")
368 return None
369 except ValueError as e:
370 LOGGER.error(f"Invalid heartbeat timestamp format: {e}")
371 # Treat as first heartbeat if timestamp is invalid
373 # First heartbeat or invalid timestamp
374 LOGGER.debug("Sending first heartbeat")
375 if send_heartbeat(cached_data.get("id"), data=data):
376 cached_data["heartbeat_timestamp"] = str(current)
377 LOGGER.debug("First heartbeat sent successfully")
378 return cached_data
379 else:
380 LOGGER.warning("First heartbeat send failed")
381 return None
384def alive(config: dict, data: Any = None) -> bool:
385 """Record liveliness.
387 Args:
388 config: Configuration dictionary
389 data: Additional usage data to send with heartbeat
391 Returns:
392 True if usage tracking was successful, False otherwise
393 """
394 # Check if usage tracking is disabled
395 if not get_usage_tracking_enabled(config):
396 LOGGER.debug("Usage tracking is disabled, skipping")
397 return True # Return True to not affect main sync loop
399 LOGGER.debug("Usage tracking alive check started")
401 cache_file_path = init_cache(config=config)
402 cached_data = load_cache(cache_file_path)
404 if not already_installed(cached_data=cached_data):
405 LOGGER.debug("New installation detected, registering...")
406 installed_data = install(cached_data=cached_data)
407 if installed_data is not None:
408 result = save_cache(file_path=cache_file_path, data=installed_data)
409 LOGGER.debug("Installation registration completed")
410 return result
411 else:
412 LOGGER.error("Installation registration failed")
413 return False
415 LOGGER.debug("Installation already registered, checking heartbeat")
416 heartbeat_data = heartbeat(cached_data=cached_data, data=data)
417 if heartbeat_data is not None:
418 result = save_cache(file_path=cache_file_path, data=heartbeat_data)
419 LOGGER.debug("Heartbeat completed successfully")
420 return result
422 LOGGER.debug("No heartbeat required or heartbeat failed")
423 return False