Coverage for src/melvonaut/ebt_calc.py: 0%

139 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 09:36 +0000

1import os 

2import sys 

3import math 

4from loguru import logger 

5 

6import shared.constants as con 

7from melvonaut.settings import settings 

8from shared.models import Event, Ping 

9 

10import matplotlib.pyplot as plt 

11from matplotlib.lines import Line2D 

12import matplotlib.patches as patches 

13 

14##### LOGGING ##### 

15logger.remove() 

16logger.add( 

17 sink=sys.stderr, level=settings.FILE_LOGGING_LEVEL, backtrace=True, diagnose=True 

18) 

19 

20# [CONSTANTS] 

21scaling_factor = 1 

22x_0 = 0 

23y_0 = 0 

24x_max = int(con.WORLD_X / scaling_factor) 

25y_max = int(con.WORLD_Y / scaling_factor) 

26max_offset = 325 

27 

28 

29# [HELPER] 

30def f(d: float) -> float: 

31 """ 

32 Computes a transformed distance value. 

33 

34 Args: 

35 d (float): Input distance value. 

36 

37 Returns: 

38 float: Transformed distance value. 

39 """ 

40 res = 225 + ((0.4 * (d + 1)) / 4) 

41 return float(res) 

42 

43 

44def distance(x1: int, x2: int, y1: int, y2: int) -> float: 

45 """ 

46 Computes the Euclidean distance between two points, adjusting for wraparound conditions. 

47 

48 Args: 

49 x1 (int): X-coordinate of the first point. 

50 x2 (int): X-coordinate of the second point. 

51 y1 (int): Y-coordinate of the first point. 

52 y2 (int): Y-coordinate of the second point. 

53 

54 Returns: 

55 float: The Euclidean distance between the two points. 

56 """ 

57 if x1 > con.WORLD_X: 

58 x1 = x1 % con.WORLD_X 

59 while x1 < 0: 

60 x1 += con.WORLD_X 

61 if y1 > con.WORLD_Y: 

62 y1 = y1 % con.WORLD_Y 

63 while y1 < 0: 

64 y1 += con.WORLD_Y 

65 if x2 > con.WORLD_X: 

66 x2 = x2 % con.WORLD_X 

67 while x2 < 0: 

68 x2 += con.WORLD_X 

69 if y2 > con.WORLD_Y: 

70 y2 = y2 % con.WORLD_Y 

71 while y2 < 0: 

72 y2 += con.WORLD_Y 

73 

74 # check for case at edge 

75 a = math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) 

76 b = math.sqrt((con.WORLD_X + x2 - x1) ** 2 + (y2 - y1) ** 2) 

77 c = math.sqrt((x2 - x1 - con.WORLD_X) ** 2 + (y2 - y1) ** 2) 

78 d = math.sqrt((x2 - x1) ** 2 + (con.WORLD_Y + y2 - y1) ** 2) 

79 e = math.sqrt((x2 - x1) ** 2 + (y2 - y1 - con.WORLD_Y) ** 2) 

80 return min(a, b, c, d, e) 

81 # return math.sqrt((x2 - x1) ** 2 + (y2 - y1) ** 2) 

82 

83 

84def parse_pings(id: int, events: list[Event]) -> list[Ping]: 

85 """ 

86 Parses event data to extract relevant ping information. 

87 

88 Args: 

89 id (int): Identifier for the event source. 

90 events (list[Event]): List of event objects to be processed. 

91 

92 Returns: 

93 list[Ping]: A list of parsed Ping objects. 

94 """ 

95 processed = [] 

96 for event in events: 

97 if f"GALILEO_MSG_EB,ID_{id},DISTANCE_" in event.event: 

98 (d, x, y) = event.easy_parse() 

99 s = Ping( 

100 x=int(x / scaling_factor), 

101 y=int(y / scaling_factor), 

102 d=d / scaling_factor, 

103 mind=int((d - f(d)) / scaling_factor), 

104 maxd=int((d + f(d)) / scaling_factor), 

105 ) 

106 processed.append(s) 

107 return processed 

108 

109 

110def find_matches(pings: list[Ping]) -> list[tuple[int, int]]: 

111 """ 

112 Identifies matching points from the given pings based on distance constraints. 

113 

114 Args: 

115 pings (list[Ping]): List of Ping objects to process. 

116 

117 Returns: 

118 list[tuple[int, int]]: List of coordinate pairs that satisfy all constraints. 

119 """ 

120 # Procssed first point 

121 res = [] 

122 p1 = min(pings, key=lambda p: p.maxd) 

123 for x in range(p1.x - p1.maxd, p1.x + p1.maxd): 

124 for y in range(p1.y - p1.maxd, p1.y + p1.maxd): 

125 if x > x_0 and x < x_max and y > y_0 and y < y_max: 

126 dist = distance(p1.x, x, p1.y, y) 

127 if dist > p1.mind and dist < p1.maxd: 

128 res.append((x, y)) 

129 logger.info(f"Found {len(res)} possible points on first circle.") 

130 

131 # Only keep the ones that are in all circles 

132 filtered_res = [] 

133 for x, y in res: 

134 is_valid = True 

135 for pn in pings: 

136 dist = distance(pn.x, x, pn.y, y) 

137 if dist < pn.mind or dist > pn.maxd: 

138 is_valid = False 

139 break 

140 if is_valid: 

141 filtered_res.append((x, y)) 

142 

143 logger.info(f"Found {len(filtered_res)} points that match all pings.") 

144 return filtered_res 

145 

146 

147def draw_res( 

148 id: int, res: list[tuple[int, int]], pings: list[Ping], show: bool = False 

149) -> tuple[int, int]: 

150 """ 

151 Draws and saves a visualization of the computed emergency beacon locations. 

152 

153 Args: 

154 id (int): Identifier for the emergency beacon tracker. 

155 res (list[tuple[int, int]]): List of matched coordinate points. 

156 pings (list[Ping]): List of Ping objects representing detected signals. 

157 show (bool, optional): Whether to display the plot. Defaults to False. 

158 

159 Returns: 

160 tuple[int, int]: The estimated centroid of the matched points, or (-1, -1) if no matches were found. 

161 """ 

162 

163 def find_centroid(points: list[tuple[int, int]]) -> tuple[float, float]: 

164 """ 

165 Computes the centroid of a set of points. 

166 

167 Args: 

168 points (list[tuple[int, int]]): List of coordinate points. 

169 

170 Returns: 

171 tuple[float, float]: The centroid coordinates. 

172 """ 

173 xs, ys = zip(*points) 

174 centroid_x = sum(xs) / len(xs) 

175 centroid_y = sum(ys) / len(ys) 

176 return (centroid_x, centroid_y) 

177 

178 x_list, y_list = [], [] 

179 for x, y in res: 

180 x_list.append(x) 

181 y_list.append(y) 

182 

183 if res: 

184 centroid = find_centroid(res) 

185 

186 plt.style.use("bmh") 

187 _, ax = plt.subplots() 

188 plt.title(f"Emergency Beacon Tracker {id} - {len(pings)} pings") 

189 plt.xlabel("Width") 

190 plt.ylabel("Height") 

191 ax.set_xlim(0, x_max) 

192 ax.set_ylim(0, y_max) 

193 

194 # plot matched area 

195 ax.plot(x_list, y_list, "ro", markersize=0.01, zorder=4) 

196 legend_area = patches.Patch( 

197 edgecolor="red", facecolor="red", linewidth=1, label="Matched area" 

198 ) 

199 

200 # plot pings 

201 for p in pings: 

202 ax.plot(p.x, p.y, "x", color="grey", markersize=5, zorder=3) 

203 circle_inner = patches.Circle( 

204 (p.x, p.y), 

205 p.mind, 

206 edgecolor="green", 

207 facecolor="none", 

208 linewidth=0.2, 

209 zorder=2, 

210 ) 

211 circle_outer = patches.Circle( 

212 (p.x, p.y), 

213 p.maxd, 

214 edgecolor="blue", 

215 facecolor="none", 

216 linewidth=0.2, 

217 zorder=2, 

218 ) 

219 ax.add_patch(circle_inner) 

220 ax.add_patch(circle_outer) 

221 legend_point = Line2D( 

222 [0], 

223 [0], 

224 linestyle="None", 

225 marker="x", 

226 markerfacecolor="grey", 

227 markeredgecolor="grey", 

228 markersize=6, 

229 label="Ping Location", 

230 ) 

231 legend_inner = patches.Patch( 

232 edgecolor="green", facecolor="none", linewidth=1, label="Minimum Distance" 

233 ) 

234 legend_outer = patches.Patch( 

235 edgecolor="blue", facecolor="none", linewidth=1, label="Maximum Distance" 

236 ) 

237 

238 if res: 

239 # plot centroid 

240 circle_guess = patches.Circle( 

241 (centroid[0], centroid[1]), 

242 75, 

243 edgecolor="violet", 

244 facecolor="violet", 

245 linewidth=1, 

246 zorder=5, 

247 ) 

248 ax.add_patch(circle_guess) 

249 legend_guess = patches.Patch( 

250 edgecolor="violet", 

251 facecolor="violet", 

252 linewidth=1, 

253 label=f"Best guess\n({int(centroid[0])}, {int(centroid[1])})", 

254 ) 

255 ax.legend( 

256 handles=[ 

257 legend_point, 

258 legend_inner, 

259 legend_outer, 

260 legend_guess, 

261 legend_area, 

262 ], 

263 loc="best", 

264 ) 

265 else: 

266 ax.legend( 

267 handles=[legend_point, legend_inner, legend_outer, legend_area], 

268 loc="best", 

269 ) 

270 

271 if show: 

272 if res: 

273 logger.info(f"Centroid is: ({int(centroid[0])},{int(centroid[1])})") 

274 else: 

275 logger.warning("Could not match any points!") 

276 plt.show() 

277 else: 

278 space = "" 

279 count = 0 

280 path = con.CONSOLE_EBT_PATH + f"EBT_{id}_{len(pings)}.png" 

281 while os.path.isfile(path): 

282 count += 1 

283 space = "_" + str(count) 

284 path = con.CONSOLE_EBT_PATH + f"EBT_{id}_{len(pings)}{space}.png" 

285 plt.savefig(path, dpi=1000) 

286 if res: 

287 return (int(centroid[0]), int(centroid[1])) 

288 else: 

289 return (-1, -1) 

290 

291# for local testing purposes 

292if __name__ == "__main__": 

293 # Open idea: use midpoint circle algorithm? -> not used for now 

294 logger.info("Running from cli.") 

295 

296 id = 201 

297 path: str = con.CONSOLE_FROM_MELVONAUT_PATH + "MelvonautEvents.csv" 

298 events = Event.load_events_from_csv(path=path) 

299 

300 # Example data 

301 """ 

302 processed = [] 

303 # data = [(4097, 7652, 2041), (5758, 8357, 688), (6220, 8553, 1075), (7245, 8989, 1669)] 

304 data = [ 

305 (19972.3165561, 113.5243816, 1454.48), 

306 (20486.232864, 331.337984, 930.35), 

307 (20998.9861724, 548.6578144, 787.93), 

308 (21510.18207954, 766.74099024, 1093.99), 

309 (18882.99334624, 2295.73420544, 1947.67), 

310 (19394.53293776, 2512.96329856, 1450.01), 

311 (19908.73421827, 2730.89789112, 1442.63), 

312 (20421.30728271, 2948.14119576, 1828.68), 

313 (20926.46189231, 3163.05597336, 1651.83), 

314 ] 

315 for d in data: 

316 s = ping( 

317 x=int(d[0] / scaling_factor), 

318 y=int(d[1] / scaling_factor), 

319 d=d[2] / scaling_factor, 

320 mind=int((d[2] - max_offset) / scaling_factor), 

321 maxd=int((d[2] + max_offset) / scaling_factor), 

322 ) 

323 processed.append(s) 

324 print(f"Added: {s}") 

325 """ 

326 

327 processed = parse_pings(id=id, events=events) 

328 logger.info(f"Done parsing of {len(processed)} pings.") 

329 

330 res = find_matches(pings=processed) 

331 

332 if len(res) == 0: 

333 logger.error("No Matches Found!") 

334 exit() 

335 

336 draw_res(id=id, res=res, pings=processed, show=True)