Coverage for src/melvonaut/mel_telemetry.py: 68%

53 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 09:36 +0000

1##### TELEMETRY ##### 

2import asyncio 

3import datetime 

4import json 

5from typing import Any 

6 

7from aiofile import async_open 

8from pathlib import Path 

9import csv 

10 

11import shared.constants as con 

12from shared.models import BaseTelemetry 

13from loguru import logger 

14 

15 

16class MelTelemetry(BaseTelemetry): 

17 timestamp: datetime.datetime 

18 

19 async def store_observation_csv(self) -> None: 

20 """ 

21 Stores the telemetry observation data in a CSV file. 

22 

23 This function converts the telemetry data into a flattened dictionary format, 

24 ensuring nested dictionaries are stored as separate fields. It then appends 

25 the data to an existing CSV file or creates a new file if one does not exist. 

26 

27 Args: 

28 None 

29 

30 Returns: 

31 None 

32 """ 

33 tel_dict = self.model_dump() 

34 flattened = {} 

35 for key, value in tel_dict.items(): 

36 if isinstance(value, dict): 

37 for sub_key, sub_value in value.items(): 

38 flattened[f"{key}_{sub_key}"] = sub_value 

39 else: 

40 flattened[key] = value 

41 if self.timestamp: 41 ↛ 44line 41 didn't jump to line 44 because the condition on line 41 was always true

42 timestamp = self.timestamp.isoformat() 

43 else: 

44 timestamp = datetime.datetime.now().isoformat() 

45 flattened["timestamp"] = timestamp 

46 

47 if not Path(con.TELEMETRY_LOCATION_CSV).is_file(): 

48 async with async_open(con.TELEMETRY_LOCATION_CSV, "w") as afp: 

49 writer = csv.DictWriter(afp, fieldnames=flattened.keys()) 

50 await writer.writeheader() 

51 await writer.writerow(flattened) 

52 # logger.debug(f"Writing observation to {con.TELEMETRY_LOCATION_CSV}") 

53 else: 

54 async with async_open(con.TELEMETRY_LOCATION_CSV, "a") as afp: 

55 writer = csv.DictWriter(afp, fieldnames=flattened.keys()) 

56 await writer.writerow(flattened) 

57 # logger.debug(f"Writing observation to {con.TELEMETRY_LOCATION_CSV}") 

58 

59 async def store_observation_json(self) -> None: 

60 """ 

61 Stores the telemetry observation data in a JSON file. 

62 

63 This function retrieves the existing telemetry data from the JSON file 

64 and updates it with a new entry. If the JSON file does not exist, a new 

65 one is created. The data is formatted in a structured manner with timestamps 

66 as keys. 

67 

68 Args: 

69 None 

70 

71 Returns: 

72 None 

73 """ 

74 logger.debug("Storing observation as json.") 

75 try: 

76 async with async_open(con.TELEMETRY_LOCATION_JSON, "r") as afp: 

77 raw_telemetry = await afp.read() 

78 dict_telemetry = json.loads(raw_telemetry) 

79 except FileNotFoundError: 

80 logger.debug(f"{con.TELEMETRY_LOCATION_JSON} does not exist.") 

81 dict_telemetry = {} 

82 

83 if self.timestamp: 

84 timestamp = self.timestamp.isoformat() 

85 else: 

86 timestamp = datetime.datetime.now().isoformat() 

87 new_telemetry_entry = self.model_dump(exclude={"timestamp"}) 

88 dict_telemetry[timestamp] = new_telemetry_entry 

89 json_telemetry = json.dumps(dict_telemetry, indent=4, sort_keys=True) 

90 

91 async with async_open(con.TELEMETRY_LOCATION_JSON, "w") as afp: 

92 # logger.debug(f"Writing to {con.TELEMETRY_LOCATION_JSON}") 

93 await afp.write(str(json_telemetry)) 

94 logger.debug("Observation stored") 

95 

96 def model_post_init(self, __context__: Any) -> None: 

97 """ 

98 Initializes the telemetry model and triggers CSV storage. 

99 

100 Args: 

101 __context__ (Any): Context data passed during initialization. 

102 

103 Returns: 

104 None 

105 """ 

106 loop = asyncio.get_event_loop() 

107 loop.create_task(self.store_observation_csv())