Coverage for src/usage.py: 100%

196 statements  

« prev     ^ index     » next       coverage.py v7.11.0, created at 2025-10-22 13:54 +0000

1"""To record usage of the app.""" 

2 

3import json 

4import os 

5import tempfile 

6import time 

7from datetime import datetime 

8from typing import Any 

9 

10import requests 

11 

12from src import get_logger 

13from src.config_parser import get_usage_tracking_enabled, prepare_root_destination 

14 

15LOGGER = get_logger() 

16 

17CACHE_FILE_NAME = "/config/.data" 

18NEW_INSTALLATION_ENDPOINT = os.environ.get("NEW_INSTALLATION_ENDPOINT", None) 

19NEW_HEARTBEAT_ENDPOINT = os.environ.get("NEW_HEARTBEAT_ENDPOINT", None) 

20APP_NAME = "icloud-docker" 

21APP_VERSION = os.environ.get("APP_VERSION", "dev") 

22NEW_INSTALLATION_DATA = {"appName": APP_NAME, "appVersion": APP_VERSION} 

23 

24# Retry configuration 

25MAX_RETRIES = int(os.environ.get("USAGE_TRACKING_MAX_RETRIES", "3")) 

26RETRY_BACKOFF_FACTOR = float(os.environ.get("USAGE_TRACKING_RETRY_BACKOFF", "2.0")) 

27 

28 

29def init_cache(config: dict) -> str: 

30 """Initialize the cache file. 

31 

32 Args: 

33 config: Configuration dictionary containing root destination path 

34 

35 Returns: 

36 Absolute path to the cache file 

37 """ 

38 root_destination_path = prepare_root_destination(config=config) 

39 cache_file_path = os.path.join(root_destination_path, CACHE_FILE_NAME) 

40 LOGGER.debug(f"Initialized usage cache at: {cache_file_path}") 

41 return cache_file_path 

42 

43 

44def validate_cache_data(data: dict) -> bool: 

45 """Validate cache data structure. 

46 

47 Args: 

48 data: Dictionary to validate 

49 

50 Returns: 

51 True if data is valid, False otherwise 

52 """ 

53 # Basic structure validation 

54 if not isinstance(data, dict): 

55 return False 

56 

57 # If we have an ID, validate it's a string 

58 if "id" in data and not isinstance(data["id"], str): 

59 return False 

60 

61 # If we have app_version, validate it's a string 

62 if "app_version" in data and not isinstance(data["app_version"], str): 

63 return False 

64 

65 # If we have heartbeat timestamp, validate format 

66 if "heartbeat_timestamp" in data: 

67 try: 

68 datetime.strptime(data["heartbeat_timestamp"], "%Y-%m-%d %H:%M:%S.%f") 

69 except (ValueError, TypeError): 

70 return False 

71 

72 return True 

73 

74 

75def load_cache(file_path: str) -> dict: 

76 """Load the cache file with validation and corruption recovery. 

77 

78 Args: 

79 file_path: Absolute path to the cache file 

80 

81 Returns: 

82 Dictionary containing cached usage data 

83 """ 

84 data = {} 

85 if os.path.isfile(file_path): 

86 try: 

87 with open(file_path, encoding="utf-8") as f: 

88 loaded_data = json.load(f) 

89 

90 # Validate the loaded data 

91 if validate_cache_data(loaded_data): 

92 data = loaded_data 

93 LOGGER.debug(f"Loaded and validated usage cache from: {file_path}") 

94 else: 

95 LOGGER.warning(f"Cache data validation failed for {file_path}, starting fresh") 

96 save_cache(file_path=file_path, data={}) 

97 except (json.JSONDecodeError, OSError) as e: 

98 LOGGER.error(f"Failed to load usage cache from {file_path}: {e}") 

99 LOGGER.info("Creating new empty cache file due to corruption") 

100 save_cache(file_path=file_path, data={}) 

101 else: 

102 LOGGER.debug(f"Usage cache file not found, creating: {file_path}") 

103 save_cache(file_path=file_path, data={}) 

104 return data 

105 

106 

107def save_cache(file_path: str, data: dict) -> bool: 

108 """Save data to the cache file using atomic operations. 

109 

110 Args: 

111 file_path: Absolute path to the cache file 

112 data: Dictionary containing usage data to save 

113 

114 Returns: 

115 True if save was successful, False otherwise 

116 """ 

117 try: 

118 # Write to temporary file first for atomic operation 

119 dir_name = os.path.dirname(file_path) 

120 with tempfile.NamedTemporaryFile( 

121 mode="w", 

122 encoding="utf-8", 

123 dir=dir_name, 

124 delete=False, 

125 suffix=".tmp", 

126 ) as temp_file: 

127 json.dump(data, temp_file, indent=2) 

128 temp_path = temp_file.name 

129 

130 # Atomically move temp file to final location 

131 os.rename(temp_path, file_path) 

132 LOGGER.debug(f"Atomically saved usage cache to: {file_path}") 

133 return True 

134 except OSError as e: 

135 LOGGER.error(f"Failed to save usage cache to {file_path}: {e}") 

136 # Clean up temp file if it exists 

137 try: 

138 if "temp_path" in locals(): 

139 os.unlink(temp_path) 

140 except OSError: 

141 pass 

142 return False 

143 

144 

145def post_with_retry( 

146 url: str, 

147 json_data: dict, 

148 timeout: int = 10, 

149 max_retries: int = MAX_RETRIES, 

150 backoff_factor: float = RETRY_BACKOFF_FACTOR, 

151) -> requests.Response | None: 

152 """Post request with exponential backoff retry. 

153 

154 Args: 

155 url: Endpoint URL 

156 json_data: JSON payload 

157 timeout: Request timeout in seconds 

158 max_retries: Maximum number of retry attempts 

159 backoff_factor: Multiplier for exponential backoff 

160 

161 Returns: 

162 Response object if successful, None otherwise 

163 """ 

164 last_exception = None 

165 

166 for attempt in range(max_retries): 

167 try: 

168 response = requests.post(url, json=json_data, timeout=timeout) # type: ignore[arg-type] 

169 

170 # Don't retry on validation errors (4xx except rate limit) 

171 if 400 <= response.status_code < 500 and response.status_code != 429: 

172 LOGGER.debug(f"Non-retriable error (status {response.status_code})") 

173 return response 

174 

175 # Success or retriable error 

176 if response.ok: 

177 return response 

178 

179 # Rate limit (429) or server error (5xx) - retry 

180 LOGGER.warning( 

181 f"Request failed with status {response.status_code}, " f"attempt {attempt + 1}/{max_retries}", 

182 ) 

183 

184 except (requests.ConnectionError, requests.Timeout) as e: 

185 last_exception = e 

186 LOGGER.warning(f"Network error: {e}, attempt {attempt + 1}/{max_retries}") 

187 except Exception as e: 

188 # Catch other exceptions but don't retry 

189 LOGGER.error(f"Unexpected error during request: {e}") 

190 return None 

191 

192 # Exponential backoff before next retry 

193 if attempt < max_retries - 1: 

194 wait_time = backoff_factor**attempt 

195 LOGGER.debug(f"Waiting {wait_time}s before retry...") 

196 time.sleep(wait_time) 

197 

198 # All retries exhausted 

199 if last_exception: 

200 LOGGER.error(f"All retry attempts failed: {last_exception}") 

201 return None 

202 

203 

204def post_new_installation(data: dict, endpoint=NEW_INSTALLATION_ENDPOINT) -> str | None: 

205 """Post new installation to server with retry logic. 

206 

207 Args: 

208 data: Dictionary containing installation data 

209 endpoint: API endpoint URL, defaults to NEW_INSTALLATION_ENDPOINT 

210 

211 Returns: 

212 Installation ID if successful, None otherwise 

213 """ 

214 try: 

215 LOGGER.debug(f"Posting new installation to: {endpoint}") 

216 response = post_with_retry(endpoint, data, timeout=10) 

217 

218 if response and response.ok: 

219 response_data = response.json() 

220 installation_id = response_data["id"] 

221 LOGGER.debug(f"Successfully registered new installation: {installation_id}") 

222 return installation_id 

223 else: 

224 status = response.status_code if response else "no response" 

225 LOGGER.error(f"Installation registration failed: {status}") 

226 except Exception as e: 

227 LOGGER.error(f"Failed to post new installation: {e}") 

228 return None 

229 

230 

231def record_new_installation(previous_id: str | None = None) -> str | None: 

232 """Record new or upgrade existing installation. 

233 

234 Args: 

235 previous_id: Previous installation ID for upgrades, None for new installations 

236 

237 Returns: 

238 New installation ID if successful, None otherwise 

239 """ 

240 data = dict(NEW_INSTALLATION_DATA) 

241 if previous_id: 

242 data["previousId"] = previous_id 

243 return post_new_installation(data) 

244 

245 

246def already_installed(cached_data: dict) -> bool: 

247 """Check if already installed. 

248 

249 Args: 

250 cached_data: Dictionary containing cached usage data 

251 

252 Returns: 

253 True if installation is up-to-date, False otherwise 

254 """ 

255 return "id" in cached_data and "app_version" in cached_data and cached_data["app_version"] == APP_VERSION 

256 

257 

258def install(cached_data: dict) -> dict | None: 

259 """Install the app. 

260 

261 Args: 

262 cached_data: Dictionary containing cached usage data 

263 

264 Returns: 

265 Updated cached data dictionary if successful, None otherwise 

266 """ 

267 previous_id = cached_data.get("id", None) 

268 if previous_id: 

269 LOGGER.debug(f"Upgrading existing installation: {previous_id}") 

270 else: 

271 LOGGER.debug("Installing new instance") 

272 

273 new_id = record_new_installation(previous_id) 

274 if new_id: 

275 cached_data["id"] = new_id 

276 cached_data["app_version"] = APP_VERSION 

277 LOGGER.debug(f"Installation completed with ID: {new_id}") 

278 return cached_data 

279 

280 LOGGER.error("Installation failed") 

281 return None 

282 

283 

284def post_new_heartbeat(data: dict, endpoint=NEW_HEARTBEAT_ENDPOINT) -> bool: 

285 """Post the heartbeat to server with retry logic. 

286 

287 Args: 

288 data: Dictionary containing heartbeat data 

289 endpoint: API endpoint URL, defaults to NEW_HEARTBEAT_ENDPOINT 

290 

291 Returns: 

292 True if heartbeat was sent successfully, False otherwise 

293 """ 

294 try: 

295 LOGGER.debug(f"Posting heartbeat to: {endpoint}") 

296 response = post_with_retry(endpoint, data, timeout=20) 

297 

298 if response and response.ok: 

299 LOGGER.debug("Heartbeat sent successfully") 

300 return True 

301 else: 

302 status = response.status_code if response else "no response" 

303 LOGGER.error(f"Heartbeat failed: {status}") 

304 except Exception as e: 

305 LOGGER.error(f"Failed to post heartbeat: {e}") 

306 return False 

307 

308 

309def send_heartbeat(app_id: str | None, data: Any = None) -> bool: 

310 """Prepare and send heartbeat to server. 

311 

312 Args: 

313 app_id: Installation ID for heartbeat identification 

314 data: Additional data to send with heartbeat 

315 

316 Returns: 

317 True if heartbeat was sent successfully, False otherwise 

318 """ 

319 data = {"installationId": app_id, "data": data} 

320 return post_new_heartbeat(data) 

321 

322 

323def current_time() -> datetime: 

324 """Get current UTC time. 

325 

326 Returns: 

327 Current UTC datetime object 

328 """ 

329 return datetime.utcnow() 

330 

331 

332def heartbeat(cached_data: dict, data: Any) -> dict | None: 

333 """Send heartbeat. 

334 

335 Args: 

336 cached_data: Dictionary containing cached usage data 

337 data: Additional data to send with heartbeat 

338 

339 Returns: 

340 Updated cached data dictionary if heartbeat was sent, 

341 None if heartbeat was throttled or failed 

342 """ 

343 previous_heartbeat = cached_data.get("heartbeat_timestamp", None) 

344 current = current_time() 

345 

346 if previous_heartbeat: 

347 try: 

348 previous = datetime.strptime(previous_heartbeat, "%Y-%m-%d %H:%M:%S.%f") 

349 time_since_last = current - previous 

350 LOGGER.debug(f"Time since last heartbeat: {time_since_last}") 

351 

352 # Check if different UTC day, not just 24 hours 

353 if previous.date() < current.date(): 

354 LOGGER.debug("Sending heartbeat (different UTC day)") 

355 if send_heartbeat(cached_data.get("id"), data=data): 

356 cached_data["heartbeat_timestamp"] = str(current) 

357 return cached_data 

358 else: 

359 LOGGER.warning("Heartbeat send failed") 

360 return None 

361 else: 

362 LOGGER.debug("Heartbeat throttled (same UTC day)") 

363 return None 

364 except ValueError as e: 

365 LOGGER.error(f"Invalid heartbeat timestamp format: {e}") 

366 # Treat as first heartbeat if timestamp is invalid 

367 

368 # First heartbeat or invalid timestamp 

369 LOGGER.debug("Sending first heartbeat") 

370 if send_heartbeat(cached_data.get("id"), data=data): 

371 cached_data["heartbeat_timestamp"] = str(current) 

372 LOGGER.debug("First heartbeat sent successfully") 

373 return cached_data 

374 else: 

375 LOGGER.warning("First heartbeat send failed") 

376 return None 

377 

378 

379def alive(config: dict, data: Any = None) -> bool: 

380 """Record liveliness. 

381 

382 Args: 

383 config: Configuration dictionary 

384 data: Additional usage data to send with heartbeat 

385 

386 Returns: 

387 True if usage tracking was successful, False otherwise 

388 """ 

389 # Check if usage tracking is disabled 

390 if not get_usage_tracking_enabled(config): 

391 LOGGER.debug("Usage tracking is disabled, skipping") 

392 return True # Return True to not affect main sync loop 

393 

394 LOGGER.debug("Usage tracking alive check started") 

395 

396 cache_file_path = init_cache(config=config) 

397 cached_data = load_cache(cache_file_path) 

398 

399 if not already_installed(cached_data=cached_data): 

400 LOGGER.debug("New installation detected, registering...") 

401 installed_data = install(cached_data=cached_data) 

402 if installed_data is not None: 

403 result = save_cache(file_path=cache_file_path, data=installed_data) 

404 LOGGER.debug("Installation registration completed") 

405 return result 

406 else: 

407 LOGGER.error("Installation registration failed") 

408 return False 

409 

410 LOGGER.debug("Installation already registered, checking heartbeat") 

411 heartbeat_data = heartbeat(cached_data=cached_data, data=data) 

412 if heartbeat_data is not None: 

413 result = save_cache(file_path=cache_file_path, data=heartbeat_data) 

414 LOGGER.debug("Heartbeat completed successfully") 

415 return result 

416 

417 LOGGER.debug("No heartbeat required or heartbeat failed") 

418 return False