Coverage for src/rift_console/image_processing.py: 0%
160 statements
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
« prev ^ index » next coverage.py v7.8.0, created at 2025-04-08 09:36 +0000
1import sys
2from collections import Counter
3from concurrent.futures import ProcessPoolExecutor
4from functools import partial
5from typing import Optional
6import requests
8from loguru import logger
9from PIL import Image
11from rift_console.image_helper import (
12 generate_spiral_walk,
13 parse_image_name,
14 find_image_names,
15)
16import shared.constants as con
18##### LOGGING #####
19logger.remove()
20logger.add(sink=sys.stderr, level=con.RIFT_LOG_LEVEL, backtrace=True, diagnose=True)
23def count_matching_pixels(
24 offset: tuple[int, int],
25 first_img: Image.Image,
26 second_img: Image.Image,
27 max_offset: int,
28) -> tuple[tuple[int, int], int]:
29 """Counts how many pixels are equal between the two images for a given offset
31 Args:
32 offset (tuple[int, int]): shift of one image
33 img (Image): first image
34 existing_img (Image): second (larger to allow shift) image
36 Returns:
37 tuple[int, int]: used offset and number of matching pixels
38 """
40 matches = 0
41 for x_local in range(first_img.size[0]):
42 for y_local in range(first_img.size[1]):
43 p1 = first_img.getpixel((x_local, y_local))
44 p2 = second_img.getpixel(
45 (x_local + offset[0] + max_offset, y_local + offset[1] + max_offset)
46 )
47 if p1 and p2 and isinstance(p1, tuple) and isinstance(p2, tuple):
48 # only compare R G B and not Alpha. Since there is random noise a slight difference is allowed
49 if (
50 abs(p1[0] - p2[0]) + abs(p1[1] - p2[1]) + abs(p1[2] - p2[2])
51 < con.IMAGE_NOISE_FORGIVENESS
52 ):
53 matches += 1
54 else:
55 logger.error(f"{p1} {p2} unkown types")
57 return offset, matches
60# Takes the folder location(including logs/melvonaut/images) and a list of image names
61def stitch_images(
62 image_path: str, image_name_list: list[str], panorama: Optional[Image.Image] = None
63) -> Image.Image:
64 """Main stitching algorithm
65 TODO add existing img
67 Args:
68 image_path (str): _description_
69 images (list[str]): _description_
70 panorama
72 Returns:
73 Image.Image: _description_
74 """
75 # create new panorama if it does not exist
76 if panorama is None:
77 # add 1000 pixels on each side be used by nudging
78 panorama = Image.new(
79 "RGBA",
80 (
81 con.WORLD_X + con.STITCHING_BORDER * 2,
82 con.WORLD_Y + con.STITCHING_BORDER * 2,
83 ),
84 )
86 processed_images_counter = 0
87 nudging_failed_counter = 0
88 to_few_pixel_counter = 0
89 max_offset_database = []
91 # iterate images
92 for image_name in image_name_list:
93 with Image.open(image_path + image_name) as img:
94 # extract image name
95 lens_size, x, y = parse_image_name(image_name)
96 try:
97 img = img.convert("RGBA")
98 except OSError as e:
99 logger.warning(
100 f"Could not parse file {image_name}, skipped. Error: {e}"
101 )
102 continue
104 # possible resize
105 if lens_size != 600:
106 img = img.resize((lens_size, lens_size), Image.Resampling.LANCZOS)
108 logger.info(f"Parsing {image_name}")
109 logger.debug(f"{img.size} {img.mode}")
111 # try position in a square arround the center
112 # values 7x7 Grid: d = 3, n = 28 9x9 Grid: d = 4 n = 80 11x11 Grid, d = 5 n = 120
113 spiral_coordinates = generate_spiral_walk(
114 con.SEARCH_GRID_SIDE_LENGTH * con.SEARCH_GRID_SIDE_LENGTH
115 )
117 max_offset = int((con.SEARCH_GRID_SIDE_LENGTH - 1) / 2)
118 existing_stitch = panorama.crop(
119 (
120 x - max_offset + con.STITCHING_BORDER,
121 y - max_offset + con.STITCHING_BORDER,
122 x + lens_size + max_offset + con.STITCHING_BORDER,
123 y + lens_size + max_offset + con.STITCHING_BORDER,
124 )
125 )
127 # check if existing_stich contains something
128 total_pixel = existing_stitch.size[0] * existing_stitch.size[1]
129 set_pixel = sum(
130 pixel != (0, 0, 0, 0) for pixel in list(existing_stitch.getdata())
131 )
132 empty_pixel = total_pixel - set_pixel
134 logger.debug(
135 f"Existing stich ({existing_stitch.size[0]},{existing_stitch.size[1]}) w. {total_pixel}p "
136 + f"set: {set_pixel} {set_pixel/total_pixel}% and transparent: {empty_pixel} {empty_pixel/total_pixel}%"
137 )
139 best_match_count = 0
140 best_offset = (0, 0)
141 skip = False
143 # TODO next goal would to move in only one direction in which the matches get better
144 if con.DO_IMAGE_NUDGING_SEARCH:
145 # probiere nur zu match_count falls mehr als 20% aller pixel gefüllt
146 if set_pixel / total_pixel == 0:
147 logger.warning("Emtpy panoarma, image still placed")
149 elif set_pixel / total_pixel > 0.2:
150 # make a func with static params for this image
151 count_part = partial(
152 count_matching_pixels,
153 first_img=img,
154 second_img=existing_stitch,
155 max_offset=max_offset,
156 )
158 with ProcessPoolExecutor(
159 max_workers=con.NUMBER_OF_WORKER_THREADS
160 ) as executor:
161 # each Thread gets automatically assign a different coordinate from the pool
162 results = executor.map(count_part, spiral_coordinates)
164 for offset, matches in results:
165 if matches > best_match_count:
166 logger.info(
167 f"New best: matches {matches}p ({matches/total_pixel}%), with offset {best_offset}\n"
168 )
169 best_offset = offset
170 best_match_count = matches
172 # check if it worked
173 if best_match_count / (set_pixel) < 0.5:
174 logger.warning(
175 f"Nudging failed, image skipped, since best_match_count: {best_match_count}p ({best_match_count/total_pixel}%)"
176 )
178 skip = True
179 nudging_failed_counter += 1
181 logger.warning(
182 f"{max(abs(best_offset[0]), abs(best_offset[1]))} {best_offset[0]} {best_offset[1]}"
183 )
184 max_offset_database.append(
185 max(abs(best_offset[0]), abs(best_offset[1]))
186 )
187 else:
188 logger.warning(
189 f"Too few pixel on panorama, image skipped, set_pixel%: {set_pixel/total_pixel}"
190 )
191 skip = True
192 to_few_pixel_counter += 1
194 # need to check math for % here!
195 logger.debug(
196 f"Placed Image best_match_count: {best_match_count}p ({best_match_count/total_pixel}%) with offset: {best_offset}\n"
197 )
199 if not skip:
200 panorama.paste(
201 img,
202 (
203 x + best_offset[0] + con.STITCHING_BORDER,
204 y + best_offset[1] + con.STITCHING_BORDER,
205 ),
206 )
208 processed_images_counter += 1
209 if processed_images_counter % con.SAVE_PANORAMA_STEP == 0:
210 panorama.save(
211 con.PANORAMA_PATH + "step_" + str(processed_images_counter) + ".png"
212 )
214 # if any(pixel < 255 for pixel in alpha.getdata()):
215 # return True
217 if processed_images_counter >= con.STITCHING_COUNT_LIMIT:
218 logger.warning(
219 f"STITCHING_COUNT_LIMIT of {con.STITCHING_COUNT_LIMIT} reached!"
220 )
221 break
223 logger.warning(
224 f"\n\nDone stitching of {processed_images_counter} from {len(image_name_list)} given images"
225 )
226 if con.DO_IMAGE_NUDGING_SEARCH:
227 logger.warning(
228 f"nudging_failed_counter: {nudging_failed_counter} , to_few_pixel_counter: {to_few_pixel_counter}\n"
229 )
231 for element, count in Counter(max_offset_database).items():
232 logger.warning(f"Max_offset up to {element} occured {count} times")
234 return panorama
237def upload(id: int, path: str, folder: bool = False) -> None:
238 """Uploads one objective image" """
240 if folder:
241 images = find_image_names(path)
242 for img in images:
243 logger.warning(img)
244 upload(id, path + "/" + img, False)
246 logger.warning(f"Uploaded folder of {len(images)}.")
247 return
249 logger.info(f"Uploading {id} with path {path}")
250 params = {"objective_id": id}
252 files = {"image": (path, open(path, "rb"), "image/png")}
254 with requests.Session() as s:
255 r = s.post(con.IMAGE_ENDPOINT, params=params, files=files)
257 if r.status_code == 200:
258 logger.warning(f"Uploaded: {r}")
259 logger.warning(f"{r.text}{r.json()}")
260 else:
261 logger.error(f"Upload failed with code: {r.status_code}")
262 logger.error(f"{r.text} {r.json()}")
263 logger.info("Done with Uplaod!")
266def cut(panorama_path: str, X1: int, Y1: int, X2: int, Y2: int) -> None:
267 """Cut a small portion from a bigger Panorama.
269 Args:
270 panorama_path (str): Name of the file (should include con.PANORAMA_PATH)
271 coordinates: Section that should be cut and saved
272 """
273 coordinates = (int(X1), int(Y1), int(X2), int(Y2))
274 with Image.open(panorama_path) as panorama:
275 cut_img = panorama.crop(coordinates)
277 # cut_img.show()
278 cut_img.save(panorama_path.replace(".png", "") + "_cut.png")
280 logger.warning("Saved cut to media/*_cut.png")
283def create_thumbnail(panorama_path: str) -> None:
284 """Creates a scaled down panorama and a greyscale one from a given panorama and saves it to `src/rift_console/static/images/thumb.png` from where it can be used by html.
286 Args:
287 panorama_path (str): Name of the file (should include con.PANORAMA_PATH)
288 """
289 with Image.open(panorama_path) as panorama:
290 thumb = panorama.resize(
291 (con.THUMBNAIL_X, con.THUMBNAIL_Y), Image.Resampling.LANCZOS
292 )
293 thumb.save(panorama_path.replace(".png", "") + "_thumb.png")
294 # thumb = thumb.convert("L")
295 # thumb.save("src/rift_console/static/images/" + "thumb_grey.png")
296 logger.warning(f"Saved Thumbnail to {panorama_path}_thumb.png")
299# TODO add parameter to add new stiches onto an exisiting map
300def automated_stitching(local_path: str) -> None:
301 """Stitches images from the given path into one big image, which is stored under the same name in con.PANORAMA_PATH.
303 Args:
304 local_path (str): Path of a folder with images that should be stitched.
305 """
307 image_path = local_path + "/"
308 output_path = con.PANORAMA_PATH + "stitched"
310 image_name_list = find_image_names(image_path)
312 logger.warning(
313 f"Starting stitching of {len(image_name_list)} image with path: {image_path}"
314 )
316 panorama = stitch_images(image_path=image_path, image_name_list=image_name_list)
318 remove_offset = (
319 con.STITCHING_BORDER,
320 con.STITCHING_BORDER,
321 con.WORLD_X + con.STITCHING_BORDER,
322 con.WORLD_Y + con.STITCHING_BORDER,
323 )
324 panorama = panorama.crop(remove_offset)
325 panorama.save(output_path + ".png")
327 logger.warning(f"Saved panorama in {output_path}.png")
330# For CLI testing
331if __name__ == "__main__":
332 print("GO")
334 if len(sys.argv) < 2:
335 print("Usage: python3 src/rift_console/image_processing.py stitch PATH")
336 print("Usage: python3 src/rift_console/image_processing.py thumb PATH")
337 print(
338 "Usage: python3 src/rift_console/image_processing.py cut PATH X1 Y1 X2 Y2"
339 )
340 print("Exiting, wrong number of params!")
341 sys.exit(1)
343 # Stiching
344 if sys.argv[1] == "stitch":
345 if len(sys.argv) == 3:
346 automated_stitching(local_path=sys.argv[2])
347 sys.exit(0)
348 print("Usage: python3 src/rift_console/image_processing.py stitch PATH")
349 sys.exit(1)
350 # Create Thumbnail
351 elif sys.argv[1] == "thumb":
352 if len(sys.argv) == 3:
353 create_thumbnail(panorama_path=sys.argv[2])
354 sys.exit(0)
355 print("Usage: python3 src/rift_console/image_processing.py thumb PATH")
356 sys.exit(1)
357 # Cut
358 elif sys.argv[1] == "cut":
359 if len(sys.argv) == 7:
360 cut(
361 sys.argv[2],
362 int(sys.argv[3]),
363 int(sys.argv[4]),
364 int(sys.argv[5]),
365 int(sys.argv[6]),
366 )
367 sys.exit(0)
368 print(
369 "Usage: python3 src/rift_console/image_processing.py cut PATH X1 Y1 X2 Y2)"
370 )
371 sys.exit(1)
372 # Upload objective
373 elif sys.argv[1] == "upload":
374 if len(sys.argv) == 5:
375 upload(id=int(sys.argv[2]), path=sys.argv[3], folder=eval(sys.argv[4]))
376 sys.exit(0)
377 print(
378 "Usage: python3 src/rift_console/image_processing.py upload ID PATH IS_FOLDER"
379 )
380 sys.exit(1)