Coverage for src/rift_console/melvin_api.py: 0%
213 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
1import os
2import signal
3import subprocess
4import threading
5from typing import Any, Optional
6from pydantic import BaseModel
7import requests
8import csv
10from loguru import logger
11from shared.models import HttpCode, live_utc
12import shared.constants as con
14# Works with active port forwarding
15url = "localhost"
16port = "8080"
18def melvonaut_api(method: HttpCode, endpoint: str, json: dict[str, str] = {}) -> Any:
19 """Wrapper with error handling for Melvonaut API."""
20 try:
21 with requests.Session() as s:
22 match method:
23 case HttpCode.GET:
24 r = s.get("http://" + url + ":" + port + endpoint, timeout=5)
25 case HttpCode.POST:
26 r = s.post(
27 "http://" + url + ":" + port + endpoint,
28 timeout=5,
29 json=json,
30 )
32 except requests.exceptions.ConnectionError:
33 logger.error("ConnectionError - possible no VPN?")
34 return {}
35 except requests.exceptions.ReadTimeout:
36 logger.error("Timeout error!")
37 return {}
39 match r.status_code:
40 case 200:
41 try:
42 logger.debug(
43 f"Received from API {method}/{endpoint} - {r} - {r.json()}"
44 )
45 except requests.exceptions.JSONDecodeError:
46 logger.debug(f"Received from API {method}/{endpoint} - {r}")
47 return r
48 case 404:
49 logger.warning(f"Requested ressource not found - {r}.")
50 return {}
51 case _:
52 # unknow error
53 logger.warning(f"Unknown error, could not contact satellite? - {r}.")
54 return {}
57class MelvonautTelemetry(BaseModel):
58 """Datapoint of Disk, Memory and CPU utilization."""
59 disk_total: int
60 disk_free: int
61 disk_perc: float
62 mem_total: int
63 mem_available: int
64 mem_perc: float
65 cpu_cores: int
66 cpu_perc: float
68def live_melvonaut() -> Optional[MelvonautTelemetry]:
69 """Get live MelvonautTelemetry."""
70 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
71 logger.warning("Melvonaut API unreachable!")
72 return None
73 d = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_disk_usage").json()
74 m = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_memory_usage").json()
75 c = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_cpu_usage").json()
77 gigabyte = 2**30
78 if d and m and c:
79 logger.info("Mevlonaut telemetry done.")
80 return MelvonautTelemetry(
81 disk_total=int(d["root"]["total"] / gigabyte),
82 disk_free=int(d["root"]["free"] / gigabyte),
83 disk_perc=100 - d["root"]["percent"], # invert
84 mem_total=int(m["total"] / gigabyte),
85 mem_available=int(m["available"] / gigabyte),
86 mem_perc=m["percent"],
87 cpu_cores=c["physical_cores"],
88 cpu_perc=c["percent"],
89 )
90 else:
91 logger.warning("Mevlonaut telemetry failed.")
92 return None
94def get_setting(setting: str) -> str:
95 """Get a Melvonaut Setting from settings.py."""
96 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
97 logger.warning("Melvonaut API unreachable!")
98 return ""
99 check = melvonaut_api(
100 method=HttpCode.POST, endpoint="/api/post_get_setting", json={setting: ""}
101 )
103 if check:
104 value = str(check.json()[setting])
105 logger.info(f'Mevlonaut get settting "{setting}" is "{value}" done.')
106 return value
107 logger.warning('Mevlonaut get setting "{setting}" failed.')
108 return ""
110def set_setting(setting: str, value: str) -> bool:
111 """Set a Melvonaut Setting from settings.py."""
112 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
113 logger.warning("Melvonaut API unreachable!")
114 return False
115 r = melvonaut_api(
116 method=HttpCode.POST, endpoint="/api/post_set_setting", json={setting: value}
117 )
119 if r:
120 check = melvonaut_api(
121 method=HttpCode.POST, endpoint="/api/post_get_setting", json={setting: ""}
122 )
123 logger.error(f"{check.json()} {value}")
124 if check.json()[setting] == value:
125 logger.info(f'Mevlonaut set_Settting "{setting}" to "{value}" done.')
126 return True
127 logger.warning(f'Mevlonaut set_Settting "{setting}" to "{value}" failed.')
128 return False
130def download_events() -> str:
131 """Download event log."""
132 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
133 logger.warning("Melvonaut API unreachable!")
134 return ""
136 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_download_events")
137 if r:
138 decoded_content = r.content.decode("utf-8")
139 csv_file_path = (
140 con.CONSOLE_FROM_MELVONAUT_PATH
141 + "MelvonautEvents-"
142 + live_utc().strftime("%Y-%m-%dT%H:%M:%S")
143 + ".csv"
144 )
146 with open(csv_file_path, "w", newline="", encoding="utf-8") as file:
147 file.write(decoded_content)
148 with open(csv_file_path, mode="r", newline="", encoding="utf-8") as file:
149 csv_reader = csv.reader(file)
150 line_count = sum(1 for _ in csv_reader)
152 res = f"Mevlonaut get_download_events to {csv_file_path} with {line_count} lines done."
153 else:
154 res = "Mevlonaut get_download_events failed, is okay if event-log is empty."
155 logger.warning(res)
156 return res
158def clear_events() -> str:
159 """Clear event log."""
160 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
161 logger.warning("Melvonaut API unreachable!")
162 return ""
164 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_events")
166 if r:
167 res = "Mevlonaut get_clear_events done."
168 else:
169 res = "Mevlonaut get_clear_events failed, is okay if event-log is empty."
170 logger.warning(res)
171 return res
173def download_telemetry() -> str:
174 """Download existing telemetry files on Melvonaut."""
175 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
176 logger.warning("Melvonaut API unreachable!")
177 return ""
179 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_download_telemetry")
180 if r:
181 decoded_content = r.content.decode("utf-8")
182 csv_file_path = (
183 con.CONSOLE_FROM_MELVONAUT_PATH
184 + "MelvonautTelemetry-"
185 + live_utc().strftime("%Y-%m-%dT%H:%M:%S")
186 + ".csv"
187 )
189 with open(csv_file_path, "w", newline="", encoding="utf-8") as file:
190 file.write(decoded_content)
191 with open(csv_file_path, mode="r", newline="", encoding="utf-8") as file:
192 csv_reader = csv.reader(file)
193 line_count = sum(1 for _ in csv_reader)
195 res = f"Mevlonaut download_telemetry to {csv_file_path} with {line_count} lines done."
196 else:
197 res = "Mevlonaut download_telemetry failed."
198 logger.warning(res)
199 return res
201def clear_telemetry() -> str:
202 """Delete exisiting telemtry files on Melvonaut."""
203 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
204 logger.warning("Melvonaut API unreachable!")
205 return ""
207 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_telemetry")
209 if r:
210 res = "Mevlonaut clear_telemetry done."
211 else:
212 res = "Mevlonaut clear_telemetry failed."
214 logger.warning(res)
215 return res
218def list_logs() -> list[str] | bool:
219 """List all log fiels on Melvonaut."""
220 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
221 logger.warning("Melvonaut API unreachable!")
222 return False
224 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_list_log_files").json()
226 if r:
227 logs: list[str] = r["log_files"]
228 logger.info(f"Mevlonaut list logs done, found {len(logs)} images.")
229 return logs
230 else:
231 logger.warning("Mevlonaut list_images failed.")
232 return False
234def get_download_save_log(log_name: str) -> Any:
235 """Downloads all logs from Melvonaut."""
236 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
237 logger.warning("Melvonaut API unreachable!")
238 return None
240 r = melvonaut_api(
241 method=HttpCode.POST, endpoint="/api/post_download_log", json={"file": log_name}
242 )
244 if r:
245 logger.info(f'Mevlonaut downloaded "{log_name}" done.')
246 return r
247 else:
248 logger.warning("Mevlonaut get_download_save_log failed.")
249 return None
251def clear_logs() -> bool:
252 """Deletes logs on Melvonaut."""
253 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
254 logger.warning("Melvonaut API unreachable!")
255 return False
257 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_all_logs")
259 if r:
260 logger.warning("Mevlonaut cleared all logs done.")
261 return True
262 else:
263 logger.warning("Mevlonaut clear_logs failed.")
264 return False
267def list_images() -> list[str] | bool:
268 """List all exising images on Melvonaut."""
269 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
270 logger.warning("Melvonaut API unreachable!")
271 return False
273 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_list_images").json()
275 if r:
276 images: list[str] = r["images"]
277 logger.info(f"Mevlonaut image list done, found {len(images)} images.")
278 return images
279 else:
280 logger.warning("Mevlonaut list_images failed.")
281 return False
283def get_download_save_image(image_name: str) -> Any:
284 """Download a single image from Melvonaut."""
285 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
286 logger.warning("Melvonaut API unreachable!")
287 return None
289 r = melvonaut_api(
290 method=HttpCode.POST,
291 endpoint="/api/post_download_image",
292 json={"file": image_name},
293 )
295 if r:
296 logger.info(f'Mevlonaut downloaded "{image_name}" done.')
297 return r
298 else:
299 logger.warning("Mevlonaut get_download_save_image failed.")
300 return None
302def clear_images() -> bool:
303 """Deletes exisiting images on Melvonaut."""
304 if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
305 logger.warning("Melvonaut API unreachable!")
306 return False
308 r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_all_images")
310 if r:
311 logger.warning("Mevlonaut cleared all images done.")
312 return True
313 else:
314 logger.warning("Mevlonaut clear_images failed.")
315 return False
318def create_tunnel() -> None:
319 """Not completed function to automatically create an ssh tunnel with port forwarding,
320 alternative use 'shpass -f .ssh-pw ssh -N -L 8080:localhost:8080 root@10.100.50.1'"""
321 cmd = [
322 "sshpass",
323 "-f",
324 ".ssh-pw",
325 "ssh",
326 "root@10.100.50.1",
327 "-N",
328 "-L",
329 "8080:localhost:8080",
330 "-o",
331 "ConnectTimeout=1s",
332 ]
333 timeout = 60 * 15 # kill connection after 15 min
335 process = subprocess.Popen(args=cmd)
336 logger.info(f"Started tunnel: {process.pid}")
338 # Function to terminate the process
339 def terminate_process() -> None:
340 logger.warning("Cleanup tunnel")
341 os.killpg(os.getpgid(process.pid), signal.SIGTERM)
343 # Start a timer to terminate the process after timeout
344 timer = threading.Timer(timeout, terminate_process)
345 timer.start()
347 return