Coverage for src/shared/models.py: 69%
228 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
1import asyncio
2import csv
3import datetime
4import re
5import time
6from loguru import logger
7from pathlib import Path
8from enum import Enum, StrEnum
9from typing import Callable, Awaitable, Any
11from PIL import Image
12from aiofile import async_open
13from pydantic import BaseModel, ConfigDict
14from typing import Optional, Union
16import shared.constants as con
18# Pillow has a low default maximum image size, overwriten here
19Image.MAX_IMAGE_PIXELS = 500000000
22# [From CIARC API User Manual]
23class CameraAngle(StrEnum):
24 """
25 Different camera angles possible on MELVIN.
26 """
28 Wide = "wide"
29 Narrow = "narrow"
30 Normal = "normal"
31 Unknown = "unknown"
34class State(StrEnum):
35 """From CIARC user manual"""
37 Deployment = "deployment"
38 Acquisition = "acquisition"
39 Charge = "charge"
40 Safe = "safe"
41 Communication = "communication"
42 Transition = "transition"
43 Unknown = "none"
46class Slot(BaseModel):
47 """
48 One communication slot in which MELVIN can be contacted.
50 Methods:
51 parse_api(data: dict) -> tuple[int, list["Slot"]]:
52 Parses the given API data to extract slots and the number of slots used.
53 """
55 id: int
56 start: datetime.datetime
57 end: datetime.datetime
58 enabled: bool
60 @staticmethod
61 def parse_api(data: dict) -> tuple[int, list["Slot"]]: # type: ignore
62 """
63 Parses CIARC API response into the list of available slots.
65 Args:
66 data (dict): The API response from /slots.
68 Returns:
69 tuple[int, list["Slot"]]: Number of communication slots used and
70 list of slots sorted by the earliest start time.
72 """
73 slots_used = data["communication_slots_used"]
74 slots = []
75 for s in data["slots"]:
76 slots.append(Slot(**s))
78 slots.sort(key=lambda slot: slot.start)
79 # logger.debug(f"Deparsed Slot API used: {slots_used} - {slots}")
80 return (slots_used, slots)
83class ZonedObjective(BaseModel):
84 """
85 One hidden or visible objective, completed by taking pictures of its position.
86 """
88 id: int # could be null acording to Dto
89 name: str
90 start: datetime.datetime
91 end: datetime.datetime
92 decrease_rate: float
93 zone: Optional[tuple[int, int, int, int]] # could be a str acording to dto
94 optic_required: CameraAngle # cast from str
95 coverage_required: float
96 description: str # cast from str
97 secret: bool
98 # sprite is ignored as said in email
100 @staticmethod
101 def parse_api(data: dict) -> list["ZonedObjective"]: # type: ignore
102 """
103 Extracts and parses objectives from its matching api endpoint
104 """
105 z_obj_list = []
106 # parse objective list
107 for obj in data["zoned_objectives"]:
108 if type(obj["zone"]) is str:
109 zone = None
110 else:
111 zone = (
112 int(obj["zone"][0]),
113 int(obj["zone"][1]),
114 int(obj["zone"][2]),
115 int(obj["zone"][3]),
116 )
118 z_obj_list.append(
119 ZonedObjective(
120 id=obj["id"],
121 name=obj["name"],
122 start=datetime.datetime.fromisoformat(obj["start"]),
123 end=datetime.datetime.fromisoformat(obj["end"]),
124 decrease_rate=obj["decrease_rate"],
125 zone=zone,
126 optic_required=CameraAngle(obj["optic_required"]),
127 coverage_required=obj["coverage_required"],
128 description=obj["description"],
129 secret=obj["secret"],
130 )
131 )
133 return sorted(z_obj_list, key=lambda event: event.start)
136class BeaconObjective(BaseModel):
137 """
138 Emergency beacon objective from CIARC API.
139 """
141 id: int
142 name: str
143 start: datetime.datetime
144 end: datetime.datetime
145 decrease_rate: float
146 attempts_made: int
147 description: str
149 @staticmethod
150 def parse_api(data: dict) -> list["BeaconObjective"]: # type: ignore
151 """
152 Parse CIARC API to list of this class
153 """
154 beacon_obj = []
155 for b in data["beacon_objectives"]:
156 beacon_obj.append(BeaconObjective(**b))
158 return sorted(beacon_obj, key=lambda event: event.start)
161class Achievement(BaseModel):
162 """
163 From CIARC API.
164 """
166 name: str
167 done: bool
168 points: int
169 description: str
170 goal_parameter_threshold: Union[bool, int, float, str]
171 goal_parameter: Union[bool, int, float, str]
173 @staticmethod
174 def parse_api(data: dict) -> list["Achievement"]: # type: ignore
175 """
176 Parse CIARC API into list of Achievment.
177 """
178 achv = []
179 for a in data["achievements"]:
180 achv.append(Achievement(**a))
182 return achv
185class HttpCode(Enum):
186 """Used HTTP codes for API."""
188 GET = "get"
189 PUT = "put"
190 DELETE = "delete"
191 POST = "post"
194# based on /observation API endpoint
195class BaseTelemetry(BaseModel):
196 """Based on /observation endpoint."""
198 model_config = ConfigDict(use_enum_values=True)
200 class AreaCovered(BaseModel):
201 narrow: float
202 normal: float
203 wide: float
205 class DataVolume(BaseModel):
206 data_volume_received: int
207 data_volume_sent: int
209 active_time: float
210 angle: CameraAngle
211 area_covered: AreaCovered
212 battery: float
213 data_volume: DataVolume
214 distance_covered: float
215 fuel: float
216 width_x: int
217 height_y: int
218 images_taken: int
219 max_battery: float
220 objectives_done: int
221 objectives_points: int
222 simulation_speed: int
223 state: State
224 timestamp: datetime.datetime
225 vx: float
226 vy: float
228 def __str__(self) -> str:
229 return (
230 f"Telemetry@{self.timestamp.isoformat()} state={self.state} angle={self.angle} "
231 f"(x,y)=({self.width_x},{self.height_y}) (vx,vy)=({self.vx},{self.vy}) "
232 f"battery={self.battery}/{self.max_battery} fuel={self.fuel} sim_speed={self.simulation_speed} "
233 f"dist_cov={self.distance_covered} area_cov={self.area_covered.narrow}/{self.area_covered.normal}/{self.area_covered.wide} "
234 f"active_t={self.active_time} #images={self.images_taken} obj-done/points={self.objectives_done}/{self.objectives_points} "
235 f"data-s/r={self.data_volume.data_volume_sent}/{self.data_volume.data_volume_received}"
236 )
239# [MELVONAUT]
240class MELVINTask(StrEnum):
241 """
242 Our custom programs/missions/states in which we can place Melvin.
243 In evaluation phase only mapping and ebt was used.
244 The other two were used in Phase 2, or could be used in a future update.
245 """
247 Mapping = "mapping"
248 Next_objective = "next_objective"
249 Fixed_objective = "fixed_objective"
250 EBT = "ebt"
251 # Emergencies = "emergencies"
252 # Events = "events"
253 # Idle = "idle"
256"""
257def boxes_overlap_in_grid(box1, box2):
259 # Not completed helper function.
260 # Idea was to check if melvins camera range overlaps with an objective.
262 grid_width = con.WORLD_X
263 grid_height = con.WORLD_Y
264 # Extract the position and dimensions of box1
265 x1, y1, width1, height1 = box1
266 # Extract the position and dimensions of box2
267 x2, y2, width2, height2 = box2
269 # Define a helper function to check overlap in one dimension with overflow
270 def overlap_1d(start1, length1, start2, length2, max_length):
271 # Compute the end positions with wrapping
272 end1 = (start1 + length1 - 1) % max_length
273 end2 = (start2 + length2 - 1) % max_length
275 # Check overlap considering wrapping
276 return (
277 (start1 <= end2 and end1 >= start2) # direct overlap
278 or (
279 end1 < start1 and (start1 <= end2 or end1 >= start2)
280 ) # wrapped around for first box
281 or (
282 end2 < start2 and (start2 <= end1 or end2 >= start1)
283 ) # wrapped around for second box
284 )
286 # Check overlap in both dimensions
287 overlap_x = overlap_1d(x1, width1, x2, width2, grid_width)
288 overlap_y = overlap_1d(y1, height1, y2, height2, grid_height)
290 # The boxes overlap if they overlap in both dimensions
291 return overlap_x and overlap_y
292"""
295def lens_size_by_angle(angle: CameraAngle) -> int:
296 """
297 Returns covered area by a single picture.
298 """
299 match angle:
300 case CameraAngle.Narrow:
301 lens_size = 600
302 case CameraAngle.Normal:
303 lens_size = 800
304 case CameraAngle.Wide:
305 lens_size = 1000
306 return lens_size
309def log_rate_limiter(interval_seconds: int): # type: ignore
310 """
311 Limits how often a single event can trigger a lot entry. Prevents cluttering of the same message.
312 Probaly not a "good" final solution.
313 """
315 # habe luhki nach loguru log rate limiter gefragt, gibt anscheinend keine besser inbuild lösung
316 def decorator(func): # type: ignore
317 last_log_time = [0] # Use a list to allow modification of non-local state
319 def wrapper(*args, **kwargs): # type: ignore
320 nonlocal last_log_time
321 current_time = time.time()
322 if current_time - last_log_time[0] >= interval_seconds:
323 func(*args, **kwargs)
324 last_log_time[0] = current_time # type: ignore
326 return wrapper
328 return decorator
331# Apply a 3-second rate limiter
332@log_rate_limiter(3) # type: ignore
333def limited_log(message: str) -> None:
334 """Log limit for info"""
335 logger.info(message)
338# Apply a 1-second rate limiter
339@log_rate_limiter(1) # type: ignore
340def limited_log_debug(message: str) -> None:
341 """Log limit for debug"""
342 logger.debug(message)
345class Timer(BaseModel):
346 """Starts tasks after a given intervall. E.g. take the next picture X-seconds after the current one."""
348 model_config = ConfigDict(arbitrary_types_allowed=True)
350 _timeout: float
351 _callback: Callable[[], Awaitable[Any]]
352 _task: asyncio.Task[None]
354 def __init__(self, timeout: float, callback: Callable[[], Awaitable[Any]]):
355 super().__init__()
356 self._timeout = timeout
357 self._callback = callback
358 self._task = asyncio.create_task(self._job())
360 async def _job(self) -> None:
361 await asyncio.sleep(self._timeout)
362 await self._callback()
364 def cancel(self) -> None:
365 self._task.cancel()
367 def get_task(self) -> asyncio.Task[None]:
368 return self._task
371class MelvinImage(BaseModel):
372 """Our format for a single image taken by MELVIN."""
374 model_config = ConfigDict(arbitrary_types_allowed=True)
376 image: Image.Image
377 angle: CameraAngle
378 cor_x: int
379 cor_y: int
380 time: datetime.datetime
383class Ping:
384 """Part of EBT objective, one single distance/ping."""
386 def __init__(self, x: int, y: int, d: float, mind: int, maxd: int):
387 self.x = x
388 self.y = y
389 self.d = d
390 self.mind = mind
391 self.maxd = maxd
393 def __str__(self) -> str:
394 return f"Ping: x={self.x}, y={self.y}, d={self.d}, mind={self.mind}, maxd={self.maxd}"
397class Event(BaseModel):
398 """Message by /announcements, includes time and position for ebt processing."""
400 event: str
401 id: int
402 timestamp: Optional[datetime.datetime] = None
403 current_x: Optional[float] = None
404 current_y: Optional[float] = None
406 def __str__(self) -> str:
407 return f"Event: {self.event} (x,y)=({self.current_x},{self.current_y}) t={time_seconds(self.timestamp or live_utc())}"
409 def easy_parse(self) -> tuple[float, float, float]:
410 """Custom parsing wrapper for ebt calculation."""
411 pattern = r"DISTANCE_(\d+\.\d+)"
412 dist = re.findall(pattern, self.event)[0]
413 if dist and self.current_x and self.current_y:
414 return (float(dist), self.current_x, self.current_y)
415 else:
416 logger.warning(f"Tried to parse incomplete event: {self}")
417 return (0.0, 0.0, 0.0)
419 async def to_csv(self) -> None:
420 """Melvonaut saves events."""
421 event_dict = self.model_dump()
422 if self.timestamp:
423 event_dict["timestamp"] = self.timestamp.isoformat()
424 if not Path(con.EVENT_LOCATION_CSV).is_file():
425 async with async_open(con.EVENT_LOCATION_CSV, "w") as afp:
426 writer = csv.DictWriter(afp, fieldnames=event_dict.keys())
427 await writer.writeheader()
428 await writer.writerow(event_dict)
429 # logger.debug(f"Writing event to {con.EVENT_LOCATION_CSV}")
430 else:
431 async with async_open(con.EVENT_LOCATION_CSV, "a") as afp:
432 writer = csv.DictWriter(afp, fieldnames=event_dict.keys())
433 await writer.writerow(event_dict)
434 # logger.debug(f"Writing event to {con.EVENT_LOCATION_CSV}")
436 @staticmethod
437 def load_events_from_csv(path: str) -> list["Event"]:
438 """Melvonaut saves events as csv, Rift-console loads them."""
439 events = []
440 if not Path(path).is_file():
441 logger.warning(f"No event file found under {path}")
442 else:
443 with open(path, "r") as f:
444 for row in csv.DictReader(f):
445 read_event = Event(
446 event=row["event"],
447 id=int(row["id"]),
448 timestamp=datetime.datetime.fromisoformat(row["timestamp"]),
449 current_x=float(row["current_x"]),
450 current_y=float(row["current_y"]),
451 )
452 events.append(read_event)
453 logger.info(f"Loaded {len(events)} events from {path}")
454 return events
457# [TIMEFORMATS]
458# ISO 8601 format
459# Melin returns like this: 2024-12-24T13:10:13.660337Z
460# or 2024-12-26T13:00:00Z
461# Z equivalent to +00:00 to indicate UTC timezone
463# To get current time in UTC use datetime.datetime.now(datetime.timezone.utc)
464# or get from string with datetime.datetime.fromisoformat(X)
465# to also change into isoformat use X.isoformat()
468# TARGET: 2025-03-01T00:54:02.809428+00:00
469def live_utc() -> datetime.datetime:
470 """Returns live datetime object, including timezone utc"""
471 return datetime.datetime.now(datetime.timezone.utc)
474def time_seconds(date: datetime.datetime) -> str:
475 return date.strftime("%Y-%m-%dT%H:%M:%S")