Coverage for src/rift_console/melvin_api.py: 0%

213 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 09:36 +0000

1import os 

2import signal 

3import subprocess 

4import threading 

5from typing import Any, Optional 

6from pydantic import BaseModel 

7import requests 

8import csv 

9 

10from loguru import logger 

11from shared.models import HttpCode, live_utc 

12import shared.constants as con 

13 

14# Works with active port forwarding 

15url = "localhost" 

16port = "8080" 

17 

18def melvonaut_api(method: HttpCode, endpoint: str, json: dict[str, str] = {}) -> Any: 

19 """Wrapper with error handling for Melvonaut API.""" 

20 try: 

21 with requests.Session() as s: 

22 match method: 

23 case HttpCode.GET: 

24 r = s.get("http://" + url + ":" + port + endpoint, timeout=5) 

25 case HttpCode.POST: 

26 r = s.post( 

27 "http://" + url + ":" + port + endpoint, 

28 timeout=5, 

29 json=json, 

30 ) 

31 

32 except requests.exceptions.ConnectionError: 

33 logger.error("ConnectionError - possible no VPN?") 

34 return {} 

35 except requests.exceptions.ReadTimeout: 

36 logger.error("Timeout error!") 

37 return {} 

38 

39 match r.status_code: 

40 case 200: 

41 try: 

42 logger.debug( 

43 f"Received from API {method}/{endpoint} - {r} - {r.json()}" 

44 ) 

45 except requests.exceptions.JSONDecodeError: 

46 logger.debug(f"Received from API {method}/{endpoint} - {r}") 

47 return r 

48 case 404: 

49 logger.warning(f"Requested ressource not found - {r}.") 

50 return {} 

51 case _: 

52 # unknow error 

53 logger.warning(f"Unknown error, could not contact satellite? - {r}.") 

54 return {} 

55 

56 

57class MelvonautTelemetry(BaseModel): 

58 """Datapoint of Disk, Memory and CPU utilization.""" 

59 disk_total: int 

60 disk_free: int 

61 disk_perc: float 

62 mem_total: int 

63 mem_available: int 

64 mem_perc: float 

65 cpu_cores: int 

66 cpu_perc: float 

67 

68def live_melvonaut() -> Optional[MelvonautTelemetry]: 

69 """Get live MelvonautTelemetry.""" 

70 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

71 logger.warning("Melvonaut API unreachable!") 

72 return None 

73 d = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_disk_usage").json() 

74 m = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_memory_usage").json() 

75 c = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_cpu_usage").json() 

76 

77 gigabyte = 2**30 

78 if d and m and c: 

79 logger.info("Mevlonaut telemetry done.") 

80 return MelvonautTelemetry( 

81 disk_total=int(d["root"]["total"] / gigabyte), 

82 disk_free=int(d["root"]["free"] / gigabyte), 

83 disk_perc=100 - d["root"]["percent"], # invert 

84 mem_total=int(m["total"] / gigabyte), 

85 mem_available=int(m["available"] / gigabyte), 

86 mem_perc=m["percent"], 

87 cpu_cores=c["physical_cores"], 

88 cpu_perc=c["percent"], 

89 ) 

90 else: 

91 logger.warning("Mevlonaut telemetry failed.") 

92 return None 

93 

94def get_setting(setting: str) -> str: 

95 """Get a Melvonaut Setting from settings.py.""" 

96 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

97 logger.warning("Melvonaut API unreachable!") 

98 return "" 

99 check = melvonaut_api( 

100 method=HttpCode.POST, endpoint="/api/post_get_setting", json={setting: ""} 

101 ) 

102 

103 if check: 

104 value = str(check.json()[setting]) 

105 logger.info(f'Mevlonaut get settting "{setting}" is "{value}" done.') 

106 return value 

107 logger.warning('Mevlonaut get setting "{setting}" failed.') 

108 return "" 

109 

110def set_setting(setting: str, value: str) -> bool: 

111 """Set a Melvonaut Setting from settings.py.""" 

112 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

113 logger.warning("Melvonaut API unreachable!") 

114 return False 

115 r = melvonaut_api( 

116 method=HttpCode.POST, endpoint="/api/post_set_setting", json={setting: value} 

117 ) 

118 

119 if r: 

120 check = melvonaut_api( 

121 method=HttpCode.POST, endpoint="/api/post_get_setting", json={setting: ""} 

122 ) 

123 logger.error(f"{check.json()} {value}") 

124 if check.json()[setting] == value: 

125 logger.info(f'Mevlonaut set_Settting "{setting}" to "{value}" done.') 

126 return True 

127 logger.warning(f'Mevlonaut set_Settting "{setting}" to "{value}" failed.') 

128 return False 

129 

130def download_events() -> str: 

131 """Download event log.""" 

132 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

133 logger.warning("Melvonaut API unreachable!") 

134 return "" 

135 

136 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_download_events") 

137 if r: 

138 decoded_content = r.content.decode("utf-8") 

139 csv_file_path = ( 

140 con.CONSOLE_FROM_MELVONAUT_PATH 

141 + "MelvonautEvents-" 

142 + live_utc().strftime("%Y-%m-%dT%H:%M:%S") 

143 + ".csv" 

144 ) 

145 

146 with open(csv_file_path, "w", newline="", encoding="utf-8") as file: 

147 file.write(decoded_content) 

148 with open(csv_file_path, mode="r", newline="", encoding="utf-8") as file: 

149 csv_reader = csv.reader(file) 

150 line_count = sum(1 for _ in csv_reader) 

151 

152 res = f"Mevlonaut get_download_events to {csv_file_path} with {line_count} lines done." 

153 else: 

154 res = "Mevlonaut get_download_events failed, is okay if event-log is empty." 

155 logger.warning(res) 

156 return res 

157 

158def clear_events() -> str: 

159 """Clear event log.""" 

160 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

161 logger.warning("Melvonaut API unreachable!") 

162 return "" 

163 

164 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_events") 

165 

166 if r: 

167 res = "Mevlonaut get_clear_events done." 

168 else: 

169 res = "Mevlonaut get_clear_events failed, is okay if event-log is empty." 

170 logger.warning(res) 

171 return res 

172 

173def download_telemetry() -> str: 

174 """Download existing telemetry files on Melvonaut.""" 

175 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

176 logger.warning("Melvonaut API unreachable!") 

177 return "" 

178 

179 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_download_telemetry") 

180 if r: 

181 decoded_content = r.content.decode("utf-8") 

182 csv_file_path = ( 

183 con.CONSOLE_FROM_MELVONAUT_PATH 

184 + "MelvonautTelemetry-" 

185 + live_utc().strftime("%Y-%m-%dT%H:%M:%S") 

186 + ".csv" 

187 ) 

188 

189 with open(csv_file_path, "w", newline="", encoding="utf-8") as file: 

190 file.write(decoded_content) 

191 with open(csv_file_path, mode="r", newline="", encoding="utf-8") as file: 

192 csv_reader = csv.reader(file) 

193 line_count = sum(1 for _ in csv_reader) 

194 

195 res = f"Mevlonaut download_telemetry to {csv_file_path} with {line_count} lines done." 

196 else: 

197 res = "Mevlonaut download_telemetry failed." 

198 logger.warning(res) 

199 return res 

200 

201def clear_telemetry() -> str: 

202 """Delete exisiting telemtry files on Melvonaut.""" 

203 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

204 logger.warning("Melvonaut API unreachable!") 

205 return "" 

206 

207 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_telemetry") 

208 

209 if r: 

210 res = "Mevlonaut clear_telemetry done." 

211 else: 

212 res = "Mevlonaut clear_telemetry failed." 

213 

214 logger.warning(res) 

215 return res 

216 

217 

218def list_logs() -> list[str] | bool: 

219 """List all log fiels on Melvonaut.""" 

220 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

221 logger.warning("Melvonaut API unreachable!") 

222 return False 

223 

224 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_list_log_files").json() 

225 

226 if r: 

227 logs: list[str] = r["log_files"] 

228 logger.info(f"Mevlonaut list logs done, found {len(logs)} images.") 

229 return logs 

230 else: 

231 logger.warning("Mevlonaut list_images failed.") 

232 return False 

233 

234def get_download_save_log(log_name: str) -> Any: 

235 """Downloads all logs from Melvonaut.""" 

236 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

237 logger.warning("Melvonaut API unreachable!") 

238 return None 

239 

240 r = melvonaut_api( 

241 method=HttpCode.POST, endpoint="/api/post_download_log", json={"file": log_name} 

242 ) 

243 

244 if r: 

245 logger.info(f'Mevlonaut downloaded "{log_name}" done.') 

246 return r 

247 else: 

248 logger.warning("Mevlonaut get_download_save_log failed.") 

249 return None 

250 

251def clear_logs() -> bool: 

252 """Deletes logs on Melvonaut.""" 

253 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

254 logger.warning("Melvonaut API unreachable!") 

255 return False 

256 

257 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_all_logs") 

258 

259 if r: 

260 logger.warning("Mevlonaut cleared all logs done.") 

261 return True 

262 else: 

263 logger.warning("Mevlonaut clear_logs failed.") 

264 return False 

265 

266 

267def list_images() -> list[str] | bool: 

268 """List all exising images on Melvonaut.""" 

269 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

270 logger.warning("Melvonaut API unreachable!") 

271 return False 

272 

273 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_list_images").json() 

274 

275 if r: 

276 images: list[str] = r["images"] 

277 logger.info(f"Mevlonaut image list done, found {len(images)} images.") 

278 return images 

279 else: 

280 logger.warning("Mevlonaut list_images failed.") 

281 return False 

282 

283def get_download_save_image(image_name: str) -> Any: 

284 """Download a single image from Melvonaut.""" 

285 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

286 logger.warning("Melvonaut API unreachable!") 

287 return None 

288 

289 r = melvonaut_api( 

290 method=HttpCode.POST, 

291 endpoint="/api/post_download_image", 

292 json={"file": image_name}, 

293 ) 

294 

295 if r: 

296 logger.info(f'Mevlonaut downloaded "{image_name}" done.') 

297 return r 

298 else: 

299 logger.warning("Mevlonaut get_download_save_image failed.") 

300 return None 

301 

302def clear_images() -> bool: 

303 """Deletes exisiting images on Melvonaut.""" 

304 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"): 

305 logger.warning("Melvonaut API unreachable!") 

306 return False 

307 

308 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_all_images") 

309 

310 if r: 

311 logger.warning("Mevlonaut cleared all images done.") 

312 return True 

313 else: 

314 logger.warning("Mevlonaut clear_images failed.") 

315 return False 

316 

317 

318def create_tunnel() -> None: 

319 """Not completed function to automatically create an ssh tunnel with port forwarding, 

320 alternative use 'shpass -f .ssh-pw ssh -N -L 8080:localhost:8080 root@10.100.50.1'""" 

321 cmd = [ 

322 "sshpass", 

323 "-f", 

324 ".ssh-pw", 

325 "ssh", 

326 "root@10.100.50.1", 

327 "-N", 

328 "-L", 

329 "8080:localhost:8080", 

330 "-o", 

331 "ConnectTimeout=1s", 

332 ] 

333 timeout = 60 * 15 # kill connection after 15 min 

334 

335 process = subprocess.Popen(args=cmd) 

336 logger.info(f"Started tunnel: {process.pid}") 

337 

338 # Function to terminate the process 

339 def terminate_process() -> None: 

340 logger.warning("Cleanup tunnel") 

341 os.killpg(os.getpgid(process.pid), signal.SIGTERM) 

342 

343 # Start a timer to terminate the process after timeout 

344 timer = threading.Timer(timeout, terminate_process) 

345 timer.start() 

346 

347 return