Coverage for src/melvonaut/__main__.py: 0%
143 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
1"""
2Melvonaut
3:author: Jonathan Decker
4"""
6# Load settings first to ensure the overrides are available
7from melvonaut.settings import settings
9import asyncio
10import concurrent.futures
11import io
12import os
13import re
14import signal
15import uvloop
17from typing import Optional, AsyncIterable
18from datetime import datetime, timezone
20import aiohttp
21import click
23import aiodebug.log_slow_callbacks # type: ignore
24from PIL import Image
25from aiofile import async_open
26from loguru import logger
28from melvonaut.mel_telemetry import MelTelemetry
29from melvonaut.state_planer import state_planner
30from melvonaut import api, utils
31import shared.constants as con
32from shared.models import Timer, Event, MelvinImage, CameraAngle
34if settings.TRACING:
35 import tracemalloc
37 tracemalloc.start(5)
40##### Global Variables #####
41current_telemetry = None
42aiodebug.log_slow_callbacks.enable(0.05)
44# create a unique id each time melvonauts start, to allow better image sorting
47# tracemalloc.start()
50async def get_observations() -> None:
51 """Async get observations from the Melvin API and update the state planner
53 This function establishes a session with the API and retrieves observation data.
54 If the response is successful, it updates the telemetry state.
55 If any errors occur, they are logged accordingly.
57 Returns:
58 None
60 """
61 async with aiohttp.ClientSession() as session:
62 try:
63 async with session.get(con.OBSERVATION_ENDPOINT) as response:
64 if response.status == 200:
65 json_response = await response.json()
66 # logger.debug("Received observations")
67 # pprint(json_response, indent=4, sort_dicts=True)
68 await state_planner.update_telemetry(MelTelemetry(**json_response))
69 else:
70 logger.warning(f"Failed to get observations: {response.status}")
71 except aiohttp.client_exceptions.ConnectionTimeoutError:
72 logger.warning("Observations endpoint timeouted.")
73 except asyncio.TimeoutError:
74 logger.warning("ASyncio TimeoutError occured.")
75 except aiohttp.client_exceptions.ClientOSError:
76 logger.warning("Client_exceptions.ClienOSError occured.")
79async def run_get_observations() -> None:
80 """Runs the observation fetching function in a loop.
82 This function repeatedly fetches observations based on a specified refresh rate,
83 adjusting for simulation speed.
85 Returns:
86 None
87 """
88 await get_observations()
89 while True:
90 # logger.debug("Submitted observations request")
91 observe_task = Timer(
92 timeout=settings.OBSERVATION_REFRESH_RATE
93 / state_planner.get_simulation_speed(),
94 callback=get_observations,
95 ).get_task()
96 await asyncio.gather(observe_task)
99async def get_announcements(last_id: Optional[str] = None) -> Optional[str]:
100 """Fetches announcements asynchronously with event-stream handling.
102 This function continuously listens for new announcements from the API and processes them.
103 If announcements are received, they are logged and stored.
105 Args:
106 last_id (Optional[str]): The ID of the last processed event to resume from, if applicable.
108 Returns:
109 Optional[str]: The ID of the last received announcement, or None if an error occurs.
110 """
111 content_line_regex = re.compile(r"^\[(\d+)]\s*(.*)$")
113 headers = {"Accept": "text/event-stream", "Cache-Control": "no-cache"}
114 if last_id:
115 headers["Last-Event-ID"] = last_id
117 timeout = aiohttp.ClientTimeout(
118 total=None, connect=None, sock_connect=None, sock_read=None
119 )
121 async with aiohttp.ClientSession(timeout=timeout) as session:
122 try:
123 async with session.get(
124 con.ANNOUNCEMENTS_ENDPOINT, headers=headers
125 ) as response:
126 if response.status not in [200, 301, 307]:
127 logger.error(f"Failed to get announcements: {response.status}")
128 await session.close()
129 return None
130 else:
131 # logger.error(response.content)
132 # async for line in response.content:
133 # logger.error(line)
134 async for line in response.content:
135 line_decoded = line.decode("utf-8")
136 # logger.warning(f"Received announcement {line}")
137 # logger.warning(f"Location is: {state_planner.calc_current_location()}")
138 # logger.warning(f"Received announcement with content:{line_decoded}")
139 line_filtered = line_decoded.replace("data:", "").strip()
141 match = content_line_regex.search(line_filtered)
142 if match:
143 line_id = int(match.group(1))
144 line_content = str(match.group(2))
145 timestamp = datetime.now(timezone.utc)
146 current_x, current_y = state_planner.calc_current_location()
148 current_event = Event(
149 event=line_content,
150 id=line_id,
151 timestamp=timestamp,
152 current_x=current_x,
153 current_y=current_y,
154 )
156 logger.warning(
157 f"Received announcement: {current_event.model_dump()}"
158 )
159 await current_event.to_csv()
160 state_planner.recent_events.append(current_event)
161 last_id = str(current_event.id)
162 except TimeoutError:
163 logger.error("Announcements subscription timed out")
164 finally:
165 if response and not response.closed:
166 response.close()
167 if not session.closed:
168 await session.close()
169 return last_id
172# Irgendwie restartet der sich alle 5 sekunden, und glaube überlastet die API
173async def run_get_announcements() -> None:
174 """Continuously fetches announcements from the API.
176 This function runs in an infinite loop, restarting the subscription when needed.
178 Returns:
179 None
180 """
181 logger.warning("Started announcements subscription")
182 while True:
183 await asyncio.gather(get_announcements())
184 logger.warning("Restarted announcements subscription")
187# not in use, can be removed
188async def read_images() -> AsyncIterable[MelvinImage]:
189 """Reads image files asynchronously from a designated directory.
191 This function iterates over stored images, extracts metadata from filenames, and
192 yields `MelvinImage` objects.
194 Yields:
195 MelvinImage: An image object containing extracted metadata.
196 """
197 if not os.path.exists(con.IMAGE_PATH):
198 logger.warning(f"{con.IMAGE_PATH} does not exist.")
199 return
201 pattern = r"image_melvonaut_angle_(\w+)_x_(\d+\.\d+)_y_(\d+\.\d+)_(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})\.png"
203 for filename in os.listdir(con.IMAGE_PATH):
204 if filename.endswith(".png"):
205 image_path = os.path.join(con.IMAGE_PATH, filename)
206 try:
207 async with async_open(image_path, "rb") as afp:
208 data = await afp.read()
209 image = Image.open(io.BytesIO(data))
210 except FileNotFoundError as e:
211 logger.warning(f"{image_path} does not exist.")
212 logger.debug(e)
213 continue
214 except IOError as e:
215 logger.warning(f"Failed to read {image_path}")
216 logger.debug(e)
217 continue
218 except ValueError as e:
219 logger.warning(f"Failed to parse {image_path}")
220 logger.debug(e)
221 continue
222 match = re.match(pattern, filename)
223 if match:
224 angle = CameraAngle(match.group(1))
225 cor_x = int(match.group(2))
226 cor_y = int(match.group(3))
227 time = datetime.strptime(match.group(4), "%Y-%m-%d_%H-%M-%S")
228 yield MelvinImage(
229 image=image, angle=angle, cor_x=cor_x, cor_y=cor_y, time=time
230 )
231 else:
232 logger.warning(f"Failed to parse {filename}.")
235async def run_read_images() -> None:
236 """Log all receives images.
238 Returns:
239 None
240 """
241 async for image in read_images():
242 logger.debug(f"Received image: {image}")
245def cancel_tasks() -> None:
246 """Cancels all tasks and event loop.
248 Returns:
249 None
250 """
251 for task in asyncio.all_tasks():
252 task.cancel()
253 loop = asyncio.get_running_loop()
254 loop.stop()
257def start_event_loop() -> None:
258 """Initializes and starts the asynchronous event loop.
260 This function sets up signal handlers, registers tasks for fetching observations,
261 announcements, and API interactions, and starts the event loop.
263 Returns:
264 None
265 """
266 loop = uvloop.new_event_loop()
268 for sig in (signal.SIGINT, signal.SIGTERM):
269 loop.add_signal_handler(sig, cancel_tasks)
271 loop.set_default_executor(concurrent.futures.ThreadPoolExecutor(max_workers=1))
273 loop.create_task(run_get_observations())
274 loop.create_task(run_get_announcements())
276 loop.create_task(api.run_api())
278 # loop.create_task(run_read_images())
280 loop.run_forever()
282 for sig in (signal.SIGINT, signal.SIGTERM):
283 loop.remove_signal_handler(sig)
285 logger.info("Shutting down Melvonaut...")
288@click.command()
289@click.version_option()
290def main() -> None:
291 """Melvonaut."""
292 utils.setup_logging()
293 logger.info("Starting Melvonaut...")
295 start_event_loop()
298if __name__ == "__main__":
299 main(prog_name="Melvonaut") # pragma: no cover