Skip to content

Rift-Console Reference

Ciarc.

image_helper

Provides helping functions that are used in image_processing.

filter_by_date(images, start, end)

Source code in src/rift_console/image_helper.py
42
43
44
45
46
47
48
49
50
51
52
53
54
def filter_by_date(
    images: list[str], start: datetime.datetime, end: datetime.datetime
) -> list[str]:
    res = []
    date_format = "%Y-%m-%dT%H:%M:%S"
    for image in images:
        date = datetime.datetime.strptime(get_date(image), date_format).replace(
            tzinfo=datetime.timezone.utc
        )
        # logger.warning(f"{date} {start} {end}")
        if date >= start and date <= end:
            res.append(image)
    return res

find_image_names(directory)

Traverses the given directory and find + sorts all images in our filename format

Parameters:

Name Type Description Default
directory str

path to the folder, needs to include con.IMAGE_PATH

required

Returns:

Type Description
list[str]

list[str]: the name of all images in that folder, sorted by its timestamp from old to now

Source code in src/rift_console/image_helper.py
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
def find_image_names(directory: str) -> list[str]:
    """Traverses the given directory and find + sorts all images in our filename format

    Args:
        directory (str): path to the folder, needs to include con.IMAGE_PATH

    Returns:
        list[str]: the name of all images in that folder, sorted by its timestamp from old to now
    """

    # find all names
    image_names = []
    for filename in os.listdir(directory):
        if filename.startswith("image"):
            image_names.append(filename)

    # helper function used in sorting
    def extract_timestamp(s: str) -> datetime.datetime:
        timestamp_pattern = r"_(\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}\.\d{6})"

        match = re.search(timestamp_pattern, s)
        if match:
            return datetime.datetime.fromisoformat(match.group(1))
        else:
            raise Exception("find_image_names: did not found timestamp in image names")

    def extract_pos(s: str) -> int:
        pos_pattern = r"_x_(-?\d+)_y_(-?\d+)"

        match = re.search(pos_pattern, s)
        if match:
            x = int(match.group(1))
            y = int(match.group(2))
            return x + y
        else:
            raise Exception("find_image_names: did not found position in image names")

    # sort
    if con.SORT_IMAGE_BY_POSITION:
        image_names = sorted(image_names, key=extract_pos)
    else:
        image_names = sorted(image_names, key=extract_timestamp)
    return image_names

generate_spiral_walk(n)

Create an spiraling offset pattern arround a central point, e.g. (0,0), (0,1), (1,0), (1,1), ... sorted by Manhattan geometry

Parameters:

Name Type Description Default
n int

number of offsets to be generated

required

Returns:

Type Description
list[tuple[int, int]]

list[tuple[int, int]]: list of offsets

Source code in src/rift_console/image_helper.py
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
def generate_spiral_walk(n: int) -> list[tuple[int, int]]:
    """Create an spiraling offset pattern arround a central point, e.g. (0,0), (0,1), (1,0), (1,1), ...
        sorted by Manhattan geometry

    Args:
        n (int): number of offsets to be generated

    Returns:
        list[tuple[int, int]]: list of offsets
    """

    # move right, up, left, down
    directions = [(1, 0), (0, 1), (-1, 0), (0, -1)]
    # start with going right
    direction_index = 0

    x, y = 0, 0
    offsets = [(x, y)]

    # Number of steps we take before changing direction
    steps = 1

    while len(offsets) < n:
        for _ in range(2):
            for _ in range(steps):
                if len(offsets) < n:
                    # Move in the current direction
                    dx, dy = directions[direction_index]
                    x += dx
                    y += dy
                    # Add the new position to the spiral
                    offsets.append((x, y))
                else:
                    break
            # Change direction clockwise
            direction_index = (direction_index + 1) % 4
        # After moving two directions, we increase the number of steps
        steps += 1

    sorted_offset = sorted(offsets, key=lambda x: abs(x[0]) + abs(x[1]))
    return sorted_offset

get_angle(image)

Source code in src/rift_console/image_helper.py
16
17
18
19
20
21
22
23
24
def get_angle(image: str) -> CameraAngle:
    if "narrow" in image:
        return CameraAngle.Narrow
    elif "normal" in image:
        return CameraAngle.Normal
    elif "wide" in image:
        return CameraAngle.Wide
    logger.warning(f"Unknown camera angle in {image}")
    return CameraAngle.Unknown

get_date(image)

Source code in src/rift_console/image_helper.py
27
28
29
30
31
32
33
34
35
36
37
38
39
def get_date(image: str) -> str:
    # pattern of year-month-dayThour-minute
    pattern = r"\d{4}-\d{2}-\d{2}T\d{2}:\d{2}:\d{2}"

    found_matches = re.findall(pattern, image)

    if len(found_matches) == 1:
        # logger.debug(found_matches[0])
        match: str = found_matches[0]
        return match
    else:
        logger.warning("None or two dates in {image}")
        return datetime.datetime.min.strftime("%Y-%m-%dT%H:%M:%S")

parse_image_name(name)

Parses an image name in the format generated by Melvonaut and extract the relevant properties for stitching

Parameters:

Name Type Description Default
name str

file name of the image

required

Returns:

Type Description
int

tuple[int, int, int]: Used lenssize (and therefore if the image should be scaled to this later)

int

and approximated x/y coordinates on the stiched image

Source code in src/rift_console/image_helper.py
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
def parse_image_name(name: str) -> tuple[int, int, int]:
    """Parses an image name in the format generated by Melvonaut and extract the relevant properties for stitching

    Args:
        name (str): file name of the image

    Returns:
        tuple[int, int, int]: Used lenssize (and therefore if the image should be scaled to this later)
        and approximated x/y coordinates on the stiched image
    """
    from shared.models import CameraAngle

    # expected format: 'image_5344_wide_2024-12-11T17:31:27.507376_x_19936_y_4879'
    # with 8 underscores
    if len(name.split("_")) != con.IMAGE_NAME_UNDERSCORE_COUNT:
        raise Exception("parse_image_name: filename has wrong format!")

    # used CameraAngle is after second underscore
    match name.split("_")[con.IMAGE_ANGLE_POSITION]:
        case CameraAngle.Narrow:
            lens_size = 600
        case CameraAngle.Normal:
            lens_size = 800
        case CameraAngle.Wide:
            lens_size = 1000

    # find x and y in name
    match = re.search(r"_x_(-?\d+)_y_(-?\d+)", name)

    if match:
        x = int(match.group(1))
        y = int(match.group(2))

        # old images position is not adjusted in melvonaut yet
        if con.USE_LEGACY_IMAGE_NAMES:
            x -= (int)(lens_size / 2)
            y -= (int)(lens_size / 2)
    else:
        raise Exception("parse_image_name: could not match x/y coordinates!")

    return lens_size, x, y

image_processing

automated_stitching(local_path)

Stitches images from the given path into one big image, which is stored under the same name in con.PANORAMA_PATH.

Parameters:

Name Type Description Default
local_path str

Path of a folder with images that should be stitched.

required
Source code in src/rift_console/image_processing.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def automated_stitching(local_path: str) -> None:
    """Stitches images from the given path into one big image, which is stored under the same name in con.PANORAMA_PATH.

    Args:
        local_path (str): Path of a folder with images that should be stitched.
    """

    image_path = local_path + "/"
    output_path = con.PANORAMA_PATH + "stitched"

    image_name_list = find_image_names(image_path)

    logger.warning(
        f"Starting stitching of {len(image_name_list)} image with path: {image_path}"
    )

    panorama = stitch_images(image_path=image_path, image_name_list=image_name_list)

    remove_offset = (
        con.STITCHING_BORDER,
        con.STITCHING_BORDER,
        con.WORLD_X + con.STITCHING_BORDER,
        con.WORLD_Y + con.STITCHING_BORDER,
    )
    panorama = panorama.crop(remove_offset)
    panorama.save(output_path + ".png")

    logger.warning(f"Saved panorama in {output_path}.png")

count_matching_pixels(offset, first_img, second_img, max_offset)

Counts how many pixels are equal between the two images for a given offset

Parameters:

Name Type Description Default
offset tuple[int, int]

shift of one image

required
img Image

first image

required
existing_img Image

second (larger to allow shift) image

required

Returns:

Type Description
tuple[tuple[int, int], int]

tuple[int, int]: used offset and number of matching pixels

Source code in src/rift_console/image_processing.py
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
def count_matching_pixels(
    offset: tuple[int, int],
    first_img: Image.Image,
    second_img: Image.Image,
    max_offset: int,
) -> tuple[tuple[int, int], int]:
    """Counts how many pixels are equal between the two images for a given offset

    Args:
        offset (tuple[int, int]): shift of one image
        img (Image): first image
        existing_img (Image): second (larger to allow shift) image

    Returns:
        tuple[int, int]: used offset and number of matching pixels
    """

    matches = 0
    for x_local in range(first_img.size[0]):
        for y_local in range(first_img.size[1]):
            p1 = first_img.getpixel((x_local, y_local))
            p2 = second_img.getpixel(
                (x_local + offset[0] + max_offset, y_local + offset[1] + max_offset)
            )
            if p1 and p2 and isinstance(p1, tuple) and isinstance(p2, tuple):
                # only compare R G B and not Alpha. Since there is random noise a slight difference is allowed
                if (
                    abs(p1[0] - p2[0]) + abs(p1[1] - p2[1]) + abs(p1[2] - p2[2])
                    < con.IMAGE_NOISE_FORGIVENESS
                ):
                    matches += 1
            else:
                logger.error(f"{p1} {p2} unkown types")

    return offset, matches

create_thumbnail(panorama_path)

Creates a scaled down panorama and a greyscale one from a given panorama and saves it to src/rift_console/static/images/thumb.png from where it can be used by html.

Parameters:

Name Type Description Default
panorama_path str

Name of the file (should include con.PANORAMA_PATH)

required
Source code in src/rift_console/image_processing.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
def create_thumbnail(panorama_path: str) -> None:
    """Creates a scaled down panorama and a greyscale one from a given panorama and saves it to `src/rift_console/static/images/thumb.png` from where it can be used by html.

    Args:
        panorama_path (str): Name of the file (should include con.PANORAMA_PATH)
    """
    with Image.open(panorama_path) as panorama:
        thumb = panorama.resize(
            (con.THUMBNAIL_X, con.THUMBNAIL_Y), Image.Resampling.LANCZOS
        )
        thumb.save(panorama_path.replace(".png", "") + "_thumb.png")
        # thumb = thumb.convert("L")
        # thumb.save("src/rift_console/static/images/" + "thumb_grey.png")
    logger.warning(f"Saved Thumbnail to {panorama_path}_thumb.png")

cut(panorama_path, X1, Y1, X2, Y2)

Cut a small portion from a bigger Panorama.

Parameters:

Name Type Description Default
panorama_path str

Name of the file (should include con.PANORAMA_PATH)

required
coordinates

Section that should be cut and saved

required
Source code in src/rift_console/image_processing.py
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
def cut(panorama_path: str, X1: int, Y1: int, X2: int, Y2: int) -> None:
    """Cut a small portion from a bigger Panorama.

    Args:
        panorama_path (str): Name of the file (should include con.PANORAMA_PATH)
        coordinates: Section that should be cut and saved
    """
    coordinates = (int(X1), int(Y1), int(X2), int(Y2))
    with Image.open(panorama_path) as panorama:
        cut_img = panorama.crop(coordinates)

    # cut_img.show()
    cut_img.save(panorama_path.replace(".png", "") + "_cut.png")

    logger.warning("Saved cut to media/*_cut.png")

stitch_images(image_path, image_name_list, panorama=None)

Main stitching algorithm TODO add existing img

Parameters:

Name Type Description Default
image_path str

description

required
images list[str]

description

required

Returns:

Type Description
Image

Image.Image: description

Source code in src/rift_console/image_processing.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
def stitch_images(
    image_path: str, image_name_list: list[str], panorama: Optional[Image.Image] = None
) -> Image.Image:
    """Main stitching algorithm
    TODO add existing img

    Args:
        image_path (str): _description_
        images (list[str]): _description_
        panorama

    Returns:
        Image.Image: _description_
    """
    # create new panorama if it does not exist
    if panorama is None:
        # add 1000 pixels on each side be used by nudging
        panorama = Image.new(
            "RGBA",
            (
                con.WORLD_X + con.STITCHING_BORDER * 2,
                con.WORLD_Y + con.STITCHING_BORDER * 2,
            ),
        )

    processed_images_counter = 0
    nudging_failed_counter = 0
    to_few_pixel_counter = 0
    max_offset_database = []

    # iterate images
    for image_name in image_name_list:
        with Image.open(image_path + image_name) as img:
            # extract image name
            lens_size, x, y = parse_image_name(image_name)
            try:
                img = img.convert("RGBA")
            except OSError as e:
                logger.warning(
                    f"Could not parse file {image_name}, skipped. Error: {e}"
                )
                continue

            # possible resize
            if lens_size != 600:
                img = img.resize((lens_size, lens_size), Image.Resampling.LANCZOS)

            logger.info(f"Parsing {image_name}")
            logger.debug(f"{img.size} {img.mode}")

            # try position in a square arround the center
            # values 7x7 Grid: d = 3, n = 28   9x9 Grid: d = 4 n = 80   11x11 Grid, d = 5 n = 120
            spiral_coordinates = generate_spiral_walk(
                con.SEARCH_GRID_SIDE_LENGTH * con.SEARCH_GRID_SIDE_LENGTH
            )

            max_offset = int((con.SEARCH_GRID_SIDE_LENGTH - 1) / 2)
            existing_stitch = panorama.crop(
                (
                    x - max_offset + con.STITCHING_BORDER,
                    y - max_offset + con.STITCHING_BORDER,
                    x + lens_size + max_offset + con.STITCHING_BORDER,
                    y + lens_size + max_offset + con.STITCHING_BORDER,
                )
            )

            # check if existing_stich contains something
            total_pixel = existing_stitch.size[0] * existing_stitch.size[1]
            set_pixel = sum(
                pixel != (0, 0, 0, 0) for pixel in list(existing_stitch.getdata())
            )
            empty_pixel = total_pixel - set_pixel

            logger.debug(
                f"Existing stich ({existing_stitch.size[0]},{existing_stitch.size[1]}) w. {total_pixel}p "
                + f"set: {set_pixel} {set_pixel/total_pixel}% and transparent: {empty_pixel} {empty_pixel/total_pixel}%"
            )

            best_match_count = 0
            best_offset = (0, 0)
            skip = False

            # TODO next goal would to move in only one direction in which the matches get better
            if con.DO_IMAGE_NUDGING_SEARCH:
                # probiere nur zu match_count falls mehr als 20% aller pixel gefüllt
                if set_pixel / total_pixel == 0:
                    logger.warning("Emtpy panoarma, image still placed")

                elif set_pixel / total_pixel > 0.2:
                    # make a func with static params for this image
                    count_part = partial(
                        count_matching_pixels,
                        first_img=img,
                        second_img=existing_stitch,
                        max_offset=max_offset,
                    )

                    with ProcessPoolExecutor(
                        max_workers=con.NUMBER_OF_WORKER_THREADS
                    ) as executor:
                        # each Thread gets automatically assign a different coordinate from the pool
                        results = executor.map(count_part, spiral_coordinates)

                    for offset, matches in results:
                        if matches > best_match_count:
                            logger.info(
                                f"New best: matches {matches}p ({matches/total_pixel}%), with offset {best_offset}\n"
                            )
                            best_offset = offset
                            best_match_count = matches

                    # check if it worked
                    if best_match_count / (set_pixel) < 0.5:
                        logger.warning(
                            f"Nudging failed, image skipped, since best_match_count: {best_match_count}p ({best_match_count/total_pixel}%)"
                        )

                        skip = True
                        nudging_failed_counter += 1

                        logger.warning(
                            f"{max(abs(best_offset[0]), abs(best_offset[1]))} {best_offset[0]} {best_offset[1]}"
                        )
                        max_offset_database.append(
                            max(abs(best_offset[0]), abs(best_offset[1]))
                        )
                else:
                    logger.warning(
                        f"Too few pixel on panorama, image skipped, set_pixel%: {set_pixel/total_pixel}"
                    )
                    skip = True
                    to_few_pixel_counter += 1

                # need to check math for % here!
                logger.debug(
                    f"Placed Image best_match_count: {best_match_count}p ({best_match_count/total_pixel}%) with offset: {best_offset}\n"
                )

            if not skip:
                panorama.paste(
                    img,
                    (
                        x + best_offset[0] + con.STITCHING_BORDER,
                        y + best_offset[1] + con.STITCHING_BORDER,
                    ),
                )

            processed_images_counter += 1
            if processed_images_counter % con.SAVE_PANORAMA_STEP == 0:
                panorama.save(
                    con.PANORAMA_PATH + "step_" + str(processed_images_counter) + ".png"
                )

        # if any(pixel < 255 for pixel in alpha.getdata()):
        #    return True

        if processed_images_counter >= con.STITCHING_COUNT_LIMIT:
            logger.warning(
                f"STITCHING_COUNT_LIMIT of {con.STITCHING_COUNT_LIMIT} reached!"
            )
            break

    logger.warning(
        f"\n\nDone stitching of {processed_images_counter} from {len(image_name_list)} given images"
    )
    if con.DO_IMAGE_NUDGING_SEARCH:
        logger.warning(
            f"nudging_failed_counter: {nudging_failed_counter} , to_few_pixel_counter: {to_few_pixel_counter}\n"
        )

    for element, count in Counter(max_offset_database).items():
        logger.warning(f"Max_offset up to {element} occured {count} times")

    return panorama

upload(id, path, folder=False)

Uploads one objective image"

Source code in src/rift_console/image_processing.py
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
def upload(id: int, path: str, folder: bool = False) -> None:
    """Uploads one objective image" """

    if folder:
        images = find_image_names(path)
        for img in images:
            logger.warning(img)
            upload(id, path + "/" + img, False)

            logger.warning(f"Uploaded folder of {len(images)}.")
        return

    logger.info(f"Uploading {id} with path {path}")
    params = {"objective_id": id}

    files = {"image": (path, open(path, "rb"), "image/png")}

    with requests.Session() as s:
        r = s.post(con.IMAGE_ENDPOINT, params=params, files=files)

        if r.status_code == 200:
            logger.warning(f"Uploaded: {r}")
            logger.warning(f"{r.text}{r.json()}")
        else:
            logger.error(f"Upload failed with code: {r.status_code}")
            logger.error(f"{r.text} {r.json()}")
    logger.info("Done with Uplaod!")

melvin_api

port = '8080' module-attribute

url = 'localhost' module-attribute

MelvonautTelemetry

Bases: BaseModel

Datapoint of Disk, Memory and CPU utilization.

Source code in src/rift_console/melvin_api.py
57
58
59
60
61
62
63
64
65
66
class MelvonautTelemetry(BaseModel):
    """Datapoint of Disk, Memory and CPU utilization."""
    disk_total: int
    disk_free: int
    disk_perc: float
    mem_total: int
    mem_available: int
    mem_perc: float
    cpu_cores: int
    cpu_perc: float

cpu_cores instance-attribute

cpu_perc instance-attribute

disk_free instance-attribute

disk_perc instance-attribute

disk_total instance-attribute

mem_available instance-attribute

mem_perc instance-attribute

mem_total instance-attribute

clear_events()

Clear event log.

Source code in src/rift_console/melvin_api.py
158
159
160
161
162
163
164
165
166
167
168
169
170
171
def clear_events() -> str:
    """Clear event log."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return ""

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_events")

    if r:
        res = "Mevlonaut get_clear_events done."
    else:
        res = "Mevlonaut get_clear_events failed, is okay if event-log is empty."
    logger.warning(res)
    return res

clear_images()

Deletes exisiting images on Melvonaut.

Source code in src/rift_console/melvin_api.py
302
303
304
305
306
307
308
309
310
311
312
313
314
315
def clear_images() -> bool:
    """Deletes exisiting images on Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return False

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_all_images")

    if r:
        logger.warning("Mevlonaut cleared all images done.")
        return True
    else:
        logger.warning("Mevlonaut clear_images failed.")
        return False

clear_logs()

Deletes logs on Melvonaut.

Source code in src/rift_console/melvin_api.py
251
252
253
254
255
256
257
258
259
260
261
262
263
264
def clear_logs() -> bool:
    """Deletes logs on Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return False

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_all_logs")

    if r:
        logger.warning("Mevlonaut cleared all logs done.")
        return True
    else:
        logger.warning("Mevlonaut clear_logs failed.")
        return False

clear_telemetry()

Delete exisiting telemtry files on Melvonaut.

Source code in src/rift_console/melvin_api.py
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
def clear_telemetry() -> str:
    """Delete exisiting telemtry files on Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return ""

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_clear_telemetry")

    if r:
        res = "Mevlonaut clear_telemetry done."
    else:
        res = "Mevlonaut clear_telemetry failed."

    logger.warning(res)
    return res

create_tunnel()

Not completed function to automatically create an ssh tunnel with port forwarding, alternative use 'shpass -f .ssh-pw ssh -N -L 8080:localhost:8080 root@10.100.50.1'

Source code in src/rift_console/melvin_api.py
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
def create_tunnel() -> None:
    """Not completed function to automatically create an ssh tunnel with port forwarding,
    alternative use 'shpass -f .ssh-pw ssh -N -L 8080:localhost:8080 root@10.100.50.1'"""
    cmd = [
        "sshpass",
        "-f",
        ".ssh-pw",
        "ssh",
        "root@10.100.50.1",
        "-N",
        "-L",
        "8080:localhost:8080",
        "-o",
        "ConnectTimeout=1s",
    ]
    timeout = 60 * 15  # kill connection after 15 min

    process = subprocess.Popen(args=cmd)
    logger.info(f"Started tunnel: {process.pid}")

    # Function to terminate the process
    def terminate_process() -> None:
        logger.warning("Cleanup tunnel")
        os.killpg(os.getpgid(process.pid), signal.SIGTERM)

    # Start a timer to terminate the process after timeout
    timer = threading.Timer(timeout, terminate_process)
    timer.start()

    return

download_events()

Download event log.

Source code in src/rift_console/melvin_api.py
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
def download_events() -> str:
    """Download event log."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return ""

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_download_events")
    if r:
        decoded_content = r.content.decode("utf-8")
        csv_file_path = (
            con.CONSOLE_FROM_MELVONAUT_PATH
            + "MelvonautEvents-"
            + live_utc().strftime("%Y-%m-%dT%H:%M:%S")
            + ".csv"
        )

        with open(csv_file_path, "w", newline="", encoding="utf-8") as file:
            file.write(decoded_content)
        with open(csv_file_path, mode="r", newline="", encoding="utf-8") as file:
            csv_reader = csv.reader(file)
            line_count = sum(1 for _ in csv_reader)

        res = f"Mevlonaut get_download_events to {csv_file_path} with {line_count} lines done."
    else:
        res = "Mevlonaut get_download_events failed, is okay if event-log is empty."
    logger.warning(res)
    return res

download_telemetry()

Download existing telemetry files on Melvonaut.

Source code in src/rift_console/melvin_api.py
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
def download_telemetry() -> str:
    """Download existing telemetry files on Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return ""

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_download_telemetry")
    if r:
        decoded_content = r.content.decode("utf-8")
        csv_file_path = (
            con.CONSOLE_FROM_MELVONAUT_PATH
            + "MelvonautTelemetry-"
            + live_utc().strftime("%Y-%m-%dT%H:%M:%S")
            + ".csv"
        )

        with open(csv_file_path, "w", newline="", encoding="utf-8") as file:
            file.write(decoded_content)
        with open(csv_file_path, mode="r", newline="", encoding="utf-8") as file:
            csv_reader = csv.reader(file)
            line_count = sum(1 for _ in csv_reader)

        res = f"Mevlonaut download_telemetry to {csv_file_path} with {line_count} lines done."
    else:
        res = "Mevlonaut download_telemetry failed."
    logger.warning(res)
    return res

get_download_save_image(image_name)

Download a single image from Melvonaut.

Source code in src/rift_console/melvin_api.py
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
def get_download_save_image(image_name: str) -> Any:
    """Download a single image from Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return None

    r = melvonaut_api(
        method=HttpCode.POST,
        endpoint="/api/post_download_image",
        json={"file": image_name},
    )

    if r:
        logger.info(f'Mevlonaut downloaded "{image_name}" done.')
        return r
    else:
        logger.warning("Mevlonaut get_download_save_image failed.")
        return None

get_download_save_log(log_name)

Downloads all logs from Melvonaut.

Source code in src/rift_console/melvin_api.py
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
def get_download_save_log(log_name: str) -> Any:
    """Downloads all logs from Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return None

    r = melvonaut_api(
        method=HttpCode.POST, endpoint="/api/post_download_log", json={"file": log_name}
    )

    if r:
        logger.info(f'Mevlonaut downloaded "{log_name}" done.')
        return r
    else:
        logger.warning("Mevlonaut get_download_save_log failed.")
        return None

get_setting(setting)

Get a Melvonaut Setting from settings.py.

Source code in src/rift_console/melvin_api.py
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def get_setting(setting: str) -> str:
    """Get a Melvonaut Setting from settings.py."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return ""
    check = melvonaut_api(
        method=HttpCode.POST, endpoint="/api/post_get_setting", json={setting: ""}
    )

    if check:
        value = str(check.json()[setting])
        logger.info(f'Mevlonaut get settting "{setting}" is "{value}" done.')
        return value
    logger.warning('Mevlonaut get setting "{setting}" failed.')
    return ""

list_images()

List all exising images on Melvonaut.

Source code in src/rift_console/melvin_api.py
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
def list_images() -> list[str] | bool:
    """List all exising images on Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return False

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_list_images").json()

    if r:
        images: list[str] = r["images"]
        logger.info(f"Mevlonaut image list done, found {len(images)} images.")
        return images
    else:
        logger.warning("Mevlonaut list_images failed.")
        return False

list_logs()

List all log fiels on Melvonaut.

Source code in src/rift_console/melvin_api.py
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
def list_logs() -> list[str] | bool:
    """List all log fiels on Melvonaut."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return False

    r = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_list_log_files").json()

    if r:
        logs: list[str] = r["log_files"]
        logger.info(f"Mevlonaut list logs done, found {len(logs)} images.")
        return logs
    else:
        logger.warning("Mevlonaut list_images failed.")
        return False

live_melvonaut()

Get live MelvonautTelemetry.

Source code in src/rift_console/melvin_api.py
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
def live_melvonaut() -> Optional[MelvonautTelemetry]:
    """Get live MelvonautTelemetry."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return None
    d = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_disk_usage").json()
    m = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_memory_usage").json()
    c = melvonaut_api(method=HttpCode.GET, endpoint="/api/get_cpu_usage").json()

    gigabyte = 2**30
    if d and m and c:
        logger.info("Mevlonaut telemetry done.")
        return MelvonautTelemetry(
            disk_total=int(d["root"]["total"] / gigabyte),
            disk_free=int(d["root"]["free"] / gigabyte),
            disk_perc=100 - d["root"]["percent"],  # invert
            mem_total=int(m["total"] / gigabyte),
            mem_available=int(m["available"] / gigabyte),
            mem_perc=m["percent"],
            cpu_cores=c["physical_cores"],
            cpu_perc=c["percent"],
        )
    else:
        logger.warning("Mevlonaut telemetry failed.")
        return None

melvonaut_api(method, endpoint, json={})

Wrapper with error handling for Melvonaut API.

Source code in src/rift_console/melvin_api.py
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def melvonaut_api(method: HttpCode, endpoint: str, json: dict[str, str] = {}) -> Any:
    """Wrapper with error handling for Melvonaut API."""
    try:
        with requests.Session() as s:
            match method:
                case HttpCode.GET:
                    r = s.get("http://" + url + ":" + port + endpoint, timeout=5)
                case HttpCode.POST:
                    r = s.post(
                        "http://" + url + ":" + port + endpoint,
                        timeout=5,
                        json=json,
                    )

    except requests.exceptions.ConnectionError:
        logger.error("ConnectionError - possible no VPN?")
        return {}
    except requests.exceptions.ReadTimeout:
        logger.error("Timeout error!")
        return {}

    match r.status_code:
        case 200:
            try:
                logger.debug(
                    f"Received from API {method}/{endpoint} - {r} - {r.json()}"
                )
            except requests.exceptions.JSONDecodeError:
                logger.debug(f"Received from API {method}/{endpoint} - {r}")
            return r
        case 404:
            logger.warning(f"Requested ressource not found - {r}.")
            return {}
        case _:
            # unknow error
            logger.warning(f"Unknown error, could not contact satellite? - {r}.")
            return {}

set_setting(setting, value)

Set a Melvonaut Setting from settings.py.

Source code in src/rift_console/melvin_api.py
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
def set_setting(setting: str, value: str) -> bool:
    """Set a Melvonaut Setting from settings.py."""
    if not melvonaut_api(method=HttpCode.GET, endpoint="/api/health"):
        logger.warning("Melvonaut API unreachable!")
        return False
    r = melvonaut_api(
        method=HttpCode.POST, endpoint="/api/post_set_setting", json={setting: value}
    )

    if r:
        check = melvonaut_api(
            method=HttpCode.POST, endpoint="/api/post_get_setting", json={setting: ""}
        )
        logger.error(f"{check.json()} {value}")
        if check.json()[setting] == value:
            logger.info(f'Mevlonaut set_Settting "{setting}" to "{value}" done.')
            return True
    logger.warning(f'Mevlonaut set_Settting "{setting}" to "{value}" failed.')
    return False

rift_console

RiftConsole

State of a currently running Console, including live data from CIARC/Melvonaut API.

Source code in src/rift_console/rift_console.py
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class RiftConsole:
    """State of a currently running Console, including live data from CIARC/Melvonaut API."""

    last_backup_date: Optional[datetime.datetime] = None
    is_network_simulation: Optional[bool] = None
    user_speed_multiplier: Optional[int] = None

    live_telemetry: Optional[BaseTelemetry] = None
    prev_state: State = State.Unknown
    next_state: State = State.Unknown
    slots_used: Optional[int] = None
    slots: list[Slot] = []
    zoned_objectives: list[ZonedObjective] = []
    beacon_objectives: list[BeaconObjective] = []
    completed_ids: list[int] = []
    achievements: list[Achievement] = []
    past_traj: list[tuple[int, int]] = []
    future_traj: list[tuple[int, int]] = []
    live_melvonaut_api: Optional[MelvonautTelemetry] = None
    melvonaut_image_count: int = -1  # -1 indicates no data
    console_image_count: int = -1  # -1 indicates no data
    console_image_dates: list[tuple[str, int]] = []
    ebt_ping_list: list[tuple[int, int]] = []
    console_found_events: list[Event] = []
    melvin_task: str = ""
    melvin_lens: str = ""

    def get_draw_zoned_obj(self) -> list[dict[str, object]]:
        """Picks objectives to be drawn later from its telemetry."""
        get_draw_zoned_obj = []
        for obj in self.zoned_objectives:
            if obj.zone is not None:
                draw = {
                    "name": obj.id,
                    "zone": [
                        int(obj.zone[0]),
                        int(obj.zone[1]),
                        int(obj.zone[2]),
                        int(obj.zone[3]),
                    ],
                }
                get_draw_zoned_obj.append(draw)
                if len(get_draw_zoned_obj) >= 5:  # only collect 5 for visual clarity
                    break
        return get_draw_zoned_obj

    def predict_trajektorie(
        self,
    ) -> tuple[list[tuple[int, int]], list[tuple[int, int]]]:
        """Calculate the points that melvin goes through next"""
        past = []
        future = []

        if self.live_telemetry:
            for i in range(0, con.TRAJ_TIME, con.TRAJ_STEP):
                (x, y) = RiftConsole.fix_overflow(
                    int(self.live_telemetry.width_x + self.live_telemetry.vx * i),
                    int(self.live_telemetry.height_y + self.live_telemetry.vy * i),
                )
                future.append((x, y))
                (x, y) = RiftConsole.fix_overflow(
                    int(self.live_telemetry.width_x - self.live_telemetry.vx * i),
                    int(self.live_telemetry.height_y - self.live_telemetry.vy * i),
                )
                past.append((x, y))

        return (past, future)

    @staticmethod
    def fix_overflow(x: int, y: int) -> tuple[int, int]:
        """Helper for trajektorie predition. Does "teleportation" when MELVIN reaches one side of the map."""
        if x > con.WORLD_X:
            x = x % con.WORLD_X

        while x < 0:
            x += con.WORLD_X

        if y > con.WORLD_Y:
            y = y % con.WORLD_Y
        while y < 0:
            y += con.WORLD_Y

        return (x, y)

achievements = [] class-attribute instance-attribute

beacon_objectives = [] class-attribute instance-attribute

completed_ids = [] class-attribute instance-attribute

console_found_events = [] class-attribute instance-attribute

console_image_count = -1 class-attribute instance-attribute

console_image_dates = [] class-attribute instance-attribute

ebt_ping_list = [] class-attribute instance-attribute

future_traj = [] class-attribute instance-attribute

is_network_simulation = None class-attribute instance-attribute

last_backup_date = None class-attribute instance-attribute

live_melvonaut_api = None class-attribute instance-attribute

live_telemetry = None class-attribute instance-attribute

melvin_lens = '' class-attribute instance-attribute

melvin_task = '' class-attribute instance-attribute

melvonaut_image_count = -1 class-attribute instance-attribute

next_state = State.Unknown class-attribute instance-attribute

past_traj = [] class-attribute instance-attribute

prev_state = State.Unknown class-attribute instance-attribute

slots = [] class-attribute instance-attribute

slots_used = None class-attribute instance-attribute

user_speed_multiplier = None class-attribute instance-attribute

zoned_objectives = [] class-attribute instance-attribute

fix_overflow(x, y) staticmethod

Helper for trajektorie predition. Does "teleportation" when MELVIN reaches one side of the map.

Source code in src/rift_console/rift_console.py
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
@staticmethod
def fix_overflow(x: int, y: int) -> tuple[int, int]:
    """Helper for trajektorie predition. Does "teleportation" when MELVIN reaches one side of the map."""
    if x > con.WORLD_X:
        x = x % con.WORLD_X

    while x < 0:
        x += con.WORLD_X

    if y > con.WORLD_Y:
        y = y % con.WORLD_Y
    while y < 0:
        y += con.WORLD_Y

    return (x, y)

get_draw_zoned_obj()

Picks objectives to be drawn later from its telemetry.

Source code in src/rift_console/rift_console.py
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
def get_draw_zoned_obj(self) -> list[dict[str, object]]:
    """Picks objectives to be drawn later from its telemetry."""
    get_draw_zoned_obj = []
    for obj in self.zoned_objectives:
        if obj.zone is not None:
            draw = {
                "name": obj.id,
                "zone": [
                    int(obj.zone[0]),
                    int(obj.zone[1]),
                    int(obj.zone[2]),
                    int(obj.zone[3]),
                ],
            }
            get_draw_zoned_obj.append(draw)
            if len(get_draw_zoned_obj) >= 5:  # only collect 5 for visual clarity
                break
    return get_draw_zoned_obj

predict_trajektorie()

Calculate the points that melvin goes through next

Source code in src/rift_console/rift_console.py
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
def predict_trajektorie(
    self,
) -> tuple[list[tuple[int, int]], list[tuple[int, int]]]:
    """Calculate the points that melvin goes through next"""
    past = []
    future = []

    if self.live_telemetry:
        for i in range(0, con.TRAJ_TIME, con.TRAJ_STEP):
            (x, y) = RiftConsole.fix_overflow(
                int(self.live_telemetry.width_x + self.live_telemetry.vx * i),
                int(self.live_telemetry.height_y + self.live_telemetry.vy * i),
            )
            future.append((x, y))
            (x, y) = RiftConsole.fix_overflow(
                int(self.live_telemetry.width_x - self.live_telemetry.vx * i),
                int(self.live_telemetry.height_y - self.live_telemetry.vy * i),
            )
            past.append((x, y))

    return (past, future)