Coverage for src/rift_console/image_processing.py: 0%

160 statements  

« prev     ^ index     » next       coverage.py v7.8.0, created at 2025-04-08 09:36 +0000

1import sys 

2from collections import Counter 

3from concurrent.futures import ProcessPoolExecutor 

4from functools import partial 

5from typing import Optional 

6import requests 

7 

8from loguru import logger 

9from PIL import Image 

10 

11from rift_console.image_helper import ( 

12 generate_spiral_walk, 

13 parse_image_name, 

14 find_image_names, 

15) 

16import shared.constants as con 

17 

18##### LOGGING ##### 

19logger.remove() 

20logger.add(sink=sys.stderr, level=con.RIFT_LOG_LEVEL, backtrace=True, diagnose=True) 

21 

22 

23def count_matching_pixels( 

24 offset: tuple[int, int], 

25 first_img: Image.Image, 

26 second_img: Image.Image, 

27 max_offset: int, 

28) -> tuple[tuple[int, int], int]: 

29 """Counts how many pixels are equal between the two images for a given offset 

30 

31 Args: 

32 offset (tuple[int, int]): shift of one image 

33 img (Image): first image 

34 existing_img (Image): second (larger to allow shift) image 

35 

36 Returns: 

37 tuple[int, int]: used offset and number of matching pixels 

38 """ 

39 

40 matches = 0 

41 for x_local in range(first_img.size[0]): 

42 for y_local in range(first_img.size[1]): 

43 p1 = first_img.getpixel((x_local, y_local)) 

44 p2 = second_img.getpixel( 

45 (x_local + offset[0] + max_offset, y_local + offset[1] + max_offset) 

46 ) 

47 if p1 and p2 and isinstance(p1, tuple) and isinstance(p2, tuple): 

48 # only compare R G B and not Alpha. Since there is random noise a slight difference is allowed 

49 if ( 

50 abs(p1[0] - p2[0]) + abs(p1[1] - p2[1]) + abs(p1[2] - p2[2]) 

51 < con.IMAGE_NOISE_FORGIVENESS 

52 ): 

53 matches += 1 

54 else: 

55 logger.error(f"{p1} {p2} unkown types") 

56 

57 return offset, matches 

58 

59 

60# Takes the folder location(including logs/melvonaut/images) and a list of image names 

61def stitch_images( 

62 image_path: str, image_name_list: list[str], panorama: Optional[Image.Image] = None 

63) -> Image.Image: 

64 """Main stitching algorithm 

65 TODO add existing img 

66 

67 Args: 

68 image_path (str): _description_ 

69 images (list[str]): _description_ 

70 panorama 

71 

72 Returns: 

73 Image.Image: _description_ 

74 """ 

75 # create new panorama if it does not exist 

76 if panorama is None: 

77 # add 1000 pixels on each side be used by nudging 

78 panorama = Image.new( 

79 "RGBA", 

80 ( 

81 con.WORLD_X + con.STITCHING_BORDER * 2, 

82 con.WORLD_Y + con.STITCHING_BORDER * 2, 

83 ), 

84 ) 

85 

86 processed_images_counter = 0 

87 nudging_failed_counter = 0 

88 to_few_pixel_counter = 0 

89 max_offset_database = [] 

90 

91 # iterate images 

92 for image_name in image_name_list: 

93 with Image.open(image_path + image_name) as img: 

94 # extract image name 

95 lens_size, x, y = parse_image_name(image_name) 

96 try: 

97 img = img.convert("RGBA") 

98 except OSError as e: 

99 logger.warning( 

100 f"Could not parse file {image_name}, skipped. Error: {e}" 

101 ) 

102 continue 

103 

104 # possible resize 

105 if lens_size != 600: 

106 img = img.resize((lens_size, lens_size), Image.Resampling.LANCZOS) 

107 

108 logger.info(f"Parsing {image_name}") 

109 logger.debug(f"{img.size} {img.mode}") 

110 

111 # try position in a square arround the center 

112 # values 7x7 Grid: d = 3, n = 28 9x9 Grid: d = 4 n = 80 11x11 Grid, d = 5 n = 120 

113 spiral_coordinates = generate_spiral_walk( 

114 con.SEARCH_GRID_SIDE_LENGTH * con.SEARCH_GRID_SIDE_LENGTH 

115 ) 

116 

117 max_offset = int((con.SEARCH_GRID_SIDE_LENGTH - 1) / 2) 

118 existing_stitch = panorama.crop( 

119 ( 

120 x - max_offset + con.STITCHING_BORDER, 

121 y - max_offset + con.STITCHING_BORDER, 

122 x + lens_size + max_offset + con.STITCHING_BORDER, 

123 y + lens_size + max_offset + con.STITCHING_BORDER, 

124 ) 

125 ) 

126 

127 # check if existing_stich contains something 

128 total_pixel = existing_stitch.size[0] * existing_stitch.size[1] 

129 set_pixel = sum( 

130 pixel != (0, 0, 0, 0) for pixel in list(existing_stitch.getdata()) 

131 ) 

132 empty_pixel = total_pixel - set_pixel 

133 

134 logger.debug( 

135 f"Existing stich ({existing_stitch.size[0]},{existing_stitch.size[1]}) w. {total_pixel}p " 

136 + f"set: {set_pixel} {set_pixel/total_pixel}% and transparent: {empty_pixel} {empty_pixel/total_pixel}%" 

137 ) 

138 

139 best_match_count = 0 

140 best_offset = (0, 0) 

141 skip = False 

142 

143 # TODO next goal would to move in only one direction in which the matches get better 

144 if con.DO_IMAGE_NUDGING_SEARCH: 

145 # probiere nur zu match_count falls mehr als 20% aller pixel gefüllt 

146 if set_pixel / total_pixel == 0: 

147 logger.warning("Emtpy panoarma, image still placed") 

148 

149 elif set_pixel / total_pixel > 0.2: 

150 # make a func with static params for this image 

151 count_part = partial( 

152 count_matching_pixels, 

153 first_img=img, 

154 second_img=existing_stitch, 

155 max_offset=max_offset, 

156 ) 

157 

158 with ProcessPoolExecutor( 

159 max_workers=con.NUMBER_OF_WORKER_THREADS 

160 ) as executor: 

161 # each Thread gets automatically assign a different coordinate from the pool 

162 results = executor.map(count_part, spiral_coordinates) 

163 

164 for offset, matches in results: 

165 if matches > best_match_count: 

166 logger.info( 

167 f"New best: matches {matches}p ({matches/total_pixel}%), with offset {best_offset}\n" 

168 ) 

169 best_offset = offset 

170 best_match_count = matches 

171 

172 # check if it worked 

173 if best_match_count / (set_pixel) < 0.5: 

174 logger.warning( 

175 f"Nudging failed, image skipped, since best_match_count: {best_match_count}p ({best_match_count/total_pixel}%)" 

176 ) 

177 

178 skip = True 

179 nudging_failed_counter += 1 

180 

181 logger.warning( 

182 f"{max(abs(best_offset[0]), abs(best_offset[1]))} {best_offset[0]} {best_offset[1]}" 

183 ) 

184 max_offset_database.append( 

185 max(abs(best_offset[0]), abs(best_offset[1])) 

186 ) 

187 else: 

188 logger.warning( 

189 f"Too few pixel on panorama, image skipped, set_pixel%: {set_pixel/total_pixel}" 

190 ) 

191 skip = True 

192 to_few_pixel_counter += 1 

193 

194 # need to check math for % here! 

195 logger.debug( 

196 f"Placed Image best_match_count: {best_match_count}p ({best_match_count/total_pixel}%) with offset: {best_offset}\n" 

197 ) 

198 

199 if not skip: 

200 panorama.paste( 

201 img, 

202 ( 

203 x + best_offset[0] + con.STITCHING_BORDER, 

204 y + best_offset[1] + con.STITCHING_BORDER, 

205 ), 

206 ) 

207 

208 processed_images_counter += 1 

209 if processed_images_counter % con.SAVE_PANORAMA_STEP == 0: 

210 panorama.save( 

211 con.PANORAMA_PATH + "step_" + str(processed_images_counter) + ".png" 

212 ) 

213 

214 # if any(pixel < 255 for pixel in alpha.getdata()): 

215 # return True 

216 

217 if processed_images_counter >= con.STITCHING_COUNT_LIMIT: 

218 logger.warning( 

219 f"STITCHING_COUNT_LIMIT of {con.STITCHING_COUNT_LIMIT} reached!" 

220 ) 

221 break 

222 

223 logger.warning( 

224 f"\n\nDone stitching of {processed_images_counter} from {len(image_name_list)} given images" 

225 ) 

226 if con.DO_IMAGE_NUDGING_SEARCH: 

227 logger.warning( 

228 f"nudging_failed_counter: {nudging_failed_counter} , to_few_pixel_counter: {to_few_pixel_counter}\n" 

229 ) 

230 

231 for element, count in Counter(max_offset_database).items(): 

232 logger.warning(f"Max_offset up to {element} occured {count} times") 

233 

234 return panorama 

235 

236 

237def upload(id: int, path: str, folder: bool = False) -> None: 

238 """Uploads one objective image" """ 

239 

240 if folder: 

241 images = find_image_names(path) 

242 for img in images: 

243 logger.warning(img) 

244 upload(id, path + "/" + img, False) 

245 

246 logger.warning(f"Uploaded folder of {len(images)}.") 

247 return 

248 

249 logger.info(f"Uploading {id} with path {path}") 

250 params = {"objective_id": id} 

251 

252 files = {"image": (path, open(path, "rb"), "image/png")} 

253 

254 with requests.Session() as s: 

255 r = s.post(con.IMAGE_ENDPOINT, params=params, files=files) 

256 

257 if r.status_code == 200: 

258 logger.warning(f"Uploaded: {r}") 

259 logger.warning(f"{r.text}{r.json()}") 

260 else: 

261 logger.error(f"Upload failed with code: {r.status_code}") 

262 logger.error(f"{r.text} {r.json()}") 

263 logger.info("Done with Uplaod!") 

264 

265 

266def cut(panorama_path: str, X1: int, Y1: int, X2: int, Y2: int) -> None: 

267 """Cut a small portion from a bigger Panorama. 

268 

269 Args: 

270 panorama_path (str): Name of the file (should include con.PANORAMA_PATH) 

271 coordinates: Section that should be cut and saved 

272 """ 

273 coordinates = (int(X1), int(Y1), int(X2), int(Y2)) 

274 with Image.open(panorama_path) as panorama: 

275 cut_img = panorama.crop(coordinates) 

276 

277 # cut_img.show() 

278 cut_img.save(panorama_path.replace(".png", "") + "_cut.png") 

279 

280 logger.warning("Saved cut to media/*_cut.png") 

281 

282 

283def create_thumbnail(panorama_path: str) -> None: 

284 """Creates a scaled down panorama and a greyscale one from a given panorama and saves it to `src/rift_console/static/images/thumb.png` from where it can be used by html. 

285 

286 Args: 

287 panorama_path (str): Name of the file (should include con.PANORAMA_PATH) 

288 """ 

289 with Image.open(panorama_path) as panorama: 

290 thumb = panorama.resize( 

291 (con.THUMBNAIL_X, con.THUMBNAIL_Y), Image.Resampling.LANCZOS 

292 ) 

293 thumb.save(panorama_path.replace(".png", "") + "_thumb.png") 

294 # thumb = thumb.convert("L") 

295 # thumb.save("src/rift_console/static/images/" + "thumb_grey.png") 

296 logger.warning(f"Saved Thumbnail to {panorama_path}_thumb.png") 

297 

298 

299# TODO add parameter to add new stiches onto an exisiting map 

300def automated_stitching(local_path: str) -> None: 

301 """Stitches images from the given path into one big image, which is stored under the same name in con.PANORAMA_PATH. 

302 

303 Args: 

304 local_path (str): Path of a folder with images that should be stitched. 

305 """ 

306 

307 image_path = local_path + "/" 

308 output_path = con.PANORAMA_PATH + "stitched" 

309 

310 image_name_list = find_image_names(image_path) 

311 

312 logger.warning( 

313 f"Starting stitching of {len(image_name_list)} image with path: {image_path}" 

314 ) 

315 

316 panorama = stitch_images(image_path=image_path, image_name_list=image_name_list) 

317 

318 remove_offset = ( 

319 con.STITCHING_BORDER, 

320 con.STITCHING_BORDER, 

321 con.WORLD_X + con.STITCHING_BORDER, 

322 con.WORLD_Y + con.STITCHING_BORDER, 

323 ) 

324 panorama = panorama.crop(remove_offset) 

325 panorama.save(output_path + ".png") 

326 

327 logger.warning(f"Saved panorama in {output_path}.png") 

328 

329 

330# For CLI testing 

331if __name__ == "__main__": 

332 print("GO") 

333 

334 if len(sys.argv) < 2: 

335 print("Usage: python3 src/rift_console/image_processing.py stitch PATH") 

336 print("Usage: python3 src/rift_console/image_processing.py thumb PATH") 

337 print( 

338 "Usage: python3 src/rift_console/image_processing.py cut PATH X1 Y1 X2 Y2" 

339 ) 

340 print("Exiting, wrong number of params!") 

341 sys.exit(1) 

342 

343 # Stiching 

344 if sys.argv[1] == "stitch": 

345 if len(sys.argv) == 3: 

346 automated_stitching(local_path=sys.argv[2]) 

347 sys.exit(0) 

348 print("Usage: python3 src/rift_console/image_processing.py stitch PATH") 

349 sys.exit(1) 

350 # Create Thumbnail 

351 elif sys.argv[1] == "thumb": 

352 if len(sys.argv) == 3: 

353 create_thumbnail(panorama_path=sys.argv[2]) 

354 sys.exit(0) 

355 print("Usage: python3 src/rift_console/image_processing.py thumb PATH") 

356 sys.exit(1) 

357 # Cut 

358 elif sys.argv[1] == "cut": 

359 if len(sys.argv) == 7: 

360 cut( 

361 sys.argv[2], 

362 int(sys.argv[3]), 

363 int(sys.argv[4]), 

364 int(sys.argv[5]), 

365 int(sys.argv[6]), 

366 ) 

367 sys.exit(0) 

368 print( 

369 "Usage: python3 src/rift_console/image_processing.py cut PATH X1 Y1 X2 Y2)" 

370 ) 

371 sys.exit(1) 

372 # Upload objective 

373 elif sys.argv[1] == "upload": 

374 if len(sys.argv) == 5: 

375 upload(id=int(sys.argv[2]), path=sys.argv[3], folder=eval(sys.argv[4])) 

376 sys.exit(0) 

377 print( 

378 "Usage: python3 src/rift_console/image_processing.py upload ID PATH IS_FOLDER" 

379 ) 

380 sys.exit(1)