Coverage for src/shared/models.py: 69%

228 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 09:36 +0000

1import asyncio 

2import csv 

3import datetime 

4import re 

5import time 

6from loguru import logger 

7from pathlib import Path 

8from enum import Enum, StrEnum 

9from typing import Callable, Awaitable, Any 

10 

11from PIL import Image 

12from aiofile import async_open 

13from pydantic import BaseModel, ConfigDict 

14from typing import Optional, Union 

15 

16import shared.constants as con 

17 

18# Pillow has a low default maximum image size, overwriten here 

19Image.MAX_IMAGE_PIXELS = 500000000 

20 

21 

22# [From CIARC API User Manual] 

23class CameraAngle(StrEnum): 

24 """ 

25 Different camera angles possible on MELVIN. 

26 """ 

27 

28 Wide = "wide" 

29 Narrow = "narrow" 

30 Normal = "normal" 

31 Unknown = "unknown" 

32 

33 

34class State(StrEnum): 

35 """From CIARC user manual""" 

36 

37 Deployment = "deployment" 

38 Acquisition = "acquisition" 

39 Charge = "charge" 

40 Safe = "safe" 

41 Communication = "communication" 

42 Transition = "transition" 

43 Unknown = "none" 

44 

45 

46class Slot(BaseModel): 

47 """ 

48 One communication slot in which MELVIN can be contacted. 

49 

50 Methods: 

51 parse_api(data: dict) -> tuple[int, list["Slot"]]: 

52 Parses the given API data to extract slots and the number of slots used. 

53 """ 

54 

55 id: int 

56 start: datetime.datetime 

57 end: datetime.datetime 

58 enabled: bool 

59 

60 @staticmethod 

61 def parse_api(data: dict) -> tuple[int, list["Slot"]]: # type: ignore 

62 """ 

63 Parses CIARC API response into the list of available slots. 

64 

65 Args: 

66 data (dict): The API response from /slots. 

67 

68 Returns: 

69 tuple[int, list["Slot"]]: Number of communication slots used and 

70 list of slots sorted by the earliest start time. 

71 

72 """ 

73 slots_used = data["communication_slots_used"] 

74 slots = [] 

75 for s in data["slots"]: 

76 slots.append(Slot(**s)) 

77 

78 slots.sort(key=lambda slot: slot.start) 

79 # logger.debug(f"Deparsed Slot API used: {slots_used} - {slots}") 

80 return (slots_used, slots) 

81 

82 

83class ZonedObjective(BaseModel): 

84 """ 

85 One hidden or visible objective, completed by taking pictures of its position. 

86 """ 

87 

88 id: int # could be null acording to Dto 

89 name: str 

90 start: datetime.datetime 

91 end: datetime.datetime 

92 decrease_rate: float 

93 zone: Optional[tuple[int, int, int, int]] # could be a str acording to dto 

94 optic_required: CameraAngle # cast from str 

95 coverage_required: float 

96 description: str # cast from str 

97 secret: bool 

98 # sprite is ignored as said in email 

99 

100 @staticmethod 

101 def parse_api(data: dict) -> list["ZonedObjective"]: # type: ignore 

102 """ 

103 Extracts and parses objectives from its matching api endpoint 

104 """ 

105 z_obj_list = [] 

106 # parse objective list 

107 for obj in data["zoned_objectives"]: 

108 if type(obj["zone"]) is str: 

109 zone = None 

110 else: 

111 zone = ( 

112 int(obj["zone"][0]), 

113 int(obj["zone"][1]), 

114 int(obj["zone"][2]), 

115 int(obj["zone"][3]), 

116 ) 

117 

118 z_obj_list.append( 

119 ZonedObjective( 

120 id=obj["id"], 

121 name=obj["name"], 

122 start=datetime.datetime.fromisoformat(obj["start"]), 

123 end=datetime.datetime.fromisoformat(obj["end"]), 

124 decrease_rate=obj["decrease_rate"], 

125 zone=zone, 

126 optic_required=CameraAngle(obj["optic_required"]), 

127 coverage_required=obj["coverage_required"], 

128 description=obj["description"], 

129 secret=obj["secret"], 

130 ) 

131 ) 

132 

133 return sorted(z_obj_list, key=lambda event: event.start) 

134 

135 

136class BeaconObjective(BaseModel): 

137 """ 

138 Emergency beacon objective from CIARC API. 

139 """ 

140 

141 id: int 

142 name: str 

143 start: datetime.datetime 

144 end: datetime.datetime 

145 decrease_rate: float 

146 attempts_made: int 

147 description: str 

148 

149 @staticmethod 

150 def parse_api(data: dict) -> list["BeaconObjective"]: # type: ignore 

151 """ 

152 Parse CIARC API to list of this class 

153 """ 

154 beacon_obj = [] 

155 for b in data["beacon_objectives"]: 

156 beacon_obj.append(BeaconObjective(**b)) 

157 

158 return sorted(beacon_obj, key=lambda event: event.start) 

159 

160 

161class Achievement(BaseModel): 

162 """ 

163 From CIARC API. 

164 """ 

165 

166 name: str 

167 done: bool 

168 points: int 

169 description: str 

170 goal_parameter_threshold: Union[bool, int, float, str] 

171 goal_parameter: Union[bool, int, float, str] 

172 

173 @staticmethod 

174 def parse_api(data: dict) -> list["Achievement"]: # type: ignore 

175 """ 

176 Parse CIARC API into list of Achievment. 

177 """ 

178 achv = [] 

179 for a in data["achievements"]: 

180 achv.append(Achievement(**a)) 

181 

182 return achv 

183 

184 

185class HttpCode(Enum): 

186 """Used HTTP codes for API.""" 

187 

188 GET = "get" 

189 PUT = "put" 

190 DELETE = "delete" 

191 POST = "post" 

192 

193 

194# based on /observation API endpoint 

195class BaseTelemetry(BaseModel): 

196 """Based on /observation endpoint.""" 

197 

198 model_config = ConfigDict(use_enum_values=True) 

199 

200 class AreaCovered(BaseModel): 

201 narrow: float 

202 normal: float 

203 wide: float 

204 

205 class DataVolume(BaseModel): 

206 data_volume_received: int 

207 data_volume_sent: int 

208 

209 active_time: float 

210 angle: CameraAngle 

211 area_covered: AreaCovered 

212 battery: float 

213 data_volume: DataVolume 

214 distance_covered: float 

215 fuel: float 

216 width_x: int 

217 height_y: int 

218 images_taken: int 

219 max_battery: float 

220 objectives_done: int 

221 objectives_points: int 

222 simulation_speed: int 

223 state: State 

224 timestamp: datetime.datetime 

225 vx: float 

226 vy: float 

227 

228 def __str__(self) -> str: 

229 return ( 

230 f"Telemetry@{self.timestamp.isoformat()} state={self.state} angle={self.angle} " 

231 f"(x,y)=({self.width_x},{self.height_y}) (vx,vy)=({self.vx},{self.vy}) " 

232 f"battery={self.battery}/{self.max_battery} fuel={self.fuel} sim_speed={self.simulation_speed} " 

233 f"dist_cov={self.distance_covered} area_cov={self.area_covered.narrow}/{self.area_covered.normal}/{self.area_covered.wide} " 

234 f"active_t={self.active_time} #images={self.images_taken} obj-done/points={self.objectives_done}/{self.objectives_points} " 

235 f"data-s/r={self.data_volume.data_volume_sent}/{self.data_volume.data_volume_received}" 

236 ) 

237 

238 

239# [MELVONAUT] 

240class MELVINTask(StrEnum): 

241 """ 

242 Our custom programs/missions/states in which we can place Melvin. 

243 In evaluation phase only mapping and ebt was used. 

244 The other two were used in Phase 2, or could be used in a future update. 

245 """ 

246 

247 Mapping = "mapping" 

248 Next_objective = "next_objective" 

249 Fixed_objective = "fixed_objective" 

250 EBT = "ebt" 

251 # Emergencies = "emergencies" 

252 # Events = "events" 

253 # Idle = "idle" 

254 

255 

256""" 

257def boxes_overlap_in_grid(box1, box2): 

258  

259 # Not completed helper function. 

260 # Idea was to check if melvins camera range overlaps with an objective. 

261  

262 grid_width = con.WORLD_X 

263 grid_height = con.WORLD_Y 

264 # Extract the position and dimensions of box1 

265 x1, y1, width1, height1 = box1 

266 # Extract the position and dimensions of box2 

267 x2, y2, width2, height2 = box2 

268 

269 # Define a helper function to check overlap in one dimension with overflow 

270 def overlap_1d(start1, length1, start2, length2, max_length): 

271 # Compute the end positions with wrapping 

272 end1 = (start1 + length1 - 1) % max_length 

273 end2 = (start2 + length2 - 1) % max_length 

274 

275 # Check overlap considering wrapping 

276 return ( 

277 (start1 <= end2 and end1 >= start2) # direct overlap 

278 or ( 

279 end1 < start1 and (start1 <= end2 or end1 >= start2) 

280 ) # wrapped around for first box 

281 or ( 

282 end2 < start2 and (start2 <= end1 or end2 >= start1) 

283 ) # wrapped around for second box 

284 ) 

285 

286 # Check overlap in both dimensions 

287 overlap_x = overlap_1d(x1, width1, x2, width2, grid_width) 

288 overlap_y = overlap_1d(y1, height1, y2, height2, grid_height) 

289 

290 # The boxes overlap if they overlap in both dimensions 

291 return overlap_x and overlap_y 

292""" 

293 

294 

295def lens_size_by_angle(angle: CameraAngle) -> int: 

296 """ 

297 Returns covered area by a single picture. 

298 """ 

299 match angle: 

300 case CameraAngle.Narrow: 

301 lens_size = 600 

302 case CameraAngle.Normal: 

303 lens_size = 800 

304 case CameraAngle.Wide: 

305 lens_size = 1000 

306 return lens_size 

307 

308 

309def log_rate_limiter(interval_seconds: int): # type: ignore 

310 """ 

311 Limits how often a single event can trigger a lot entry. Prevents cluttering of the same message. 

312 Probaly not a "good" final solution. 

313 """ 

314 

315 # habe luhki nach loguru log rate limiter gefragt, gibt anscheinend keine besser inbuild lösung 

316 def decorator(func): # type: ignore 

317 last_log_time = [0] # Use a list to allow modification of non-local state 

318 

319 def wrapper(*args, **kwargs): # type: ignore 

320 nonlocal last_log_time 

321 current_time = time.time() 

322 if current_time - last_log_time[0] >= interval_seconds: 

323 func(*args, **kwargs) 

324 last_log_time[0] = current_time # type: ignore 

325 

326 return wrapper 

327 

328 return decorator 

329 

330 

331# Apply a 3-second rate limiter 

332@log_rate_limiter(3) # type: ignore 

333def limited_log(message: str) -> None: 

334 """Log limit for info""" 

335 logger.info(message) 

336 

337 

338# Apply a 1-second rate limiter 

339@log_rate_limiter(1) # type: ignore 

340def limited_log_debug(message: str) -> None: 

341 """Log limit for debug""" 

342 logger.debug(message) 

343 

344 

345class Timer(BaseModel): 

346 """Starts tasks after a given intervall. E.g. take the next picture X-seconds after the current one.""" 

347 

348 model_config = ConfigDict(arbitrary_types_allowed=True) 

349 

350 _timeout: float 

351 _callback: Callable[[], Awaitable[Any]] 

352 _task: asyncio.Task[None] 

353 

354 def __init__(self, timeout: float, callback: Callable[[], Awaitable[Any]]): 

355 super().__init__() 

356 self._timeout = timeout 

357 self._callback = callback 

358 self._task = asyncio.create_task(self._job()) 

359 

360 async def _job(self) -> None: 

361 await asyncio.sleep(self._timeout) 

362 await self._callback() 

363 

364 def cancel(self) -> None: 

365 self._task.cancel() 

366 

367 def get_task(self) -> asyncio.Task[None]: 

368 return self._task 

369 

370 

371class MelvinImage(BaseModel): 

372 """Our format for a single image taken by MELVIN.""" 

373 

374 model_config = ConfigDict(arbitrary_types_allowed=True) 

375 

376 image: Image.Image 

377 angle: CameraAngle 

378 cor_x: int 

379 cor_y: int 

380 time: datetime.datetime 

381 

382 

383class Ping: 

384 """Part of EBT objective, one single distance/ping.""" 

385 

386 def __init__(self, x: int, y: int, d: float, mind: int, maxd: int): 

387 self.x = x 

388 self.y = y 

389 self.d = d 

390 self.mind = mind 

391 self.maxd = maxd 

392 

393 def __str__(self) -> str: 

394 return f"Ping: x={self.x}, y={self.y}, d={self.d}, mind={self.mind}, maxd={self.maxd}" 

395 

396 

397class Event(BaseModel): 

398 """Message by /announcements, includes time and position for ebt processing.""" 

399 

400 event: str 

401 id: int 

402 timestamp: Optional[datetime.datetime] = None 

403 current_x: Optional[float] = None 

404 current_y: Optional[float] = None 

405 

406 def __str__(self) -> str: 

407 return f"Event: {self.event} (x,y)=({self.current_x},{self.current_y}) t={time_seconds(self.timestamp or live_utc())}" 

408 

409 def easy_parse(self) -> tuple[float, float, float]: 

410 """Custom parsing wrapper for ebt calculation.""" 

411 pattern = r"DISTANCE_(\d+\.\d+)" 

412 dist = re.findall(pattern, self.event)[0] 

413 if dist and self.current_x and self.current_y: 

414 return (float(dist), self.current_x, self.current_y) 

415 else: 

416 logger.warning(f"Tried to parse incomplete event: {self}") 

417 return (0.0, 0.0, 0.0) 

418 

419 async def to_csv(self) -> None: 

420 """Melvonaut saves events.""" 

421 event_dict = self.model_dump() 

422 if self.timestamp: 

423 event_dict["timestamp"] = self.timestamp.isoformat() 

424 if not Path(con.EVENT_LOCATION_CSV).is_file(): 

425 async with async_open(con.EVENT_LOCATION_CSV, "w") as afp: 

426 writer = csv.DictWriter(afp, fieldnames=event_dict.keys()) 

427 await writer.writeheader() 

428 await writer.writerow(event_dict) 

429 # logger.debug(f"Writing event to {con.EVENT_LOCATION_CSV}") 

430 else: 

431 async with async_open(con.EVENT_LOCATION_CSV, "a") as afp: 

432 writer = csv.DictWriter(afp, fieldnames=event_dict.keys()) 

433 await writer.writerow(event_dict) 

434 # logger.debug(f"Writing event to {con.EVENT_LOCATION_CSV}") 

435 

436 @staticmethod 

437 def load_events_from_csv(path: str) -> list["Event"]: 

438 """Melvonaut saves events as csv, Rift-console loads them.""" 

439 events = [] 

440 if not Path(path).is_file(): 

441 logger.warning(f"No event file found under {path}") 

442 else: 

443 with open(path, "r") as f: 

444 for row in csv.DictReader(f): 

445 read_event = Event( 

446 event=row["event"], 

447 id=int(row["id"]), 

448 timestamp=datetime.datetime.fromisoformat(row["timestamp"]), 

449 current_x=float(row["current_x"]), 

450 current_y=float(row["current_y"]), 

451 ) 

452 events.append(read_event) 

453 logger.info(f"Loaded {len(events)} events from {path}") 

454 return events 

455 

456 

457# [TIMEFORMATS] 

458# ISO 8601 format 

459# Melin returns like this: 2024-12-24T13:10:13.660337Z 

460# or 2024-12-26T13:00:00Z 

461# Z equivalent to +00:00 to indicate UTC timezone 

462 

463# To get current time in UTC use datetime.datetime.now(datetime.timezone.utc) 

464# or get from string with datetime.datetime.fromisoformat(X) 

465# to also change into isoformat use X.isoformat() 

466 

467 

468# TARGET: 2025-03-01T00:54:02.809428+00:00 

469def live_utc() -> datetime.datetime: 

470 """Returns live datetime object, including timezone utc""" 

471 return datetime.datetime.now(datetime.timezone.utc) 

472 

473 

474def time_seconds(date: datetime.datetime) -> str: 

475 return date.strftime("%Y-%m-%dT%H:%M:%S")