Coverage for src/usage.py: 100%

167 statements  

« prev     ^ index     » next       coverage.py v7.10.7, created at 2025-10-16 04:41 +0000

1"""To record usage of the app.""" 

2 

3import json 

4import os 

5import tempfile 

6from datetime import datetime, timedelta 

7from typing import Any 

8 

9import requests 

10 

11from src import get_logger 

12from src.config_parser import get_usage_tracking_enabled, prepare_root_destination 

13 

14LOGGER = get_logger() 

15 

16CACHE_FILE_NAME = "/config/.data" 

17NEW_INSTALLATION_ENDPOINT = os.environ.get("NEW_INSTALLATION_ENDPOINT", None) 

18NEW_HEARTBEAT_ENDPOINT = os.environ.get("NEW_HEARTBEAT_ENDPOINT", None) 

19APP_NAME = "icloud-docker" 

20APP_VERSION = os.environ.get("APP_VERSION", "dev") 

21NEW_INSTALLATION_DATA = {"appName": APP_NAME, "appVersion": APP_VERSION} 

22 

23 

24def init_cache(config: dict) -> str: 

25 """Initialize the cache file. 

26 

27 Args: 

28 config: Configuration dictionary containing root destination path 

29 

30 Returns: 

31 Absolute path to the cache file 

32 """ 

33 root_destination_path = prepare_root_destination(config=config) 

34 cache_file_path = os.path.join(root_destination_path, CACHE_FILE_NAME) 

35 LOGGER.debug(f"Initialized usage cache at: {cache_file_path}") 

36 return cache_file_path 

37 

38 

39def validate_cache_data(data: dict) -> bool: 

40 """Validate cache data structure. 

41 

42 Args: 

43 data: Dictionary to validate 

44 

45 Returns: 

46 True if data is valid, False otherwise 

47 """ 

48 # Basic structure validation 

49 if not isinstance(data, dict): 

50 return False 

51 

52 # If we have an ID, validate it's a string 

53 if "id" in data and not isinstance(data["id"], str): 

54 return False 

55 

56 # If we have app_version, validate it's a string 

57 if "app_version" in data and not isinstance(data["app_version"], str): 

58 return False 

59 

60 # If we have heartbeat timestamp, validate format 

61 if "heartbeat_timestamp" in data: 

62 try: 

63 datetime.strptime(data["heartbeat_timestamp"], "%Y-%m-%d %H:%M:%S.%f") 

64 except (ValueError, TypeError): 

65 return False 

66 

67 return True 

68 

69 

70def load_cache(file_path: str) -> dict: 

71 """Load the cache file with validation and corruption recovery. 

72 

73 Args: 

74 file_path: Absolute path to the cache file 

75 

76 Returns: 

77 Dictionary containing cached usage data 

78 """ 

79 data = {} 

80 if os.path.isfile(file_path): 

81 try: 

82 with open(file_path, encoding="utf-8") as f: 

83 loaded_data = json.load(f) 

84 

85 # Validate the loaded data 

86 if validate_cache_data(loaded_data): 

87 data = loaded_data 

88 LOGGER.debug(f"Loaded and validated usage cache from: {file_path}") 

89 else: 

90 LOGGER.warning(f"Cache data validation failed for {file_path}, starting fresh") 

91 save_cache(file_path=file_path, data={}) 

92 except (json.JSONDecodeError, OSError) as e: 

93 LOGGER.error(f"Failed to load usage cache from {file_path}: {e}") 

94 LOGGER.info("Creating new empty cache file due to corruption") 

95 save_cache(file_path=file_path, data={}) 

96 else: 

97 LOGGER.debug(f"Usage cache file not found, creating: {file_path}") 

98 save_cache(file_path=file_path, data={}) 

99 return data 

100 

101 

102def save_cache(file_path: str, data: dict) -> bool: 

103 """Save data to the cache file using atomic operations. 

104 

105 Args: 

106 file_path: Absolute path to the cache file 

107 data: Dictionary containing usage data to save 

108 

109 Returns: 

110 True if save was successful, False otherwise 

111 """ 

112 try: 

113 # Write to temporary file first for atomic operation 

114 dir_name = os.path.dirname(file_path) 

115 with tempfile.NamedTemporaryFile( 

116 mode="w", 

117 encoding="utf-8", 

118 dir=dir_name, 

119 delete=False, 

120 suffix=".tmp", 

121 ) as temp_file: 

122 json.dump(data, temp_file, indent=2) 

123 temp_path = temp_file.name 

124 

125 # Atomically move temp file to final location 

126 os.rename(temp_path, file_path) 

127 LOGGER.debug(f"Atomically saved usage cache to: {file_path}") 

128 return True 

129 except OSError as e: 

130 LOGGER.error(f"Failed to save usage cache to {file_path}: {e}") 

131 # Clean up temp file if it exists 

132 try: 

133 if "temp_path" in locals(): 

134 os.unlink(temp_path) 

135 except OSError: 

136 pass 

137 return False 

138 

139 

140def post_new_installation(data: dict, endpoint=NEW_INSTALLATION_ENDPOINT) -> str | None: 

141 """Post new installation to server. 

142 

143 Args: 

144 data: Dictionary containing installation data 

145 endpoint: API endpoint URL, defaults to NEW_INSTALLATION_ENDPOINT 

146 

147 Returns: 

148 Installation ID if successful, None otherwise 

149 """ 

150 try: 

151 LOGGER.debug(f"Posting new installation to: {endpoint}") 

152 response = requests.post(endpoint, json=data, timeout=10) # type: ignore[arg-type] 

153 if response.ok: 

154 response_data = response.json() 

155 installation_id = response_data["id"] 

156 LOGGER.debug(f"Successfully registered new installation: {installation_id}") 

157 return installation_id 

158 else: 

159 LOGGER.debug(f"Installation registration failed with status {response.status_code}") 

160 except Exception as e: 

161 LOGGER.error(f"Failed to post new installation: {e}") 

162 return None 

163 

164 

165def record_new_installation(previous_id: str | None = None) -> str | None: 

166 """Record new or upgrade existing installation. 

167 

168 Args: 

169 previous_id: Previous installation ID for upgrades, None for new installations 

170 

171 Returns: 

172 New installation ID if successful, None otherwise 

173 """ 

174 data = dict(NEW_INSTALLATION_DATA) 

175 if previous_id: 

176 data["previousId"] = previous_id 

177 return post_new_installation(data) 

178 

179 

180def already_installed(cached_data: dict) -> bool: 

181 """Check if already installed. 

182 

183 Args: 

184 cached_data: Dictionary containing cached usage data 

185 

186 Returns: 

187 True if installation is up-to-date, False otherwise 

188 """ 

189 return "id" in cached_data and "app_version" in cached_data and cached_data["app_version"] == APP_VERSION 

190 

191 

192def install(cached_data: dict) -> dict | None: 

193 """Install the app. 

194 

195 Args: 

196 cached_data: Dictionary containing cached usage data 

197 

198 Returns: 

199 Updated cached data dictionary if successful, None otherwise 

200 """ 

201 previous_id = cached_data.get("id", None) 

202 if previous_id: 

203 LOGGER.debug(f"Upgrading existing installation: {previous_id}") 

204 else: 

205 LOGGER.debug("Installing new instance") 

206 

207 new_id = record_new_installation(previous_id) 

208 if new_id: 

209 cached_data["id"] = new_id 

210 cached_data["app_version"] = APP_VERSION 

211 LOGGER.debug(f"Installation completed with ID: {new_id}") 

212 return cached_data 

213 

214 LOGGER.error("Installation failed") 

215 return None 

216 

217 

218def post_new_heartbeat(data: dict, endpoint=NEW_HEARTBEAT_ENDPOINT) -> bool: 

219 """Post the heartbeat to server. 

220 

221 Args: 

222 data: Dictionary containing heartbeat data 

223 endpoint: API endpoint URL, defaults to NEW_HEARTBEAT_ENDPOINT 

224 

225 Returns: 

226 True if heartbeat was sent successfully, False otherwise 

227 """ 

228 try: 

229 LOGGER.debug(f"Posting heartbeat to: {endpoint}") 

230 response = requests.post(endpoint, json=data, timeout=20) # type: ignore[arg-type] 

231 if response.ok: 

232 LOGGER.debug("Heartbeat sent successfully") 

233 return True 

234 else: 

235 LOGGER.debug(f"Heartbeat failed with status {response.status_code}") 

236 except Exception as e: 

237 LOGGER.error(f"Failed to post heartbeat: {e}") 

238 return False 

239 

240 

241def send_heartbeat(app_id: str | None, data: Any = None) -> bool: 

242 """Prepare and send heartbeat to server. 

243 

244 Args: 

245 app_id: Installation ID for heartbeat identification 

246 data: Additional data to send with heartbeat 

247 

248 Returns: 

249 True if heartbeat was sent successfully, False otherwise 

250 """ 

251 data = {"installationId": app_id, "data": data} 

252 return post_new_heartbeat(data) 

253 

254 

255def current_time() -> datetime: 

256 """Get current time. 

257 

258 Returns: 

259 Current datetime object 

260 """ 

261 return datetime.now() 

262 

263 

264def heartbeat(cached_data: dict, data: Any) -> dict | None: 

265 """Send heartbeat. 

266 

267 Args: 

268 cached_data: Dictionary containing cached usage data 

269 data: Additional data to send with heartbeat 

270 

271 Returns: 

272 Updated cached data dictionary if heartbeat was sent, 

273 None if heartbeat was throttled or failed 

274 """ 

275 previous_heartbeat = cached_data.get("heartbeat_timestamp", None) 

276 current = current_time() 

277 

278 if previous_heartbeat: 

279 try: 

280 previous = datetime.strptime(previous_heartbeat, "%Y-%m-%d %H:%M:%S.%f") 

281 time_since_last = current - previous 

282 LOGGER.debug(f"Time since last heartbeat: {time_since_last}") 

283 

284 if previous < (current - timedelta(hours=24)): 

285 LOGGER.debug("Sending heartbeat (24+ hours since last)") 

286 if send_heartbeat(cached_data.get("id"), data=data): 

287 cached_data["heartbeat_timestamp"] = str(current) 

288 return cached_data 

289 else: 

290 LOGGER.warning("Heartbeat send failed") 

291 return None 

292 else: 

293 LOGGER.debug("Heartbeat throttled (less than 24 hours)") 

294 return None 

295 except ValueError as e: 

296 LOGGER.error(f"Invalid heartbeat timestamp format: {e}") 

297 # Treat as first heartbeat if timestamp is invalid 

298 

299 # First heartbeat or invalid timestamp 

300 LOGGER.debug("Sending first heartbeat") 

301 if send_heartbeat(cached_data.get("id"), data=data): 

302 cached_data["heartbeat_timestamp"] = str(current) 

303 LOGGER.debug("First heartbeat sent successfully") 

304 return cached_data 

305 else: 

306 LOGGER.warning("First heartbeat send failed") 

307 return None 

308 

309 

310def alive(config: dict, data: Any = None) -> bool: 

311 """Record liveliness. 

312 

313 Args: 

314 config: Configuration dictionary 

315 data: Additional usage data to send with heartbeat 

316 

317 Returns: 

318 True if usage tracking was successful, False otherwise 

319 """ 

320 # Check if usage tracking is disabled 

321 if not get_usage_tracking_enabled(config): 

322 LOGGER.debug("Usage tracking is disabled, skipping") 

323 return True # Return True to not affect main sync loop 

324 

325 LOGGER.debug("Usage tracking alive check started") 

326 

327 cache_file_path = init_cache(config=config) 

328 cached_data = load_cache(cache_file_path) 

329 

330 if not already_installed(cached_data=cached_data): 

331 LOGGER.debug("New installation detected, registering...") 

332 installed_data = install(cached_data=cached_data) 

333 if installed_data is not None: 

334 result = save_cache(file_path=cache_file_path, data=installed_data) 

335 LOGGER.debug("Installation registration completed") 

336 return result 

337 else: 

338 LOGGER.error("Installation registration failed") 

339 return False 

340 

341 LOGGER.debug("Installation already registered, checking heartbeat") 

342 heartbeat_data = heartbeat(cached_data=cached_data, data=data) 

343 if heartbeat_data is not None: 

344 result = save_cache(file_path=cache_file_path, data=heartbeat_data) 

345 LOGGER.debug("Heartbeat completed successfully") 

346 return result 

347 

348 LOGGER.debug("No heartbeat required or heartbeat failed") 

349 return False