Coverage for src/melvonaut/__main__.py: 0%

143 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 09:36 +0000

1""" 

2Melvonaut 

3:author: Jonathan Decker 

4""" 

5 

6# Load settings first to ensure the overrides are available 

7from melvonaut.settings import settings 

8 

9import asyncio 

10import concurrent.futures 

11import io 

12import os 

13import re 

14import signal 

15import uvloop 

16 

17from typing import Optional, AsyncIterable 

18from datetime import datetime, timezone 

19 

20import aiohttp 

21import click 

22 

23import aiodebug.log_slow_callbacks # type: ignore 

24from PIL import Image 

25from aiofile import async_open 

26from loguru import logger 

27 

28from melvonaut.mel_telemetry import MelTelemetry 

29from melvonaut.state_planer import state_planner 

30from melvonaut import api, utils 

31import shared.constants as con 

32from shared.models import Timer, Event, MelvinImage, CameraAngle 

33 

34if settings.TRACING: 

35 import tracemalloc 

36 

37 tracemalloc.start(5) 

38 

39 

40##### Global Variables ##### 

41current_telemetry = None 

42aiodebug.log_slow_callbacks.enable(0.05) 

43 

44# create a unique id each time melvonauts start, to allow better image sorting 

45 

46 

47# tracemalloc.start() 

48 

49 

50async def get_observations() -> None: 

51 """Async get observations from the Melvin API and update the state planner 

52 

53 This function establishes a session with the API and retrieves observation data. 

54 If the response is successful, it updates the telemetry state. 

55 If any errors occur, they are logged accordingly. 

56 

57 Returns: 

58 None 

59 

60 """ 

61 async with aiohttp.ClientSession() as session: 

62 try: 

63 async with session.get(con.OBSERVATION_ENDPOINT) as response: 

64 if response.status == 200: 

65 json_response = await response.json() 

66 # logger.debug("Received observations") 

67 # pprint(json_response, indent=4, sort_dicts=True) 

68 await state_planner.update_telemetry(MelTelemetry(**json_response)) 

69 else: 

70 logger.warning(f"Failed to get observations: {response.status}") 

71 except aiohttp.client_exceptions.ConnectionTimeoutError: 

72 logger.warning("Observations endpoint timeouted.") 

73 except asyncio.TimeoutError: 

74 logger.warning("ASyncio TimeoutError occured.") 

75 except aiohttp.client_exceptions.ClientOSError: 

76 logger.warning("Client_exceptions.ClienOSError occured.") 

77 

78 

79async def run_get_observations() -> None: 

80 """Runs the observation fetching function in a loop. 

81 

82 This function repeatedly fetches observations based on a specified refresh rate, 

83 adjusting for simulation speed. 

84 

85 Returns: 

86 None 

87 """ 

88 await get_observations() 

89 while True: 

90 # logger.debug("Submitted observations request") 

91 observe_task = Timer( 

92 timeout=settings.OBSERVATION_REFRESH_RATE 

93 / state_planner.get_simulation_speed(), 

94 callback=get_observations, 

95 ).get_task() 

96 await asyncio.gather(observe_task) 

97 

98 

99async def get_announcements(last_id: Optional[str] = None) -> Optional[str]: 

100 """Fetches announcements asynchronously with event-stream handling. 

101 

102 This function continuously listens for new announcements from the API and processes them. 

103 If announcements are received, they are logged and stored. 

104 

105 Args: 

106 last_id (Optional[str]): The ID of the last processed event to resume from, if applicable. 

107 

108 Returns: 

109 Optional[str]: The ID of the last received announcement, or None if an error occurs. 

110 """ 

111 content_line_regex = re.compile(r"^\[(\d+)]\s*(.*)$") 

112 

113 headers = {"Accept": "text/event-stream", "Cache-Control": "no-cache"} 

114 if last_id: 

115 headers["Last-Event-ID"] = last_id 

116 

117 timeout = aiohttp.ClientTimeout( 

118 total=None, connect=None, sock_connect=None, sock_read=None 

119 ) 

120 

121 async with aiohttp.ClientSession(timeout=timeout) as session: 

122 try: 

123 async with session.get( 

124 con.ANNOUNCEMENTS_ENDPOINT, headers=headers 

125 ) as response: 

126 if response.status not in [200, 301, 307]: 

127 logger.error(f"Failed to get announcements: {response.status}") 

128 await session.close() 

129 return None 

130 else: 

131 # logger.error(response.content) 

132 # async for line in response.content: 

133 # logger.error(line) 

134 async for line in response.content: 

135 line_decoded = line.decode("utf-8") 

136 # logger.warning(f"Received announcement {line}") 

137 # logger.warning(f"Location is: {state_planner.calc_current_location()}") 

138 # logger.warning(f"Received announcement with content:{line_decoded}") 

139 line_filtered = line_decoded.replace("data:", "").strip() 

140 

141 match = content_line_regex.search(line_filtered) 

142 if match: 

143 line_id = int(match.group(1)) 

144 line_content = str(match.group(2)) 

145 timestamp = datetime.now(timezone.utc) 

146 current_x, current_y = state_planner.calc_current_location() 

147 

148 current_event = Event( 

149 event=line_content, 

150 id=line_id, 

151 timestamp=timestamp, 

152 current_x=current_x, 

153 current_y=current_y, 

154 ) 

155 

156 logger.warning( 

157 f"Received announcement: {current_event.model_dump()}" 

158 ) 

159 await current_event.to_csv() 

160 state_planner.recent_events.append(current_event) 

161 last_id = str(current_event.id) 

162 except TimeoutError: 

163 logger.error("Announcements subscription timed out") 

164 finally: 

165 if response and not response.closed: 

166 response.close() 

167 if not session.closed: 

168 await session.close() 

169 return last_id 

170 

171 

172# Irgendwie restartet der sich alle 5 sekunden, und glaube überlastet die API 

173async def run_get_announcements() -> None: 

174 """Continuously fetches announcements from the API. 

175 

176 This function runs in an infinite loop, restarting the subscription when needed. 

177 

178 Returns: 

179 None 

180 """ 

181 logger.warning("Started announcements subscription") 

182 while True: 

183 await asyncio.gather(get_announcements()) 

184 logger.warning("Restarted announcements subscription") 

185 

186 

187# not in use, can be removed 

188async def read_images() -> AsyncIterable[MelvinImage]: 

189 """Reads image files asynchronously from a designated directory. 

190 

191 This function iterates over stored images, extracts metadata from filenames, and 

192 yields `MelvinImage` objects. 

193 

194 Yields: 

195 MelvinImage: An image object containing extracted metadata. 

196 """ 

197 if not os.path.exists(con.IMAGE_PATH): 

198 logger.warning(f"{con.IMAGE_PATH} does not exist.") 

199 return 

200 

201 pattern = r"image_melvonaut_angle_(\w+)_x_(\d+\.\d+)_y_(\d+\.\d+)_(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})\.png" 

202 

203 for filename in os.listdir(con.IMAGE_PATH): 

204 if filename.endswith(".png"): 

205 image_path = os.path.join(con.IMAGE_PATH, filename) 

206 try: 

207 async with async_open(image_path, "rb") as afp: 

208 data = await afp.read() 

209 image = Image.open(io.BytesIO(data)) 

210 except FileNotFoundError as e: 

211 logger.warning(f"{image_path} does not exist.") 

212 logger.debug(e) 

213 continue 

214 except IOError as e: 

215 logger.warning(f"Failed to read {image_path}") 

216 logger.debug(e) 

217 continue 

218 except ValueError as e: 

219 logger.warning(f"Failed to parse {image_path}") 

220 logger.debug(e) 

221 continue 

222 match = re.match(pattern, filename) 

223 if match: 

224 angle = CameraAngle(match.group(1)) 

225 cor_x = int(match.group(2)) 

226 cor_y = int(match.group(3)) 

227 time = datetime.strptime(match.group(4), "%Y-%m-%d_%H-%M-%S") 

228 yield MelvinImage( 

229 image=image, angle=angle, cor_x=cor_x, cor_y=cor_y, time=time 

230 ) 

231 else: 

232 logger.warning(f"Failed to parse {filename}.") 

233 

234 

235async def run_read_images() -> None: 

236 """Log all receives images. 

237 

238 Returns: 

239 None 

240 """ 

241 async for image in read_images(): 

242 logger.debug(f"Received image: {image}") 

243 

244 

245def cancel_tasks() -> None: 

246 """Cancels all tasks and event loop. 

247 

248 Returns: 

249 None 

250 """ 

251 for task in asyncio.all_tasks(): 

252 task.cancel() 

253 loop = asyncio.get_running_loop() 

254 loop.stop() 

255 

256 

257def start_event_loop() -> None: 

258 """Initializes and starts the asynchronous event loop. 

259 

260 This function sets up signal handlers, registers tasks for fetching observations, 

261 announcements, and API interactions, and starts the event loop. 

262 

263 Returns: 

264 None 

265 """ 

266 loop = uvloop.new_event_loop() 

267 

268 for sig in (signal.SIGINT, signal.SIGTERM): 

269 loop.add_signal_handler(sig, cancel_tasks) 

270 

271 loop.set_default_executor(concurrent.futures.ThreadPoolExecutor(max_workers=1)) 

272 

273 loop.create_task(run_get_observations()) 

274 loop.create_task(run_get_announcements()) 

275 

276 loop.create_task(api.run_api()) 

277 

278 # loop.create_task(run_read_images()) 

279 

280 loop.run_forever() 

281 

282 for sig in (signal.SIGINT, signal.SIGTERM): 

283 loop.remove_signal_handler(sig) 

284 

285 logger.info("Shutting down Melvonaut...") 

286 

287 

288@click.command() 

289@click.version_option() 

290def main() -> None: 

291 """Melvonaut.""" 

292 utils.setup_logging() 

293 logger.info("Starting Melvonaut...") 

294 

295 start_event_loop() 

296 

297 

298if __name__ == "__main__": 

299 main(prog_name="Melvonaut") # pragma: no cover