Skip to content

Melvonaut Reference

Ciarc.

__main__

Melvonaut :author: Jonathan Decker

current_telemetry = None module-attribute

cancel_tasks()

Cancels all tasks and event loop.

Returns:

Type Description
None

None

Source code in src/melvonaut/__main__.py
245
246
247
248
249
250
251
252
253
254
def cancel_tasks() -> None:
    """Cancels all tasks and event loop.

    Returns:
        None
    """
    for task in asyncio.all_tasks():
        task.cancel()
    loop = asyncio.get_running_loop()
    loop.stop()

get_announcements(last_id=None) async

Fetches announcements asynchronously with event-stream handling.

This function continuously listens for new announcements from the API and processes them. If announcements are received, they are logged and stored.

Parameters:

Name Type Description Default
last_id Optional[str]

The ID of the last processed event to resume from, if applicable.

None

Returns:

Type Description
Optional[str]

Optional[str]: The ID of the last received announcement, or None if an error occurs.

Source code in src/melvonaut/__main__.py
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
async def get_announcements(last_id: Optional[str] = None) -> Optional[str]:
    """Fetches announcements asynchronously with event-stream handling.

    This function continuously listens for new announcements from the API and processes them.
    If announcements are received, they are logged and stored.

    Args:
        last_id (Optional[str]): The ID of the last processed event to resume from, if applicable.

    Returns:
        Optional[str]: The ID of the last received announcement, or None if an error occurs.
    """
    content_line_regex = re.compile(r"^\[(\d+)]\s*(.*)$")

    headers = {"Accept": "text/event-stream", "Cache-Control": "no-cache"}
    if last_id:
        headers["Last-Event-ID"] = last_id

    timeout = aiohttp.ClientTimeout(
        total=None, connect=None, sock_connect=None, sock_read=None
    )

    async with aiohttp.ClientSession(timeout=timeout) as session:
        try:
            async with session.get(
                con.ANNOUNCEMENTS_ENDPOINT, headers=headers
            ) as response:
                if response.status not in [200, 301, 307]:
                    logger.error(f"Failed to get announcements: {response.status}")
                    await session.close()
                    return None
                else:
                    # logger.error(response.content)
                    # async for line in response.content:
                    #    logger.error(line)
                    async for line in response.content:
                        line_decoded = line.decode("utf-8")
                        # logger.warning(f"Received announcement {line}")
                        # logger.warning(f"Location is: {state_planner.calc_current_location()}")
                        # logger.warning(f"Received announcement with content:{line_decoded}")
                        line_filtered = line_decoded.replace("data:", "").strip()

                        match = content_line_regex.search(line_filtered)
                        if match:
                            line_id = int(match.group(1))
                            line_content = str(match.group(2))
                            timestamp = datetime.now(timezone.utc)
                            current_x, current_y = state_planner.calc_current_location()

                            current_event = Event(
                                event=line_content,
                                id=line_id,
                                timestamp=timestamp,
                                current_x=current_x,
                                current_y=current_y,
                            )

                            logger.warning(
                                f"Received announcement: {current_event.model_dump()}"
                            )
                            await current_event.to_csv()
                            state_planner.recent_events.append(current_event)
                            last_id = str(current_event.id)
        except TimeoutError:
            logger.error("Announcements subscription timed out")
        finally:
            if response and not response.closed:
                response.close()
            if not session.closed:
                await session.close()
            return last_id

get_observations() async

Async get observations from the Melvin API and update the state planner

This function establishes a session with the API and retrieves observation data. If the response is successful, it updates the telemetry state. If any errors occur, they are logged accordingly.

Returns:

Type Description
None

None

Source code in src/melvonaut/__main__.py
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
async def get_observations() -> None:
    """Async get observations from the Melvin API and update the state planner

    This function establishes a session with the API and retrieves observation data.
    If the response is successful, it updates the telemetry state.
    If any errors occur, they are logged accordingly.

    Returns:
        None

    """
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(con.OBSERVATION_ENDPOINT) as response:
                if response.status == 200:
                    json_response = await response.json()
                    # logger.debug("Received observations")
                    # pprint(json_response, indent=4, sort_dicts=True)
                    await state_planner.update_telemetry(MelTelemetry(**json_response))
                else:
                    logger.warning(f"Failed to get observations: {response.status}")
        except aiohttp.client_exceptions.ConnectionTimeoutError:
            logger.warning("Observations endpoint timeouted.")
        except asyncio.TimeoutError:
            logger.warning("ASyncio TimeoutError occured.")
        except aiohttp.client_exceptions.ClientOSError:
            logger.warning("Client_exceptions.ClienOSError occured.")

main()

Melvonaut.

Source code in src/melvonaut/__main__.py
288
289
290
291
292
293
294
295
@click.command()
@click.version_option()
def main() -> None:
    """Melvonaut."""
    utils.setup_logging()
    logger.info("Starting Melvonaut...")

    start_event_loop()

read_images() async

Reads image files asynchronously from a designated directory.

This function iterates over stored images, extracts metadata from filenames, and yields MelvinImage objects.

Yields:

Name Type Description
MelvinImage AsyncIterable[MelvinImage]

An image object containing extracted metadata.

Source code in src/melvonaut/__main__.py
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
async def read_images() -> AsyncIterable[MelvinImage]:
    """Reads image files asynchronously from a designated directory.

    This function iterates over stored images, extracts metadata from filenames, and
    yields `MelvinImage` objects.

    Yields:
        MelvinImage: An image object containing extracted metadata.
    """
    if not os.path.exists(con.IMAGE_PATH):
        logger.warning(f"{con.IMAGE_PATH} does not exist.")
        return

    pattern = r"image_melvonaut_angle_(\w+)_x_(\d+\.\d+)_y_(\d+\.\d+)_(\d{4}-\d{2}-\d{2}_\d{2}-\d{2}-\d{2})\.png"

    for filename in os.listdir(con.IMAGE_PATH):
        if filename.endswith(".png"):
            image_path = os.path.join(con.IMAGE_PATH, filename)
            try:
                async with async_open(image_path, "rb") as afp:
                    data = await afp.read()
                    image = Image.open(io.BytesIO(data))
            except FileNotFoundError as e:
                logger.warning(f"{image_path} does not exist.")
                logger.debug(e)
                continue
            except IOError as e:
                logger.warning(f"Failed to read {image_path}")
                logger.debug(e)
                continue
            except ValueError as e:
                logger.warning(f"Failed to parse {image_path}")
                logger.debug(e)
                continue
            match = re.match(pattern, filename)
            if match:
                angle = CameraAngle(match.group(1))
                cor_x = int(match.group(2))
                cor_y = int(match.group(3))
                time = datetime.strptime(match.group(4), "%Y-%m-%d_%H-%M-%S")
                yield MelvinImage(
                    image=image, angle=angle, cor_x=cor_x, cor_y=cor_y, time=time
                )
            else:
                logger.warning(f"Failed to parse {filename}.")

run_get_announcements() async

Continuously fetches announcements from the API.

This function runs in an infinite loop, restarting the subscription when needed.

Returns:

Type Description
None

None

Source code in src/melvonaut/__main__.py
173
174
175
176
177
178
179
180
181
182
183
184
async def run_get_announcements() -> None:
    """Continuously fetches announcements from the API.

    This function runs in an infinite loop, restarting the subscription when needed.

    Returns:
        None
    """
    logger.warning("Started announcements subscription")
    while True:
        await asyncio.gather(get_announcements())
        logger.warning("Restarted announcements subscription")

run_get_observations() async

Runs the observation fetching function in a loop.

This function repeatedly fetches observations based on a specified refresh rate, adjusting for simulation speed.

Returns:

Type Description
None

None

Source code in src/melvonaut/__main__.py
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
async def run_get_observations() -> None:
    """Runs the observation fetching function in a loop.

    This function repeatedly fetches observations based on a specified refresh rate,
    adjusting for simulation speed.

    Returns:
        None
    """
    await get_observations()
    while True:
        # logger.debug("Submitted observations request")
        observe_task = Timer(
            timeout=settings.OBSERVATION_REFRESH_RATE
            / state_planner.get_simulation_speed(),
            callback=get_observations,
        ).get_task()
        await asyncio.gather(observe_task)

run_read_images() async

Log all receives images.

Returns:

Type Description
None

None

Source code in src/melvonaut/__main__.py
235
236
237
238
239
240
241
242
async def run_read_images() -> None:
    """Log all receives images.

    Returns:
        None
    """
    async for image in read_images():
        logger.debug(f"Received image: {image}")

start_event_loop()

Initializes and starts the asynchronous event loop.

This function sets up signal handlers, registers tasks for fetching observations, announcements, and API interactions, and starts the event loop.

Returns:

Type Description
None

None

Source code in src/melvonaut/__main__.py
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
def start_event_loop() -> None:
    """Initializes and starts the asynchronous event loop.

    This function sets up signal handlers, registers tasks for fetching observations,
    announcements, and API interactions, and starts the event loop.

    Returns:
        None
    """
    loop = uvloop.new_event_loop()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.add_signal_handler(sig, cancel_tasks)

    loop.set_default_executor(concurrent.futures.ThreadPoolExecutor(max_workers=1))

    loop.create_task(run_get_observations())
    loop.create_task(run_get_announcements())

    loop.create_task(api.run_api())

    # loop.create_task(run_read_images())

    loop.run_forever()

    for sig in (signal.SIGINT, signal.SIGTERM):
        loop.remove_signal_handler(sig)

    logger.info("Shutting down Melvonaut...")

api

Handler = Callable[[web.Request], Awaitable[web.StreamResponse]] module-attribute

catcher_middleware(request, handler) async

Middleware to catch and log unhandled exceptions in request handling.

If an exception occurs while processing the request, this middleware logs the error and returns a 500 Internal Server Error response with the error message.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required
handler Handler

The next request handler in the middleware chain.

required

Returns:

Name Type Description
Any Any

The HTTP response from the handler, or a 500 error response

Any

if an exception occurs.

Source code in src/melvonaut/api.py
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
@web.middleware
async def catcher_middleware(request: web.Request, handler: Handler) -> Any:
    """Middleware to catch and log unhandled exceptions in request handling.

    If an exception occurs while processing the request, this middleware
    logs the error and returns a 500 Internal Server Error response
    with the error message.

    Args:
        request (web.Request): The incoming HTTP request.
        handler (Handler): The next request handler in the middleware chain.

    Returns:
        Any: The HTTP response from the handler, or a 500 error response
        if an exception occurs.
    """
    try:
        return await handler(request)
    except Exception as e:
        logger.exception(e)
        return web.Response(status=500, text=str(e))

compression_middleware(request, handler) async

Middleware to handle response compression using gzip or deflate.

This middleware checks the Accept-Encoding header of the request to determine if the client supports gzip or deflate compression. If supported, it applies the corresponding compression to the response.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required
handler Handler

The next request handler in the middleware chain.

required

Returns:

Name Type Description
Any Any

The compressed HTTP response if the client supports it,

Any

otherwise the original response.

Source code in src/melvonaut/api.py
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
@web.middleware
async def compression_middleware(request: web.Request, handler: Handler) -> Any:
    """Middleware to handle response compression using gzip or deflate.

    This middleware checks the `Accept-Encoding` header of the request
    to determine if the client supports gzip or deflate compression.
    If supported, it applies the corresponding compression to the response.

    Args:
        request (web.Request): The incoming HTTP request.
        handler (Handler): The next request handler in the middleware chain.

    Returns:
        Any: The compressed HTTP response if the client supports it,
        otherwise the original response.
    """
    accept_encoding = request.headers.get(hdrs.ACCEPT_ENCODING, "").lower()

    if ContentCoding.gzip.value in accept_encoding:
        compressor = ContentCoding.gzip.value
    elif ContentCoding.deflate.value in accept_encoding:
        compressor = ContentCoding.deflate.value
    else:
        return await handler(request)

    resp = await handler(request)
    resp.headers[hdrs.CONTENT_ENCODING] = compressor
    resp.enable_compression()
    return resp

get_all_settings(request) async

Retrieve all settings configured in the system.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing all system settings.

Source code in src/melvonaut/api.py
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
async def get_all_settings(request: web.Request) -> web.Response:
    """Retrieve all settings configured in the system.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing all system settings.
    """
    logger.debug("Getting all settings")
    attrs = [key for key in dir(settings) if not key.startswith("_") and key.isupper()]
    all_settings: dict[str, Any] = {}
    for attr in attrs:
        value = settings.__getattribute__(attr)
        if type(value) is float or type(value) is int:
            all_settings[attr] = value
        elif type(value) is datetime.datetime:
            all_settings[attr] = value.isoformat()
        else:
            all_settings[attr] = str(value)
    return web.json_response(all_settings, status=200)

get_clear_all_images(request) async

Clears all stored images.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing a list of cleared images.

Source code in src/melvonaut/api.py
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
async def get_clear_all_images(request: web.Request) -> web.Response:
    """Clears all stored images.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing a list of cleared images.
    """
    logger.debug("Clearing all images")
    folder = pathlib.Path(con.IMAGE_PATH_BASE)
    images = [str(file) for file in folder.rglob("*.png") if file.is_file()]
    for image in images:
        pathlib.Path(image).unlink()
    return web.Response(status=200, text="OK")

get_clear_all_logs(request) async

Clears all log files in the system.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing a list of cleared log files.

Source code in src/melvonaut/api.py
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
async def get_clear_all_logs(request: web.Request) -> web.Response:
    """Clears all log files in the system.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing a list of cleared log files.
    """
    try:
        logger.debug("Clearing all log files")
        log_files = []
        folder = pathlib.Path(con.MEL_LOG_PATH)
        for file in folder.iterdir():
            if file.is_dir():
                continue
            if not file.name.endswith(".log"):
                continue
            log_files.append(file.name)
            file.unlink()
        utils.setup_file_logging()
        return web.json_response({"Cleared_files": log_files}, status=200)
    except Exception as e:
        logger.exception(e)
        return web.json_response({"error": str(e)}, status=500)

get_clear_events(request) async

Deletes the event log file from the system.

If the event log file exists, it is deleted. If it does not exist, a 404 response is returned.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: A 200 response if the file is successfully deleted.

Response

web.Response: A 404 response if the file is not found.

Source code in src/melvonaut/api.py
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
async def get_clear_events(request: web.Request) -> web.Response:
    """Deletes the event log file from the system.

    If the event log file exists, it is deleted. If it does not exist, a 404 response is returned.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: A 200 response if the file is successfully deleted.
        web.Response: A 404 response if the file is not found.
    """
    logger.debug("Clearing events")
    events_file = pathlib.Path(con.EVENT_LOCATION_CSV)
    if events_file.exists():
        events_file.unlink()
        return web.Response(status=200, text="OK")
    else:
        return web.Response(status=404, text="File not found")

get_clear_telemetry(request) async

Clears the telemetry data file.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: A success response if the file is deleted, otherwise a 404 response.

Source code in src/melvonaut/api.py
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
async def get_clear_telemetry(request: web.Request) -> web.Response:
    """Clears the telemetry data file.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: A success response if the file is deleted, otherwise a 404 response.
    """
    logger.debug("Clearing telemetry")
    telemetry_file = pathlib.Path(con.TELEMETRY_LOCATION_CSV)
    if telemetry_file.exists():
        telemetry_file.unlink()
        return web.Response(status=200, text="OK")
    else:
        return web.Response(status=404, text="File not found")

get_cpu_usage(request) async

Retrieve CPU usage statistics.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing CPU usage data including user, system, idle time, percent usage, core count, and frequency.

Source code in src/melvonaut/api.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
async def get_cpu_usage(request: web.Request) -> web.Response:
    """Retrieve CPU usage statistics.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing CPU usage data including user, system, idle time, percent usage, core count, and frequency.
    """
    logger.debug("Getting CPU usage")
    cpu_usage = psutil.cpu_times()
    cpu_percent = psutil.cpu_percent()
    cpu_count = psutil.cpu_count()
    cpu_freq = psutil.cpu_freq()
    cpu = {
        "user": cpu_usage.user,
        "system": cpu_usage.system,
        "idle": cpu_usage.idle,
        "percent": cpu_percent,
        "physical_cores": cpu_count,
        "current_freq": cpu_freq.current,
        "max_freq": cpu_freq.max,
        "min_freq": cpu_freq.min,
    }
    return web.json_response(cpu, status=200)

get_disk_usage(request) async

Retrieve disk usage statistics for root and home directories.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing disk usage data.

Source code in src/melvonaut/api.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
async def get_disk_usage(request: web.Request) -> web.Response:
    """Retrieve disk usage statistics for root and home directories.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing disk usage data.
    """
    logger.debug("Getting disk usage")
    disk_root = psutil.disk_usage("/")
    disk_home = psutil.disk_usage("/home")
    return web.json_response(
        {"root": disk_root._asdict(), "home": disk_home._asdict()}, status=200
    )

get_download_events(request) async

Handles the download of event logs.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response | FileResponse

web.FileResponse: The requested event log file if it exists.

Response | FileResponse

web.Response: A 404 response if the file is not found.

Source code in src/melvonaut/api.py
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
async def get_download_events(request: web.Request) -> web.Response | web.FileResponse:
    """Handles the download of event logs.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.FileResponse: The requested event log file if it exists.
        web.Response: A 404 response if the file is not found.
    """
    logger.debug("Downloading events")
    events_file = pathlib.Path(con.EVENT_LOCATION_CSV)
    if events_file.exists():
        return web.FileResponse(events_file, status=200)
    else:
        return web.Response(status=404, text="File not found")

get_download_events_and_clear(request) async

Downloads and clears the event log file.

This function retrieves the event log file, sends its content as a response, and then deletes the file from the system.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: A response containing the event log content if found.

Response

web.Response: A 404 response if the file is not found.

Source code in src/melvonaut/api.py
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
async def get_download_events_and_clear(request: web.Request) -> web.Response:
    """Downloads and clears the event log file.

    This function retrieves the event log file, sends its content as a response,
    and then deletes the file from the system.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: A response containing the event log content if found.
        web.Response: A 404 response if the file is not found.
    """
    logger.debug("Downloading events and clearing")
    events_file = pathlib.Path(con.EVENT_LOCATION_CSV)
    if events_file.exists():
        events_file_content = StringIO(events_file.read_text())
        try:
            return web.Response(body=events_file_content, status=200)
        finally:
            events_file.unlink()
    else:
        return web.Response(status=404, text="File not found")

get_download_telemetry(request) async

Handles telemetry data download requests.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response | FileResponse

web.Response: The telemetry file if it exists, otherwise a 404 response.

Source code in src/melvonaut/api.py
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
async def get_download_telemetry(
    request: web.Request,
) -> web.Response | web.FileResponse:
    """Handles telemetry data download requests.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: The telemetry file if it exists, otherwise a 404 response.
    """
    logger.debug("Downloading telemetry")
    telemetry_file = pathlib.Path(con.TELEMETRY_LOCATION_CSV)
    if telemetry_file.exists():
        return web.FileResponse(telemetry_file, status=200)
    else:
        return web.Response(status=404, text="File not found")

get_download_telemetry_and_clear(request) async

Handles telemetry data download requests and deletes the file after serving it.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: The telemetry file if it exists, otherwise a 404 response.

Source code in src/melvonaut/api.py
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
async def get_download_telemetry_and_clear(request: web.Request) -> web.Response:
    """Handles telemetry data download requests and deletes the file after serving it.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: The telemetry file if it exists, otherwise a 404 response.
    """
    logger.debug("Downloading telemetry and clearing")
    telemetry_file = pathlib.Path(con.TELEMETRY_LOCATION_CSV)
    if telemetry_file.exists():
        telemetry_file_content = StringIO(telemetry_file.read_text())
        try:
            return web.Response(body=telemetry_file_content, status=200)
        finally:
            telemetry_file.unlink()
    else:
        return web.Response(status=404, text="File not found")

get_list_images(request) async

Lists all available image files.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing a list of image filenames.

Source code in src/melvonaut/api.py
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
async def get_list_images(request: web.Request) -> web.Response:
    """Lists all available image files.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing a list of image filenames.
    """
    logger.debug("Listing images")
    folder = pathlib.Path(con.IMAGE_PATH_BASE)
    if not folder.exists():
        return web.Response(status=404, text=f"Folder not found: {folder}")
    images = [str(file.name) for file in folder.rglob("*.png") if file.is_file()]
    return web.json_response({"images": images}, status=200)

get_list_log_files(request) async

Retrieve a list of log files from the log directory.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing a list of log filenames.

Source code in src/melvonaut/api.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
async def get_list_log_files(request: web.Request) -> web.Response:
    """Retrieve a list of log files from the log directory.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing a list of log filenames.
    """
    logger.debug("Listing log files")
    log_files = []
    folder = pathlib.Path(con.MEL_LOG_PATH)
    try:
        for file in folder.iterdir():
            if not file.is_file():
                continue
            if not file.name.endswith(".log"):
                continue
            log_files.append(file.name)
    except Exception as e:
        return web.json_response({"error": str(e)}, status=500)
    return web.json_response({"log_files": log_files}, status=200)

get_melvin_version(request) async

Retrieve the current version of the Melvin service.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing the Melvin service version.

Source code in src/melvonaut/api.py
167
168
169
170
171
172
173
174
175
176
177
178
async def get_melvin_version(request: web.Request) -> web.Response:
    """Retrieve the current version of the Melvin service.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing the Melvin service version.
    """
    return web.json_response(
        {"version": importlib.metadata.version("ciarc")}, status=200
    )

get_memory_usage(request) async

Retrieve memory usage statistics.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: JSON response containing memory usage data.

Source code in src/melvonaut/api.py
49
50
51
52
53
54
55
56
57
58
59
60
async def get_memory_usage(request: web.Request) -> web.Response:
    """Retrieve memory usage statistics.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: JSON response containing memory usage data.
    """
    logger.debug("Getting memory usage")
    memory_usage = psutil.virtual_memory()
    return web.json_response(memory_usage._asdict(), status=200)

get_reset_settings(request) async

Resets all settings to their default values.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: A success response confirming the reset.

Source code in src/melvonaut/api.py
522
523
524
525
526
527
528
529
530
531
532
533
async def get_reset_settings(request: web.Request) -> web.Response:
    """Resets all settings to their default values.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: A success response confirming the reset.
    """
    logger.debug("Resetting settings")
    settings.clear_settings()
    return web.Response(status=200, text="OK")

get_restart_melvin(request) async

Handles a request to restart the Melvin service.

This endpoint is not yet implemented and always returns a 501 Not Implemented response.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: A response indicating that the operation is not implemented.

Source code in src/melvonaut/api.py
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
async def get_restart_melvin(request: web.Request) -> web.Response:
    """Handles a request to restart the Melvin service.

    This endpoint is not yet implemented and always returns a 501 Not Implemented response.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: A response indicating that the operation is not implemented.
    """
    return web.Response(status=501, text="Not Implemented")

get_shutdown_melvin(request) async

Handles a request to shut down the Melvin service.

If settings.DO_ACTUALLY_EXIT is set to True, the event loop is stopped, all pending tasks are canceled, and the process exits. Otherwise, a warning is logged.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: A response with status 200 indicating the shutdown request was received.

Source code in src/melvonaut/api.py
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
async def get_shutdown_melvin(request: web.Request) -> web.Response:
    """Handles a request to shut down the Melvin service.

    If `settings.DO_ACTUALLY_EXIT` is set to True, the event loop is stopped,
    all pending tasks are canceled, and the process exits. Otherwise, a warning is logged.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: A response with status 200 indicating the shutdown request was received.
    """
    try:
        return web.Response(status=200, text="OK")
    finally:
        if settings.DO_ACTUALLY_EXIT:
            loop = asyncio.get_running_loop()
            loop.stop()
            pending_tasks = asyncio.all_tasks()
            for task in pending_tasks:
                task.cancel()
            exit()
        else:
            logger.warning("Requested shutdown, but not actually exiting")

health(request) async

Check if the API is running and healthy.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request.

required

Returns:

Type Description
Response

web.Response: A response with status 200 and text "OK".

Source code in src/melvonaut/api.py
20
21
22
23
24
25
26
27
28
29
async def health(request: web.Request) -> web.Response:
    """Check if the API is running and healthy.

    Args:
        request (web.Request): The incoming HTTP request.

    Returns:
        web.Response: A response with status 200 and text "OK".
    """
    return web.Response(status=200, text="OK")

post_clear_log(request) async

Handles log file deletion requests.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the log file name in JSON format.

required

Returns:

Type Description
Response

web.Response: A success response if the log file is cleared, otherwise an error response.

Source code in src/melvonaut/api.py
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
async def post_clear_log(request: web.Request) -> web.Response:
    """Handles log file deletion requests.

    Args:
        request (web.Request): The incoming HTTP request containing the log file name in JSON format.

    Returns:
        web.Response: A success response if the log file is cleared, otherwise an error response.
    """
    data = await request.json()
    logger.debug(f"Clearing log: {data}")
    log_file = pathlib.Path(con.MEL_LOG_PATH) / data.get("file")
    if log_file.exists():
        if log_file.is_dir():
            return web.Response(status=400, text=f"{log_file} is a directory")
        if not log_file.name.endswith(".log"):
            return web.Response(status=400, text=f"{log_file} is not a log file")
        log_file.unlink()
        utils.setup_file_logging()
        return web.Response(status=200, text=f"{log_file} cleared")
    else:
        return web.Response(status=404, text=f"{log_file} not found")

post_clear_setting(request) async

Clears a specific setting.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the setting name in JSON format.

required

Returns:

Type Description
Response

web.Response: A success response if the setting is cleared.

Source code in src/melvonaut/api.py
552
553
554
555
556
557
558
559
560
561
562
563
564
565
async def post_clear_setting(request: web.Request) -> web.Response:
    """Clears a specific setting.

    Args:
        request (web.Request): The incoming HTTP request containing the setting name in JSON format.

    Returns:
        web.Response: A success response if the setting is cleared.
    """
    logger.debug("Clearing settings")
    data = await request.json()
    logger.debug(f"Clearing settings: {data}")
    settings.delete_settings(data.keys())
    return web.Response(status=200, text="OK")

post_download_image(request) async

Handles image file download requests.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the image filename in JSON format.

required

Returns:

Type Description
Response | FileResponse

web.Response: The requested image file if it exists, otherwise a 404 response.

Source code in src/melvonaut/api.py
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
async def post_download_image(request: web.Request) -> web.Response | web.FileResponse:
    """Handles image file download requests.

    Args:
        request (web.Request): The incoming HTTP request containing the image filename in JSON format.

    Returns:
        web.Response: The requested image file if it exists, otherwise a 404 response.
    """
    data = await request.json()
    logger.debug(f"Downloading image: {data}")
    image_file = pathlib.Path(con.IMAGE_PATH_BASE) / data.get("file")
    if image_file.exists():
        return web.FileResponse(image_file, status=200)
    else:
        return web.Response(status=404, text="File not found")

post_download_image_and_clear(request) async

Handles image file download requests and deletes the file after serving it.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the image filename in JSON format.

required

Returns:

Type Description
Response

web.Response: The requested image file if it exists, otherwise a 404 response.

Source code in src/melvonaut/api.py
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
async def post_download_image_and_clear(request: web.Request) -> web.Response:
    """Handles image file download requests and deletes the file after serving it.

    Args:
        request (web.Request): The incoming HTTP request containing the image filename in JSON format.

    Returns:
        web.Response: The requested image file if it exists, otherwise a 404 response.
    """
    data = await request.json()
    logger.debug(f"Downloading image and clearing: {data}")
    image_file = pathlib.Path(con.IMAGE_PATH_BASE) / data.get("file")
    if image_file.exists():
        image_file_content = BytesIO(image_file.read_bytes())
        try:
            return web.Response(body=image_file_content, status=200)
        finally:
            image_file.unlink()
    else:
        return web.Response(status=404, text="File not found")

post_download_log(request) async

Handles log file download requests.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the log file name in JSON format.

required

Returns:

Type Description
Response | FileResponse

web.Response: The requested log file if it exists, otherwise a 404 response.

Source code in src/melvonaut/api.py
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
async def post_download_log(request: web.Request) -> web.Response | web.FileResponse:
    """Handles log file download requests.

    Args:
        request (web.Request): The incoming HTTP request containing the log file name in JSON format.

    Returns:
        web.Response: The requested log file if it exists, otherwise a 404 response.
    """
    data = await request.json()
    logger.debug(f"Downloading log: {data}")
    log_file = pathlib.Path(con.MEL_LOG_PATH) / data.get("file")
    if log_file.exists():
        return web.FileResponse(log_file, status=200)
    else:
        return web.Response(status=404, text="File not found")

post_download_log_and_clear(request) async

Handles log file download requests and deletes the file after serving it.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the log file name in JSON format.

required

Returns:

Type Description
Response

web.Response: The requested log file if it exists, otherwise a 404 response.

Source code in src/melvonaut/api.py
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
async def post_download_log_and_clear(request: web.Request) -> web.Response:
    """Handles log file download requests and deletes the file after serving it.

    Args:
        request (web.Request): The incoming HTTP request containing the log file name in JSON format.

    Returns:
        web.Response: The requested log file if it exists, otherwise a 404 response.
    """
    data = await request.json()
    logger.debug(f"Downloading log and clearing: {data}")
    log_file = pathlib.Path(con.MEL_LOG_PATH) / data.get("file")
    if log_file.exists():
        log_file_content = StringIO(log_file.read_text())
        try:
            return web.Response(body=log_file_content, status=200)
        finally:
            log_file.unlink()
            utils.setup_file_logging()
    else:
        return web.Response(status=404, text="File not found")

post_execute_command(request) async

Execute a shell command asynchronously.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing JSON data with the command to execute.

required

Returns:

Type Description
Response

web.Response: JSON response containing command output and return code.

Source code in src/melvonaut/api.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
async def post_execute_command(request: web.Request) -> web.Response:
    """Execute a shell command asynchronously.

    Args:
        request (web.Request): The incoming HTTP request containing JSON data with the command to execute.

    Returns:
        web.Response: JSON response containing command output and return code.
    """
    data = await request.json()
    cmd = data.get("cmd")
    logger.debug(f"Executing command: {cmd}")
    output = []
    try:
        process = await asyncio.create_subprocess_shell(
            cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
        )
        while True:
            if process.stdout:
                line = await process.stdout.readline()
                if not line:
                    break
                output.append(line.decode())
        await process.wait()
        return_code = process.returncode
        logger.debug(f"Command output: {output}")
        return web.json_response(
            {"output": output, "return_code": return_code}, status=200
        )
    except asyncio.TimeoutError:
        return web.json_response({"output": output, "error": "Timeout"}, status=500)
    except asyncio.CancelledError:
        return web.json_response({"output": output, "error": "Cancelled"}, status=500)
    except Exception as e:
        return web.json_response({"output": output, "error": str(e)}, status=500)

post_get_setting(request) async

Retrieves a specific setting.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the setting name in JSON format.

required

Returns:

Type Description
Response

web.Response: JSON response containing the requested setting value.

Source code in src/melvonaut/api.py
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
async def post_get_setting(request: web.Request) -> web.Response:
    """Retrieves a specific setting.

    Args:
        request (web.Request): The incoming HTTP request containing the setting name in JSON format.

    Returns:
        web.Response: JSON response containing the requested setting value.
    """
    logger.debug("Getting settings")
    data = await request.json()
    logger.debug(f"Requested settings: {data}")
    response_settings = {}
    try:
        for key in data.keys():
            response_settings[key] = settings.__getattribute__(key)
    except AttributeError:
        return web.Response(status=404, text=f"Setting not found: {key}")
    logger.debug(f"Response settings: {response_settings}")
    return web.json_response(response_settings, status=200)

post_set_melvin_task(request) async

Sets a task for Melvin (a task management system).

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing the task details in JSON format.

required

Returns:

Type Description
Response

web.Response: A success response if the task is set.

Source code in src/melvonaut/api.py
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
async def post_set_melvin_task(request: web.Request) -> web.Response:
    """Sets a task for Melvin (a task management system).

    Args:
        request (web.Request): The incoming HTTP request containing the task details in JSON format.

    Returns:
        web.Response: A success response if the task is set.
    """
    data = await request.json()
    logger.debug(f"Setting melvin task: {data}")
    task = data.get("task", None)
    if not task:
        logger.warning("Missing field task")
        return web.Response(status=400, text="Missing field task")
    try:
        melvin_task = models.MELVINTask(task)
    except ValueError:
        logger.warning("Invalid task")
        return web.Response(status=400, text="Invalid task")
    except Exception as e:
        logger.warning(f"Error setting melvin task: {e}")
        return web.Response(status=500, text=str(e))
    settings.set_settings({"CURRENT_MELVIN_TASK": melvin_task})
    return web.Response(status=200, text="OK")

post_set_setting(request) async

Sets a new configuration setting.

Parameters:

Name Type Description Default
request Request

The incoming HTTP request containing settings in JSON format.

required

Returns:

Type Description
Response

web.Response: A success response if the setting is applied.

Source code in src/melvonaut/api.py
536
537
538
539
540
541
542
543
544
545
546
547
548
549
async def post_set_setting(request: web.Request) -> web.Response:
    """Sets a new configuration setting.

    Args:
        request (web.Request): The incoming HTTP request containing settings in JSON format.

    Returns:
        web.Response: A success response if the setting is applied.
    """
    logger.debug("Setting settings")
    data = await request.json()
    logger.debug(f"Setting settings: {data}")
    settings.set_settings(data)
    return web.Response(status=200, text="OK")

run_api() async

Starts the web API server.

Returns:

Type Description
None

None

Source code in src/melvonaut/api.py
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
async def run_api() -> None:
    """Starts the web API server.

    Returns:
        None
    """
    logger.debug("Setting up API server")
    settings.init_settings()
    app = web.Application(middlewares=[compression_middleware, catcher_middleware])
    setup_routes(app)
    runner = web.AppRunner(app)
    await runner.setup()
    site = web.TCPSite(runner, "0.0.0.0", settings.API_PORT)
    try:
        logger.info(f"API server started on port {settings.API_PORT}")
        await site.start()
        logger.debug("API server started")
    finally:
        # das hat bei mir direkt nach dem start ausgelöst
        # logger.debug("Shutting down API server")
        # await runner.cleanup()
        pass

setup_routes(app)

Sets up API routes for the web application.

Parameters:

Name Type Description Default
app Application

The web application instance.

required
Source code in src/melvonaut/api.py
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
def setup_routes(app: web.Application) -> None:
    """Sets up API routes for the web application.

    Args:
        app (web.Application): The web application instance.
    """
    app.router.add_post("/api/post_download_log", post_download_log)
    app.router.add_get("/api/get_download_telemetry", get_download_telemetry)
    app.router.add_get("/api/get_download_events", get_download_events)
    app.router.add_post("/api/post_download_image", post_download_image)
    app.router.add_post("/api/post_set_melvin_task", post_set_melvin_task)
    app.router.add_get("/api/get_reset_settings", get_reset_settings)
    app.router.add_post("/api/post_set_setting", post_set_setting)
    app.router.add_post("/api/post_clear_setting", post_clear_setting)
    app.router.add_post("/api/post_clear_log", post_clear_log)
    app.router.add_post("/api/post_get_setting", post_get_setting)
    app.router.add_get("/api/get_all_settings", get_all_settings)
    app.router.add_post("/api/post_download_log_and_clear", post_download_log_and_clear)
    app.router.add_get(
        "/api/get_download_telemetry_and_clear", get_download_telemetry_and_clear
    )
    app.router.add_get(
        "/api/get_download_events_and_clear", get_download_events_and_clear
    )
    app.router.add_post(
        "/api/post_download_image_and_clear", post_download_image_and_clear
    )
    app.router.add_get("/api/get_clear_all_logs", get_clear_all_logs)
    app.router.add_get("/api/get_clear_telemetry", get_clear_telemetry)
    app.router.add_get("/api/get_clear_events", get_clear_events)
    app.router.add_get("/api/get_clear_all_images", get_clear_all_images)
    app.router.add_get("/api/health", health)
    app.router.add_get("/api/get_disk_usage", get_disk_usage)
    app.router.add_get("/api/get_memory_usage", get_memory_usage)
    app.router.add_get("/api/get_cpu_usage", get_cpu_usage)
    app.router.add_get("/api/get_restart_melvin", get_restart_melvin)
    app.router.add_get("/api/get_shutdown_melvin", get_shutdown_melvin)
    app.router.add_post("/api/post_execute_command", post_execute_command)
    app.router.add_get("/api/get_melvin_version", get_melvin_version)
    app.router.add_get("/api/get_list_log_files", get_list_log_files)
    app.router.add_get("/api/get_list_images", get_list_images)

ebt_calc

events = Event.load_events_from_csv(path=path) module-attribute

processed = []

data = [(4097, 7652, 2041), (5758, 8357, 688), (6220, 8553, 1075), (7245, 8989, 1669)]

data = [ (19972.3165561, 113.5243816, 1454.48), (20486.232864, 331.337984, 930.35), (20998.9861724, 548.6578144, 787.93), (21510.18207954, 766.74099024, 1093.99), (18882.99334624, 2295.73420544, 1947.67), (19394.53293776, 2512.96329856, 1450.01), (19908.73421827, 2730.89789112, 1442.63), (20421.30728271, 2948.14119576, 1828.68), (20926.46189231, 3163.05597336, 1651.83), ] for d in data: s = ping( x=int(d[0] / scaling_factor), y=int(d[1] / scaling_factor), d=d[2] / scaling_factor, mind=int((d[2] - max_offset) / scaling_factor), maxd=int((d[2] + max_offset) / scaling_factor), ) processed.append(s) print(f"Added: {s}")

id = 201 module-attribute

max_offset = 325 module-attribute

path = con.CONSOLE_FROM_MELVONAUT_PATH + 'MelvonautEvents.csv' module-attribute

processed = parse_pings(id=id, events=events) module-attribute

res = find_matches(pings=processed) module-attribute

scaling_factor = 1 module-attribute

x_0 = 0 module-attribute

x_max = int(con.WORLD_X / scaling_factor) module-attribute

y_0 = 0 module-attribute

y_max = int(con.WORLD_Y / scaling_factor) module-attribute

distance(x1, x2, y1, y2)

Computes the Euclidean distance between two points, adjusting for wraparound conditions.

Parameters:

Name Type Description Default
x1 int

X-coordinate of the first point.

required
x2 int

X-coordinate of the second point.

required
y1 int

Y-coordinate of the first point.

required
y2 int

Y-coordinate of the second point.

required

Returns:

Name Type Description
float float

The Euclidean distance between the two points.

Source code in src/melvonaut/ebt_calc.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
def distance(x1: int, x2: int, y1: int, y2: int) -> float:
    """
    Computes the Euclidean distance between two points, adjusting for wraparound conditions.

    Args:
        x1 (int): X-coordinate of the first point.
        x2 (int): X-coordinate of the second point.
        y1 (int): Y-coordinate of the first point.
        y2 (int): Y-coordinate of the second point.

    Returns:
        float: The Euclidean distance between the two points.
    """
    if x1 > con.WORLD_X:
        x1 = x1 % con.WORLD_X
    while x1 < 0:
        x1 += con.WORLD_X
    if y1 > con.WORLD_Y:
        y1 = y1 % con.WORLD_Y
    while y1 < 0:
        y1 += con.WORLD_Y
    if x2 > con.WORLD_X:
        x2 = x2 % con.WORLD_X
    while x2 < 0:
        x2 += con.WORLD_X
    if y2 > con.WORLD_Y:
        y2 = y2 % con.WORLD_Y
    while y2 < 0:
        y2 += con.WORLD_Y

    # check for case at edge
    a = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
    b = math.sqrt((con.WORLD_X + x2 - x1) ** 2 + (y2 - y1) ** 2)
    c = math.sqrt((x2 - x1 - con.WORLD_X) ** 2 + (y2 - y1) ** 2)
    d = math.sqrt((x2 - x1) ** 2 + (con.WORLD_Y + y2 - y1) ** 2)
    e = math.sqrt((x2 - x1) ** 2 + (y2 - y1 - con.WORLD_Y) ** 2)
    return min(a, b, c, d, e)

draw_res(id, res, pings, show=False)

Draws and saves a visualization of the computed emergency beacon locations.

Parameters:

Name Type Description Default
id int

Identifier for the emergency beacon tracker.

required
res list[tuple[int, int]]

List of matched coordinate points.

required
pings list[Ping]

List of Ping objects representing detected signals.

required
show bool

Whether to display the plot. Defaults to False.

False

Returns:

Type Description
tuple[int, int]

tuple[int, int]: The estimated centroid of the matched points, or (-1, -1) if no matches were found.

Source code in src/melvonaut/ebt_calc.py
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
def draw_res(
    id: int, res: list[tuple[int, int]], pings: list[Ping], show: bool = False
) -> tuple[int, int]:
    """
    Draws and saves a visualization of the computed emergency beacon locations.

    Args:
        id (int): Identifier for the emergency beacon tracker.
        res (list[tuple[int, int]]): List of matched coordinate points.
        pings (list[Ping]): List of Ping objects representing detected signals.
        show (bool, optional): Whether to display the plot. Defaults to False.

    Returns:
        tuple[int, int]: The estimated centroid of the matched points, or (-1, -1) if no matches were found.
    """

    def find_centroid(points: list[tuple[int, int]]) -> tuple[float, float]:
        """
        Computes the centroid of a set of points.

        Args:
            points (list[tuple[int, int]]): List of coordinate points.

        Returns:
            tuple[float, float]: The centroid coordinates.
        """
        xs, ys = zip(*points)
        centroid_x = sum(xs) / len(xs)
        centroid_y = sum(ys) / len(ys)
        return (centroid_x, centroid_y)

    x_list, y_list = [], []
    for x, y in res:
        x_list.append(x)
        y_list.append(y)

    if res:
        centroid = find_centroid(res)

    plt.style.use("bmh")
    _, ax = plt.subplots()
    plt.title(f"Emergency Beacon Tracker {id} - {len(pings)} pings")
    plt.xlabel("Width")
    plt.ylabel("Height")
    ax.set_xlim(0, x_max)
    ax.set_ylim(0, y_max)

    # plot matched area
    ax.plot(x_list, y_list, "ro", markersize=0.01, zorder=4)
    legend_area = patches.Patch(
        edgecolor="red", facecolor="red", linewidth=1, label="Matched area"
    )

    # plot pings
    for p in pings:
        ax.plot(p.x, p.y, "x", color="grey", markersize=5, zorder=3)
        circle_inner = patches.Circle(
            (p.x, p.y),
            p.mind,
            edgecolor="green",
            facecolor="none",
            linewidth=0.2,
            zorder=2,
        )
        circle_outer = patches.Circle(
            (p.x, p.y),
            p.maxd,
            edgecolor="blue",
            facecolor="none",
            linewidth=0.2,
            zorder=2,
        )
        ax.add_patch(circle_inner)
        ax.add_patch(circle_outer)
    legend_point = Line2D(
        [0],
        [0],
        linestyle="None",
        marker="x",
        markerfacecolor="grey",
        markeredgecolor="grey",
        markersize=6,
        label="Ping Location",
    )
    legend_inner = patches.Patch(
        edgecolor="green", facecolor="none", linewidth=1, label="Minimum Distance"
    )
    legend_outer = patches.Patch(
        edgecolor="blue", facecolor="none", linewidth=1, label="Maximum Distance"
    )

    if res:
        # plot centroid
        circle_guess = patches.Circle(
            (centroid[0], centroid[1]),
            75,
            edgecolor="violet",
            facecolor="violet",
            linewidth=1,
            zorder=5,
        )
        ax.add_patch(circle_guess)
        legend_guess = patches.Patch(
            edgecolor="violet",
            facecolor="violet",
            linewidth=1,
            label=f"Best guess\n({int(centroid[0])}, {int(centroid[1])})",
        )
        ax.legend(
            handles=[
                legend_point,
                legend_inner,
                legend_outer,
                legend_guess,
                legend_area,
            ],
            loc="best",
        )
    else:
        ax.legend(
            handles=[legend_point, legend_inner, legend_outer, legend_area],
            loc="best",
        )

    if show:
        if res:
            logger.info(f"Centroid is: ({int(centroid[0])},{int(centroid[1])})")
        else:
            logger.warning("Could not match any points!")
        plt.show()
    else:
        space = ""
        count = 0
        path = con.CONSOLE_EBT_PATH + f"EBT_{id}_{len(pings)}.png"
        while os.path.isfile(path):
            count += 1
            space = "_" + str(count)
            path = con.CONSOLE_EBT_PATH + f"EBT_{id}_{len(pings)}{space}.png"
        plt.savefig(path, dpi=1000)
    if res:
        return (int(centroid[0]), int(centroid[1]))
    else:
        return (-1, -1)

f(d)

Computes a transformed distance value.

Parameters:

Name Type Description Default
d float

Input distance value.

required

Returns:

Name Type Description
float float

Transformed distance value.

Source code in src/melvonaut/ebt_calc.py
30
31
32
33
34
35
36
37
38
39
40
41
def f(d: float) -> float:
    """
    Computes a transformed distance value.

    Args:
        d (float): Input distance value.

    Returns:
        float: Transformed distance value.
    """
    res = 225 + ((0.4 * (d + 1)) / 4)
    return float(res)

find_matches(pings)

Identifies matching points from the given pings based on distance constraints.

Parameters:

Name Type Description Default
pings list[Ping]

List of Ping objects to process.

required

Returns:

Type Description
list[tuple[int, int]]

list[tuple[int, int]]: List of coordinate pairs that satisfy all constraints.

Source code in src/melvonaut/ebt_calc.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
def find_matches(pings: list[Ping]) -> list[tuple[int, int]]:
    """
    Identifies matching points from the given pings based on distance constraints.

    Args:
        pings (list[Ping]): List of Ping objects to process.

    Returns:
        list[tuple[int, int]]: List of coordinate pairs that satisfy all constraints.
    """
    # Procssed first point
    res = []
    p1 = min(pings, key=lambda p: p.maxd)
    for x in range(p1.x - p1.maxd, p1.x + p1.maxd):
        for y in range(p1.y - p1.maxd, p1.y + p1.maxd):
            if x > x_0 and x < x_max and y > y_0 and y < y_max:
                dist = distance(p1.x, x, p1.y, y)
                if dist > p1.mind and dist < p1.maxd:
                    res.append((x, y))
    logger.info(f"Found {len(res)} possible points on first circle.")

    # Only keep the ones that are in all circles
    filtered_res = []
    for x, y in res:
        is_valid = True
        for pn in pings:
            dist = distance(pn.x, x, pn.y, y)
            if dist < pn.mind or dist > pn.maxd:
                is_valid = False
                break
        if is_valid:
            filtered_res.append((x, y))

    logger.info(f"Found {len(filtered_res)} points that match all pings.")
    return filtered_res

parse_pings(id, events)

Parses event data to extract relevant ping information.

Parameters:

Name Type Description Default
id int

Identifier for the event source.

required
events list[Event]

List of event objects to be processed.

required

Returns:

Type Description
list[Ping]

list[Ping]: A list of parsed Ping objects.

Source code in src/melvonaut/ebt_calc.py
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def parse_pings(id: int, events: list[Event]) -> list[Ping]:
    """
    Parses event data to extract relevant ping information.

    Args:
        id (int): Identifier for the event source.
        events (list[Event]): List of event objects to be processed.

    Returns:
        list[Ping]: A list of parsed Ping objects.
    """
    processed = []
    for event in events:
        if f"GALILEO_MSG_EB,ID_{id},DISTANCE_" in event.event:
            (d, x, y) = event.easy_parse()
            s = Ping(
                x=int(x / scaling_factor),
                y=int(y / scaling_factor),
                d=d / scaling_factor,
                mind=int((d - f(d)) / scaling_factor),
                maxd=int((d + f(d)) / scaling_factor),
            )
            processed.append(s)
    return processed

mel_telemetry

MelTelemetry

Bases: BaseTelemetry

Source code in src/melvonaut/mel_telemetry.py
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
class MelTelemetry(BaseTelemetry):
    timestamp: datetime.datetime

    async def store_observation_csv(self) -> None:
        """
        Stores the telemetry observation data in a CSV file.

        This function converts the telemetry data into a flattened dictionary format,
        ensuring nested dictionaries are stored as separate fields. It then appends
        the data to an existing CSV file or creates a new file if one does not exist.

        Args:
            None

        Returns:
            None
        """
        tel_dict = self.model_dump()
        flattened = {}
        for key, value in tel_dict.items():
            if isinstance(value, dict):
                for sub_key, sub_value in value.items():
                    flattened[f"{key}_{sub_key}"] = sub_value
            else:
                flattened[key] = value
        if self.timestamp:
            timestamp = self.timestamp.isoformat()
        else:
            timestamp = datetime.datetime.now().isoformat()
        flattened["timestamp"] = timestamp

        if not Path(con.TELEMETRY_LOCATION_CSV).is_file():
            async with async_open(con.TELEMETRY_LOCATION_CSV, "w") as afp:
                writer = csv.DictWriter(afp, fieldnames=flattened.keys())
                await writer.writeheader()
                await writer.writerow(flattened)
            # logger.debug(f"Writing observation to {con.TELEMETRY_LOCATION_CSV}")
        else:
            async with async_open(con.TELEMETRY_LOCATION_CSV, "a") as afp:
                writer = csv.DictWriter(afp, fieldnames=flattened.keys())
                await writer.writerow(flattened)
            # logger.debug(f"Writing observation to {con.TELEMETRY_LOCATION_CSV}")

    async def store_observation_json(self) -> None:
        """
        Stores the telemetry observation data in a JSON file.

        This function retrieves the existing telemetry data from the JSON file
        and updates it with a new entry. If the JSON file does not exist, a new
        one is created. The data is formatted in a structured manner with timestamps
        as keys.

        Args:
            None

        Returns:
            None
        """
        logger.debug("Storing observation as json.")
        try:
            async with async_open(con.TELEMETRY_LOCATION_JSON, "r") as afp:
                raw_telemetry = await afp.read()
                dict_telemetry = json.loads(raw_telemetry)
        except FileNotFoundError:
            logger.debug(f"{con.TELEMETRY_LOCATION_JSON} does not exist.")
            dict_telemetry = {}

        if self.timestamp:
            timestamp = self.timestamp.isoformat()
        else:
            timestamp = datetime.datetime.now().isoformat()
        new_telemetry_entry = self.model_dump(exclude={"timestamp"})
        dict_telemetry[timestamp] = new_telemetry_entry
        json_telemetry = json.dumps(dict_telemetry, indent=4, sort_keys=True)

        async with async_open(con.TELEMETRY_LOCATION_JSON, "w") as afp:
            # logger.debug(f"Writing to {con.TELEMETRY_LOCATION_JSON}")
            await afp.write(str(json_telemetry))
        logger.debug("Observation stored")

    def model_post_init(self, __context__: Any) -> None:
        """
        Initializes the telemetry model and triggers CSV storage.

        Args:
            __context__ (Any): Context data passed during initialization.

        Returns:
            None
        """
        loop = asyncio.get_event_loop()
        loop.create_task(self.store_observation_csv())

timestamp instance-attribute

model_post_init(__context__)

Initializes the telemetry model and triggers CSV storage.

Parameters:

Name Type Description Default
__context__ Any

Context data passed during initialization.

required

Returns:

Type Description
None

None

Source code in src/melvonaut/mel_telemetry.py
 96
 97
 98
 99
100
101
102
103
104
105
106
107
def model_post_init(self, __context__: Any) -> None:
    """
    Initializes the telemetry model and triggers CSV storage.

    Args:
        __context__ (Any): Context data passed during initialization.

    Returns:
        None
    """
    loop = asyncio.get_event_loop()
    loop.create_task(self.store_observation_csv())

store_observation_csv() async

Stores the telemetry observation data in a CSV file.

This function converts the telemetry data into a flattened dictionary format, ensuring nested dictionaries are stored as separate fields. It then appends the data to an existing CSV file or creates a new file if one does not exist.

Returns:

Type Description
None

None

Source code in src/melvonaut/mel_telemetry.py
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
async def store_observation_csv(self) -> None:
    """
    Stores the telemetry observation data in a CSV file.

    This function converts the telemetry data into a flattened dictionary format,
    ensuring nested dictionaries are stored as separate fields. It then appends
    the data to an existing CSV file or creates a new file if one does not exist.

    Args:
        None

    Returns:
        None
    """
    tel_dict = self.model_dump()
    flattened = {}
    for key, value in tel_dict.items():
        if isinstance(value, dict):
            for sub_key, sub_value in value.items():
                flattened[f"{key}_{sub_key}"] = sub_value
        else:
            flattened[key] = value
    if self.timestamp:
        timestamp = self.timestamp.isoformat()
    else:
        timestamp = datetime.datetime.now().isoformat()
    flattened["timestamp"] = timestamp

    if not Path(con.TELEMETRY_LOCATION_CSV).is_file():
        async with async_open(con.TELEMETRY_LOCATION_CSV, "w") as afp:
            writer = csv.DictWriter(afp, fieldnames=flattened.keys())
            await writer.writeheader()
            await writer.writerow(flattened)
        # logger.debug(f"Writing observation to {con.TELEMETRY_LOCATION_CSV}")
    else:
        async with async_open(con.TELEMETRY_LOCATION_CSV, "a") as afp:
            writer = csv.DictWriter(afp, fieldnames=flattened.keys())
            await writer.writerow(flattened)

store_observation_json() async

Stores the telemetry observation data in a JSON file.

This function retrieves the existing telemetry data from the JSON file and updates it with a new entry. If the JSON file does not exist, a new one is created. The data is formatted in a structured manner with timestamps as keys.

Returns:

Type Description
None

None

Source code in src/melvonaut/mel_telemetry.py
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
async def store_observation_json(self) -> None:
    """
    Stores the telemetry observation data in a JSON file.

    This function retrieves the existing telemetry data from the JSON file
    and updates it with a new entry. If the JSON file does not exist, a new
    one is created. The data is formatted in a structured manner with timestamps
    as keys.

    Args:
        None

    Returns:
        None
    """
    logger.debug("Storing observation as json.")
    try:
        async with async_open(con.TELEMETRY_LOCATION_JSON, "r") as afp:
            raw_telemetry = await afp.read()
            dict_telemetry = json.loads(raw_telemetry)
    except FileNotFoundError:
        logger.debug(f"{con.TELEMETRY_LOCATION_JSON} does not exist.")
        dict_telemetry = {}

    if self.timestamp:
        timestamp = self.timestamp.isoformat()
    else:
        timestamp = datetime.datetime.now().isoformat()
    new_telemetry_entry = self.model_dump(exclude={"timestamp"})
    dict_telemetry[timestamp] = new_telemetry_entry
    json_telemetry = json.dumps(dict_telemetry, indent=4, sort_keys=True)

    async with async_open(con.TELEMETRY_LOCATION_JSON, "w") as afp:
        # logger.debug(f"Writing to {con.TELEMETRY_LOCATION_JSON}")
        await afp.write(str(json_telemetry))
    logger.debug("Observation stored")

settings

file_log_handler_id = None module-attribute

settings = Settings() module-attribute

Settings

Bases: BaseModel

Startup settings for Melvonaut, can be changed by Melvonaut API.

Source code in src/melvonaut/settings.py
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
class Settings(BaseModel):
    """Startup settings for Melvonaut, can be changed by Melvonaut API."""

    model_config = {"arbitrary_types_allowed": True}

    # [Logging]
    TRACING: bool = bool(os.getenv("TRACING", False))

    TERMINAL_LOGGING_LEVEL: str = os.getenv("TERMINAL_LOGGING_LEVEL", "INFO").upper()
    FILE_LOGGING_LEVEL: str = os.getenv("FILE_LOGGING_LEVEL", "INFO").upper()

    API_PORT: int = int(os.getenv("API_PORT", 8080))
    DISCORD_WEBHOOK_TOKEN: Optional[str] = os.getenv("DISCORD_WEBHOOK_TOKEN", None)
    DISCORD_ALERTS_ENABLED: bool = bool(os.getenv("DISCORD_ALERTS_ENABLED", False))

    NETWORK_SIM_ENABLED: bool = bool(os.getenv("NETWORK_SIMULATION", False))

    ## [Stateplaning]
    OBSERVATION_REFRESH_RATE: int = int(
        os.getenv("OBSERVATION_REFRESH_RATE", 5)
    )  # Seconds between observation requests
    BATTERY_LOW_THRESHOLD: int = int(os.getenv("BATTERY_LOW_THRESHOLD", 20))
    BATTERY_HIGH_THRESHOLD: int = int(
        os.getenv("BATTERY_HIGH_THRESHOLD", 0)
    )  # Difference to Max Battery before switching

    TARGET_ANGLE_DEG: float = float(
        os.getenv("TARGET_ANGLE_DEG", 23.0)
    )  # The angle was calculated through simulations
    # With total speed over 50, cannot use wide angle camera
    # 49.9 = y + x
    # x = 2.35585 * y
    # 49.9 = 2.35585 * y + y
    # 49.9 = 3.35585 * y
    # y = 49.9 / 3.35585
    # y = 14.87
    # 49.9 - 14.87 = 35.03 = x
    TARGET_SPEED_NORMAL_X: float = float(
        os.getenv("TARGET_SPEED_NORMAL_X", 35.03)
    )  # 2.35585 times as much as Y
    TARGET_SPEED_NORMAL_Y: float = float(os.getenv("TARGET_SPEED_NORMAL_Y", 14.87))

    # With total speed over 10, cannot use narrow angle camera
    # 9.9 = y + x
    # y = 9.9 / 3.35585
    # y = 2.95
    # 9.9 - 2.95 = 6.95 = x
    TARGET_SPEED_NARROW_X: float = float(os.getenv("TARGET_SPEED_NARROW_X", 6.95))
    TARGET_SPEED_NARROW_Y: float = float(os.getenv("TARGET_SPEED_NARROW_Y", 2.95))

    # Total speed can be up to 71
    # 71 = y + x
    # y = 71 / 3.35585
    # y = 21.16
    # 71 - 21.16 = 49.84 = x
    TARGET_SPEED_WIDE_X: float = float(os.getenv("TARGET_SPEED_WIDE_X", 49.84))
    TARGET_SPEED_WIDE_Y: float = float(os.getenv("TARGET_SPEED_WIDE_Y", 21.16))

    DISTANCE_BETWEEN_IMAGES: int = int(
        os.getenv("DISTANCE_BETWEEN_IMAGES", 450)
    )  # How many pixel before taking another image

    # [Melvin Task Planing]
    # Standard mapping, with no objectives and the camera angle below
    CURRENT_MELVIN_TASK: MELVINTask = MELVINTask.Mapping
    TARGET_CAMERA_ANGLE_ACQUISITION: CameraAngle = CameraAngle.Narrow

    # Automatically do the next upcoming objective
    # CURRENT_MELVIN_TASK: MELVINTasks = MELVINTasks.Next_objective

    # Do a specific objective
    # CURRENT_MELVIN_TASK: MELVINTasks = MELVINTasks.Fixed_objective
    # FIXED_OBJECTIVE = "Aurora 10"

    # Go for the emergency beacon tracker
    # CURRENT_MELVIN_TASK: MELVINTask = MELVINTask.EBT

    # [Legacy]
    # To set a custom time window to be active, or to disable all timing checks
    DO_TIMING_CHECK: bool = False
    START_TIME: datetime.datetime = datetime.datetime(
        2025, 1, 2, 12, 00, tzinfo=datetime.timezone.utc
    )
    STOP_TIME: datetime.datetime = datetime.datetime(
        2025, 1, 30, 12, 00, tzinfo=datetime.timezone.utc
    )
    DO_ACTUALLY_EXIT: bool = True  # Used in testing
    OVERRIDES: dict[str, Any] = {}

    # load settings
    def load_settings(self) -> None:
        """Loads settings from a persistent JSON file.

        If the settings file does not exist or contains invalid JSON,
        the overrides dictionary is reset to an empty state.
        """
        if not pathlib.Path(con.MEL_PERSISTENT_SETTINGS).exists():
            logger.debug("Settings don't exist")
            self.OVERRIDES = {}
        with open(con.MEL_PERSISTENT_SETTINGS, "r") as f:
            try:
                loaded = json.loads(f.read())
            except JSONDecodeError:
                logger.warning("Failed to load settings")
                self.OVERRIDES = {}
                return
            # logger.debug(f"{loaded=}")
            for key, value in loaded.items():
                self.OVERRIDES[key.upper()] = value
            # logger.debug(f"{self.OVERRIDES=}")

    # save settings
    def save_settings(self) -> None:
        """Saves the current settings overrides to a persistent JSON file."""
        with open(con.MEL_PERSISTENT_SETTINGS, "w") as f:
            f.write(json.dumps(self.OVERRIDES))

    # get settings
    def get_setting(self, key: str, default: Any = None) -> Any:
        """Retrieves a setting value from overrides or returns the default.

        Args:
            key (str): The setting key to retrieve.
            default (Any, optional): The default value if the key is not found. Defaults to None.

        Returns:
            Any: The value of the setting if it exists, otherwise the default.
        """
        return self.OVERRIDES.get(key.upper(), default)

    # set setting
    def set_setting(self, key: str, value: Any) -> None:
        """Sets a single setting in overrides and saves the settings.

        Args:
            key (str): The setting key.
            value (Any): The value to assign to the setting.
        """
        # logger.debug(f"Setting {key.upper()} to {value}")
        # logger.debug(f"{self.OVERRIDES=}")
        self.OVERRIDES[key.upper()] = value
        # logger.debug(f"{self.OVERRIDES=}")
        self.save_settings()

    def set_settings(self, key_values: dict[str, Any]) -> None:
        """Sets multiple settings at once and saves them.

        Args:
            key_values (dict[str, Any]): A dictionary of key-value pairs to update.
        """
        # logger.debug(f"Setting {self.OVERRIDES}")
        if len(key_values.keys()) == 0:
            logger.debug("Clearing settings")
            self.set_setting("OVERRIDES", {})
        else:
            for key, value in key_values.items():
                self.set_setting(key, value)
        # logger.debug(f"Setting {self.OVERRIDES}")
        self.save_settings()

    def delete_settings(self, keys: list[str]) -> None:
        """Deletes specified settings from overrides and saves the settings.

        Args:
            keys (list[str]): A list of setting keys to remove.
        """
        # logger.debug(f"Deleting {keys}")
        for key in keys:
            del self.OVERRIDES[key.upper()]
        # logger.debug(f"{self.OVERRIDES=}")
        self.save_settings()

    def init_settings(self) -> bool:
        """Initializes settings by checking for an existing settings file.

        Returns:
            bool: True if settings were newly created, False if they already exist.
        """
        if pathlib.Path(con.MEL_PERSISTENT_SETTINGS).exists():
            logger.debug("Settings already exist")
            return False
        logger.debug("Settings created")
        self.save_settings()
        return True

    # clear settings
    def clear_settings(self) -> None:
        """Clears all settings by setting overrides to None and saving."""
        self.OVERRIDES = None  # type: ignore
        # logger.debug(f"{self.OVERRIDES=}")

    def get_default_setting(self, key: str) -> Any:
        """Retrieves the default value of a setting from the class attributes.

        Args:
            key (str): The setting key to retrieve.

        Returns:
            Any: The default value of the setting.
        """
        return super().__getattribute__(key)

    def __init__(self) -> None:
        """Initializes the Settings object, loading settings if they exist."""
        super().__init__()
        if not self.init_settings():
            self.load_settings()

    def __getattribute__(self, item: str) -> Any:
        """Overrides attribute access to check overrides before default settings.

        Args:
            item (str): The attribute key to retrieve.

        Returns:
            Any: The overridden value if it exists, otherwise the default.
        """
        if item.startswith("__"):
            return super().__getattribute__(item)
        # logger.debug(f"Getting {item}")
        overrides = super().__getattribute__("OVERRIDES")
        if item.upper() in overrides:
            return overrides[item.upper()]
        return super().__getattribute__(item)

    def __setattr__(self, key: str, value: Any) -> None:
        """Overrides attribute setting to ensure settings are properly stored.

        Args:
            key (str): The setting key.
            value (Any): The value to assign to the setting.
        """
        # logger.debug(f"Setting {key} to {value}")
        if key == "OVERRIDES" and value is None:
            self.OVERRIDES.clear()
            self.save_settings()
        elif type(value) is dict:
            self.set_settings(value)
        else:
            self.set_setting(key.upper(), value)

API_PORT = int(os.getenv('API_PORT', 8080)) class-attribute instance-attribute

BATTERY_HIGH_THRESHOLD = int(os.getenv('BATTERY_HIGH_THRESHOLD', 0)) class-attribute instance-attribute

BATTERY_LOW_THRESHOLD = int(os.getenv('BATTERY_LOW_THRESHOLD', 20)) class-attribute instance-attribute

CURRENT_MELVIN_TASK = MELVINTask.Mapping class-attribute instance-attribute

DISCORD_ALERTS_ENABLED = bool(os.getenv('DISCORD_ALERTS_ENABLED', False)) class-attribute instance-attribute

DISCORD_WEBHOOK_TOKEN = os.getenv('DISCORD_WEBHOOK_TOKEN', None) class-attribute instance-attribute

DISTANCE_BETWEEN_IMAGES = int(os.getenv('DISTANCE_BETWEEN_IMAGES', 450)) class-attribute instance-attribute

DO_ACTUALLY_EXIT = True class-attribute instance-attribute

DO_TIMING_CHECK = False class-attribute instance-attribute

FILE_LOGGING_LEVEL = os.getenv('FILE_LOGGING_LEVEL', 'INFO').upper() class-attribute instance-attribute

NETWORK_SIM_ENABLED = bool(os.getenv('NETWORK_SIMULATION', False)) class-attribute instance-attribute

OBSERVATION_REFRESH_RATE = int(os.getenv('OBSERVATION_REFRESH_RATE', 5)) class-attribute instance-attribute

OVERRIDES = {} class-attribute instance-attribute

START_TIME = datetime.datetime(2025, 1, 2, 12, 0, tzinfo=datetime.timezone.utc) class-attribute instance-attribute

STOP_TIME = datetime.datetime(2025, 1, 30, 12, 0, tzinfo=datetime.timezone.utc) class-attribute instance-attribute

TARGET_ANGLE_DEG = float(os.getenv('TARGET_ANGLE_DEG', 23.0)) class-attribute instance-attribute

TARGET_CAMERA_ANGLE_ACQUISITION = CameraAngle.Narrow class-attribute instance-attribute

TARGET_SPEED_NARROW_X = float(os.getenv('TARGET_SPEED_NARROW_X', 6.95)) class-attribute instance-attribute

TARGET_SPEED_NARROW_Y = float(os.getenv('TARGET_SPEED_NARROW_Y', 2.95)) class-attribute instance-attribute

TARGET_SPEED_NORMAL_X = float(os.getenv('TARGET_SPEED_NORMAL_X', 35.03)) class-attribute instance-attribute

TARGET_SPEED_NORMAL_Y = float(os.getenv('TARGET_SPEED_NORMAL_Y', 14.87)) class-attribute instance-attribute

TARGET_SPEED_WIDE_X = float(os.getenv('TARGET_SPEED_WIDE_X', 49.84)) class-attribute instance-attribute

TARGET_SPEED_WIDE_Y = float(os.getenv('TARGET_SPEED_WIDE_Y', 21.16)) class-attribute instance-attribute

TERMINAL_LOGGING_LEVEL = os.getenv('TERMINAL_LOGGING_LEVEL', 'INFO').upper() class-attribute instance-attribute

TRACING = bool(os.getenv('TRACING', False)) class-attribute instance-attribute

model_config = {'arbitrary_types_allowed': True} class-attribute instance-attribute

__getattribute__(item)

Overrides attribute access to check overrides before default settings.

Parameters:

Name Type Description Default
item str

The attribute key to retrieve.

required

Returns:

Name Type Description
Any Any

The overridden value if it exists, otherwise the default.

Source code in src/melvonaut/settings.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
def __getattribute__(self, item: str) -> Any:
    """Overrides attribute access to check overrides before default settings.

    Args:
        item (str): The attribute key to retrieve.

    Returns:
        Any: The overridden value if it exists, otherwise the default.
    """
    if item.startswith("__"):
        return super().__getattribute__(item)
    # logger.debug(f"Getting {item}")
    overrides = super().__getattribute__("OVERRIDES")
    if item.upper() in overrides:
        return overrides[item.upper()]
    return super().__getattribute__(item)

__init__()

Initializes the Settings object, loading settings if they exist.

Source code in src/melvonaut/settings.py
224
225
226
227
228
def __init__(self) -> None:
    """Initializes the Settings object, loading settings if they exist."""
    super().__init__()
    if not self.init_settings():
        self.load_settings()

__setattr__(key, value)

Overrides attribute setting to ensure settings are properly stored.

Parameters:

Name Type Description Default
key str

The setting key.

required
value Any

The value to assign to the setting.

required
Source code in src/melvonaut/settings.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
def __setattr__(self, key: str, value: Any) -> None:
    """Overrides attribute setting to ensure settings are properly stored.

    Args:
        key (str): The setting key.
        value (Any): The value to assign to the setting.
    """
    # logger.debug(f"Setting {key} to {value}")
    if key == "OVERRIDES" and value is None:
        self.OVERRIDES.clear()
        self.save_settings()
    elif type(value) is dict:
        self.set_settings(value)
    else:
        self.set_setting(key.upper(), value)

clear_settings()

Clears all settings by setting overrides to None and saving.

Source code in src/melvonaut/settings.py
208
209
210
def clear_settings(self) -> None:
    """Clears all settings by setting overrides to None and saving."""
    self.OVERRIDES = None  # type: ignore

delete_settings(keys)

Deletes specified settings from overrides and saves the settings.

Parameters:

Name Type Description Default
keys list[str]

A list of setting keys to remove.

required
Source code in src/melvonaut/settings.py
182
183
184
185
186
187
188
189
190
191
192
def delete_settings(self, keys: list[str]) -> None:
    """Deletes specified settings from overrides and saves the settings.

    Args:
        keys (list[str]): A list of setting keys to remove.
    """
    # logger.debug(f"Deleting {keys}")
    for key in keys:
        del self.OVERRIDES[key.upper()]
    # logger.debug(f"{self.OVERRIDES=}")
    self.save_settings()

get_default_setting(key)

Retrieves the default value of a setting from the class attributes.

Parameters:

Name Type Description Default
key str

The setting key to retrieve.

required

Returns:

Name Type Description
Any Any

The default value of the setting.

Source code in src/melvonaut/settings.py
213
214
215
216
217
218
219
220
221
222
def get_default_setting(self, key: str) -> Any:
    """Retrieves the default value of a setting from the class attributes.

    Args:
        key (str): The setting key to retrieve.

    Returns:
        Any: The default value of the setting.
    """
    return super().__getattribute__(key)

get_setting(key, default=None)

Retrieves a setting value from overrides or returns the default.

Parameters:

Name Type Description Default
key str

The setting key to retrieve.

required
default Any

The default value if the key is not found. Defaults to None.

None

Returns:

Name Type Description
Any Any

The value of the setting if it exists, otherwise the default.

Source code in src/melvonaut/settings.py
140
141
142
143
144
145
146
147
148
149
150
def get_setting(self, key: str, default: Any = None) -> Any:
    """Retrieves a setting value from overrides or returns the default.

    Args:
        key (str): The setting key to retrieve.
        default (Any, optional): The default value if the key is not found. Defaults to None.

    Returns:
        Any: The value of the setting if it exists, otherwise the default.
    """
    return self.OVERRIDES.get(key.upper(), default)

init_settings()

Initializes settings by checking for an existing settings file.

Returns:

Name Type Description
bool bool

True if settings were newly created, False if they already exist.

Source code in src/melvonaut/settings.py
194
195
196
197
198
199
200
201
202
203
204
205
def init_settings(self) -> bool:
    """Initializes settings by checking for an existing settings file.

    Returns:
        bool: True if settings were newly created, False if they already exist.
    """
    if pathlib.Path(con.MEL_PERSISTENT_SETTINGS).exists():
        logger.debug("Settings already exist")
        return False
    logger.debug("Settings created")
    self.save_settings()
    return True

load_settings()

Loads settings from a persistent JSON file.

If the settings file does not exist or contains invalid JSON, the overrides dictionary is reset to an empty state.

Source code in src/melvonaut/settings.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
def load_settings(self) -> None:
    """Loads settings from a persistent JSON file.

    If the settings file does not exist or contains invalid JSON,
    the overrides dictionary is reset to an empty state.
    """
    if not pathlib.Path(con.MEL_PERSISTENT_SETTINGS).exists():
        logger.debug("Settings don't exist")
        self.OVERRIDES = {}
    with open(con.MEL_PERSISTENT_SETTINGS, "r") as f:
        try:
            loaded = json.loads(f.read())
        except JSONDecodeError:
            logger.warning("Failed to load settings")
            self.OVERRIDES = {}
            return
        # logger.debug(f"{loaded=}")
        for key, value in loaded.items():
            self.OVERRIDES[key.upper()] = value

save_settings()

Saves the current settings overrides to a persistent JSON file.

Source code in src/melvonaut/settings.py
134
135
136
137
def save_settings(self) -> None:
    """Saves the current settings overrides to a persistent JSON file."""
    with open(con.MEL_PERSISTENT_SETTINGS, "w") as f:
        f.write(json.dumps(self.OVERRIDES))

set_setting(key, value)

Sets a single setting in overrides and saves the settings.

Parameters:

Name Type Description Default
key str

The setting key.

required
value Any

The value to assign to the setting.

required
Source code in src/melvonaut/settings.py
153
154
155
156
157
158
159
160
161
162
163
164
def set_setting(self, key: str, value: Any) -> None:
    """Sets a single setting in overrides and saves the settings.

    Args:
        key (str): The setting key.
        value (Any): The value to assign to the setting.
    """
    # logger.debug(f"Setting {key.upper()} to {value}")
    # logger.debug(f"{self.OVERRIDES=}")
    self.OVERRIDES[key.upper()] = value
    # logger.debug(f"{self.OVERRIDES=}")
    self.save_settings()

set_settings(key_values)

Sets multiple settings at once and saves them.

Parameters:

Name Type Description Default
key_values dict[str, Any]

A dictionary of key-value pairs to update.

required
Source code in src/melvonaut/settings.py
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
def set_settings(self, key_values: dict[str, Any]) -> None:
    """Sets multiple settings at once and saves them.

    Args:
        key_values (dict[str, Any]): A dictionary of key-value pairs to update.
    """
    # logger.debug(f"Setting {self.OVERRIDES}")
    if len(key_values.keys()) == 0:
        logger.debug("Clearing settings")
        self.set_setting("OVERRIDES", {})
    else:
        for key, value in key_values.items():
            self.set_setting(key, value)
    # logger.debug(f"Setting {self.OVERRIDES}")
    self.save_settings()

state_planer

state_planner = StatePlanner() module-attribute

StatePlanner

Bases: BaseModel

Source code in src/melvonaut/state_planer.py
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
class StatePlanner(BaseModel):
    melv_id: int = random.randint(0, 9999)
    current_telemetry: Optional[MelTelemetry] = None
    previous_telemetry: Optional[MelTelemetry] = None

    previous_state: Optional[State] = None
    state_change_time: datetime.datetime = datetime.datetime.now()

    submitted_transition_request: bool = False

    target_state: Optional[State] = None

    _accelerating: bool = False

    _run_get_image_task: Optional[asyncio.Task[None]] = None

    _aiohttp_session: Optional[aiohttp.ClientSession] = None

    _target_vel_x: Optional[float] = None
    _target_vel_y: Optional[float] = None

    _z_obj_list: list[ZonedObjective] = []

    recent_events: list[Event] = []

    _current_obj_name: str = ""

    def model_post_init(self, __context__: Any) -> None:
        """Initializes the recent_events list by loading events from a CSV file.

        Args:
            __context__ (Any): Context data passed during initialization.
        """
        self.recent_events = Event.load_events_from_csv(path=con.EVENT_LOCATION_CSV)

    def get_current_state(self) -> State:
        """Retrieves the current state from telemetry data.

        Returns:
            State: The current state if telemetry is available, otherwise State.Unknown.
        """
        if self.current_telemetry is None:
            return State.Unknown
        return self.current_telemetry.state

    def get_previous_state(self) -> State:
        """Retrieves the previous state from telemetry data.

        Returns:
            State: The previous state if telemetry is available, otherwise State.Unknown.
        """
        if self.previous_telemetry is None:
            return State.Unknown
        return self.previous_telemetry.state

    def get_simulation_speed(self) -> int:
        """Gets the current simulation speed from telemetry data.

        Returns:
            int: The simulation speed if telemetry is available, otherwise 1.
        """
        if self.current_telemetry is None:
            return 1
        return self.current_telemetry.simulation_speed

    def get_time_since_state_change(self) -> datetime.timedelta:
        """Calculates the time elapsed since the last state change.

        Returns:
            datetime.timedelta: The time difference between now and the last state change.
        """
        return datetime.datetime.now() - self.state_change_time

    def calc_transition_remaining_time(self) -> datetime.timedelta:
        """Calculates the remaining time for state transition.

        Returns:
            datetime.timedelta: The remaining transition time based on the simulation speed.
        """
        if self.get_current_state() is State.Transition:
            logger.debug("Not in transition state, returning 0")
            return datetime.timedelta(0)
        elif self.previous_state == State.Safe:
            total_time = datetime.timedelta(
                seconds=con.STATE_TRANSITION_FROM_SAFE_TIME
                / self.get_simulation_speed()
            )
            return total_time - self.get_time_since_state_change()
        else:
            total_time = datetime.timedelta(
                seconds=con.STATE_TRANSITION_TIME / self.get_simulation_speed()
            )
            return total_time - self.get_time_since_state_change()

    def calc_current_location(self) -> tuple[float, float]:
        """Estimates the current location based on telemetry data and time elapsed.

        Returns:
            tuple[float, float]: The estimated (x, y) coordinates.
        """
        if self.current_telemetry is None:
            return 0.0, 0.0
        time_since_observation = (
            live_utc() - self.current_telemetry.timestamp
        ).total_seconds()
        current_x = (
            self.current_telemetry.width_x
            + self.current_telemetry.vx * time_since_observation
        )
        current_y = (
            self.current_telemetry.height_y
            + self.current_telemetry.vy * time_since_observation
        )
        return current_x, current_y

    async def trigger_velocity_change(self, new_vel_x: float, new_vel_y: float) -> None:
        """Sets new values for accelartion, also set _accelerating

        Args:
            new_vel_x (float): The target velocity in the x direction.
            new_vel_y (float): The target velocity in the y direction.
        """

        self._target_vel_x = new_vel_x
        self._target_vel_y = new_vel_y

        if self.current_telemetry is None:
            logger.warning("No telemetry data available. Cannot set velocity.")
            return
        if (
            new_vel_x == self.current_telemetry.vx
            and new_vel_y == self.current_telemetry.vy
        ):
            self._accelerating = False
            logger.info("Target velocity already set. Not changing velocity.")
            return
        request_body = {
            "vel_x": new_vel_x,
            "vel_y": new_vel_y,
            "camera_angle": self.current_telemetry.angle,
            "state": self.get_current_state(),
        }
        async with aiohttp.ClientSession() as session:
            async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
                if response.status == 200:
                    self._accelerating = True
                    logger.info(f"Velocity set to {new_vel_x}, {new_vel_y}")
                else:
                    logger.error(f"Failed to set velocity to {new_vel_x}, {new_vel_y}")

    async def trigger_camera_angle_change(self, new_angle: CameraAngle) -> None:
        """Tries to change the camera angle to new_angle

        Args:
            new_angle (CameraAngle): The desired camera angle.
        """
        if self.current_telemetry is None:
            logger.warning("No telemetry data available. Cannot set camera angle.")
            return
        if new_angle == self.current_telemetry.angle:
            logger.info("Target camera angle already set. Not changing angle.")
            return
        request_body = {
            "vel_x": self.current_telemetry.vx,
            "vel_y": self.current_telemetry.vy,
            "camera_angle": new_angle,
            "state": self.get_current_state(),
        }
        async with aiohttp.ClientSession() as session:
            async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
                if response.status == 200:
                    self.current_telemetry.angle = new_angle
                    logger.info(f"Camera angle set to {new_angle}")
                else:
                    logger.error(f"Failed to set camera angle to {new_angle}")

    async def trigger_state_transition(self, new_state: State) -> None:
        """Initiates a state transition if valid conditions are met.

        Args:
            new_state (State): The target state to transition to.
        """
        if new_state in [State.Transition, State.Unknown, State.Deployment, State.Safe]:
            logger.warning(f"Cannot transition to {new_state}.")
            return
        if self.current_telemetry is None:
            logger.warning("No telemetry data available. Cannot initiate transition.")
            return
        if self.current_telemetry.state == State.Transition:
            logger.debug("Already in transition state, not starting transition.")
            return
        if new_state == self.get_current_state():
            logger.debug(f"State is already {new_state}, not starting transition.")
            return
        request_body = {
            "state": new_state,
            "vel_x": self.current_telemetry.vx,
            "vel_y": self.current_telemetry.vy,
            "camera_angle": self.current_telemetry.angle,
        }
        async with aiohttp.ClientSession() as session:
            async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
                if response.status == 200:
                    logger.info(
                        f"Started transition to {new_state} at battery level {self.current_telemetry.battery}"
                    )
                    self.submitted_transition_request = True
                    self.target_state = new_state
                else:
                    logger.warning(
                        f"Failed to transition to {new_state}: {response.status}"
                    )
                    logger.debug(f"Response body: {await response.text()}")

    async def switch_if_battery_low(
        self, state_low_battery: State, state_high_battery: State
    ) -> None:
        """Switches state based on battery level.

        Args:
            state_low_battery (State): The state to switch to when battery is low.
            state_high_battery (State): The state to switch to when battery is sufficient.
        """
        if self.current_telemetry is None:
            logger.warning(
                "No telemetry data available. Cannot plan battery based switching."
            )
            return
        if self.current_telemetry.battery <= settings.BATTERY_LOW_THRESHOLD:
            if self.get_current_state() == state_low_battery:
                return
            logger.debug(
                f"State is {self.get_current_state()}, Battery is low, triggering transition to {state_low_battery}"
            )
            await self.trigger_state_transition(state_low_battery)
        else:
            if self.get_current_state() == state_high_battery:
                return
            logger.debug(
                f"State is {self.get_current_state()}, Battery is high, triggering transition to {state_high_battery}"
            )
            await self.trigger_state_transition(state_high_battery)

    async def plan_state_switching(self) -> None:
        """Plans and executes state transitions based on current telemetry data.

        This function checks the current state and decides whether to transition
        to another state based on conditions like battery level and velocity.

        Logs relevant debug information and triggers state transitions when necessary.

        Returns:
            None
        """
        if self.current_telemetry is None:
            logger.warning("No telemetry data available. Cannot plan state switching.")
            return

        state = self.get_current_state()

        match state:
            case State.Transition:
                logger.debug(
                    f"Time since state change: {self.get_time_since_state_change()}"
                )
                expected_time_to_complete = self.calc_transition_remaining_time()
                limited_log(
                    f"State is Transition to {self.target_state}, waiting for transition to complete.\nExpected time to complete state transition: {expected_time_to_complete}"
                )
                # logger.debug(
                #     f"Previous state: {self.get_previous_state()}, Current state: {self.get_current_state()}"
                # )
            case State.Acquisition:
                # in EBT leave once everything is set
                if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT:
                    if (
                        self._target_vel_x
                        and self._target_vel_y
                        and self.current_telemetry.angle
                        == settings.TARGET_CAMERA_ANGLE_ACQUISITION
                        and self._target_vel_x == self.current_telemetry.vx
                        and self._target_vel_y == self.current_telemetry.vy
                    ):
                        await self.trigger_state_transition(State.Communication)

                await self.switch_if_battery_low(State.Charge, State.Acquisition)

            case State.Charge:
                if (
                    self.current_telemetry.battery
                    >= self.current_telemetry.max_battery
                    - settings.BATTERY_HIGH_THRESHOLD
                ):
                    if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT:
                        # starting ebt, but speed/angle not set yet

                        logger.info(
                            f"EBT Task, Angle: telemetry: {self.current_telemetry.angle} vs target: {settings.TARGET_CAMERA_ANGLE_ACQUISITION}"
                        )
                        logger.info(
                            f"EBT Task, vx: {self.current_telemetry.vx} vs target: {self._target_vel_x}"
                        )
                        logger.info(
                            f"EBT Task, vy: {self.current_telemetry.vy} vs target: {self._target_vel_y}"
                        )
                        if (
                            self._target_vel_x is None
                            or self._target_vel_y is None
                            or self.current_telemetry.angle
                            != settings.TARGET_CAMERA_ANGLE_ACQUISITION
                            or self._target_vel_x != self.current_telemetry.vx
                            or self._target_vel_y != self.current_telemetry.vy
                        ):
                            await self.trigger_state_transition(State.Acquisition)
                        else:
                            # logger.info("starting comms!")
                            await self.trigger_state_transition(State.Communication)

                    else:
                        # logger.info("starting acq!")
                        await self.trigger_state_transition(State.Acquisition)

            case State.Safe:
                if self.current_telemetry.battery >= (
                    self.current_telemetry.max_battery * 0.5
                ):
                    await self.trigger_state_transition(State.Acquisition)
                else:
                    await self.trigger_state_transition(State.Charge)
                await self.switch_if_battery_low(State.Charge, State.Acquisition)
            case State.Communication:
                await self.switch_if_battery_low(State.Charge, State.Communication)
            case State.Deployment:
                logger.debug(
                    "State is Deployment, triggering transition to Acquisition"
                )
                await self.trigger_state_transition(State.Acquisition)

    async def update_telemetry(self, new_telemetry: MelTelemetry) -> None:
        """Updates the telemetry data and handles state changes.

        This function updates the previous and current telemetry readings,
        logs relevant debug information, and checks for state changes.

        If a state change occurs, it logs the transition, cancels image
        retrieval tasks if necessary, and triggers appropriate actions based on
        the new state.

        Args:
            new_telemetry (MelTelemetry): The new telemetry data to update.

        Returns:
            None
        """
        self.previous_telemetry = self.current_telemetry
        self.current_telemetry = new_telemetry

        logger.debug(
            f"New observations - State: {self.get_current_state()},"
            f" Battery level: {self.current_telemetry.battery}/{self.current_telemetry.max_battery},"
            f" Vel X,Y: {self.current_telemetry.vx}, {self.current_telemetry.vy},"
            f" Fuel: {self.current_telemetry.fuel}"
        )

        logger.debug(
            "Current memory usage: "
            + str(psutil.Process(os.getpid()).memory_info().rss / 1024**2)
            + " MB"
        )

        # if self.get_current_state() == State.Acquisition:
        #    await self.get_image()
        # logger.debug(f"Threads: {threading.active_count()}")
        # for thread in threading.enumerate():
        #    frame = sys._current_frames()[thread.ident]
        #    logger.warning(f"{inspect.getframeinfo(frame).filename}.{inspect.getframeinfo(frame).function}:{inspect.getframeinfo(frame).lineno}")

        # check if still accelerating
        if (
            self._target_vel_x == self.current_telemetry.vx
            and self._target_vel_y == self.current_telemetry.vy
        ):
            self._accelerating = False

        if self.previous_telemetry is not None:
            if self.get_previous_state() != self.get_current_state():
                logger.info(
                    f"State changed from {self.get_previous_state()} to {self.get_current_state()}"
                )
                self.previous_state = self.get_previous_state()
                self.state_change_time = datetime.datetime.now()
                # Put in here events to do on state change
                if settings.TRACING:
                    if self.previous_state == State.Transition:
                        snapshot1 = tracemalloc.take_snapshot()
                        stats = snapshot1.statistics("traceback")
                        for stat in stats:
                            logger.warning(
                                "%s memory blocks: %.1f KiB"
                                % (stat.count, stat.size / 1024)
                            )
                            for line in stat.traceback.format():
                                logger.warning(line)

                # logger.debug(
                #     f"Previous state: {self.previous_state}, Current state: {self.get_current_state()}"
                # )
                match self.get_current_state():
                    case State.Transition:
                        if self._run_get_image_task:
                            logger.debug("end image")
                            self._run_get_image_task.cancel()
                            self._run_get_image_task = None
                        if self.submitted_transition_request:
                            self.submitted_transition_request = False
                        else:
                            logger.warning("State transition was externally triggered!")
                    case State.Acquisition:
                        logger.info("Starting control in acquisition state.")
                        if self._run_get_image_task:
                            logger.debug("Image task already running")
                        else:
                            logger.debug("start image")
                            loop = asyncio.get_event_loop()
                            self._run_get_image_task = loop.create_task(
                                self.run_get_image()
                            )
                        await self.control_acquisition()
                    case State.Charge:
                        pass
                    case State.Safe:
                        logger.warning("State transitioned to SAFE!")
                    case State.Communication:
                        pass
                    case State.Deployment:
                        logger.warning("State transitioned to DEPLOYMENT!")
                    case _:
                        logger.warning(f"Unknown state {self.get_current_state()}")
                if self.get_current_state() != State.Acquisition:
                    self._accelerating = False
                if self.get_current_state() != State.Transition:
                    if self.target_state != self.get_current_state():
                        logger.warning(
                            f"Planned state transition to {self.target_state} failed, now in {self.get_current_state()}"
                        )
                    else:
                        logger.debug(
                            f"Planned state transition to {self.target_state} succeeded."
                        )
                    self.target_state = None

            await self.plan_state_switching()

    async def get_image(self) -> None:
        """Captures an image if telemetry data is available and conditions are met.

        If no telemetry data is available, the function waits for the next observation cycle.
        It checks various conditions (e.g., acceleration, camera angle, timing) before fetching an image
        from an external API and saving it with appropriate metadata.

        Returns:
            None
        """
        logger.debug("Getting image")
        if self.current_telemetry is None:
            logger.warning(
                f"No telemetry data available. Waiting {settings.OBSERVATION_REFRESH_RATE}s for next image."
            )
            image_task = Timer(
                timeout=settings.OBSERVATION_REFRESH_RATE, callback=self.get_image
            ).get_task()
            await asyncio.gather(image_task)
            return
        if not self._aiohttp_session:
            self._aiohttp_session = aiohttp.ClientSession()

        # Filter out cases where no image should be taken

        if (
            settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective
            or settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective
        ) and not self._z_obj_list:
            logger.warning(
                "Skipped image: In Objectives_only mode, but z_obj_list emtpy!"
            )
            return

        if self.current_telemetry.angle != settings.TARGET_CAMERA_ANGLE_ACQUISITION:
            logger.info(
                f"Skipped image: current_angle={self.current_telemetry.angle} vs target={settings.TARGET_CAMERA_ANGLE_ACQUISITION}"
            )
            return
        if self._accelerating:
            logger.info(
                f"Skipped image: accelerating to: {self._target_vel_x} {self._target_vel_y}"
            )
            return

        if settings.DO_TIMING_CHECK and settings.START_TIME > datetime.datetime.now(
            datetime.timezone.utc
        ):
            logger.warning(
                f"Skipped image, to early: start={settings.START_TIME} current_time={live_utc()}"
            )
            return
        if settings.DO_TIMING_CHECK and live_utc() > settings.STOP_TIME:
            logger.warning(
                f"Skipped image, to late: end={settings.STOP_TIME} current_time={live_utc()}"
            )
            return

        # save the current telemetry values, so they dont get overwritten by a later update
        tele_timestamp = self.current_telemetry.timestamp
        tele_x = self.current_telemetry.width_x
        tele_y = self.current_telemetry.height_y
        tele_vx = self.current_telemetry.vx
        tele_vy = self.current_telemetry.vy
        tele_simSpeed = self.get_simulation_speed()
        tele_angle = self.current_telemetry.angle

        lens_size = lens_size_by_angle(self.current_telemetry.angle)
        """
        # TODO add box check if melvin and objective overlap
        # check if we are in range of an objective
        # TODO also check MELVINTasks
        # TODO check if within box, unless hidden
        melvin_box = (
            tele_x - lens_size / 2,
            tele_y - lens_size / 2,
            tele_x + lens_size / 2,
            tele_y + lens_size / 2,
        )
        if self._z_obj_list[0].zone is None:
            logger.warning("Hidden objective, taking photo")
            return
            TODO
        else:
            logger.warning("Checking if in range of objective:")
            objective_box = self._z_obj_list[0].zone

        if not boxes_overlap_in_grid(melvin_box, objective_box):
            logger.error(
                f"Image skipped, not Overlapping! {melvin_box} {objective_box}"
            )
            return
        """
        async with aiohttp.ClientSession() as session:
            try:
                async with session.get(con.IMAGE_ENDPOINT) as response:
                    if response.status == 200:
                        # Extract exact image timestamp
                        img_timestamp = response.headers.get("image-timestamp")
                        if img_timestamp is None:
                            logger.error(
                                "Image timestamp not found in headers, substituting with current time"
                            )
                            parsed_img_timestamp = datetime.datetime.now()
                        else:
                            parsed_img_timestamp = datetime.datetime.fromisoformat(
                                img_timestamp
                            )

                        # Calculate the difference between the img and the last telemetry
                        difference_in_seconds = (
                            parsed_img_timestamp - tele_timestamp
                        ).total_seconds()

                        adj_x = round(
                            tele_x + (difference_in_seconds * tele_vx * tele_simSpeed)
                        ) - (lens_size / 2)
                        adj_y = round(
                            tele_y + (difference_in_seconds * tele_vy * tele_simSpeed)
                        ) - (lens_size / 2)

                        # TODO check if images are correct!
                        # TODO might also need modulo for side cases
                        # logger.debug(f"T {parsed_img_timestamp} | C {tele_timestamp}")
                        # logger.debug(
                        #     f"D {difference_in_seconds} | R {tele_x} ADJ {adj_x}"
                        # )

                        image_path = con.IMAGE_LOCATION.format(
                            melv_id=self._current_obj_name,
                            angle=tele_angle,
                            time=parsed_img_timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"),
                            cor_x=int(adj_x),
                            cor_y=int(adj_y),
                        )

                        logger.info(
                            f"Received image at {adj_x}x {adj_y}y with {self.current_telemetry.angle} angle"
                        )

                        async with async_open(image_path, "wb") as afp:
                            while True:
                                cnt = await response.content.readany()
                                if not cnt:
                                    break
                                await afp.write(cnt)
                    else:
                        logger.warning(f"Failed to get image: {response.status}")
                        logger.info(f"Response body: {await response.text()}")
                        logger.info(
                            "This is normal at the end of acquisition mode once."
                        )
            except aiohttp.client_exceptions.ConnectionTimeoutError:
                logger.warning("Observations endpoint timeouted.")
            except asyncio.exceptions.CancelledError:
                logger.warning("Get image task was cancelled.")

    async def run_get_image(self) -> None:
        """Continuously captures images while in the Acquisition state.

        This function continuously captures images at calculated intervals,
        adjusting timing based on velocity and acceleration.

        Returns:
            None
        """
        logger.debug("Starting run_get_image")
        if self.get_current_state() == State.Acquisition:
            await self.get_image()
        while self.get_current_state() == State.Acquisition:
            if self.current_telemetry is None:
                logger.debug(
                    "No telemetry data available. Assuming Observation Refresh Rate."
                )
                delay_in_s = float(settings.OBSERVATION_REFRESH_RATE)
            else:
                current_total_vel = (
                    self.current_telemetry.vx + self.current_telemetry.vy
                )
                if self._accelerating:
                    # If accelerating calculate distance based on current speed and acceleration
                    delay_in_s = (
                        math.sqrt(
                            current_total_vel**2
                            + 2 * con.ACCELERATION * settings.DISTANCE_BETWEEN_IMAGES
                        )
                        - current_total_vel
                    ) / con.ACCELERATION
                else:
                    # When not accelerating calculate distance based on current speed
                    delay_in_s = (
                        float(settings.DISTANCE_BETWEEN_IMAGES) / current_total_vel
                    )
            delay_in_s = delay_in_s / self.get_simulation_speed()
            logger.debug(f"Next image in {delay_in_s}s.")
            image_task = Timer(timeout=delay_in_s, callback=self.get_image).get_task()
            await asyncio.gather(image_task)

    # run once after changing into acquisition mode -> setup
    async def control_acquisition(self) -> None:
        """Initializes acquisition mode by updating objectives and setting camera parameters.

        Retrieves objectives from an external API, determines the current objective, and adjusts
        acquisition parameters accordingly. Also creates necessary directories for image storage.

        Returns:
            None
        """
        async with aiohttp.ClientSession() as session:
            # update Objectives
            async with session.get(con.OBJECTIVE_ENDPOINT) as response:
                if response.status == 200:
                    json_response = await response.json()
                    self._z_obj_list: list[ZonedObjective] = ZonedObjective.parse_api(
                        json_response
                    )
                    logger.info(
                        f"Updated objectives, there are {len(self._z_obj_list)} objectives."
                    )
                else:
                    logger.error("Could not get OBJECTIVE_ENDPOINT")

        current_obj = None
        # Always check for new objective in this task
        if settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective:
            current_obj = self._z_obj_list[0]

            logger.error(f"Using next_objective, current task: {current_obj.name}")

        # In this task look for the given string
        elif settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective:
            for obj in self._z_obj_list:
                if obj.name == settings.FIXED_OBJECTIVE:
                    logger.error(f"Using fixed_objective, current task: {obj}")
                    current_obj = obj
                    break

        if current_obj:
            settings.START_TIME = current_obj.start
            settings.STOP_TIME = current_obj.end
            settings.TARGET_CAMERA_ANGLE_ACQUISITION = current_obj.optic_required

            self._current_obj_name = str(current_obj.id) + current_obj.name.replace(
                " ", ""
            )

            con.IMAGE_PATH = con.IMAGE_PATH_BASE + self._current_obj_name + "/"

            con.IMAGE_LOCATION = (
                con.IMAGE_PATH
                + "image_{melv_id}_{angle}_{time}_x_{cor_x}_y_{cor_y}.png"
            )
            try:
                subprocess.run(["mkdir", con.IMAGE_PATH], check=True)
                logger.info(f"Created folder: {con.IMAGE_PATH}")

            except subprocess.CalledProcessError as e:
                logger.info(f"z_obj could not mkdir: {e}")

        # check if change occured and cut the last image

        await self.trigger_camera_angle_change(settings.TARGET_CAMERA_ANGLE_ACQUISITION)

        # TODO stop velocity change if battery is low
        if self.current_telemetry and self.current_telemetry.battery < 10:
            logger.error("Battery low, cant accelerate any more!")

        match settings.TARGET_CAMERA_ANGLE_ACQUISITION:
            case CameraAngle.Wide:
                await self.trigger_velocity_change(
                    settings.TARGET_SPEED_WIDE_X, settings.TARGET_SPEED_WIDE_Y
                )
            case CameraAngle.Narrow:
                await self.trigger_velocity_change(
                    settings.TARGET_SPEED_NARROW_X, settings.TARGET_SPEED_NARROW_Y
                )
            case CameraAngle.Normal:
                await self.trigger_velocity_change(
                    settings.TARGET_SPEED_NORMAL_X, settings.TARGET_SPEED_NORMAL_Y
                )
            case _:
                pass

current_telemetry = None class-attribute instance-attribute

melv_id = random.randint(0, 9999) class-attribute instance-attribute

previous_state = None class-attribute instance-attribute

previous_telemetry = None class-attribute instance-attribute

recent_events = [] class-attribute instance-attribute

state_change_time = datetime.datetime.now() class-attribute instance-attribute

submitted_transition_request = False class-attribute instance-attribute

target_state = None class-attribute instance-attribute

calc_current_location()

Estimates the current location based on telemetry data and time elapsed.

Returns:

Type Description
tuple[float, float]

tuple[float, float]: The estimated (x, y) coordinates.

Source code in src/melvonaut/state_planer.py
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
def calc_current_location(self) -> tuple[float, float]:
    """Estimates the current location based on telemetry data and time elapsed.

    Returns:
        tuple[float, float]: The estimated (x, y) coordinates.
    """
    if self.current_telemetry is None:
        return 0.0, 0.0
    time_since_observation = (
        live_utc() - self.current_telemetry.timestamp
    ).total_seconds()
    current_x = (
        self.current_telemetry.width_x
        + self.current_telemetry.vx * time_since_observation
    )
    current_y = (
        self.current_telemetry.height_y
        + self.current_telemetry.vy * time_since_observation
    )
    return current_x, current_y

calc_transition_remaining_time()

Calculates the remaining time for state transition.

Returns:

Type Description
timedelta

datetime.timedelta: The remaining transition time based on the simulation speed.

Source code in src/melvonaut/state_planer.py
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
def calc_transition_remaining_time(self) -> datetime.timedelta:
    """Calculates the remaining time for state transition.

    Returns:
        datetime.timedelta: The remaining transition time based on the simulation speed.
    """
    if self.get_current_state() is State.Transition:
        logger.debug("Not in transition state, returning 0")
        return datetime.timedelta(0)
    elif self.previous_state == State.Safe:
        total_time = datetime.timedelta(
            seconds=con.STATE_TRANSITION_FROM_SAFE_TIME
            / self.get_simulation_speed()
        )
        return total_time - self.get_time_since_state_change()
    else:
        total_time = datetime.timedelta(
            seconds=con.STATE_TRANSITION_TIME / self.get_simulation_speed()
        )
        return total_time - self.get_time_since_state_change()

control_acquisition() async

Initializes acquisition mode by updating objectives and setting camera parameters.

Retrieves objectives from an external API, determines the current objective, and adjusts acquisition parameters accordingly. Also creates necessary directories for image storage.

Returns:

Type Description
None

None

Source code in src/melvonaut/state_planer.py
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
async def control_acquisition(self) -> None:
    """Initializes acquisition mode by updating objectives and setting camera parameters.

    Retrieves objectives from an external API, determines the current objective, and adjusts
    acquisition parameters accordingly. Also creates necessary directories for image storage.

    Returns:
        None
    """
    async with aiohttp.ClientSession() as session:
        # update Objectives
        async with session.get(con.OBJECTIVE_ENDPOINT) as response:
            if response.status == 200:
                json_response = await response.json()
                self._z_obj_list: list[ZonedObjective] = ZonedObjective.parse_api(
                    json_response
                )
                logger.info(
                    f"Updated objectives, there are {len(self._z_obj_list)} objectives."
                )
            else:
                logger.error("Could not get OBJECTIVE_ENDPOINT")

    current_obj = None
    # Always check for new objective in this task
    if settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective:
        current_obj = self._z_obj_list[0]

        logger.error(f"Using next_objective, current task: {current_obj.name}")

    # In this task look for the given string
    elif settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective:
        for obj in self._z_obj_list:
            if obj.name == settings.FIXED_OBJECTIVE:
                logger.error(f"Using fixed_objective, current task: {obj}")
                current_obj = obj
                break

    if current_obj:
        settings.START_TIME = current_obj.start
        settings.STOP_TIME = current_obj.end
        settings.TARGET_CAMERA_ANGLE_ACQUISITION = current_obj.optic_required

        self._current_obj_name = str(current_obj.id) + current_obj.name.replace(
            " ", ""
        )

        con.IMAGE_PATH = con.IMAGE_PATH_BASE + self._current_obj_name + "/"

        con.IMAGE_LOCATION = (
            con.IMAGE_PATH
            + "image_{melv_id}_{angle}_{time}_x_{cor_x}_y_{cor_y}.png"
        )
        try:
            subprocess.run(["mkdir", con.IMAGE_PATH], check=True)
            logger.info(f"Created folder: {con.IMAGE_PATH}")

        except subprocess.CalledProcessError as e:
            logger.info(f"z_obj could not mkdir: {e}")

    # check if change occured and cut the last image

    await self.trigger_camera_angle_change(settings.TARGET_CAMERA_ANGLE_ACQUISITION)

    # TODO stop velocity change if battery is low
    if self.current_telemetry and self.current_telemetry.battery < 10:
        logger.error("Battery low, cant accelerate any more!")

    match settings.TARGET_CAMERA_ANGLE_ACQUISITION:
        case CameraAngle.Wide:
            await self.trigger_velocity_change(
                settings.TARGET_SPEED_WIDE_X, settings.TARGET_SPEED_WIDE_Y
            )
        case CameraAngle.Narrow:
            await self.trigger_velocity_change(
                settings.TARGET_SPEED_NARROW_X, settings.TARGET_SPEED_NARROW_Y
            )
        case CameraAngle.Normal:
            await self.trigger_velocity_change(
                settings.TARGET_SPEED_NORMAL_X, settings.TARGET_SPEED_NORMAL_Y
            )
        case _:
            pass

get_current_state()

Retrieves the current state from telemetry data.

Returns:

Name Type Description
State State

The current state if telemetry is available, otherwise State.Unknown.

Source code in src/melvonaut/state_planer.py
68
69
70
71
72
73
74
75
76
def get_current_state(self) -> State:
    """Retrieves the current state from telemetry data.

    Returns:
        State: The current state if telemetry is available, otherwise State.Unknown.
    """
    if self.current_telemetry is None:
        return State.Unknown
    return self.current_telemetry.state

get_image() async

Captures an image if telemetry data is available and conditions are met.

If no telemetry data is available, the function waits for the next observation cycle. It checks various conditions (e.g., acceleration, camera angle, timing) before fetching an image from an external API and saving it with appropriate metadata.

Returns:

Type Description
None

None

Source code in src/melvonaut/state_planer.py
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
async def get_image(self) -> None:
    """Captures an image if telemetry data is available and conditions are met.

    If no telemetry data is available, the function waits for the next observation cycle.
    It checks various conditions (e.g., acceleration, camera angle, timing) before fetching an image
    from an external API and saving it with appropriate metadata.

    Returns:
        None
    """
    logger.debug("Getting image")
    if self.current_telemetry is None:
        logger.warning(
            f"No telemetry data available. Waiting {settings.OBSERVATION_REFRESH_RATE}s for next image."
        )
        image_task = Timer(
            timeout=settings.OBSERVATION_REFRESH_RATE, callback=self.get_image
        ).get_task()
        await asyncio.gather(image_task)
        return
    if not self._aiohttp_session:
        self._aiohttp_session = aiohttp.ClientSession()

    # Filter out cases where no image should be taken

    if (
        settings.CURRENT_MELVIN_TASK == MELVINTask.Fixed_objective
        or settings.CURRENT_MELVIN_TASK == MELVINTask.Next_objective
    ) and not self._z_obj_list:
        logger.warning(
            "Skipped image: In Objectives_only mode, but z_obj_list emtpy!"
        )
        return

    if self.current_telemetry.angle != settings.TARGET_CAMERA_ANGLE_ACQUISITION:
        logger.info(
            f"Skipped image: current_angle={self.current_telemetry.angle} vs target={settings.TARGET_CAMERA_ANGLE_ACQUISITION}"
        )
        return
    if self._accelerating:
        logger.info(
            f"Skipped image: accelerating to: {self._target_vel_x} {self._target_vel_y}"
        )
        return

    if settings.DO_TIMING_CHECK and settings.START_TIME > datetime.datetime.now(
        datetime.timezone.utc
    ):
        logger.warning(
            f"Skipped image, to early: start={settings.START_TIME} current_time={live_utc()}"
        )
        return
    if settings.DO_TIMING_CHECK and live_utc() > settings.STOP_TIME:
        logger.warning(
            f"Skipped image, to late: end={settings.STOP_TIME} current_time={live_utc()}"
        )
        return

    # save the current telemetry values, so they dont get overwritten by a later update
    tele_timestamp = self.current_telemetry.timestamp
    tele_x = self.current_telemetry.width_x
    tele_y = self.current_telemetry.height_y
    tele_vx = self.current_telemetry.vx
    tele_vy = self.current_telemetry.vy
    tele_simSpeed = self.get_simulation_speed()
    tele_angle = self.current_telemetry.angle

    lens_size = lens_size_by_angle(self.current_telemetry.angle)
    """
    # TODO add box check if melvin and objective overlap
    # check if we are in range of an objective
    # TODO also check MELVINTasks
    # TODO check if within box, unless hidden
    melvin_box = (
        tele_x - lens_size / 2,
        tele_y - lens_size / 2,
        tele_x + lens_size / 2,
        tele_y + lens_size / 2,
    )
    if self._z_obj_list[0].zone is None:
        logger.warning("Hidden objective, taking photo")
        return
        TODO
    else:
        logger.warning("Checking if in range of objective:")
        objective_box = self._z_obj_list[0].zone

    if not boxes_overlap_in_grid(melvin_box, objective_box):
        logger.error(
            f"Image skipped, not Overlapping! {melvin_box} {objective_box}"
        )
        return
    """
    async with aiohttp.ClientSession() as session:
        try:
            async with session.get(con.IMAGE_ENDPOINT) as response:
                if response.status == 200:
                    # Extract exact image timestamp
                    img_timestamp = response.headers.get("image-timestamp")
                    if img_timestamp is None:
                        logger.error(
                            "Image timestamp not found in headers, substituting with current time"
                        )
                        parsed_img_timestamp = datetime.datetime.now()
                    else:
                        parsed_img_timestamp = datetime.datetime.fromisoformat(
                            img_timestamp
                        )

                    # Calculate the difference between the img and the last telemetry
                    difference_in_seconds = (
                        parsed_img_timestamp - tele_timestamp
                    ).total_seconds()

                    adj_x = round(
                        tele_x + (difference_in_seconds * tele_vx * tele_simSpeed)
                    ) - (lens_size / 2)
                    adj_y = round(
                        tele_y + (difference_in_seconds * tele_vy * tele_simSpeed)
                    ) - (lens_size / 2)

                    # TODO check if images are correct!
                    # TODO might also need modulo for side cases
                    # logger.debug(f"T {parsed_img_timestamp} | C {tele_timestamp}")
                    # logger.debug(
                    #     f"D {difference_in_seconds} | R {tele_x} ADJ {adj_x}"
                    # )

                    image_path = con.IMAGE_LOCATION.format(
                        melv_id=self._current_obj_name,
                        angle=tele_angle,
                        time=parsed_img_timestamp.strftime("%Y-%m-%dT%H:%M:%S.%f"),
                        cor_x=int(adj_x),
                        cor_y=int(adj_y),
                    )

                    logger.info(
                        f"Received image at {adj_x}x {adj_y}y with {self.current_telemetry.angle} angle"
                    )

                    async with async_open(image_path, "wb") as afp:
                        while True:
                            cnt = await response.content.readany()
                            if not cnt:
                                break
                            await afp.write(cnt)
                else:
                    logger.warning(f"Failed to get image: {response.status}")
                    logger.info(f"Response body: {await response.text()}")
                    logger.info(
                        "This is normal at the end of acquisition mode once."
                    )
        except aiohttp.client_exceptions.ConnectionTimeoutError:
            logger.warning("Observations endpoint timeouted.")
        except asyncio.exceptions.CancelledError:
            logger.warning("Get image task was cancelled.")

get_previous_state()

Retrieves the previous state from telemetry data.

Returns:

Name Type Description
State State

The previous state if telemetry is available, otherwise State.Unknown.

Source code in src/melvonaut/state_planer.py
78
79
80
81
82
83
84
85
86
def get_previous_state(self) -> State:
    """Retrieves the previous state from telemetry data.

    Returns:
        State: The previous state if telemetry is available, otherwise State.Unknown.
    """
    if self.previous_telemetry is None:
        return State.Unknown
    return self.previous_telemetry.state

get_simulation_speed()

Gets the current simulation speed from telemetry data.

Returns:

Name Type Description
int int

The simulation speed if telemetry is available, otherwise 1.

Source code in src/melvonaut/state_planer.py
88
89
90
91
92
93
94
95
96
def get_simulation_speed(self) -> int:
    """Gets the current simulation speed from telemetry data.

    Returns:
        int: The simulation speed if telemetry is available, otherwise 1.
    """
    if self.current_telemetry is None:
        return 1
    return self.current_telemetry.simulation_speed

get_time_since_state_change()

Calculates the time elapsed since the last state change.

Returns:

Type Description
timedelta

datetime.timedelta: The time difference between now and the last state change.

Source code in src/melvonaut/state_planer.py
 98
 99
100
101
102
103
104
def get_time_since_state_change(self) -> datetime.timedelta:
    """Calculates the time elapsed since the last state change.

    Returns:
        datetime.timedelta: The time difference between now and the last state change.
    """
    return datetime.datetime.now() - self.state_change_time

model_post_init(__context__)

Initializes the recent_events list by loading events from a CSV file.

Parameters:

Name Type Description Default
__context__ Any

Context data passed during initialization.

required
Source code in src/melvonaut/state_planer.py
60
61
62
63
64
65
66
def model_post_init(self, __context__: Any) -> None:
    """Initializes the recent_events list by loading events from a CSV file.

    Args:
        __context__ (Any): Context data passed during initialization.
    """
    self.recent_events = Event.load_events_from_csv(path=con.EVENT_LOCATION_CSV)

plan_state_switching() async

Plans and executes state transitions based on current telemetry data.

This function checks the current state and decides whether to transition to another state based on conditions like battery level and velocity.

Logs relevant debug information and triggers state transitions when necessary.

Returns:

Type Description
None

None

Source code in src/melvonaut/state_planer.py
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
async def plan_state_switching(self) -> None:
    """Plans and executes state transitions based on current telemetry data.

    This function checks the current state and decides whether to transition
    to another state based on conditions like battery level and velocity.

    Logs relevant debug information and triggers state transitions when necessary.

    Returns:
        None
    """
    if self.current_telemetry is None:
        logger.warning("No telemetry data available. Cannot plan state switching.")
        return

    state = self.get_current_state()

    match state:
        case State.Transition:
            logger.debug(
                f"Time since state change: {self.get_time_since_state_change()}"
            )
            expected_time_to_complete = self.calc_transition_remaining_time()
            limited_log(
                f"State is Transition to {self.target_state}, waiting for transition to complete.\nExpected time to complete state transition: {expected_time_to_complete}"
            )
            # logger.debug(
            #     f"Previous state: {self.get_previous_state()}, Current state: {self.get_current_state()}"
            # )
        case State.Acquisition:
            # in EBT leave once everything is set
            if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT:
                if (
                    self._target_vel_x
                    and self._target_vel_y
                    and self.current_telemetry.angle
                    == settings.TARGET_CAMERA_ANGLE_ACQUISITION
                    and self._target_vel_x == self.current_telemetry.vx
                    and self._target_vel_y == self.current_telemetry.vy
                ):
                    await self.trigger_state_transition(State.Communication)

            await self.switch_if_battery_low(State.Charge, State.Acquisition)

        case State.Charge:
            if (
                self.current_telemetry.battery
                >= self.current_telemetry.max_battery
                - settings.BATTERY_HIGH_THRESHOLD
            ):
                if settings.CURRENT_MELVIN_TASK == MELVINTask.EBT:
                    # starting ebt, but speed/angle not set yet

                    logger.info(
                        f"EBT Task, Angle: telemetry: {self.current_telemetry.angle} vs target: {settings.TARGET_CAMERA_ANGLE_ACQUISITION}"
                    )
                    logger.info(
                        f"EBT Task, vx: {self.current_telemetry.vx} vs target: {self._target_vel_x}"
                    )
                    logger.info(
                        f"EBT Task, vy: {self.current_telemetry.vy} vs target: {self._target_vel_y}"
                    )
                    if (
                        self._target_vel_x is None
                        or self._target_vel_y is None
                        or self.current_telemetry.angle
                        != settings.TARGET_CAMERA_ANGLE_ACQUISITION
                        or self._target_vel_x != self.current_telemetry.vx
                        or self._target_vel_y != self.current_telemetry.vy
                    ):
                        await self.trigger_state_transition(State.Acquisition)
                    else:
                        # logger.info("starting comms!")
                        await self.trigger_state_transition(State.Communication)

                else:
                    # logger.info("starting acq!")
                    await self.trigger_state_transition(State.Acquisition)

        case State.Safe:
            if self.current_telemetry.battery >= (
                self.current_telemetry.max_battery * 0.5
            ):
                await self.trigger_state_transition(State.Acquisition)
            else:
                await self.trigger_state_transition(State.Charge)
            await self.switch_if_battery_low(State.Charge, State.Acquisition)
        case State.Communication:
            await self.switch_if_battery_low(State.Charge, State.Communication)
        case State.Deployment:
            logger.debug(
                "State is Deployment, triggering transition to Acquisition"
            )
            await self.trigger_state_transition(State.Acquisition)

run_get_image() async

Continuously captures images while in the Acquisition state.

This function continuously captures images at calculated intervals, adjusting timing based on velocity and acceleration.

Returns:

Type Description
None

None

Source code in src/melvonaut/state_planer.py
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
async def run_get_image(self) -> None:
    """Continuously captures images while in the Acquisition state.

    This function continuously captures images at calculated intervals,
    adjusting timing based on velocity and acceleration.

    Returns:
        None
    """
    logger.debug("Starting run_get_image")
    if self.get_current_state() == State.Acquisition:
        await self.get_image()
    while self.get_current_state() == State.Acquisition:
        if self.current_telemetry is None:
            logger.debug(
                "No telemetry data available. Assuming Observation Refresh Rate."
            )
            delay_in_s = float(settings.OBSERVATION_REFRESH_RATE)
        else:
            current_total_vel = (
                self.current_telemetry.vx + self.current_telemetry.vy
            )
            if self._accelerating:
                # If accelerating calculate distance based on current speed and acceleration
                delay_in_s = (
                    math.sqrt(
                        current_total_vel**2
                        + 2 * con.ACCELERATION * settings.DISTANCE_BETWEEN_IMAGES
                    )
                    - current_total_vel
                ) / con.ACCELERATION
            else:
                # When not accelerating calculate distance based on current speed
                delay_in_s = (
                    float(settings.DISTANCE_BETWEEN_IMAGES) / current_total_vel
                )
        delay_in_s = delay_in_s / self.get_simulation_speed()
        logger.debug(f"Next image in {delay_in_s}s.")
        image_task = Timer(timeout=delay_in_s, callback=self.get_image).get_task()
        await asyncio.gather(image_task)

switch_if_battery_low(state_low_battery, state_high_battery) async

Switches state based on battery level.

Parameters:

Name Type Description Default
state_low_battery State

The state to switch to when battery is low.

required
state_high_battery State

The state to switch to when battery is sufficient.

required
Source code in src/melvonaut/state_planer.py
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
async def switch_if_battery_low(
    self, state_low_battery: State, state_high_battery: State
) -> None:
    """Switches state based on battery level.

    Args:
        state_low_battery (State): The state to switch to when battery is low.
        state_high_battery (State): The state to switch to when battery is sufficient.
    """
    if self.current_telemetry is None:
        logger.warning(
            "No telemetry data available. Cannot plan battery based switching."
        )
        return
    if self.current_telemetry.battery <= settings.BATTERY_LOW_THRESHOLD:
        if self.get_current_state() == state_low_battery:
            return
        logger.debug(
            f"State is {self.get_current_state()}, Battery is low, triggering transition to {state_low_battery}"
        )
        await self.trigger_state_transition(state_low_battery)
    else:
        if self.get_current_state() == state_high_battery:
            return
        logger.debug(
            f"State is {self.get_current_state()}, Battery is high, triggering transition to {state_high_battery}"
        )
        await self.trigger_state_transition(state_high_battery)

trigger_camera_angle_change(new_angle) async

Tries to change the camera angle to new_angle

Parameters:

Name Type Description Default
new_angle CameraAngle

The desired camera angle.

required
Source code in src/melvonaut/state_planer.py
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
async def trigger_camera_angle_change(self, new_angle: CameraAngle) -> None:
    """Tries to change the camera angle to new_angle

    Args:
        new_angle (CameraAngle): The desired camera angle.
    """
    if self.current_telemetry is None:
        logger.warning("No telemetry data available. Cannot set camera angle.")
        return
    if new_angle == self.current_telemetry.angle:
        logger.info("Target camera angle already set. Not changing angle.")
        return
    request_body = {
        "vel_x": self.current_telemetry.vx,
        "vel_y": self.current_telemetry.vy,
        "camera_angle": new_angle,
        "state": self.get_current_state(),
    }
    async with aiohttp.ClientSession() as session:
        async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
            if response.status == 200:
                self.current_telemetry.angle = new_angle
                logger.info(f"Camera angle set to {new_angle}")
            else:
                logger.error(f"Failed to set camera angle to {new_angle}")

trigger_state_transition(new_state) async

Initiates a state transition if valid conditions are met.

Parameters:

Name Type Description Default
new_state State

The target state to transition to.

required
Source code in src/melvonaut/state_planer.py
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
async def trigger_state_transition(self, new_state: State) -> None:
    """Initiates a state transition if valid conditions are met.

    Args:
        new_state (State): The target state to transition to.
    """
    if new_state in [State.Transition, State.Unknown, State.Deployment, State.Safe]:
        logger.warning(f"Cannot transition to {new_state}.")
        return
    if self.current_telemetry is None:
        logger.warning("No telemetry data available. Cannot initiate transition.")
        return
    if self.current_telemetry.state == State.Transition:
        logger.debug("Already in transition state, not starting transition.")
        return
    if new_state == self.get_current_state():
        logger.debug(f"State is already {new_state}, not starting transition.")
        return
    request_body = {
        "state": new_state,
        "vel_x": self.current_telemetry.vx,
        "vel_y": self.current_telemetry.vy,
        "camera_angle": self.current_telemetry.angle,
    }
    async with aiohttp.ClientSession() as session:
        async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
            if response.status == 200:
                logger.info(
                    f"Started transition to {new_state} at battery level {self.current_telemetry.battery}"
                )
                self.submitted_transition_request = True
                self.target_state = new_state
            else:
                logger.warning(
                    f"Failed to transition to {new_state}: {response.status}"
                )
                logger.debug(f"Response body: {await response.text()}")

trigger_velocity_change(new_vel_x, new_vel_y) async

Sets new values for accelartion, also set _accelerating

Parameters:

Name Type Description Default
new_vel_x float

The target velocity in the x direction.

required
new_vel_y float

The target velocity in the y direction.

required
Source code in src/melvonaut/state_planer.py
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
async def trigger_velocity_change(self, new_vel_x: float, new_vel_y: float) -> None:
    """Sets new values for accelartion, also set _accelerating

    Args:
        new_vel_x (float): The target velocity in the x direction.
        new_vel_y (float): The target velocity in the y direction.
    """

    self._target_vel_x = new_vel_x
    self._target_vel_y = new_vel_y

    if self.current_telemetry is None:
        logger.warning("No telemetry data available. Cannot set velocity.")
        return
    if (
        new_vel_x == self.current_telemetry.vx
        and new_vel_y == self.current_telemetry.vy
    ):
        self._accelerating = False
        logger.info("Target velocity already set. Not changing velocity.")
        return
    request_body = {
        "vel_x": new_vel_x,
        "vel_y": new_vel_y,
        "camera_angle": self.current_telemetry.angle,
        "state": self.get_current_state(),
    }
    async with aiohttp.ClientSession() as session:
        async with session.put(con.CONTROL_ENDPOINT, json=request_body) as response:
            if response.status == 200:
                self._accelerating = True
                logger.info(f"Velocity set to {new_vel_x}, {new_vel_y}")
            else:
                logger.error(f"Failed to set velocity to {new_vel_x}, {new_vel_y}")

update_telemetry(new_telemetry) async

Updates the telemetry data and handles state changes.

This function updates the previous and current telemetry readings, logs relevant debug information, and checks for state changes.

If a state change occurs, it logs the transition, cancels image retrieval tasks if necessary, and triggers appropriate actions based on the new state.

Parameters:

Name Type Description Default
new_telemetry MelTelemetry

The new telemetry data to update.

required

Returns:

Type Description
None

None

Source code in src/melvonaut/state_planer.py
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
async def update_telemetry(self, new_telemetry: MelTelemetry) -> None:
    """Updates the telemetry data and handles state changes.

    This function updates the previous and current telemetry readings,
    logs relevant debug information, and checks for state changes.

    If a state change occurs, it logs the transition, cancels image
    retrieval tasks if necessary, and triggers appropriate actions based on
    the new state.

    Args:
        new_telemetry (MelTelemetry): The new telemetry data to update.

    Returns:
        None
    """
    self.previous_telemetry = self.current_telemetry
    self.current_telemetry = new_telemetry

    logger.debug(
        f"New observations - State: {self.get_current_state()},"
        f" Battery level: {self.current_telemetry.battery}/{self.current_telemetry.max_battery},"
        f" Vel X,Y: {self.current_telemetry.vx}, {self.current_telemetry.vy},"
        f" Fuel: {self.current_telemetry.fuel}"
    )

    logger.debug(
        "Current memory usage: "
        + str(psutil.Process(os.getpid()).memory_info().rss / 1024**2)
        + " MB"
    )

    # if self.get_current_state() == State.Acquisition:
    #    await self.get_image()
    # logger.debug(f"Threads: {threading.active_count()}")
    # for thread in threading.enumerate():
    #    frame = sys._current_frames()[thread.ident]
    #    logger.warning(f"{inspect.getframeinfo(frame).filename}.{inspect.getframeinfo(frame).function}:{inspect.getframeinfo(frame).lineno}")

    # check if still accelerating
    if (
        self._target_vel_x == self.current_telemetry.vx
        and self._target_vel_y == self.current_telemetry.vy
    ):
        self._accelerating = False

    if self.previous_telemetry is not None:
        if self.get_previous_state() != self.get_current_state():
            logger.info(
                f"State changed from {self.get_previous_state()} to {self.get_current_state()}"
            )
            self.previous_state = self.get_previous_state()
            self.state_change_time = datetime.datetime.now()
            # Put in here events to do on state change
            if settings.TRACING:
                if self.previous_state == State.Transition:
                    snapshot1 = tracemalloc.take_snapshot()
                    stats = snapshot1.statistics("traceback")
                    for stat in stats:
                        logger.warning(
                            "%s memory blocks: %.1f KiB"
                            % (stat.count, stat.size / 1024)
                        )
                        for line in stat.traceback.format():
                            logger.warning(line)

            # logger.debug(
            #     f"Previous state: {self.previous_state}, Current state: {self.get_current_state()}"
            # )
            match self.get_current_state():
                case State.Transition:
                    if self._run_get_image_task:
                        logger.debug("end image")
                        self._run_get_image_task.cancel()
                        self._run_get_image_task = None
                    if self.submitted_transition_request:
                        self.submitted_transition_request = False
                    else:
                        logger.warning("State transition was externally triggered!")
                case State.Acquisition:
                    logger.info("Starting control in acquisition state.")
                    if self._run_get_image_task:
                        logger.debug("Image task already running")
                    else:
                        logger.debug("start image")
                        loop = asyncio.get_event_loop()
                        self._run_get_image_task = loop.create_task(
                            self.run_get_image()
                        )
                    await self.control_acquisition()
                case State.Charge:
                    pass
                case State.Safe:
                    logger.warning("State transitioned to SAFE!")
                case State.Communication:
                    pass
                case State.Deployment:
                    logger.warning("State transitioned to DEPLOYMENT!")
                case _:
                    logger.warning(f"Unknown state {self.get_current_state()}")
            if self.get_current_state() != State.Acquisition:
                self._accelerating = False
            if self.get_current_state() != State.Transition:
                if self.target_state != self.get_current_state():
                    logger.warning(
                        f"Planned state transition to {self.target_state} failed, now in {self.get_current_state()}"
                    )
                else:
                    logger.debug(
                        f"Planned state transition to {self.target_state} succeeded."
                    )
                self.target_state = None

        await self.plan_state_switching()

utils

file_log_handler_id = None module-attribute

melvin_notifier(body, title, notify_type, *args, **kwargs)

Melvin-specific notification handler.

Just prints to stdout.

Parameters:

Name Type Description Default
body str

The message body.

required
title str

The notification title.

required
notify_type NotifyType

The type of notification.

required
*args Any

Additional arguments.

()
**kwargs dict[str, Any]

Additional keyword arguments.

{}
Source code in src/melvonaut/utils.py
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@apprise.decorators.notify(on="melvin")  # type: ignore
def melvin_notifier(
    body: str, title: str, notify_type: NotifyType, *args: Any, **kwargs: dict[str, Any]
) -> None:
    """Melvin-specific notification handler.

    Just prints to stdout.

    Args:
        body (str): The message body.
        title (str): The notification title.
        notify_type (NotifyType): The type of notification.
        *args (Any): Additional arguments.
        **kwargs (dict[str, Any]): Additional keyword arguments.
    """
    print("MELVIN HERE!")

setup_file_logging()

Configures file-based logging with rotation at midnight.

If a file log handler already exists, it is removed before adding a new one.

Source code in src/melvonaut/utils.py
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
def setup_file_logging() -> None:
    """Configures file-based logging with rotation at midnight.

    If a file log handler already exists, it is removed before adding a new one.
    """
    global file_log_handler_id
    if file_log_handler_id is not None:
        logger.remove(file_log_handler_id)  # type: ignore
    file_log_handler_id = logger.add(
        sink=con.MEL_LOG_LOCATION,
        rotation="00:00",
        level=settings.FILE_LOGGING_LEVEL,
        backtrace=True,
        diagnose=True,
        enqueue=True,
    )

setup_logging()

Configures the logging system for the application.

This function removes existing log handlers, sets up terminal logging, and configures Apprise notifications for Discord and Melvin events.

Source code in src/melvonaut/utils.py
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def setup_logging() -> None:
    """Configures the logging system for the application.

    This function removes existing log handlers, sets up terminal logging,
    and configures Apprise notifications for Discord and Melvin events.
    """
    logger.remove()
    logger.add(
        sink=sys.stdout,
        level=settings.TERMINAL_LOGGING_LEVEL,
        backtrace=True,
        diagnose=True,
        enqueue=True,
    )
    notifier = apprise.Apprise()
    if settings.DISCORD_WEBHOOK_TOKEN and settings.DISCORD_ALERTS_ENABLED:
        notifier.add(f"discord://{settings.DISCORD_WEBHOOK_TOKEN}")
        logger.add(notifier.notify, level="ERROR", filter={"apprise": False})  # type: ignore

    if settings.NETWORK_SIM_ENABLED:
        notifier.add("melvin://")
        logger.add(notifier.notify, level="ERROR", filter={"apprise": False})  # type: ignore

    setup_file_logging()