Coverage for src/usage.py: 100%
196 statements
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 13:54 +0000
« prev ^ index » next coverage.py v7.11.0, created at 2025-10-22 13:54 +0000
1"""To record usage of the app."""
3import json
4import os
5import tempfile
6import time
7from datetime import datetime
8from typing import Any
10import requests
12from src import get_logger
13from src.config_parser import get_usage_tracking_enabled, prepare_root_destination
15LOGGER = get_logger()
17CACHE_FILE_NAME = "/config/.data"
18NEW_INSTALLATION_ENDPOINT = os.environ.get("NEW_INSTALLATION_ENDPOINT", None)
19NEW_HEARTBEAT_ENDPOINT = os.environ.get("NEW_HEARTBEAT_ENDPOINT", None)
20APP_NAME = "icloud-docker"
21APP_VERSION = os.environ.get("APP_VERSION", "dev")
22NEW_INSTALLATION_DATA = {"appName": APP_NAME, "appVersion": APP_VERSION}
24# Retry configuration
25MAX_RETRIES = int(os.environ.get("USAGE_TRACKING_MAX_RETRIES", "3"))
26RETRY_BACKOFF_FACTOR = float(os.environ.get("USAGE_TRACKING_RETRY_BACKOFF", "2.0"))
29def init_cache(config: dict) -> str:
30 """Initialize the cache file.
32 Args:
33 config: Configuration dictionary containing root destination path
35 Returns:
36 Absolute path to the cache file
37 """
38 root_destination_path = prepare_root_destination(config=config)
39 cache_file_path = os.path.join(root_destination_path, CACHE_FILE_NAME)
40 LOGGER.debug(f"Initialized usage cache at: {cache_file_path}")
41 return cache_file_path
44def validate_cache_data(data: dict) -> bool:
45 """Validate cache data structure.
47 Args:
48 data: Dictionary to validate
50 Returns:
51 True if data is valid, False otherwise
52 """
53 # Basic structure validation
54 if not isinstance(data, dict):
55 return False
57 # If we have an ID, validate it's a string
58 if "id" in data and not isinstance(data["id"], str):
59 return False
61 # If we have app_version, validate it's a string
62 if "app_version" in data and not isinstance(data["app_version"], str):
63 return False
65 # If we have heartbeat timestamp, validate format
66 if "heartbeat_timestamp" in data:
67 try:
68 datetime.strptime(data["heartbeat_timestamp"], "%Y-%m-%d %H:%M:%S.%f")
69 except (ValueError, TypeError):
70 return False
72 return True
75def load_cache(file_path: str) -> dict:
76 """Load the cache file with validation and corruption recovery.
78 Args:
79 file_path: Absolute path to the cache file
81 Returns:
82 Dictionary containing cached usage data
83 """
84 data = {}
85 if os.path.isfile(file_path):
86 try:
87 with open(file_path, encoding="utf-8") as f:
88 loaded_data = json.load(f)
90 # Validate the loaded data
91 if validate_cache_data(loaded_data):
92 data = loaded_data
93 LOGGER.debug(f"Loaded and validated usage cache from: {file_path}")
94 else:
95 LOGGER.warning(f"Cache data validation failed for {file_path}, starting fresh")
96 save_cache(file_path=file_path, data={})
97 except (json.JSONDecodeError, OSError) as e:
98 LOGGER.error(f"Failed to load usage cache from {file_path}: {e}")
99 LOGGER.info("Creating new empty cache file due to corruption")
100 save_cache(file_path=file_path, data={})
101 else:
102 LOGGER.debug(f"Usage cache file not found, creating: {file_path}")
103 save_cache(file_path=file_path, data={})
104 return data
107def save_cache(file_path: str, data: dict) -> bool:
108 """Save data to the cache file using atomic operations.
110 Args:
111 file_path: Absolute path to the cache file
112 data: Dictionary containing usage data to save
114 Returns:
115 True if save was successful, False otherwise
116 """
117 try:
118 # Write to temporary file first for atomic operation
119 dir_name = os.path.dirname(file_path)
120 with tempfile.NamedTemporaryFile(
121 mode="w",
122 encoding="utf-8",
123 dir=dir_name,
124 delete=False,
125 suffix=".tmp",
126 ) as temp_file:
127 json.dump(data, temp_file, indent=2)
128 temp_path = temp_file.name
130 # Atomically move temp file to final location
131 os.rename(temp_path, file_path)
132 LOGGER.debug(f"Atomically saved usage cache to: {file_path}")
133 return True
134 except OSError as e:
135 LOGGER.error(f"Failed to save usage cache to {file_path}: {e}")
136 # Clean up temp file if it exists
137 try:
138 if "temp_path" in locals():
139 os.unlink(temp_path)
140 except OSError:
141 pass
142 return False
145def post_with_retry(
146 url: str,
147 json_data: dict,
148 timeout: int = 10,
149 max_retries: int = MAX_RETRIES,
150 backoff_factor: float = RETRY_BACKOFF_FACTOR,
151) -> requests.Response | None:
152 """Post request with exponential backoff retry.
154 Args:
155 url: Endpoint URL
156 json_data: JSON payload
157 timeout: Request timeout in seconds
158 max_retries: Maximum number of retry attempts
159 backoff_factor: Multiplier for exponential backoff
161 Returns:
162 Response object if successful, None otherwise
163 """
164 last_exception = None
166 for attempt in range(max_retries):
167 try:
168 response = requests.post(url, json=json_data, timeout=timeout) # type: ignore[arg-type]
170 # Don't retry on validation errors (4xx except rate limit)
171 if 400 <= response.status_code < 500 and response.status_code != 429:
172 LOGGER.debug(f"Non-retriable error (status {response.status_code})")
173 return response
175 # Success or retriable error
176 if response.ok:
177 return response
179 # Rate limit (429) or server error (5xx) - retry
180 LOGGER.warning(
181 f"Request failed with status {response.status_code}, " f"attempt {attempt + 1}/{max_retries}",
182 )
184 except (requests.ConnectionError, requests.Timeout) as e:
185 last_exception = e
186 LOGGER.warning(f"Network error: {e}, attempt {attempt + 1}/{max_retries}")
187 except Exception as e:
188 # Catch other exceptions but don't retry
189 LOGGER.error(f"Unexpected error during request: {e}")
190 return None
192 # Exponential backoff before next retry
193 if attempt < max_retries - 1:
194 wait_time = backoff_factor**attempt
195 LOGGER.debug(f"Waiting {wait_time}s before retry...")
196 time.sleep(wait_time)
198 # All retries exhausted
199 if last_exception:
200 LOGGER.error(f"All retry attempts failed: {last_exception}")
201 return None
204def post_new_installation(data: dict, endpoint=NEW_INSTALLATION_ENDPOINT) -> str | None:
205 """Post new installation to server with retry logic.
207 Args:
208 data: Dictionary containing installation data
209 endpoint: API endpoint URL, defaults to NEW_INSTALLATION_ENDPOINT
211 Returns:
212 Installation ID if successful, None otherwise
213 """
214 try:
215 LOGGER.debug(f"Posting new installation to: {endpoint}")
216 response = post_with_retry(endpoint, data, timeout=10)
218 if response and response.ok:
219 response_data = response.json()
220 installation_id = response_data["id"]
221 LOGGER.debug(f"Successfully registered new installation: {installation_id}")
222 return installation_id
223 else:
224 status = response.status_code if response else "no response"
225 LOGGER.error(f"Installation registration failed: {status}")
226 except Exception as e:
227 LOGGER.error(f"Failed to post new installation: {e}")
228 return None
231def record_new_installation(previous_id: str | None = None) -> str | None:
232 """Record new or upgrade existing installation.
234 Args:
235 previous_id: Previous installation ID for upgrades, None for new installations
237 Returns:
238 New installation ID if successful, None otherwise
239 """
240 data = dict(NEW_INSTALLATION_DATA)
241 if previous_id:
242 data["previousId"] = previous_id
243 return post_new_installation(data)
246def already_installed(cached_data: dict) -> bool:
247 """Check if already installed.
249 Args:
250 cached_data: Dictionary containing cached usage data
252 Returns:
253 True if installation is up-to-date, False otherwise
254 """
255 return "id" in cached_data and "app_version" in cached_data and cached_data["app_version"] == APP_VERSION
258def install(cached_data: dict) -> dict | None:
259 """Install the app.
261 Args:
262 cached_data: Dictionary containing cached usage data
264 Returns:
265 Updated cached data dictionary if successful, None otherwise
266 """
267 previous_id = cached_data.get("id", None)
268 if previous_id:
269 LOGGER.debug(f"Upgrading existing installation: {previous_id}")
270 else:
271 LOGGER.debug("Installing new instance")
273 new_id = record_new_installation(previous_id)
274 if new_id:
275 cached_data["id"] = new_id
276 cached_data["app_version"] = APP_VERSION
277 LOGGER.debug(f"Installation completed with ID: {new_id}")
278 return cached_data
280 LOGGER.error("Installation failed")
281 return None
284def post_new_heartbeat(data: dict, endpoint=NEW_HEARTBEAT_ENDPOINT) -> bool:
285 """Post the heartbeat to server with retry logic.
287 Args:
288 data: Dictionary containing heartbeat data
289 endpoint: API endpoint URL, defaults to NEW_HEARTBEAT_ENDPOINT
291 Returns:
292 True if heartbeat was sent successfully, False otherwise
293 """
294 try:
295 LOGGER.debug(f"Posting heartbeat to: {endpoint}")
296 response = post_with_retry(endpoint, data, timeout=20)
298 if response and response.ok:
299 LOGGER.debug("Heartbeat sent successfully")
300 return True
301 else:
302 status = response.status_code if response else "no response"
303 LOGGER.error(f"Heartbeat failed: {status}")
304 except Exception as e:
305 LOGGER.error(f"Failed to post heartbeat: {e}")
306 return False
309def send_heartbeat(app_id: str | None, data: Any = None) -> bool:
310 """Prepare and send heartbeat to server.
312 Args:
313 app_id: Installation ID for heartbeat identification
314 data: Additional data to send with heartbeat
316 Returns:
317 True if heartbeat was sent successfully, False otherwise
318 """
319 data = {"installationId": app_id, "data": data}
320 return post_new_heartbeat(data)
323def current_time() -> datetime:
324 """Get current UTC time.
326 Returns:
327 Current UTC datetime object
328 """
329 return datetime.utcnow()
332def heartbeat(cached_data: dict, data: Any) -> dict | None:
333 """Send heartbeat.
335 Args:
336 cached_data: Dictionary containing cached usage data
337 data: Additional data to send with heartbeat
339 Returns:
340 Updated cached data dictionary if heartbeat was sent,
341 None if heartbeat was throttled or failed
342 """
343 previous_heartbeat = cached_data.get("heartbeat_timestamp", None)
344 current = current_time()
346 if previous_heartbeat:
347 try:
348 previous = datetime.strptime(previous_heartbeat, "%Y-%m-%d %H:%M:%S.%f")
349 time_since_last = current - previous
350 LOGGER.debug(f"Time since last heartbeat: {time_since_last}")
352 # Check if different UTC day, not just 24 hours
353 if previous.date() < current.date():
354 LOGGER.debug("Sending heartbeat (different UTC day)")
355 if send_heartbeat(cached_data.get("id"), data=data):
356 cached_data["heartbeat_timestamp"] = str(current)
357 return cached_data
358 else:
359 LOGGER.warning("Heartbeat send failed")
360 return None
361 else:
362 LOGGER.debug("Heartbeat throttled (same UTC day)")
363 return None
364 except ValueError as e:
365 LOGGER.error(f"Invalid heartbeat timestamp format: {e}")
366 # Treat as first heartbeat if timestamp is invalid
368 # First heartbeat or invalid timestamp
369 LOGGER.debug("Sending first heartbeat")
370 if send_heartbeat(cached_data.get("id"), data=data):
371 cached_data["heartbeat_timestamp"] = str(current)
372 LOGGER.debug("First heartbeat sent successfully")
373 return cached_data
374 else:
375 LOGGER.warning("First heartbeat send failed")
376 return None
379def alive(config: dict, data: Any = None) -> bool:
380 """Record liveliness.
382 Args:
383 config: Configuration dictionary
384 data: Additional usage data to send with heartbeat
386 Returns:
387 True if usage tracking was successful, False otherwise
388 """
389 # Check if usage tracking is disabled
390 if not get_usage_tracking_enabled(config):
391 LOGGER.debug("Usage tracking is disabled, skipping")
392 return True # Return True to not affect main sync loop
394 LOGGER.debug("Usage tracking alive check started")
396 cache_file_path = init_cache(config=config)
397 cached_data = load_cache(cache_file_path)
399 if not already_installed(cached_data=cached_data):
400 LOGGER.debug("New installation detected, registering...")
401 installed_data = install(cached_data=cached_data)
402 if installed_data is not None:
403 result = save_cache(file_path=cache_file_path, data=installed_data)
404 LOGGER.debug("Installation registration completed")
405 return result
406 else:
407 LOGGER.error("Installation registration failed")
408 return False
410 LOGGER.debug("Installation already registered, checking heartbeat")
411 heartbeat_data = heartbeat(cached_data=cached_data, data=data)
412 if heartbeat_data is not None:
413 result = save_cache(file_path=cache_file_path, data=heartbeat_data)
414 LOGGER.debug("Heartbeat completed successfully")
415 return result
417 LOGGER.debug("No heartbeat required or heartbeat failed")
418 return False