Coverage for src/melvonaut/mel_telemetry.py: 68%
53 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
1##### TELEMETRY #####
2import asyncio
3import datetime
4import json
5from typing import Any
7from aiofile import async_open
8from pathlib import Path
9import csv
11import shared.constants as con
12from shared.models import BaseTelemetry
13from loguru import logger
16class MelTelemetry(BaseTelemetry):
17 timestamp: datetime.datetime
19 async def store_observation_csv(self) -> None:
20 """
21 Stores the telemetry observation data in a CSV file.
23 This function converts the telemetry data into a flattened dictionary format,
24 ensuring nested dictionaries are stored as separate fields. It then appends
25 the data to an existing CSV file or creates a new file if one does not exist.
27 Args:
28 None
30 Returns:
31 None
32 """
33 tel_dict = self.model_dump()
34 flattened = {}
35 for key, value in tel_dict.items():
36 if isinstance(value, dict):
37 for sub_key, sub_value in value.items():
38 flattened[f"{key}_{sub_key}"] = sub_value
39 else:
40 flattened[key] = value
41 if self.timestamp: 41 ↛ 44line 41 didn't jump to line 44 because the condition on line 41 was always true
42 timestamp = self.timestamp.isoformat()
43 else:
44 timestamp = datetime.datetime.now().isoformat()
45 flattened["timestamp"] = timestamp
47 if not Path(con.TELEMETRY_LOCATION_CSV).is_file():
48 async with async_open(con.TELEMETRY_LOCATION_CSV, "w") as afp:
49 writer = csv.DictWriter(afp, fieldnames=flattened.keys())
50 await writer.writeheader()
51 await writer.writerow(flattened)
52 # logger.debug(f"Writing observation to {con.TELEMETRY_LOCATION_CSV}")
53 else:
54 async with async_open(con.TELEMETRY_LOCATION_CSV, "a") as afp:
55 writer = csv.DictWriter(afp, fieldnames=flattened.keys())
56 await writer.writerow(flattened)
57 # logger.debug(f"Writing observation to {con.TELEMETRY_LOCATION_CSV}")
59 async def store_observation_json(self) -> None:
60 """
61 Stores the telemetry observation data in a JSON file.
63 This function retrieves the existing telemetry data from the JSON file
64 and updates it with a new entry. If the JSON file does not exist, a new
65 one is created. The data is formatted in a structured manner with timestamps
66 as keys.
68 Args:
69 None
71 Returns:
72 None
73 """
74 logger.debug("Storing observation as json.")
75 try:
76 async with async_open(con.TELEMETRY_LOCATION_JSON, "r") as afp:
77 raw_telemetry = await afp.read()
78 dict_telemetry = json.loads(raw_telemetry)
79 except FileNotFoundError:
80 logger.debug(f"{con.TELEMETRY_LOCATION_JSON} does not exist.")
81 dict_telemetry = {}
83 if self.timestamp:
84 timestamp = self.timestamp.isoformat()
85 else:
86 timestamp = datetime.datetime.now().isoformat()
87 new_telemetry_entry = self.model_dump(exclude={"timestamp"})
88 dict_telemetry[timestamp] = new_telemetry_entry
89 json_telemetry = json.dumps(dict_telemetry, indent=4, sort_keys=True)
91 async with async_open(con.TELEMETRY_LOCATION_JSON, "w") as afp:
92 # logger.debug(f"Writing to {con.TELEMETRY_LOCATION_JSON}")
93 await afp.write(str(json_telemetry))
94 logger.debug("Observation stored")
96 def model_post_init(self, __context__: Any) -> None:
97 """
98 Initializes the telemetry model and triggers CSV storage.
100 Args:
101 __context__ (Any): Context data passed during initialization.
103 Returns:
104 None
105 """
106 loop = asyncio.get_event_loop()
107 loop.create_task(self.store_observation_csv())