Coverage for src/usage.py: 100%
167 statements
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
« prev ^ index » next coverage.py v7.10.7, created at 2025-10-16 04:41 +0000
1"""To record usage of the app."""
3import json
4import os
5import tempfile
6from datetime import datetime, timedelta
7from typing import Any
9import requests
11from src import get_logger
12from src.config_parser import get_usage_tracking_enabled, prepare_root_destination
14LOGGER = get_logger()
16CACHE_FILE_NAME = "/config/.data"
17NEW_INSTALLATION_ENDPOINT = os.environ.get("NEW_INSTALLATION_ENDPOINT", None)
18NEW_HEARTBEAT_ENDPOINT = os.environ.get("NEW_HEARTBEAT_ENDPOINT", None)
19APP_NAME = "icloud-docker"
20APP_VERSION = os.environ.get("APP_VERSION", "dev")
21NEW_INSTALLATION_DATA = {"appName": APP_NAME, "appVersion": APP_VERSION}
24def init_cache(config: dict) -> str:
25 """Initialize the cache file.
27 Args:
28 config: Configuration dictionary containing root destination path
30 Returns:
31 Absolute path to the cache file
32 """
33 root_destination_path = prepare_root_destination(config=config)
34 cache_file_path = os.path.join(root_destination_path, CACHE_FILE_NAME)
35 LOGGER.debug(f"Initialized usage cache at: {cache_file_path}")
36 return cache_file_path
39def validate_cache_data(data: dict) -> bool:
40 """Validate cache data structure.
42 Args:
43 data: Dictionary to validate
45 Returns:
46 True if data is valid, False otherwise
47 """
48 # Basic structure validation
49 if not isinstance(data, dict):
50 return False
52 # If we have an ID, validate it's a string
53 if "id" in data and not isinstance(data["id"], str):
54 return False
56 # If we have app_version, validate it's a string
57 if "app_version" in data and not isinstance(data["app_version"], str):
58 return False
60 # If we have heartbeat timestamp, validate format
61 if "heartbeat_timestamp" in data:
62 try:
63 datetime.strptime(data["heartbeat_timestamp"], "%Y-%m-%d %H:%M:%S.%f")
64 except (ValueError, TypeError):
65 return False
67 return True
70def load_cache(file_path: str) -> dict:
71 """Load the cache file with validation and corruption recovery.
73 Args:
74 file_path: Absolute path to the cache file
76 Returns:
77 Dictionary containing cached usage data
78 """
79 data = {}
80 if os.path.isfile(file_path):
81 try:
82 with open(file_path, encoding="utf-8") as f:
83 loaded_data = json.load(f)
85 # Validate the loaded data
86 if validate_cache_data(loaded_data):
87 data = loaded_data
88 LOGGER.debug(f"Loaded and validated usage cache from: {file_path}")
89 else:
90 LOGGER.warning(f"Cache data validation failed for {file_path}, starting fresh")
91 save_cache(file_path=file_path, data={})
92 except (json.JSONDecodeError, OSError) as e:
93 LOGGER.error(f"Failed to load usage cache from {file_path}: {e}")
94 LOGGER.info("Creating new empty cache file due to corruption")
95 save_cache(file_path=file_path, data={})
96 else:
97 LOGGER.debug(f"Usage cache file not found, creating: {file_path}")
98 save_cache(file_path=file_path, data={})
99 return data
102def save_cache(file_path: str, data: dict) -> bool:
103 """Save data to the cache file using atomic operations.
105 Args:
106 file_path: Absolute path to the cache file
107 data: Dictionary containing usage data to save
109 Returns:
110 True if save was successful, False otherwise
111 """
112 try:
113 # Write to temporary file first for atomic operation
114 dir_name = os.path.dirname(file_path)
115 with tempfile.NamedTemporaryFile(
116 mode="w",
117 encoding="utf-8",
118 dir=dir_name,
119 delete=False,
120 suffix=".tmp",
121 ) as temp_file:
122 json.dump(data, temp_file, indent=2)
123 temp_path = temp_file.name
125 # Atomically move temp file to final location
126 os.rename(temp_path, file_path)
127 LOGGER.debug(f"Atomically saved usage cache to: {file_path}")
128 return True
129 except OSError as e:
130 LOGGER.error(f"Failed to save usage cache to {file_path}: {e}")
131 # Clean up temp file if it exists
132 try:
133 if "temp_path" in locals():
134 os.unlink(temp_path)
135 except OSError:
136 pass
137 return False
140def post_new_installation(data: dict, endpoint=NEW_INSTALLATION_ENDPOINT) -> str | None:
141 """Post new installation to server.
143 Args:
144 data: Dictionary containing installation data
145 endpoint: API endpoint URL, defaults to NEW_INSTALLATION_ENDPOINT
147 Returns:
148 Installation ID if successful, None otherwise
149 """
150 try:
151 LOGGER.debug(f"Posting new installation to: {endpoint}")
152 response = requests.post(endpoint, json=data, timeout=10) # type: ignore[arg-type]
153 if response.ok:
154 response_data = response.json()
155 installation_id = response_data["id"]
156 LOGGER.debug(f"Successfully registered new installation: {installation_id}")
157 return installation_id
158 else:
159 LOGGER.debug(f"Installation registration failed with status {response.status_code}")
160 except Exception as e:
161 LOGGER.error(f"Failed to post new installation: {e}")
162 return None
165def record_new_installation(previous_id: str | None = None) -> str | None:
166 """Record new or upgrade existing installation.
168 Args:
169 previous_id: Previous installation ID for upgrades, None for new installations
171 Returns:
172 New installation ID if successful, None otherwise
173 """
174 data = dict(NEW_INSTALLATION_DATA)
175 if previous_id:
176 data["previousId"] = previous_id
177 return post_new_installation(data)
180def already_installed(cached_data: dict) -> bool:
181 """Check if already installed.
183 Args:
184 cached_data: Dictionary containing cached usage data
186 Returns:
187 True if installation is up-to-date, False otherwise
188 """
189 return "id" in cached_data and "app_version" in cached_data and cached_data["app_version"] == APP_VERSION
192def install(cached_data: dict) -> dict | None:
193 """Install the app.
195 Args:
196 cached_data: Dictionary containing cached usage data
198 Returns:
199 Updated cached data dictionary if successful, None otherwise
200 """
201 previous_id = cached_data.get("id", None)
202 if previous_id:
203 LOGGER.debug(f"Upgrading existing installation: {previous_id}")
204 else:
205 LOGGER.debug("Installing new instance")
207 new_id = record_new_installation(previous_id)
208 if new_id:
209 cached_data["id"] = new_id
210 cached_data["app_version"] = APP_VERSION
211 LOGGER.debug(f"Installation completed with ID: {new_id}")
212 return cached_data
214 LOGGER.error("Installation failed")
215 return None
218def post_new_heartbeat(data: dict, endpoint=NEW_HEARTBEAT_ENDPOINT) -> bool:
219 """Post the heartbeat to server.
221 Args:
222 data: Dictionary containing heartbeat data
223 endpoint: API endpoint URL, defaults to NEW_HEARTBEAT_ENDPOINT
225 Returns:
226 True if heartbeat was sent successfully, False otherwise
227 """
228 try:
229 LOGGER.debug(f"Posting heartbeat to: {endpoint}")
230 response = requests.post(endpoint, json=data, timeout=20) # type: ignore[arg-type]
231 if response.ok:
232 LOGGER.debug("Heartbeat sent successfully")
233 return True
234 else:
235 LOGGER.debug(f"Heartbeat failed with status {response.status_code}")
236 except Exception as e:
237 LOGGER.error(f"Failed to post heartbeat: {e}")
238 return False
241def send_heartbeat(app_id: str | None, data: Any = None) -> bool:
242 """Prepare and send heartbeat to server.
244 Args:
245 app_id: Installation ID for heartbeat identification
246 data: Additional data to send with heartbeat
248 Returns:
249 True if heartbeat was sent successfully, False otherwise
250 """
251 data = {"installationId": app_id, "data": data}
252 return post_new_heartbeat(data)
255def current_time() -> datetime:
256 """Get current time.
258 Returns:
259 Current datetime object
260 """
261 return datetime.now()
264def heartbeat(cached_data: dict, data: Any) -> dict | None:
265 """Send heartbeat.
267 Args:
268 cached_data: Dictionary containing cached usage data
269 data: Additional data to send with heartbeat
271 Returns:
272 Updated cached data dictionary if heartbeat was sent,
273 None if heartbeat was throttled or failed
274 """
275 previous_heartbeat = cached_data.get("heartbeat_timestamp", None)
276 current = current_time()
278 if previous_heartbeat:
279 try:
280 previous = datetime.strptime(previous_heartbeat, "%Y-%m-%d %H:%M:%S.%f")
281 time_since_last = current - previous
282 LOGGER.debug(f"Time since last heartbeat: {time_since_last}")
284 if previous < (current - timedelta(hours=24)):
285 LOGGER.debug("Sending heartbeat (24+ hours since last)")
286 if send_heartbeat(cached_data.get("id"), data=data):
287 cached_data["heartbeat_timestamp"] = str(current)
288 return cached_data
289 else:
290 LOGGER.warning("Heartbeat send failed")
291 return None
292 else:
293 LOGGER.debug("Heartbeat throttled (less than 24 hours)")
294 return None
295 except ValueError as e:
296 LOGGER.error(f"Invalid heartbeat timestamp format: {e}")
297 # Treat as first heartbeat if timestamp is invalid
299 # First heartbeat or invalid timestamp
300 LOGGER.debug("Sending first heartbeat")
301 if send_heartbeat(cached_data.get("id"), data=data):
302 cached_data["heartbeat_timestamp"] = str(current)
303 LOGGER.debug("First heartbeat sent successfully")
304 return cached_data
305 else:
306 LOGGER.warning("First heartbeat send failed")
307 return None
310def alive(config: dict, data: Any = None) -> bool:
311 """Record liveliness.
313 Args:
314 config: Configuration dictionary
315 data: Additional usage data to send with heartbeat
317 Returns:
318 True if usage tracking was successful, False otherwise
319 """
320 # Check if usage tracking is disabled
321 if not get_usage_tracking_enabled(config):
322 LOGGER.debug("Usage tracking is disabled, skipping")
323 return True # Return True to not affect main sync loop
325 LOGGER.debug("Usage tracking alive check started")
327 cache_file_path = init_cache(config=config)
328 cached_data = load_cache(cache_file_path)
330 if not already_installed(cached_data=cached_data):
331 LOGGER.debug("New installation detected, registering...")
332 installed_data = install(cached_data=cached_data)
333 if installed_data is not None:
334 result = save_cache(file_path=cache_file_path, data=installed_data)
335 LOGGER.debug("Installation registration completed")
336 return result
337 else:
338 LOGGER.error("Installation registration failed")
339 return False
341 LOGGER.debug("Installation already registered, checking heartbeat")
342 heartbeat_data = heartbeat(cached_data=cached_data, data=data)
343 if heartbeat_data is not None:
344 result = save_cache(file_path=cache_file_path, data=heartbeat_data)
345 LOGGER.debug("Heartbeat completed successfully")
346 return result
348 LOGGER.debug("No heartbeat required or heartbeat failed")
349 return False