Coverage for src/melvonaut/state_planer.py: 10%
341 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
1##### State machine #####
2import asyncio
3import subprocess
4import datetime
5import math
6import tracemalloc
7from typing import Optional, Any
8from aiofile import async_open
9import aiohttp
10from pydantic import BaseModel
12import shared.constants as con
13from melvonaut.settings import settings
14from melvonaut.mel_telemetry import MelTelemetry
15from shared.models import (
16 CameraAngle,
17 MELVINTask,
18 State,
19 Timer,
20 ZonedObjective,
21 lens_size_by_angle,
22 limited_log,
23 Event,
24 live_utc,
25)
26from loguru import logger
27import random
29import os
30import psutil
33class StatePlanner(BaseModel):
34 melv_id: int = random.randint(0, 9999)
35 current_telemetry: Optional[MelTelemetry] = None
36 previous_telemetry: Optional[MelTelemetry] = None
38 previous_state: Optional[State] = None
39 state_change_time: datetime.datetime = datetime.datetime.now()
41 submitted_transition_request: bool = False
43 target_state: Optional[State] = None
45 _accelerating: bool = False
47 _run_get_image_task: Optional[asyncio.Task[None]] = None
49 _aiohttp_session: Optional[aiohttp.ClientSession] = None
51 _target_vel_x: Optional[float] = None
52 _target_vel_y: Optional[float] = None
54 _z_obj_list: list[ZonedObjective] = []
56 recent_events: list[Event] = []
58 _current_obj_name: str = ""
60 def model_post_init(self, __context__: Any) -> None:
61 """Initializes the recent_events list by loading events from a CSV file.
63 Args:
64 __context__ (Any): Context data passed during initialization.
65 """
66 self.recent_events = Event.load_events_from_csv(path=con.EVENT_LOCATION_CSV)
68 def get_current_state(self) -> State:
69 """Retrieves the current state from telemetry data.
71 Returns:
72 State: The current state if telemetry is available, otherwise State.Unknown.
73 """
74 if self.current_telemetry is None:
75 return State.Unknown
76 return self.current_telemetry.state
78 def get_previous_state(self) -> State:
79 """Retrieves the previous state from telemetry data.
81 Returns:
82 State: The previous state if telemetry is available, otherwise State.Unknown.
83 """
84 if self.previous_telemetry is None:
85 return State.Unknown
86 return self.previous_telemetry.state
88 def get_simulation_speed(self) -> int:
89 """Gets the current simulation speed from telemetry data.
91 Returns:
92 int: The simulation speed if telemetry is available, otherwise 1.
93 """
94 if self.current_telemetry is None:
95 return 1
96 return self.current_telemetry.simulation_speed
98 def get_time_since_state_change(self) -> datetime.timedelta:
99 """Calculates the time elapsed since the last state change.
101 Returns:
102 datetime.timedelta: The time difference between now and the last state change.
103 """
104 return datetime.datetime.now() - self.state_change_time
106 def calc_transition_remaining_time(self) -> datetime.timedelta:
107 """Calculates the remaining time for state transition.
109 Returns:
110 datetime.timedelta: The remaining transition time based on the simulation speed.
111 """
112 if self.get_current_state() is State.Transition:
113 logger.debug("Not in transition state, returning 0")
114 return datetime.timedelta(0)
115 elif self.previous_state == State.Safe:
116 total_time = datetime.timedelta(
117 seconds=con.STATE_TRANSITION_FROM_SAFE_TIME
118 / self.get_simulation_speed()
119 )
120 return total_time - self.get_time_since_state_change()
121 else:
122 total_time = datetime.timedelta(
123 seconds=con.STATE_TRANSITION_TIME / self.get_simulation_speed()
124 )
125 return total_time - self.get_time_since_state_change()
127 def calc_current_location(self) -> tuple[float, float]:
128 """Estimates the current location based on telemetry data and time elapsed.
130 Returns:
131 tuple[float, float]: The estimated (x, y) coordinates.
132 """
133 if self.current_telemetry is None:
134 return 0.0, 0.0
135 time_since_observation = (
136 live_utc() - self.current_telemetry.timestamp
137 ).total_seconds()
138 current_x = (
139 self.current_telemetry.width_x
140 + self.current_telemetry.vx * time_since_observation
141 )
142 current_y = (
143 self.current_telemetry.height_y
144 + self.current_telemetry.vy * time_since_observation
145 )
146 return current_x, current_y
148 async def trigger_velocity_change(self, new_vel_x: float, new_vel_y: float) -> None:
149 """Sets new values for accelartion, also set _accelerating
151 Args:
152 new_vel_x (float): The target velocity in the x direction.
153 new_vel_y (float): The target velocity in the y direction.
154 """
156 self._target_vel_x = new_vel_x
157 self._target_vel_y = new_vel_y
159 if self.current_telemetry is None:
160 logger.warning("No telemetry data available. Cannot set velocity.")
161 return
162 if (
163 new_vel_x == self.current_telemetry.vx
164 and new_vel_y == self.current_telemetry.vy
165 ):
166 self._accelerating = False
167 logger.info("Target velocity already set. Not changing velocity.")
168 return
169 request_body = {
170 "vel_x": new_vel_x,
171 "vel_y": new_vel_y,
172 "camera_angle": self.current_telemetry.angle,
173 "state": self.get_current_state(),
174 }
175 async with aiohttp.ClientSession() as session:
176 async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
177 if response.status == 200:
178 self._accelerating = True
179 logger.info(f"Velocity set to {new_vel_x}, {new_vel_y}")
180 else:
181 logger.error(f"Failed to set velocity to {new_vel_x}, {new_vel_y}")
183 async def trigger_camera_angle_change(self, new_angle: CameraAngle) -> None:
184 """Tries to change the camera angle to new_angle
186 Args:
187 new_angle (CameraAngle): The desired camera angle.
188 """
189 if self.current_telemetry is None:
190 logger.warning("No telemetry data available. Cannot set camera angle.")
191 return
192 if new_angle == self.current_telemetry.angle:
193 logger.info("Target camera angle already set. Not changing angle.")
194 return
195 request_body = {
196 "vel_x": self.current_telemetry.vx,
197 "vel_y": self.current_telemetry.vy,
198 "camera_angle": new_angle,
199 "state": self.get_current_state(),
200 }
201 async with aiohttp.ClientSession() as session:
202 async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
203 if response.status == 200:
204 self.current_telemetry.angle = new_angle
205 logger.info(f"Camera angle set to {new_angle}")
206 else:
207 logger.error(f"Failed to set camera angle to {new_angle}")
209 async def trigger_state_transition(self, new_state: State) -> None:
210 """Initiates a state transition if valid conditions are met.
212 Args:
213 new_state (State): The target state to transition to.
214 """
215 if new_state in [State.Transition, State.Unknown, State.Deployment, State.Safe]:
216 logger.warning(f"Cannot transition to {new_state}.")
217 return
218 if self.current_telemetry is None:
219 logger.warning("No telemetry data available. Cannot initiate transition.")
220 return
221 if self.current_telemetry.state == State.Transition:
222 logger.debug("Already in transition state, not starting transition.")
223 return
224 if new_state == self.get_current_state():
225 logger.debug(f"State is already {new_state}, not starting transition.")
226 return
227 request_body = {
228 "state": new_state,
229 "vel_x": self.current_telemetry.vx,
230 "vel_y": self.current_telemetry.vy,
231 "camera_angle": self.current_telemetry.angle,
232 }
233 async with aiohttp.ClientSession() as session:
234 async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
235 if response.status == 200:
236 logger.info(
237 f"Started transition to {new_state} at battery level {self.current_telemetry.battery}"
238 )
239 self.submitted_transition_request = True
240 self.target_state = new_state
241 else:
242 logger.warning(
243 f"Failed to transition to {new_state}: {response.status}"
244 )
245 logger.debug(f"Response body: {await response.text()}")
247 async def switch_if_battery_low(
248 self, state_low_battery: State, state_high_battery: State
249 ) -> None:
250 """Switches state based on battery level.
252 Args:
253 state_low_battery (State): The state to switch to when battery is low.
254 state_high_battery (State): The state to switch to when battery is sufficient.
255 """
256 if self.current_telemetry is None:
257 logger.warning(
258 "No telemetry data available. Cannot plan battery based switching."
259 )
260 return
261 if self.current_telemetry.battery <= settings.BATTERY_LOW_THRESHOLD:
262 if self.get_current_state() == state_low_battery:
263 return
264 logger.debug(
265 f"State is {self.get_current_state()}, Battery is low, triggering transition to {state_low_battery}"
266 )
267 await self.trigger_state_transition(state_low_battery)
268 else:
269 if self.get_current_state() == state_high_battery:
270 return
271 logger.debug(
272 f"State is {self.get_current_state()}, Battery is high, triggering transition to {state_high_battery}"
273 )
274 await self.trigger_state_transition(state_high_battery)
276 async def plan_state_switching(self) -> None:
277 """Plans and executes state transitions based on current telemetry data.
279 This function checks the current state and decides whether to transition
280 to another state based on conditions like battery level and velocity.
282 Logs relevant debug information and triggers state transitions when necessary.
284 Returns:
285 None
286 """
287 if self.current_telemetry is None:
288 logger.warning("No telemetry data available. Cannot plan state switching.")
289 return
291 state = self.get_current_state()
293 match state:
294 case State.Transition:
295 logger.debug(
296 f"Time since state change: {self.get_time_since_state_change()}"
297 )
298 expected_time_to_complete = self.calc_transition_remaining_time()
299 limited_log(
300 f"State is Transition to {self.target_state}, waiting for transition to complete.\nExpected time to complete state transition: {expected_time_to_complete}"
301 )
302 # logger.debug(
303 # f"Previous state: {self.get_previous_state()}, Current state: {self.get_current_state()}"
304 # )
305 case State.Acquisition:
306 # in EBT leave once everything is set
307 if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT:
308 if (
309 self._target_vel_x
310 and self._target_vel_y
311 and self.current_telemetry.angle
312 == settings.TARGET_CAMERA_ANGLE_ACQUISITION
313 and self._target_vel_x == self.current_telemetry.vx
314 and self._target_vel_y == self.current_telemetry.vy
315 ):
316 await self.trigger_state_transition(State.Communication)
318 await self.switch_if_battery_low(State.Charge, State.Acquisition)
320 case State.Charge:
321 if (
322 self.current_telemetry.battery
323 >= self.current_telemetry.max_battery
324 - settings.BATTERY_HIGH_THRESHOLD
325 ):
326 if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT:
327 # starting ebt, but speed/angle not set yet
329 logger.info(
330 f"EBT Task, Angle: telemetry: {self.current_telemetry.angle} vs target: {settings.TARGET_CAMERA_ANGLE_ACQUISITION}"
331 )
332 logger.info(
333 f"EBT Task, vx: {self.current_telemetry.vx} vs target: {self._target_vel_x}"
334 )
335 logger.info(
336 f"EBT Task, vy: {self.current_telemetry.vy} vs target: {self._target_vel_y}"
337 )
338 if (
339 self._target_vel_x is None
340 or self._target_vel_y is None
341 or self.current_telemetry.angle
342 != settings.TARGET_CAMERA_ANGLE_ACQUISITION
343 or self._target_vel_x != self.current_telemetry.vx
344 or self._target_vel_y != self.current_telemetry.vy
345 ):
346 await self.trigger_state_transition(State.Acquisition)
347 else:
348 # logger.info("starting comms!")
349 await self.trigger_state_transition(State.Communication)
351 else:
352 # logger.info("starting acq!")
353 await self.trigger_state_transition(State.Acquisition)
355 case State.Safe:
356 if self.current_telemetry.battery >= (
357 self.current_telemetry.max_battery * 0.5
358 ):
359 await self.trigger_state_transition(State.Acquisition)
360 else:
361 await self.trigger_state_transition(State.Charge)
362 await self.switch_if_battery_low(State.Charge, State.Acquisition)
363 case State.Communication:
364 await self.switch_if_battery_low(State.Charge, State.Communication)
365 case State.Deployment:
366 logger.debug(
367 "State is Deployment, triggering transition to Acquisition"
368 )
369 await self.trigger_state_transition(State.Acquisition)
371 async def update_telemetry(self, new_telemetry: MelTelemetry) -> None:
372 """Updates the telemetry data and handles state changes.
374 This function updates the previous and current telemetry readings,
375 logs relevant debug information, and checks for state changes.
377 If a state change occurs, it logs the transition, cancels image
378 retrieval tasks if necessary, and triggers appropriate actions based on
379 the new state.
381 Args:
382 new_telemetry (MelTelemetry): The new telemetry data to update.
384 Returns:
385 None
386 """
387 self.previous_telemetry = self.current_telemetry
388 self.current_telemetry = new_telemetry
390 logger.debug(
391 f"New observations - State: {self.get_current_state()},"
392 f" Battery level: {self.current_telemetry.battery}/{self.current_telemetry.max_battery},"
393 f" Vel X,Y: {self.current_telemetry.vx}, {self.current_telemetry.vy},"
394 f" Fuel: {self.current_telemetry.fuel}"
395 )
397 logger.debug(
398 "Current memory usage: "
399 + str(psutil.Process(os.getpid()).memory_info().rss / 1024**2)
400 + " MB"
401 )
403 # if self.get_current_state() == State.Acquisition:
404 # await self.get_image()
405 # logger.debug(f"Threads: {threading.active_count()}")
406 # for thread in threading.enumerate():
407 # frame = sys._current_frames()[thread.ident]
408 # logger.warning(f"{inspect.getframeinfo(frame).filename}.{inspect.getframeinfo(frame).function}:{inspect.getframeinfo(frame).lineno}")
410 # check if still accelerating
411 if (
412 self._target_vel_x == self.current_telemetry.vx
413 and self._target_vel_y == self.current_telemetry.vy
414 ):
415 self._accelerating = False
417 if self.previous_telemetry is not None:
418 if self.get_previous_state() != self.get_current_state():
419 logger.info(
420 f"State changed from {self.get_previous_state()} to {self.get_current_state()}"
421 )
422 self.previous_state = self.get_previous_state()
423 self.state_change_time = datetime.datetime.now()
424 # Put in here events to do on state change
425 if settings.TRACING:
426 if self.previous_state == State.Transition:
427 snapshot1 = tracemalloc.take_snapshot()
428 stats = snapshot1.statistics("traceback")
429 for stat in stats:
430 logger.warning(
431 "%s memory blocks: %.1f KiB"
432 % (stat.count, stat.size / 1024)
433 )
434 for line in stat.traceback.format():
435 logger.warning(line)
437 # logger.debug(
438 # f"Previous state: {self.previous_state}, Current state: {self.get_current_state()}"
439 # )
440 match self.get_current_state():
441 case State.Transition:
442 if self._run_get_image_task:
443 logger.debug("end image")
444 self._run_get_image_task.cancel()
445 self._run_get_image_task = None
446 if self.submitted_transition_request:
447 self.submitted_transition_request = False
448 else:
449 logger.warning("State transition was externally triggered!")
450 case State.Acquisition:
451 logger.info("Starting control in acquisition state.")
452 if self._run_get_image_task:
453 logger.debug("Image task already running")
454 else:
455 logger.debug("start image")
456 loop = asyncio.get_event_loop()
457 self._run_get_image_task = loop.create_task(
458 self.run_get_image()
459 )
460 await self.control_acquisition()
461 case State.Charge:
462 pass
463 case State.Safe:
464 logger.warning("State transitioned to SAFE!")
465 case State.Communication:
466 pass
467 case State.Deployment:
468 logger.warning("State transitioned to DEPLOYMENT!")
469 case _:
470 logger.warning(f"Unknown state {self.get_current_state()}")
471 if self.get_current_state() != State.Acquisition:
472 self._accelerating = False
473 if self.get_current_state() != State.Transition:
474 if self.target_state != self.get_current_state():
475 logger.warning(
476 f"Planned state transition to {self.target_state} failed, now in {self.get_current_state()}"
477 )
478 else:
479 logger.debug(
480 f"Planned state transition to {self.target_state} succeeded."
481 )
482 self.target_state = None
484 await self.plan_state_switching()
486 async def get_image(self) -> None:
487 """Captures an image if telemetry data is available and conditions are met.
489 If no telemetry data is available, the function waits for the next observation cycle.
490 It checks various conditions (e.g., acceleration, camera angle, timing) before fetching an image
491 from an external API and saving it with appropriate metadata.
493 Returns:
494 None
495 """
496 logger.debug("Getting image")
497 if self.current_telemetry is None:
498 logger.warning(
499 f"No telemetry data available. Waiting {settings.OBSERVATION_REFRESH_RATE}s for next image."
500 )
501 image_task = Timer(
502 timeout=settings.OBSERVATION_REFRESH_RATE, callback=self.get_image
503 ).get_task()
504 await asyncio.gather(image_task)
505 return
506 if not self._aiohttp_session:
507 self._aiohttp_session = aiohttp.ClientSession()
509 # Filter out cases where no image should be taken
511 if (
512 settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective
513 or settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective
514 ) and not self._z_obj_list:
515 logger.warning(
516 "Skipped image: In Objectives_only mode, but z_obj_list emtpy!"
517 )
518 return
520 if self.current_telemetry.angle != settings.TARGET_CAMERA_ANGLE_ACQUISITION:
521 logger.info(
522 f"Skipped image: current_angle={self.current_telemetry.angle} vs target={settings.TARGET_CAMERA_ANGLE_ACQUISITION}"
523 )
524 return
525 if self._accelerating:
526 logger.info(
527 f"Skipped image: accelerating to: {self._target_vel_x} {self._target_vel_y}"
528 )
529 return
531 if settings.DO_TIMING_CHECK and settings.START_TIME > datetime.datetime.now(
532 datetime.timezone.utc
533 ):
534 logger.warning(
535 f"Skipped image, to early: start={settings.START_TIME} current_time={live_utc()}"
536 )
537 return
538 if settings.DO_TIMING_CHECK and live_utc() > settings.STOP_TIME:
539 logger.warning(
540 f"Skipped image, to late: end={settings.STOP_TIME} current_time={live_utc()}"
541 )
542 return
544 # save the current telemetry values, so they dont get overwritten by a later update
545 tele_timestamp = self.current_telemetry.timestamp
546 tele_x = self.current_telemetry.width_x
547 tele_y = self.current_telemetry.height_y
548 tele_vx = self.current_telemetry.vx
549 tele_vy = self.current_telemetry.vy
550 tele_simSpeed = self.get_simulation_speed()
551 tele_angle = self.current_telemetry.angle
553 lens_size = lens_size_by_angle(self.current_telemetry.angle)
554 """
555 # TODO add box check if melvin and objective overlap
556 # check if we are in range of an objective
557 # TODO also check MELVINTasks
558 # TODO check if within box, unless hidden
559 melvin_box = (
560 tele_x - lens_size / 2,
561 tele_y - lens_size / 2,
562 tele_x + lens_size / 2,
563 tele_y + lens_size / 2,
564 )
565 if self._z_obj_list[0].zone is None:
566 logger.warning("Hidden objective, taking photo")
567 return
568 TODO
569 else:
570 logger.warning("Checking if in range of objective:")
571 objective_box = self._z_obj_list[0].zone
573 if not boxes_overlap_in_grid(melvin_box, objective_box):
574 logger.error(
575 f"Image skipped, not Overlapping! {melvin_box} {objective_box}"
576 )
577 return
578 """
579 async with aiohttp.ClientSession() as session:
580 try:
581 async with session.get(con.IMAGE_ENDPOINT) as response:
582 if response.status == 200:
583 # Extract exact image timestamp
584 img_timestamp = response.headers.get("image-timestamp")
585 if img_timestamp is None:
586 logger.error(
587 "Image timestamp not found in headers, substituting with current time"
588 )
589 parsed_img_timestamp = datetime.datetime.now()
590 else:
591 parsed_img_timestamp = datetime.datetime.fromisoformat(
592 img_timestamp
593 )
595 # Calculate the difference between the img and the last telemetry
596 difference_in_seconds = (
597 parsed_img_timestamp - tele_timestamp
598 ).total_seconds()
600 adj_x = round(
601 tele_x + (difference_in_seconds * tele_vx * tele_simSpeed)
602 ) - (lens_size / 2)
603 adj_y = round(
604 tele_y + (difference_in_seconds * tele_vy * tele_simSpeed)
605 ) - (lens_size / 2)
607 # TODO check if images are correct!
608 # TODO might also need modulo for side cases
609 # logger.debug(f"T {parsed_img_timestamp} | C {tele_timestamp}")
610 # logger.debug(
611 # f"D {difference_in_seconds} | R {tele_x} ADJ {adj_x}"
612 # )
614 image_path = con.IMAGE_LOCATION.format(
615 melv_id=self._current_obj_name,
616 angle=tele_angle,
617 time=parsed_img_timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"),
618 cor_x=int(adj_x),
619 cor_y=int(adj_y),
620 )
622 logger.info(
623 f"Received image at {adj_x}x {adj_y}y with {self.current_telemetry.angle} angle"
624 )
626 async with async_open(image_path, "wb") as afp:
627 while True:
628 cnt = await response.content.readany()
629 if not cnt:
630 break
631 await afp.write(cnt)
632 else:
633 logger.warning(f"Failed to get image: {response.status}")
634 logger.info(f"Response body: {await response.text()}")
635 logger.info(
636 "This is normal at the end of acquisition mode once."
637 )
638 except aiohttp.client_exceptions.ConnectionTimeoutError:
639 logger.warning("Observations endpoint timeouted.")
640 except asyncio.exceptions.CancelledError:
641 logger.warning("Get image task was cancelled.")
643 async def run_get_image(self) -> None:
644 """Continuously captures images while in the Acquisition state.
646 This function continuously captures images at calculated intervals,
647 adjusting timing based on velocity and acceleration.
649 Returns:
650 None
651 """
652 logger.debug("Starting run_get_image")
653 if self.get_current_state() == State.Acquisition:
654 await self.get_image()
655 while self.get_current_state() == State.Acquisition:
656 if self.current_telemetry is None:
657 logger.debug(
658 "No telemetry data available. Assuming Observation Refresh Rate."
659 )
660 delay_in_s = float(settings.OBSERVATION_REFRESH_RATE)
661 else:
662 current_total_vel = (
663 self.current_telemetry.vx + self.current_telemetry.vy
664 )
665 if self._accelerating:
666 # If accelerating calculate distance based on current speed and acceleration
667 delay_in_s = (
668 math.sqrt(
669 current_total_vel**2
670 + 2 * con.ACCELERATION * settings.DISTANCE_BETWEEN_IMAGES
671 )
672 - current_total_vel
673 ) / con.ACCELERATION
674 else:
675 # When not accelerating calculate distance based on current speed
676 delay_in_s = (
677 float(settings.DISTANCE_BETWEEN_IMAGES) / current_total_vel
678 )
679 delay_in_s = delay_in_s / self.get_simulation_speed()
680 logger.debug(f"Next image in {delay_in_s}s.")
681 image_task = Timer(timeout=delay_in_s, callback=self.get_image).get_task()
682 await asyncio.gather(image_task)
684 # run once after changing into acquisition mode -> setup
685 async def control_acquisition(self) -> None:
686 """Initializes acquisition mode by updating objectives and setting camera parameters.
688 Retrieves objectives from an external API, determines the current objective, and adjusts
689 acquisition parameters accordingly. Also creates necessary directories for image storage.
691 Returns:
692 None
693 """
694 async with aiohttp.ClientSession() as session:
695 # update Objectives
696 async with session.get(con.OBJECTIVE_ENDPOINT) as response:
697 if response.status == 200:
698 json_response = await response.json()
699 self._z_obj_list: list[ZonedObjective] = ZonedObjective.parse_api(
700 json_response
701 )
702 logger.info(
703 f"Updated objectives, there are {len(self._z_obj_list)} objectives."
704 )
705 else:
706 logger.error("Could not get OBJECTIVE_ENDPOINT")
708 current_obj = None
709 # Always check for new objective in this task
710 if settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective:
711 current_obj = self._z_obj_list[0]
713 logger.error(f"Using next_objective, current task: {current_obj.name}")
715 # In this task look for the given string
716 elif settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective:
717 for obj in self._z_obj_list:
718 if obj.name == settings.FIXED_OBJECTIVE:
719 logger.error(f"Using fixed_objective, current task: {obj}")
720 current_obj = obj
721 break
723 if current_obj:
724 settings.START_TIME = current_obj.start
725 settings.STOP_TIME = current_obj.end
726 settings.TARGET_CAMERA_ANGLE_ACQUISITION = current_obj.optic_required
728 self._current_obj_name = str(current_obj.id) + current_obj.name.replace(
729 " ", ""
730 )
732 con.IMAGE_PATH = con.IMAGE_PATH_BASE + self._current_obj_name + "/"
734 con.IMAGE_LOCATION = (
735 con.IMAGE_PATH
736 + "image_{melv_id}_{angle}_{time}_x_{cor_x}_y_{cor_y}.png"
737 )
738 try:
739 subprocess.run(["mkdir", con.IMAGE_PATH], check=True)
740 logger.info(f"Created folder: {con.IMAGE_PATH}")
742 except subprocess.CalledProcessError as e:
743 logger.info(f"z_obj could not mkdir: {e}")
745 # check if change occured and cut the last image
747 await self.trigger_camera_angle_change(settings.TARGET_CAMERA_ANGLE_ACQUISITION)
749 # TODO stop velocity change if battery is low
750 if self.current_telemetry and self.current_telemetry.battery < 10:
751 logger.error("Battery low, cant accelerate any more!")
753 match settings.TARGET_CAMERA_ANGLE_ACQUISITION:
754 case CameraAngle.Wide:
755 await self.trigger_velocity_change(
756 settings.TARGET_SPEED_WIDE_X, settings.TARGET_SPEED_WIDE_Y
757 )
758 case CameraAngle.Narrow:
759 await self.trigger_velocity_change(
760 settings.TARGET_SPEED_NARROW_X, settings.TARGET_SPEED_NARROW_Y
761 )
762 case CameraAngle.Normal:
763 await self.trigger_velocity_change(
764 settings.TARGET_SPEED_NORMAL_X, settings.TARGET_SPEED_NORMAL_Y
765 )
766 case _:
767 pass
769"""Spawn StatePlanner object"""
770state_planner = StatePlanner()