Coverage for src/melvonaut/state_planer.py: 10%

341 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 09:36 +0000

1##### State machine ##### 

2import asyncio 

3import subprocess 

4import datetime 

5import math 

6import tracemalloc 

7from typing import Optional, Any 

8from aiofile import async_open 

9import aiohttp 

10from pydantic import BaseModel 

11 

12import shared.constants as con 

13from melvonaut.settings import settings 

14from melvonaut.mel_telemetry import MelTelemetry 

15from shared.models import ( 

16 CameraAngle, 

17 MELVINTask, 

18 State, 

19 Timer, 

20 ZonedObjective, 

21 lens_size_by_angle, 

22 limited_log, 

23 Event, 

24 live_utc, 

25) 

26from loguru import logger 

27import random 

28 

29import os 

30import psutil 

31 

32 

33class StatePlanner(BaseModel): 

34 melv_id: int = random.randint(0, 9999) 

35 current_telemetry: Optional[MelTelemetry] = None 

36 previous_telemetry: Optional[MelTelemetry] = None 

37 

38 previous_state: Optional[State] = None 

39 state_change_time: datetime.datetime = datetime.datetime.now() 

40 

41 submitted_transition_request: bool = False 

42 

43 target_state: Optional[State] = None 

44 

45 _accelerating: bool = False 

46 

47 _run_get_image_task: Optional[asyncio.Task[None]] = None 

48 

49 _aiohttp_session: Optional[aiohttp.ClientSession] = None 

50 

51 _target_vel_x: Optional[float] = None 

52 _target_vel_y: Optional[float] = None 

53 

54 _z_obj_list: list[ZonedObjective] = [] 

55 

56 recent_events: list[Event] = [] 

57 

58 _current_obj_name: str = "" 

59 

60 def model_post_init(self, __context__: Any) -> None: 

61 """Initializes the recent_events list by loading events from a CSV file. 

62 

63 Args: 

64 __context__ (Any): Context data passed during initialization. 

65 """ 

66 self.recent_events = Event.load_events_from_csv(path=con.EVENT_LOCATION_CSV) 

67 

68 def get_current_state(self) -> State: 

69 """Retrieves the current state from telemetry data. 

70 

71 Returns: 

72 State: The current state if telemetry is available, otherwise State.Unknown. 

73 """ 

74 if self.current_telemetry is None: 

75 return State.Unknown 

76 return self.current_telemetry.state 

77 

78 def get_previous_state(self) -> State: 

79 """Retrieves the previous state from telemetry data. 

80 

81 Returns: 

82 State: The previous state if telemetry is available, otherwise State.Unknown. 

83 """ 

84 if self.previous_telemetry is None: 

85 return State.Unknown 

86 return self.previous_telemetry.state 

87 

88 def get_simulation_speed(self) -> int: 

89 """Gets the current simulation speed from telemetry data. 

90 

91 Returns: 

92 int: The simulation speed if telemetry is available, otherwise 1. 

93 """ 

94 if self.current_telemetry is None: 

95 return 1 

96 return self.current_telemetry.simulation_speed 

97 

98 def get_time_since_state_change(self) -> datetime.timedelta: 

99 """Calculates the time elapsed since the last state change. 

100 

101 Returns: 

102 datetime.timedelta: The time difference between now and the last state change. 

103 """ 

104 return datetime.datetime.now() - self.state_change_time 

105 

106 def calc_transition_remaining_time(self) -> datetime.timedelta: 

107 """Calculates the remaining time for state transition. 

108 

109 Returns: 

110 datetime.timedelta: The remaining transition time based on the simulation speed. 

111 """ 

112 if self.get_current_state() is State.Transition: 

113 logger.debug("Not in transition state, returning 0") 

114 return datetime.timedelta(0) 

115 elif self.previous_state == State.Safe: 

116 total_time = datetime.timedelta( 

117 seconds=con.STATE_TRANSITION_FROM_SAFE_TIME 

118 / self.get_simulation_speed() 

119 ) 

120 return total_time - self.get_time_since_state_change() 

121 else: 

122 total_time = datetime.timedelta( 

123 seconds=con.STATE_TRANSITION_TIME / self.get_simulation_speed() 

124 ) 

125 return total_time - self.get_time_since_state_change() 

126 

127 def calc_current_location(self) -> tuple[float, float]: 

128 """Estimates the current location based on telemetry data and time elapsed. 

129 

130 Returns: 

131 tuple[float, float]: The estimated (x, y) coordinates. 

132 """ 

133 if self.current_telemetry is None: 

134 return 0.0, 0.0 

135 time_since_observation = ( 

136 live_utc() - self.current_telemetry.timestamp 

137 ).total_seconds() 

138 current_x = ( 

139 self.current_telemetry.width_x 

140 + self.current_telemetry.vx * time_since_observation 

141 ) 

142 current_y = ( 

143 self.current_telemetry.height_y 

144 + self.current_telemetry.vy * time_since_observation 

145 ) 

146 return current_x, current_y 

147 

148 async def trigger_velocity_change(self, new_vel_x: float, new_vel_y: float) -> None: 

149 """Sets new values for accelartion, also set _accelerating 

150 

151 Args: 

152 new_vel_x (float): The target velocity in the x direction. 

153 new_vel_y (float): The target velocity in the y direction. 

154 """ 

155 

156 self._target_vel_x = new_vel_x 

157 self._target_vel_y = new_vel_y 

158 

159 if self.current_telemetry is None: 

160 logger.warning("No telemetry data available. Cannot set velocity.") 

161 return 

162 if ( 

163 new_vel_x == self.current_telemetry.vx 

164 and new_vel_y == self.current_telemetry.vy 

165 ): 

166 self._accelerating = False 

167 logger.info("Target velocity already set. Not changing velocity.") 

168 return 

169 request_body = { 

170 "vel_x": new_vel_x, 

171 "vel_y": new_vel_y, 

172 "camera_angle": self.current_telemetry.angle, 

173 "state": self.get_current_state(), 

174 } 

175 async with aiohttp.ClientSession() as session: 

176 async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response: 

177 if response.status == 200: 

178 self._accelerating = True 

179 logger.info(f"Velocity set to {new_vel_x}, {new_vel_y}") 

180 else: 

181 logger.error(f"Failed to set velocity to {new_vel_x}, {new_vel_y}") 

182 

183 async def trigger_camera_angle_change(self, new_angle: CameraAngle) -> None: 

184 """Tries to change the camera angle to new_angle 

185 

186 Args: 

187 new_angle (CameraAngle): The desired camera angle. 

188 """ 

189 if self.current_telemetry is None: 

190 logger.warning("No telemetry data available. Cannot set camera angle.") 

191 return 

192 if new_angle == self.current_telemetry.angle: 

193 logger.info("Target camera angle already set. Not changing angle.") 

194 return 

195 request_body = { 

196 "vel_x": self.current_telemetry.vx, 

197 "vel_y": self.current_telemetry.vy, 

198 "camera_angle": new_angle, 

199 "state": self.get_current_state(), 

200 } 

201 async with aiohttp.ClientSession() as session: 

202 async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response: 

203 if response.status == 200: 

204 self.current_telemetry.angle = new_angle 

205 logger.info(f"Camera angle set to {new_angle}") 

206 else: 

207 logger.error(f"Failed to set camera angle to {new_angle}") 

208 

209 async def trigger_state_transition(self, new_state: State) -> None: 

210 """Initiates a state transition if valid conditions are met. 

211 

212 Args: 

213 new_state (State): The target state to transition to. 

214 """ 

215 if new_state in [State.Transition, State.Unknown, State.Deployment, State.Safe]: 

216 logger.warning(f"Cannot transition to {new_state}.") 

217 return 

218 if self.current_telemetry is None: 

219 logger.warning("No telemetry data available. Cannot initiate transition.") 

220 return 

221 if self.current_telemetry.state == State.Transition: 

222 logger.debug("Already in transition state, not starting transition.") 

223 return 

224 if new_state == self.get_current_state(): 

225 logger.debug(f"State is already {new_state}, not starting transition.") 

226 return 

227 request_body = { 

228 "state": new_state, 

229 "vel_x": self.current_telemetry.vx, 

230 "vel_y": self.current_telemetry.vy, 

231 "camera_angle": self.current_telemetry.angle, 

232 } 

233 async with aiohttp.ClientSession() as session: 

234 async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response: 

235 if response.status == 200: 

236 logger.info( 

237 f"Started transition to {new_state} at battery level {self.current_telemetry.battery}" 

238 ) 

239 self.submitted_transition_request = True 

240 self.target_state = new_state 

241 else: 

242 logger.warning( 

243 f"Failed to transition to {new_state}: {response.status}" 

244 ) 

245 logger.debug(f"Response body: {await response.text()}") 

246 

247 async def switch_if_battery_low( 

248 self, state_low_battery: State, state_high_battery: State 

249 ) -> None: 

250 """Switches state based on battery level. 

251 

252 Args: 

253 state_low_battery (State): The state to switch to when battery is low. 

254 state_high_battery (State): The state to switch to when battery is sufficient. 

255 """ 

256 if self.current_telemetry is None: 

257 logger.warning( 

258 "No telemetry data available. Cannot plan battery based switching." 

259 ) 

260 return 

261 if self.current_telemetry.battery <= settings.BATTERY_LOW_THRESHOLD: 

262 if self.get_current_state() == state_low_battery: 

263 return 

264 logger.debug( 

265 f"State is {self.get_current_state()}, Battery is low, triggering transition to {state_low_battery}" 

266 ) 

267 await self.trigger_state_transition(state_low_battery) 

268 else: 

269 if self.get_current_state() == state_high_battery: 

270 return 

271 logger.debug( 

272 f"State is {self.get_current_state()}, Battery is high, triggering transition to {state_high_battery}" 

273 ) 

274 await self.trigger_state_transition(state_high_battery) 

275 

276 async def plan_state_switching(self) -> None: 

277 """Plans and executes state transitions based on current telemetry data. 

278 

279 This function checks the current state and decides whether to transition 

280 to another state based on conditions like battery level and velocity. 

281 

282 Logs relevant debug information and triggers state transitions when necessary. 

283 

284 Returns: 

285 None 

286 """ 

287 if self.current_telemetry is None: 

288 logger.warning("No telemetry data available. Cannot plan state switching.") 

289 return 

290 

291 state = self.get_current_state() 

292 

293 match state: 

294 case State.Transition: 

295 logger.debug( 

296 f"Time since state change: {self.get_time_since_state_change()}" 

297 ) 

298 expected_time_to_complete = self.calc_transition_remaining_time() 

299 limited_log( 

300 f"State is Transition to {self.target_state}, waiting for transition to complete.\nExpected time to complete state transition: {expected_time_to_complete}" 

301 ) 

302 # logger.debug( 

303 # f"Previous state: {self.get_previous_state()}, Current state: {self.get_current_state()}" 

304 # ) 

305 case State.Acquisition: 

306 # in EBT leave once everything is set 

307 if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT: 

308 if ( 

309 self._target_vel_x 

310 and self._target_vel_y 

311 and self.current_telemetry.angle 

312 == settings.TARGET_CAMERA_ANGLE_ACQUISITION 

313 and self._target_vel_x == self.current_telemetry.vx 

314 and self._target_vel_y == self.current_telemetry.vy 

315 ): 

316 await self.trigger_state_transition(State.Communication) 

317 

318 await self.switch_if_battery_low(State.Charge, State.Acquisition) 

319 

320 case State.Charge: 

321 if ( 

322 self.current_telemetry.battery 

323 >= self.current_telemetry.max_battery 

324 - settings.BATTERY_HIGH_THRESHOLD 

325 ): 

326 if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT: 

327 # starting ebt, but speed/angle not set yet 

328 

329 logger.info( 

330 f"EBT Task, Angle: telemetry: {self.current_telemetry.angle} vs target: {settings.TARGET_CAMERA_ANGLE_ACQUISITION}" 

331 ) 

332 logger.info( 

333 f"EBT Task, vx: {self.current_telemetry.vx} vs target: {self._target_vel_x}" 

334 ) 

335 logger.info( 

336 f"EBT Task, vy: {self.current_telemetry.vy} vs target: {self._target_vel_y}" 

337 ) 

338 if ( 

339 self._target_vel_x is None 

340 or self._target_vel_y is None 

341 or self.current_telemetry.angle 

342 != settings.TARGET_CAMERA_ANGLE_ACQUISITION 

343 or self._target_vel_x != self.current_telemetry.vx 

344 or self._target_vel_y != self.current_telemetry.vy 

345 ): 

346 await self.trigger_state_transition(State.Acquisition) 

347 else: 

348 # logger.info("starting comms!") 

349 await self.trigger_state_transition(State.Communication) 

350 

351 else: 

352 # logger.info("starting acq!") 

353 await self.trigger_state_transition(State.Acquisition) 

354 

355 case State.Safe: 

356 if self.current_telemetry.battery >= ( 

357 self.current_telemetry.max_battery * 0.5 

358 ): 

359 await self.trigger_state_transition(State.Acquisition) 

360 else: 

361 await self.trigger_state_transition(State.Charge) 

362 await self.switch_if_battery_low(State.Charge, State.Acquisition) 

363 case State.Communication: 

364 await self.switch_if_battery_low(State.Charge, State.Communication) 

365 case State.Deployment: 

366 logger.debug( 

367 "State is Deployment, triggering transition to Acquisition" 

368 ) 

369 await self.trigger_state_transition(State.Acquisition) 

370 

371 async def update_telemetry(self, new_telemetry: MelTelemetry) -> None: 

372 """Updates the telemetry data and handles state changes. 

373 

374 This function updates the previous and current telemetry readings, 

375 logs relevant debug information, and checks for state changes. 

376 

377 If a state change occurs, it logs the transition, cancels image 

378 retrieval tasks if necessary, and triggers appropriate actions based on 

379 the new state. 

380 

381 Args: 

382 new_telemetry (MelTelemetry): The new telemetry data to update. 

383 

384 Returns: 

385 None 

386 """ 

387 self.previous_telemetry = self.current_telemetry 

388 self.current_telemetry = new_telemetry 

389 

390 logger.debug( 

391 f"New observations - State: {self.get_current_state()}," 

392 f" Battery level: {self.current_telemetry.battery}/{self.current_telemetry.max_battery}," 

393 f" Vel X,Y: {self.current_telemetry.vx}, {self.current_telemetry.vy}," 

394 f" Fuel: {self.current_telemetry.fuel}" 

395 ) 

396 

397 logger.debug( 

398 "Current memory usage: " 

399 + str(psutil.Process(os.getpid()).memory_info().rss / 1024**2) 

400 + " MB" 

401 ) 

402 

403 # if self.get_current_state() == State.Acquisition: 

404 # await self.get_image() 

405 # logger.debug(f"Threads: {threading.active_count()}") 

406 # for thread in threading.enumerate(): 

407 # frame = sys._current_frames()[thread.ident] 

408 # logger.warning(f"{inspect.getframeinfo(frame).filename}.{inspect.getframeinfo(frame).function}:{inspect.getframeinfo(frame).lineno}") 

409 

410 # check if still accelerating 

411 if ( 

412 self._target_vel_x == self.current_telemetry.vx 

413 and self._target_vel_y == self.current_telemetry.vy 

414 ): 

415 self._accelerating = False 

416 

417 if self.previous_telemetry is not None: 

418 if self.get_previous_state() != self.get_current_state(): 

419 logger.info( 

420 f"State changed from {self.get_previous_state()} to {self.get_current_state()}" 

421 ) 

422 self.previous_state = self.get_previous_state() 

423 self.state_change_time = datetime.datetime.now() 

424 # Put in here events to do on state change 

425 if settings.TRACING: 

426 if self.previous_state == State.Transition: 

427 snapshot1 = tracemalloc.take_snapshot() 

428 stats = snapshot1.statistics("traceback") 

429 for stat in stats: 

430 logger.warning( 

431 "%s memory blocks: %.1f KiB" 

432 % (stat.count, stat.size / 1024) 

433 ) 

434 for line in stat.traceback.format(): 

435 logger.warning(line) 

436 

437 # logger.debug( 

438 # f"Previous state: {self.previous_state}, Current state: {self.get_current_state()}" 

439 # ) 

440 match self.get_current_state(): 

441 case State.Transition: 

442 if self._run_get_image_task: 

443 logger.debug("end image") 

444 self._run_get_image_task.cancel() 

445 self._run_get_image_task = None 

446 if self.submitted_transition_request: 

447 self.submitted_transition_request = False 

448 else: 

449 logger.warning("State transition was externally triggered!") 

450 case State.Acquisition: 

451 logger.info("Starting control in acquisition state.") 

452 if self._run_get_image_task: 

453 logger.debug("Image task already running") 

454 else: 

455 logger.debug("start image") 

456 loop = asyncio.get_event_loop() 

457 self._run_get_image_task = loop.create_task( 

458 self.run_get_image() 

459 ) 

460 await self.control_acquisition() 

461 case State.Charge: 

462 pass 

463 case State.Safe: 

464 logger.warning("State transitioned to SAFE!") 

465 case State.Communication: 

466 pass 

467 case State.Deployment: 

468 logger.warning("State transitioned to DEPLOYMENT!") 

469 case _: 

470 logger.warning(f"Unknown state {self.get_current_state()}") 

471 if self.get_current_state() != State.Acquisition: 

472 self._accelerating = False 

473 if self.get_current_state() != State.Transition: 

474 if self.target_state != self.get_current_state(): 

475 logger.warning( 

476 f"Planned state transition to {self.target_state} failed, now in {self.get_current_state()}" 

477 ) 

478 else: 

479 logger.debug( 

480 f"Planned state transition to {self.target_state} succeeded." 

481 ) 

482 self.target_state = None 

483 

484 await self.plan_state_switching() 

485 

486 async def get_image(self) -> None: 

487 """Captures an image if telemetry data is available and conditions are met. 

488 

489 If no telemetry data is available, the function waits for the next observation cycle. 

490 It checks various conditions (e.g., acceleration, camera angle, timing) before fetching an image 

491 from an external API and saving it with appropriate metadata. 

492 

493 Returns: 

494 None 

495 """ 

496 logger.debug("Getting image") 

497 if self.current_telemetry is None: 

498 logger.warning( 

499 f"No telemetry data available. Waiting {settings.OBSERVATION_REFRESH_RATE}s for next image." 

500 ) 

501 image_task = Timer( 

502 timeout=settings.OBSERVATION_REFRESH_RATE, callback=self.get_image 

503 ).get_task() 

504 await asyncio.gather(image_task) 

505 return 

506 if not self._aiohttp_session: 

507 self._aiohttp_session = aiohttp.ClientSession() 

508 

509 # Filter out cases where no image should be taken 

510 

511 if ( 

512 settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective 

513 or settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective 

514 ) and not self._z_obj_list: 

515 logger.warning( 

516 "Skipped image: In Objectives_only mode, but z_obj_list emtpy!" 

517 ) 

518 return 

519 

520 if self.current_telemetry.angle != settings.TARGET_CAMERA_ANGLE_ACQUISITION: 

521 logger.info( 

522 f"Skipped image: current_angle={self.current_telemetry.angle} vs target={settings.TARGET_CAMERA_ANGLE_ACQUISITION}" 

523 ) 

524 return 

525 if self._accelerating: 

526 logger.info( 

527 f"Skipped image: accelerating to: {self._target_vel_x} {self._target_vel_y}" 

528 ) 

529 return 

530 

531 if settings.DO_TIMING_CHECK and settings.START_TIME > datetime.datetime.now( 

532 datetime.timezone.utc 

533 ): 

534 logger.warning( 

535 f"Skipped image, to early: start={settings.START_TIME} current_time={live_utc()}" 

536 ) 

537 return 

538 if settings.DO_TIMING_CHECK and live_utc() > settings.STOP_TIME: 

539 logger.warning( 

540 f"Skipped image, to late: end={settings.STOP_TIME} current_time={live_utc()}" 

541 ) 

542 return 

543 

544 # save the current telemetry values, so they dont get overwritten by a later update 

545 tele_timestamp = self.current_telemetry.timestamp 

546 tele_x = self.current_telemetry.width_x 

547 tele_y = self.current_telemetry.height_y 

548 tele_vx = self.current_telemetry.vx 

549 tele_vy = self.current_telemetry.vy 

550 tele_simSpeed = self.get_simulation_speed() 

551 tele_angle = self.current_telemetry.angle 

552 

553 lens_size = lens_size_by_angle(self.current_telemetry.angle) 

554 """ 

555 # TODO add box check if melvin and objective overlap 

556 # check if we are in range of an objective 

557 # TODO also check MELVINTasks 

558 # TODO check if within box, unless hidden 

559 melvin_box = ( 

560 tele_x - lens_size / 2, 

561 tele_y - lens_size / 2, 

562 tele_x + lens_size / 2, 

563 tele_y + lens_size / 2, 

564 ) 

565 if self._z_obj_list[0].zone is None: 

566 logger.warning("Hidden objective, taking photo") 

567 return 

568 TODO 

569 else: 

570 logger.warning("Checking if in range of objective:") 

571 objective_box = self._z_obj_list[0].zone 

572 

573 if not boxes_overlap_in_grid(melvin_box, objective_box): 

574 logger.error( 

575 f"Image skipped, not Overlapping! {melvin_box} {objective_box}" 

576 ) 

577 return 

578 """ 

579 async with aiohttp.ClientSession() as session: 

580 try: 

581 async with session.get(con.IMAGE_ENDPOINT) as response: 

582 if response.status == 200: 

583 # Extract exact image timestamp 

584 img_timestamp = response.headers.get("image-timestamp") 

585 if img_timestamp is None: 

586 logger.error( 

587 "Image timestamp not found in headers, substituting with current time" 

588 ) 

589 parsed_img_timestamp = datetime.datetime.now() 

590 else: 

591 parsed_img_timestamp = datetime.datetime.fromisoformat( 

592 img_timestamp 

593 ) 

594 

595 # Calculate the difference between the img and the last telemetry 

596 difference_in_seconds = ( 

597 parsed_img_timestamp - tele_timestamp 

598 ).total_seconds() 

599 

600 adj_x = round( 

601 tele_x + (difference_in_seconds * tele_vx * tele_simSpeed) 

602 ) - (lens_size / 2) 

603 adj_y = round( 

604 tele_y + (difference_in_seconds * tele_vy * tele_simSpeed) 

605 ) - (lens_size / 2) 

606 

607 # TODO check if images are correct! 

608 # TODO might also need modulo for side cases 

609 # logger.debug(f"T {parsed_img_timestamp} | C {tele_timestamp}") 

610 # logger.debug( 

611 # f"D {difference_in_seconds} | R {tele_x} ADJ {adj_x}" 

612 # ) 

613 

614 image_path = con.IMAGE_LOCATION.format( 

615 melv_id=self._current_obj_name, 

616 angle=tele_angle, 

617 time=parsed_img_timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"), 

618 cor_x=int(adj_x), 

619 cor_y=int(adj_y), 

620 ) 

621 

622 logger.info( 

623 f"Received image at {adj_x}x {adj_y}y with {self.current_telemetry.angle} angle" 

624 ) 

625 

626 async with async_open(image_path, "wb") as afp: 

627 while True: 

628 cnt = await response.content.readany() 

629 if not cnt: 

630 break 

631 await afp.write(cnt) 

632 else: 

633 logger.warning(f"Failed to get image: {response.status}") 

634 logger.info(f"Response body: {await response.text()}") 

635 logger.info( 

636 "This is normal at the end of acquisition mode once." 

637 ) 

638 except aiohttp.client_exceptions.ConnectionTimeoutError: 

639 logger.warning("Observations endpoint timeouted.") 

640 except asyncio.exceptions.CancelledError: 

641 logger.warning("Get image task was cancelled.") 

642 

643 async def run_get_image(self) -> None: 

644 """Continuously captures images while in the Acquisition state. 

645 

646 This function continuously captures images at calculated intervals, 

647 adjusting timing based on velocity and acceleration. 

648 

649 Returns: 

650 None 

651 """ 

652 logger.debug("Starting run_get_image") 

653 if self.get_current_state() == State.Acquisition: 

654 await self.get_image() 

655 while self.get_current_state() == State.Acquisition: 

656 if self.current_telemetry is None: 

657 logger.debug( 

658 "No telemetry data available. Assuming Observation Refresh Rate." 

659 ) 

660 delay_in_s = float(settings.OBSERVATION_REFRESH_RATE) 

661 else: 

662 current_total_vel = ( 

663 self.current_telemetry.vx + self.current_telemetry.vy 

664 ) 

665 if self._accelerating: 

666 # If accelerating calculate distance based on current speed and acceleration 

667 delay_in_s = ( 

668 math.sqrt( 

669 current_total_vel**2 

670 + 2 * con.ACCELERATION * settings.DISTANCE_BETWEEN_IMAGES 

671 ) 

672 - current_total_vel 

673 ) / con.ACCELERATION 

674 else: 

675 # When not accelerating calculate distance based on current speed 

676 delay_in_s = ( 

677 float(settings.DISTANCE_BETWEEN_IMAGES) / current_total_vel 

678 ) 

679 delay_in_s = delay_in_s / self.get_simulation_speed() 

680 logger.debug(f"Next image in {delay_in_s}s.") 

681 image_task = Timer(timeout=delay_in_s, callback=self.get_image).get_task() 

682 await asyncio.gather(image_task) 

683 

684 # run once after changing into acquisition mode -> setup 

685 async def control_acquisition(self) -> None: 

686 """Initializes acquisition mode by updating objectives and setting camera parameters. 

687 

688 Retrieves objectives from an external API, determines the current objective, and adjusts 

689 acquisition parameters accordingly. Also creates necessary directories for image storage. 

690 

691 Returns: 

692 None 

693 """ 

694 async with aiohttp.ClientSession() as session: 

695 # update Objectives 

696 async with session.get(con.OBJECTIVE_ENDPOINT) as response: 

697 if response.status == 200: 

698 json_response = await response.json() 

699 self._z_obj_list: list[ZonedObjective] = ZonedObjective.parse_api( 

700 json_response 

701 ) 

702 logger.info( 

703 f"Updated objectives, there are {len(self._z_obj_list)} objectives." 

704 ) 

705 else: 

706 logger.error("Could not get OBJECTIVE_ENDPOINT") 

707 

708 current_obj = None 

709 # Always check for new objective in this task 

710 if settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective: 

711 current_obj = self._z_obj_list[0] 

712 

713 logger.error(f"Using next_objective, current task: {current_obj.name}") 

714 

715 # In this task look for the given string 

716 elif settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective: 

717 for obj in self._z_obj_list: 

718 if obj.name == settings.FIXED_OBJECTIVE: 

719 logger.error(f"Using fixed_objective, current task: {obj}") 

720 current_obj = obj 

721 break 

722 

723 if current_obj: 

724 settings.START_TIME = current_obj.start 

725 settings.STOP_TIME = current_obj.end 

726 settings.TARGET_CAMERA_ANGLE_ACQUISITION = current_obj.optic_required 

727 

728 self._current_obj_name = str(current_obj.id) + current_obj.name.replace( 

729 " ", "" 

730 ) 

731 

732 con.IMAGE_PATH = con.IMAGE_PATH_BASE + self._current_obj_name + "/" 

733 

734 con.IMAGE_LOCATION = ( 

735 con.IMAGE_PATH 

736 + "image_{melv_id}_{angle}_{time}_x_{cor_x}_y_{cor_y}.png" 

737 ) 

738 try: 

739 subprocess.run(["mkdir", con.IMAGE_PATH], check=True) 

740 logger.info(f"Created folder: {con.IMAGE_PATH}") 

741 

742 except subprocess.CalledProcessError as e: 

743 logger.info(f"z_obj could not mkdir: {e}") 

744 

745 # check if change occured and cut the last image 

746 

747 await self.trigger_camera_angle_change(settings.TARGET_CAMERA_ANGLE_ACQUISITION) 

748 

749 # TODO stop velocity change if battery is low 

750 if self.current_telemetry and self.current_telemetry.battery < 10: 

751 logger.error("Battery low, cant accelerate any more!") 

752 

753 match settings.TARGET_CAMERA_ANGLE_ACQUISITION: 

754 case CameraAngle.Wide: 

755 await self.trigger_velocity_change( 

756 settings.TARGET_SPEED_WIDE_X, settings.TARGET_SPEED_WIDE_Y 

757 ) 

758 case CameraAngle.Narrow: 

759 await self.trigger_velocity_change( 

760 settings.TARGET_SPEED_NARROW_X, settings.TARGET_SPEED_NARROW_Y 

761 ) 

762 case CameraAngle.Normal: 

763 await self.trigger_velocity_change( 

764 settings.TARGET_SPEED_NORMAL_X, settings.TARGET_SPEED_NORMAL_Y 

765 ) 

766 case _: 

767 pass 

768 

769"""Spawn StatePlanner object""" 

770state_planner = StatePlanner()