Coverage for src/melvonaut/ebt_calc.py: 0%
139 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
1import os
2import sys
3import math
4from loguru import logger
6import shared.constants as con
7from melvonaut.settings import settings
8from shared.models import Event, Ping
10import matplotlib.pyplot as plt
11from matplotlib.lines import Line2D
12import matplotlib.patches as patches
14##### LOGGING #####
15logger.remove()
16logger.add(
17 sink=sys.stderr, level=settings.FILE_LOGGING_LEVEL, backtrace=True, diagnose=True
18)
20# [CONSTANTS]
21scaling_factor = 1
22x_0 = 0
23y_0 = 0
24x_max = int(con.WORLD_X / scaling_factor)
25y_max = int(con.WORLD_Y / scaling_factor)
26max_offset = 325
29# [HELPER]
30def f(d: float) -> float:
31 """
32 Computes a transformed distance value.
34 Args:
35 d (float): Input distance value.
37 Returns:
38 float: Transformed distance value.
39 """
40 res = 225 + ((0.4 * (d + 1)) / 4)
41 return float(res)
44def distance(x1: int, x2: int, y1: int, y2: int) -> float:
45 """
46 Computes the Euclidean distance between two points, adjusting for wraparound conditions.
48 Args:
49 x1 (int): X-coordinate of the first point.
50 x2 (int): X-coordinate of the second point.
51 y1 (int): Y-coordinate of the first point.
52 y2 (int): Y-coordinate of the second point.
54 Returns:
55 float: The Euclidean distance between the two points.
56 """
57 if x1 > con.WORLD_X:
58 x1 = x1 % con.WORLD_X
59 while x1 < 0:
60 x1 += con.WORLD_X
61 if y1 > con.WORLD_Y:
62 y1 = y1 % con.WORLD_Y
63 while y1 < 0:
64 y1 += con.WORLD_Y
65 if x2 > con.WORLD_X:
66 x2 = x2 % con.WORLD_X
67 while x2 < 0:
68 x2 += con.WORLD_X
69 if y2 > con.WORLD_Y:
70 y2 = y2 % con.WORLD_Y
71 while y2 < 0:
72 y2 += con.WORLD_Y
74 # check for case at edge
75 a = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
76 b = math.sqrt((con.WORLD_X + x2 - x1) ** 2 + (y2 - y1) ** 2)
77 c = math.sqrt((x2 - x1 - con.WORLD_X) ** 2 + (y2 - y1) ** 2)
78 d = math.sqrt((x2 - x1) ** 2 + (con.WORLD_Y + y2 - y1) ** 2)
79 e = math.sqrt((x2 - x1) ** 2 + (y2 - y1 - con.WORLD_Y) ** 2)
80 return min(a, b, c, d, e)
81 # return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2)
84def parse_pings(id: int, events: list[Event]) -> list[Ping]:
85 """
86 Parses event data to extract relevant ping information.
88 Args:
89 id (int): Identifier for the event source.
90 events (list[Event]): List of event objects to be processed.
92 Returns:
93 list[Ping]: A list of parsed Ping objects.
94 """
95 processed = []
96 for event in events:
97 if f"GALILEO_MSG_EB,ID_{id},DISTANCE_" in event.event:
98 (d, x, y) = event.easy_parse()
99 s = Ping(
100 x=int(x / scaling_factor),
101 y=int(y / scaling_factor),
102 d=d / scaling_factor,
103 mind=int((d - f(d)) / scaling_factor),
104 maxd=int((d + f(d)) / scaling_factor),
105 )
106 processed.append(s)
107 return processed
110def find_matches(pings: list[Ping]) -> list[tuple[int, int]]:
111 """
112 Identifies matching points from the given pings based on distance constraints.
114 Args:
115 pings (list[Ping]): List of Ping objects to process.
117 Returns:
118 list[tuple[int, int]]: List of coordinate pairs that satisfy all constraints.
119 """
120 # Procssed first point
121 res = []
122 p1 = min(pings, key=lambda p: p.maxd)
123 for x in range(p1.x - p1.maxd, p1.x + p1.maxd):
124 for y in range(p1.y - p1.maxd, p1.y + p1.maxd):
125 if x > x_0 and x < x_max and y > y_0 and y < y_max:
126 dist = distance(p1.x, x, p1.y, y)
127 if dist > p1.mind and dist < p1.maxd:
128 res.append((x, y))
129 logger.info(f"Found {len(res)} possible points on first circle.")
131 # Only keep the ones that are in all circles
132 filtered_res = []
133 for x, y in res:
134 is_valid = True
135 for pn in pings:
136 dist = distance(pn.x, x, pn.y, y)
137 if dist < pn.mind or dist > pn.maxd:
138 is_valid = False
139 break
140 if is_valid:
141 filtered_res.append((x, y))
143 logger.info(f"Found {len(filtered_res)} points that match all pings.")
144 return filtered_res
147def draw_res(
148 id: int, res: list[tuple[int, int]], pings: list[Ping], show: bool = False
149) -> tuple[int, int]:
150 """
151 Draws and saves a visualization of the computed emergency beacon locations.
153 Args:
154 id (int): Identifier for the emergency beacon tracker.
155 res (list[tuple[int, int]]): List of matched coordinate points.
156 pings (list[Ping]): List of Ping objects representing detected signals.
157 show (bool, optional): Whether to display the plot. Defaults to False.
159 Returns:
160 tuple[int, int]: The estimated centroid of the matched points, or (-1, -1) if no matches were found.
161 """
163 def find_centroid(points: list[tuple[int, int]]) -> tuple[float, float]:
164 """
165 Computes the centroid of a set of points.
167 Args:
168 points (list[tuple[int, int]]): List of coordinate points.
170 Returns:
171 tuple[float, float]: The centroid coordinates.
172 """
173 xs, ys = zip(*points)
174 centroid_x = sum(xs) / len(xs)
175 centroid_y = sum(ys) / len(ys)
176 return (centroid_x, centroid_y)
178 x_list, y_list = [], []
179 for x, y in res:
180 x_list.append(x)
181 y_list.append(y)
183 if res:
184 centroid = find_centroid(res)
186 plt.style.use("bmh")
187 _, ax = plt.subplots()
188 plt.title(f"Emergency Beacon Tracker {id} - {len(pings)} pings")
189 plt.xlabel("Width")
190 plt.ylabel("Height")
191 ax.set_xlim(0, x_max)
192 ax.set_ylim(0, y_max)
194 # plot matched area
195 ax.plot(x_list, y_list, "ro", markersize=0.01, zorder=4)
196 legend_area = patches.Patch(
197 edgecolor="red", facecolor="red", linewidth=1, label="Matched area"
198 )
200 # plot pings
201 for p in pings:
202 ax.plot(p.x, p.y, "x", color="grey", markersize=5, zorder=3)
203 circle_inner = patches.Circle(
204 (p.x, p.y),
205 p.mind,
206 edgecolor="green",
207 facecolor="none",
208 linewidth=0.2,
209 zorder=2,
210 )
211 circle_outer = patches.Circle(
212 (p.x, p.y),
213 p.maxd,
214 edgecolor="blue",
215 facecolor="none",
216 linewidth=0.2,
217 zorder=2,
218 )
219 ax.add_patch(circle_inner)
220 ax.add_patch(circle_outer)
221 legend_point = Line2D(
222 [0],
223 [0],
224 linestyle="None",
225 marker="x",
226 markerfacecolor="grey",
227 markeredgecolor="grey",
228 markersize=6,
229 label="Ping Location",
230 )
231 legend_inner = patches.Patch(
232 edgecolor="green", facecolor="none", linewidth=1, label="Minimum Distance"
233 )
234 legend_outer = patches.Patch(
235 edgecolor="blue", facecolor="none", linewidth=1, label="Maximum Distance"
236 )
238 if res:
239 # plot centroid
240 circle_guess = patches.Circle(
241 (centroid[0], centroid[1]),
242 75,
243 edgecolor="violet",
244 facecolor="violet",
245 linewidth=1,
246 zorder=5,
247 )
248 ax.add_patch(circle_guess)
249 legend_guess = patches.Patch(
250 edgecolor="violet",
251 facecolor="violet",
252 linewidth=1,
253 label=f"Best guess\n({int(centroid[0])}, {int(centroid[1])})",
254 )
255 ax.legend(
256 handles=[
257 legend_point,
258 legend_inner,
259 legend_outer,
260 legend_guess,
261 legend_area,
262 ],
263 loc="best",
264 )
265 else:
266 ax.legend(
267 handles=[legend_point, legend_inner, legend_outer, legend_area],
268 loc="best",
269 )
271 if show:
272 if res:
273 logger.info(f"Centroid is: ({int(centroid[0])},{int(centroid[1])})")
274 else:
275 logger.warning("Could not match any points!")
276 plt.show()
277 else:
278 space = ""
279 count = 0
280 path = con.CONSOLE_EBT_PATH + f"EBT_{id}_{len(pings)}.png"
281 while os.path.isfile(path):
282 count += 1
283 space = "_" + str(count)
284 path = con.CONSOLE_EBT_PATH + f"EBT_{id}_{len(pings)}{space}.png"
285 plt.savefig(path, dpi=1000)
286 if res:
287 return (int(centroid[0]), int(centroid[1]))
288 else:
289 return (-1, -1)
291# for local testing purposes
292if __name__ == "__main__":
293 # Open idea: use midpoint circle algorithm? -> not used for now
294 logger.info("Running from cli.")
296 id = 201
297 path: str = con.CONSOLE_FROM_MELVONAUT_PATH + "MelvonautEvents.csv"
298 events = Event.load_events_from_csv(path=path)
300 # Example data
301 """
302 processed = []
303 # data = [(4097, 7652, 2041), (5758, 8357, 688), (6220, 8553, 1075), (7245, 8989, 1669)]
304 data = [
305 (19972.3165561, 113.5243816, 1454.48),
306 (20486.232864, 331.337984, 930.35),
307 (20998.9861724, 548.6578144, 787.93),
308 (21510.18207954, 766.74099024, 1093.99),
309 (18882.99334624, 2295.73420544, 1947.67),
310 (19394.53293776, 2512.96329856, 1450.01),
311 (19908.73421827, 2730.89789112, 1442.63),
312 (20421.30728271, 2948.14119576, 1828.68),
313 (20926.46189231, 3163.05597336, 1651.83),
314 ]
315 for d in data:
316 s = ping(
317 x=int(d[0] / scaling_factor),
318 y=int(d[1] / scaling_factor),
319 d=d[2] / scaling_factor,
320 mind=int((d[2] - max_offset) / scaling_factor),
321 maxd=int((d[2] + max_offset) / scaling_factor),
322 )
323 processed.append(s)
324 print(f"Added: {s}")
325 """
327 processed = parse_pings(id=id, events=events)
328 logger.info(f"Done parsing of {len(processed)} pings.")
330 res = find_matches(pings=processed)
332 if len(res) == 0:
333 logger.error("No Matches Found!")
334 exit()
336 draw_res(id=id, res=res, pings=processed, show=True)