Coverage for src/usage.py: 100%

196 statements  

« prev     ^ index     » next       coverage.py v7.14.1, created at 2026-06-06 02:49 +0000

1"""To record usage of the app.""" 

2 

3import json 

4import os 

5import tempfile 

6import time 

7from datetime import datetime 

8from typing import Any 

9 

10import requests 

11 

12from src import get_logger 

13from src.config_parser import get_usage_tracking_enabled, prepare_root_destination 

14 

15LOGGER = get_logger() 

16 

17# Same ICLOUD_DOCKER_CONFIG_DIR override as ``DEFAULT_COOKIE_DIRECTORY`` 

18# in ``src/__init__.py`` — lets the test suite on non-container hosts 

19# (macOS, sandboxes) redirect the usage cache to a writable tempdir 

20# instead of the read-only ``/config`` mount point. Defaults to 

21# ``/config/.data`` so production container deployments are unchanged. 

22CACHE_FILE_NAME = os.path.join(os.environ.get("ICLOUD_DOCKER_CONFIG_DIR", "/config"), ".data") 

23NEW_INSTALLATION_ENDPOINT = os.environ.get("NEW_INSTALLATION_ENDPOINT", None) 

24NEW_HEARTBEAT_ENDPOINT = os.environ.get("NEW_HEARTBEAT_ENDPOINT", None) 

25APP_NAME = "icloud-docker" 

26APP_VERSION = os.environ.get("APP_VERSION", "dev") 

27NEW_INSTALLATION_DATA = {"appName": APP_NAME, "appVersion": APP_VERSION} 

28 

29# Retry configuration 

30MAX_RETRIES = int(os.environ.get("USAGE_TRACKING_MAX_RETRIES", "3")) 

31RETRY_BACKOFF_FACTOR = float(os.environ.get("USAGE_TRACKING_RETRY_BACKOFF", "2.0")) 

32 

33 

34def init_cache(config: dict) -> str: 

35 """Initialize the cache file. 

36 

37 Args: 

38 config: Configuration dictionary containing root destination path 

39 

40 Returns: 

41 Absolute path to the cache file 

42 """ 

43 root_destination_path = prepare_root_destination(config=config) 

44 cache_file_path = os.path.join(root_destination_path, CACHE_FILE_NAME) 

45 LOGGER.debug(f"Initialized usage cache at: {cache_file_path}") 

46 return cache_file_path 

47 

48 

49def validate_cache_data(data: dict) -> bool: 

50 """Validate cache data structure. 

51 

52 Args: 

53 data: Dictionary to validate 

54 

55 Returns: 

56 True if data is valid, False otherwise 

57 """ 

58 # Basic structure validation 

59 if not isinstance(data, dict): 

60 return False 

61 

62 # If we have an ID, validate it's a string 

63 if "id" in data and not isinstance(data["id"], str): 

64 return False 

65 

66 # If we have app_version, validate it's a string 

67 if "app_version" in data and not isinstance(data["app_version"], str): 

68 return False 

69 

70 # If we have heartbeat timestamp, validate format 

71 if "heartbeat_timestamp" in data: 

72 try: 

73 datetime.strptime(data["heartbeat_timestamp"], "%Y-%m-%d %H:%M:%S.%f") 

74 except (ValueError, TypeError): 

75 return False 

76 

77 return True 

78 

79 

80def load_cache(file_path: str) -> dict: 

81 """Load the cache file with validation and corruption recovery. 

82 

83 Args: 

84 file_path: Absolute path to the cache file 

85 

86 Returns: 

87 Dictionary containing cached usage data 

88 """ 

89 data = {} 

90 if os.path.isfile(file_path): 

91 try: 

92 with open(file_path, encoding="utf-8") as f: 

93 loaded_data = json.load(f) 

94 

95 # Validate the loaded data 

96 if validate_cache_data(loaded_data): 

97 data = loaded_data 

98 LOGGER.debug(f"Loaded and validated usage cache from: {file_path}") 

99 else: 

100 LOGGER.warning(f"Cache data validation failed for {file_path}, starting fresh") 

101 save_cache(file_path=file_path, data={}) 

102 except (json.JSONDecodeError, OSError) as e: 

103 LOGGER.error(f"Failed to load usage cache from {file_path}: {e}") 

104 LOGGER.info("Creating new empty cache file due to corruption") 

105 save_cache(file_path=file_path, data={}) 

106 else: 

107 LOGGER.debug(f"Usage cache file not found, creating: {file_path}") 

108 save_cache(file_path=file_path, data={}) 

109 return data 

110 

111 

112def save_cache(file_path: str, data: dict) -> bool: 

113 """Save data to the cache file using atomic operations. 

114 

115 Args: 

116 file_path: Absolute path to the cache file 

117 data: Dictionary containing usage data to save 

118 

119 Returns: 

120 True if save was successful, False otherwise 

121 """ 

122 try: 

123 # Write to temporary file first for atomic operation 

124 dir_name = os.path.dirname(file_path) 

125 with tempfile.NamedTemporaryFile( 

126 mode="w", 

127 encoding="utf-8", 

128 dir=dir_name, 

129 delete=False, 

130 suffix=".tmp", 

131 ) as temp_file: 

132 json.dump(data, temp_file, indent=2) 

133 temp_path = temp_file.name 

134 

135 # Atomically move temp file to final location 

136 os.rename(temp_path, file_path) 

137 LOGGER.debug(f"Atomically saved usage cache to: {file_path}") 

138 return True 

139 except OSError as e: 

140 LOGGER.error(f"Failed to save usage cache to {file_path}: {e}") 

141 # Clean up temp file if it exists 

142 try: 

143 if "temp_path" in locals(): 

144 os.unlink(temp_path) 

145 except OSError: 

146 pass 

147 return False 

148 

149 

150def post_with_retry( 

151 url: str, 

152 json_data: dict, 

153 timeout: int = 10, 

154 max_retries: int = MAX_RETRIES, 

155 backoff_factor: float = RETRY_BACKOFF_FACTOR, 

156) -> requests.Response | None: 

157 """Post request with exponential backoff retry. 

158 

159 Args: 

160 url: Endpoint URL 

161 json_data: JSON payload 

162 timeout: Request timeout in seconds 

163 max_retries: Maximum number of retry attempts 

164 backoff_factor: Multiplier for exponential backoff 

165 

166 Returns: 

167 Response object if successful, None otherwise 

168 """ 

169 last_exception = None 

170 

171 for attempt in range(max_retries): 

172 try: 

173 response = requests.post(url, json=json_data, timeout=timeout) # type: ignore[arg-type] 

174 

175 # Don't retry on validation errors (4xx except rate limit) 

176 if 400 <= response.status_code < 500 and response.status_code != 429: 

177 LOGGER.debug(f"Non-retriable error (status {response.status_code})") 

178 return response 

179 

180 # Success or retriable error 

181 if response.ok: 

182 return response 

183 

184 # Rate limit (429) or server error (5xx) - retry 

185 LOGGER.warning( 

186 f"Request failed with status {response.status_code}, " f"attempt {attempt + 1}/{max_retries}", 

187 ) 

188 

189 except (requests.ConnectionError, requests.Timeout) as e: 

190 last_exception = e 

191 LOGGER.warning(f"Network error: {e}, attempt {attempt + 1}/{max_retries}") 

192 except Exception as e: 

193 # Catch other exceptions but don't retry 

194 LOGGER.error(f"Unexpected error during request: {e}") 

195 return None 

196 

197 # Exponential backoff before next retry 

198 if attempt < max_retries - 1: 

199 wait_time = backoff_factor**attempt 

200 LOGGER.debug(f"Waiting {wait_time}s before retry...") 

201 time.sleep(wait_time) 

202 

203 # All retries exhausted 

204 if last_exception: 

205 LOGGER.error(f"All retry attempts failed: {last_exception}") 

206 return None 

207 

208 

209def post_new_installation(data: dict, endpoint=NEW_INSTALLATION_ENDPOINT) -> str | None: 

210 """Post new installation to server with retry logic. 

211 

212 Args: 

213 data: Dictionary containing installation data 

214 endpoint: API endpoint URL, defaults to NEW_INSTALLATION_ENDPOINT 

215 

216 Returns: 

217 Installation ID if successful, None otherwise 

218 """ 

219 try: 

220 LOGGER.debug(f"Posting new installation to: {endpoint}") 

221 response = post_with_retry(endpoint, data, timeout=10) 

222 

223 if response and response.ok: 

224 response_data = response.json() 

225 installation_id = response_data["id"] 

226 LOGGER.debug(f"Successfully registered new installation: {installation_id}") 

227 return installation_id 

228 else: 

229 status = response.status_code if response else "no response" 

230 LOGGER.error(f"Installation registration failed: {status}") 

231 except Exception as e: 

232 LOGGER.error(f"Failed to post new installation: {e}") 

233 return None 

234 

235 

236def record_new_installation(previous_id: str | None = None) -> str | None: 

237 """Record new or upgrade existing installation. 

238 

239 Args: 

240 previous_id: Previous installation ID for upgrades, None for new installations 

241 

242 Returns: 

243 New installation ID if successful, None otherwise 

244 """ 

245 data = dict(NEW_INSTALLATION_DATA) 

246 if previous_id: 

247 data["previousId"] = previous_id 

248 return post_new_installation(data) 

249 

250 

251def already_installed(cached_data: dict) -> bool: 

252 """Check if already installed. 

253 

254 Args: 

255 cached_data: Dictionary containing cached usage data 

256 

257 Returns: 

258 True if installation is up-to-date, False otherwise 

259 """ 

260 return "id" in cached_data and "app_version" in cached_data and cached_data["app_version"] == APP_VERSION 

261 

262 

263def install(cached_data: dict) -> dict | None: 

264 """Install the app. 

265 

266 Args: 

267 cached_data: Dictionary containing cached usage data 

268 

269 Returns: 

270 Updated cached data dictionary if successful, None otherwise 

271 """ 

272 previous_id = cached_data.get("id", None) 

273 if previous_id: 

274 LOGGER.debug(f"Upgrading existing installation: {previous_id}") 

275 else: 

276 LOGGER.debug("Installing new instance") 

277 

278 new_id = record_new_installation(previous_id) 

279 if new_id: 

280 cached_data["id"] = new_id 

281 cached_data["app_version"] = APP_VERSION 

282 LOGGER.debug(f"Installation completed with ID: {new_id}") 

283 return cached_data 

284 

285 LOGGER.error("Installation failed") 

286 return None 

287 

288 

289def post_new_heartbeat(data: dict, endpoint=NEW_HEARTBEAT_ENDPOINT) -> bool: 

290 """Post the heartbeat to server with retry logic. 

291 

292 Args: 

293 data: Dictionary containing heartbeat data 

294 endpoint: API endpoint URL, defaults to NEW_HEARTBEAT_ENDPOINT 

295 

296 Returns: 

297 True if heartbeat was sent successfully, False otherwise 

298 """ 

299 try: 

300 LOGGER.debug(f"Posting heartbeat to: {endpoint}") 

301 response = post_with_retry(endpoint, data, timeout=20) 

302 

303 if response and response.ok: 

304 LOGGER.debug("Heartbeat sent successfully") 

305 return True 

306 else: 

307 status = response.status_code if response else "no response" 

308 LOGGER.error(f"Heartbeat failed: {status}") 

309 except Exception as e: 

310 LOGGER.error(f"Failed to post heartbeat: {e}") 

311 return False 

312 

313 

314def send_heartbeat(app_id: str | None, data: Any = None) -> bool: 

315 """Prepare and send heartbeat to server. 

316 

317 Args: 

318 app_id: Installation ID for heartbeat identification 

319 data: Additional data to send with heartbeat 

320 

321 Returns: 

322 True if heartbeat was sent successfully, False otherwise 

323 """ 

324 data = {"installationId": app_id, "data": data} 

325 return post_new_heartbeat(data) 

326 

327 

328def current_time() -> datetime: 

329 """Get current UTC time. 

330 

331 Returns: 

332 Current UTC datetime object 

333 """ 

334 return datetime.utcnow() 

335 

336 

337def heartbeat(cached_data: dict, data: Any) -> dict | None: 

338 """Send heartbeat. 

339 

340 Args: 

341 cached_data: Dictionary containing cached usage data 

342 data: Additional data to send with heartbeat 

343 

344 Returns: 

345 Updated cached data dictionary if heartbeat was sent, 

346 None if heartbeat was throttled or failed 

347 """ 

348 previous_heartbeat = cached_data.get("heartbeat_timestamp", None) 

349 current = current_time() 

350 

351 if previous_heartbeat: 

352 try: 

353 previous = datetime.strptime(previous_heartbeat, "%Y-%m-%d %H:%M:%S.%f") 

354 time_since_last = current - previous 

355 LOGGER.debug(f"Time since last heartbeat: {time_since_last}") 

356 

357 # Check if different UTC day, not just 24 hours 

358 if previous.date() < current.date(): 

359 LOGGER.debug("Sending heartbeat (different UTC day)") 

360 if send_heartbeat(cached_data.get("id"), data=data): 

361 cached_data["heartbeat_timestamp"] = str(current) 

362 return cached_data 

363 else: 

364 LOGGER.warning("Heartbeat send failed") 

365 return None 

366 else: 

367 LOGGER.debug("Heartbeat throttled (same UTC day)") 

368 return None 

369 except ValueError as e: 

370 LOGGER.error(f"Invalid heartbeat timestamp format: {e}") 

371 # Treat as first heartbeat if timestamp is invalid 

372 

373 # First heartbeat or invalid timestamp 

374 LOGGER.debug("Sending first heartbeat") 

375 if send_heartbeat(cached_data.get("id"), data=data): 

376 cached_data["heartbeat_timestamp"] = str(current) 

377 LOGGER.debug("First heartbeat sent successfully") 

378 return cached_data 

379 else: 

380 LOGGER.warning("First heartbeat send failed") 

381 return None 

382 

383 

384def alive(config: dict, data: Any = None) -> bool: 

385 """Record liveliness. 

386 

387 Args: 

388 config: Configuration dictionary 

389 data: Additional usage data to send with heartbeat 

390 

391 Returns: 

392 True if usage tracking was successful, False otherwise 

393 """ 

394 # Check if usage tracking is disabled 

395 if not get_usage_tracking_enabled(config): 

396 LOGGER.debug("Usage tracking is disabled, skipping") 

397 return True # Return True to not affect main sync loop 

398 

399 LOGGER.debug("Usage tracking alive check started") 

400 

401 cache_file_path = init_cache(config=config) 

402 cached_data = load_cache(cache_file_path) 

403 

404 if not already_installed(cached_data=cached_data): 

405 LOGGER.debug("New installation detected, registering...") 

406 installed_data = install(cached_data=cached_data) 

407 if installed_data is not None: 

408 result = save_cache(file_path=cache_file_path, data=installed_data) 

409 LOGGER.debug("Installation registration completed") 

410 return result 

411 else: 

412 LOGGER.error("Installation registration failed") 

413 return False 

414 

415 LOGGER.debug("Installation already registered, checking heartbeat") 

416 heartbeat_data = heartbeat(cached_data=cached_data, data=data) 

417 if heartbeat_data is not None: 

418 result = save_cache(file_path=cache_file_path, data=heartbeat_data) 

419 LOGGER.debug("Heartbeat completed successfully") 

420 return result 

421 

422 LOGGER.debug("No heartbeat required or heartbeat failed") 

423 return False