Created
May 11, 2026 08:12
-
-
Save maxried/730224ac9423ce747a2227423eb326f3 to your computer and use it in GitHub Desktop.
iDRAC RFB Converter
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| #!/usr/bin/env python3 | |
| """Vibe-coded tool to convert iDRAC RFB crashvideos and bootvideos to a playable format.""" | |
| import argparse | |
| import hashlib | |
| import os | |
| import shutil | |
| import struct | |
| import subprocess | |
| import sys | |
| from dataclasses import dataclass | |
| from pathlib import Path | |
| from tempfile import TemporaryDirectory | |
| from typing import Mapping, NotRequired, TypedDict | |
| try: | |
| from PIL import Image | |
| from PIL.PngImagePlugin import PngInfo | |
| except Exception: # pragma: no cover - optional dependency | |
| Image = None | |
| PngInfo = None | |
| MAX_RECTANGLES_PER_FRAME = 10_000 | |
| MAX_DIMENSION = 16_384 | |
| MAX_PIXELS = 128 * 1024 * 1024 | |
| class PixelFormat(TypedDict): | |
| bits_per_pixel: int | |
| depth: int | |
| big_endian_flag: int | |
| true_color_flag: int | |
| red_max: int | |
| green_max: int | |
| blue_max: int | |
| red_shift: int | |
| green_shift: int | |
| blue_shift: int | |
| class Handshake(TypedDict): | |
| protocol_version: str | |
| security_type: int | |
| framebuffer_width: int | |
| framebuffer_height: int | |
| pixel_format: PixelFormat | |
| desktop_name: str | |
| class FrameInfo(TypedDict): | |
| frame: int | |
| image: str | |
| session: int | |
| sequence_number: int | |
| timestamp_ms: int | |
| delay_ms: int | |
| is_full_frame: bool | |
| class DecodeInfo(TypedDict): | |
| source: str | |
| source_sha256: str | |
| capture_format: str | |
| record_count: int | |
| stream_size: int | |
| session: int | |
| session_count: int | |
| update_count: int | |
| frame_count: int | |
| duration_ms: int | None | |
| protocol_version: str | |
| security_type: int | |
| desktop_name: str | |
| framebuffer_width: int | |
| framebuffer_height: int | |
| video: NotRequired[str | None] | |
| class SessionResult(TypedDict): | |
| session: int | |
| handshake: Handshake | |
| frames: list[FrameInfo] | |
| frames_dir: NotRequired[Path] | |
| @dataclass(frozen=True) | |
| class OfflineCapture: | |
| records: list[bytes] | |
| total_ms: int | |
| class Reader: | |
| def __init__(self, data: bytes, pos: int = 0) -> None: | |
| self.data = data | |
| self.pos = pos | |
| def take(self, n: int) -> bytes: | |
| if self.pos + n > len(self.data): | |
| raise EOFError(f"need {n} bytes at {self.pos}, have {len(self.data) - self.pos}") | |
| out = self.data[self.pos : self.pos + n] | |
| self.pos += n | |
| return out | |
| def u8(self) -> int: | |
| return self.take(1)[0] | |
| def u16(self) -> int: | |
| return struct.unpack(">H", self.take(2))[0] | |
| def u32(self) -> int: | |
| return struct.unpack(">I", self.take(4))[0] | |
| def s32(self) -> int: | |
| return struct.unpack(">i", self.take(4))[0] | |
| def sha256(data: bytes) -> str: | |
| return hashlib.sha256(data).hexdigest() | |
| def validate_framebuffer_size(width: int, height: int) -> None: | |
| if width <= 0 or height <= 0: | |
| raise ValueError(f"invalid framebuffer size: {width}x{height}") | |
| if width > MAX_DIMENSION or height > MAX_DIMENSION or width * height > MAX_PIXELS: | |
| raise ValueError(f"framebuffer size exceeds parser limits: {width}x{height}") | |
| def clean_generated_output(out: Path) -> None: | |
| for name in ("images", "frames", "messages", "chunks", "embedded_images"): | |
| path = out / name | |
| if path.exists(): | |
| if path.is_file() or path.is_symlink(): | |
| path.unlink() | |
| else: | |
| shutil.rmtree(path) | |
| for name in ("metadata.json", "records.jsonl", "recording_preamble.bin", "server_stream.bin", "info.txt"): | |
| path = out / name | |
| if path.exists(): | |
| path.unlink() | |
| def parse_offline_capture(data: bytes) -> OfflineCapture: | |
| if len(data) < 24: | |
| raise ValueError("file is too small for the offline RFB container") | |
| number_of_files = int.from_bytes(data[0:2], "big") | |
| total_map_size = int.from_bytes(data[2:6], "big") | |
| total_ms = int.from_bytes(data[6:10], "big") | |
| if number_of_files < 1 or total_map_size > len(data): | |
| raise ValueError("not an offline OpenVNC/RFB capture") | |
| records: list[bytes] = [] | |
| map_offset = 10 | |
| for _ in range(number_of_files): | |
| if map_offset + 14 > len(data): | |
| raise ValueError("truncated offline file map") | |
| map_size = int.from_bytes(data[map_offset + 6 : map_offset + 10], "big") | |
| file_end = map_offset + map_size | |
| if map_size < 14 or file_end > len(data): | |
| raise ValueError("invalid offline file map size") | |
| packet = map_offset + 14 | |
| while packet + 8 <= file_end: | |
| length = int.from_bytes(data[packet : packet + 4], "big") | |
| payload_offset = packet + 4 | |
| payload_end = payload_offset + length | |
| if payload_end + 4 > file_end: | |
| raise ValueError("truncated offline packet") | |
| records.append(data[payload_offset:payload_end]) | |
| packet = payload_end + 4 | |
| map_offset = file_end | |
| if not records or not b"".join(records[:4]).startswith(b"RFB "): | |
| raise ValueError("offline container does not start with an RFB server stream") | |
| return OfflineCapture(records=records, total_ms=total_ms) | |
| def parse_pixel_format(raw: bytes) -> PixelFormat: | |
| if len(raw) != 16: | |
| raise ValueError("pixel format must be 16 bytes") | |
| fmt: PixelFormat = { | |
| "bits_per_pixel": raw[0], | |
| "depth": raw[1], | |
| "big_endian_flag": raw[2], | |
| "true_color_flag": raw[3], | |
| "red_max": struct.unpack(">H", raw[4:6])[0], | |
| "green_max": struct.unpack(">H", raw[6:8])[0], | |
| "blue_max": struct.unpack(">H", raw[8:10])[0], | |
| "red_shift": raw[10], | |
| "green_shift": raw[11], | |
| "blue_shift": raw[12], | |
| } | |
| expected: PixelFormat = { | |
| "bits_per_pixel": 16, | |
| "depth": 16, | |
| "big_endian_flag": 0, | |
| "true_color_flag": 1, | |
| "red_max": 31, | |
| "green_max": 31, | |
| "blue_max": 31, | |
| "red_shift": 11, | |
| "green_shift": 6, | |
| "blue_shift": 0, | |
| } | |
| if fmt != expected: | |
| raise ValueError(f"unsupported pixel format for this minimized parser: {fmt}") | |
| return fmt | |
| def parse_handshake(stream: bytes, pos: int) -> tuple[Handshake, int]: | |
| reader = Reader(stream, pos) | |
| version = reader.take(12).decode("ascii", errors="replace").strip() | |
| if version != "RFB 003.003": | |
| raise ValueError(f"unsupported RFB version: {version}") | |
| security_type = reader.u32() | |
| width = reader.u16() | |
| height = reader.u16() | |
| validate_framebuffer_size(width, height) | |
| pixel_format = parse_pixel_format(reader.take(16)) | |
| name_len = reader.u32() | |
| desktop_name = reader.take(name_len).decode("utf-8", errors="replace") | |
| return ( | |
| { | |
| "protocol_version": version, | |
| "security_type": security_type, | |
| "framebuffer_width": width, | |
| "framebuffer_height": height, | |
| "pixel_format": pixel_format, | |
| "desktop_name": desktop_name, | |
| }, | |
| reader.pos, | |
| ) | |
| def rgb555_to_rgb(value: int) -> tuple[int, int, int]: | |
| red = (value >> 11) & 31 | |
| green = (value >> 6) & 31 | |
| blue = value & 31 | |
| return red * 255 // 31, green * 255 // 31, blue * 255 // 31 | |
| def pixels_to_rgb555(raw: bytes, width: int, height: int) -> bytes: | |
| out = bytearray(width * height * 3) | |
| dst = 0 | |
| for src in range(0, min(len(raw), width * height * 2), 2): | |
| out[dst : dst + 3] = bytes(rgb555_to_rgb(int.from_bytes(raw[src : src + 2], "little"))) | |
| dst += 3 | |
| return bytes(out) | |
| def set_rect( | |
| fb: bytearray, | |
| fb_width: int, | |
| x: int, | |
| y: int, | |
| width: int, | |
| height: int, | |
| rgb: bytes, | |
| ) -> None: | |
| row_len = width * 3 | |
| for row in range(height): | |
| dst = ((y + row) * fb_width + x) * 3 | |
| src = row * row_len | |
| fb[dst : dst + row_len] = rgb[src : src + row_len] | |
| def get_rect(fb: bytearray, fb_width: int, x: int, y: int, width: int, height: int) -> bytes: | |
| out = bytearray(width * height * 3) | |
| row_len = width * 3 | |
| for row in range(height): | |
| src = ((y + row) * fb_width + x) * 3 | |
| dst = row * row_len | |
| out[dst : dst + row_len] = fb[src : src + row_len] | |
| return bytes(out) | |
| def fill_rect( | |
| rgb: bytearray, | |
| rect_width: int, | |
| x: int, | |
| y: int, | |
| width: int, | |
| height: int, | |
| color: tuple[int, int, int], | |
| ) -> None: | |
| row_data = bytes(color) * width | |
| for row in range(height): | |
| dst = ((y + row) * rect_width + x) * 3 | |
| rgb[dst : dst + width * 3] = row_data | |
| def read_pixel(data: bytes, pos: int) -> tuple[tuple[int, int, int], int]: | |
| if pos + 2 > len(data): | |
| raise EOFError("truncated hextile pixel") | |
| return rgb555_to_rgb(int.from_bytes(data[pos : pos + 2], "little")), pos + 2 | |
| def decode_hextile(data: bytes, width: int, height: int, base_rgb: bytes) -> tuple[bytes, int]: | |
| rgb = bytearray(base_rgb) | |
| pos = 0 | |
| bg = (0, 0, 0) | |
| fg = (0, 0, 0) | |
| last_subencoding = 0 | |
| for tile_y in range(0, height, 16): | |
| tile_h = min(16, height - tile_y) | |
| for tile_x in range(0, width, 16): | |
| if pos >= len(data): | |
| raise EOFError("truncated hextile subencoding") | |
| sub = data[pos] | |
| pos += 1 | |
| if sub & ~0x1F: | |
| raise ValueError(f"invalid hextile subencoding 0x{sub:02x}") | |
| tile_w = min(16, width - tile_x) | |
| if sub & 0x01: | |
| raw_len = tile_w * tile_h * 2 | |
| tile = pixels_to_rgb555(data[pos : pos + raw_len], tile_w, tile_h) | |
| pos += raw_len | |
| set_rect(rgb, width, tile_x, tile_y, tile_w, tile_h, tile) | |
| last_subencoding = sub | |
| continue | |
| if sub == 0 and last_subencoding & 0x01: | |
| last_subencoding = sub | |
| continue | |
| if sub & 0x02: | |
| bg, pos = read_pixel(data, pos) | |
| fill_rect(rgb, width, tile_x, tile_y, tile_w, tile_h, bg) | |
| if sub & 0x04: | |
| fg, pos = read_pixel(data, pos) | |
| if sub & 0x08: | |
| count = data[pos] | |
| pos += 1 | |
| for _ in range(count): | |
| color = fg | |
| if sub & 0x10: | |
| color, pos = read_pixel(data, pos) | |
| xy = data[pos] | |
| wh = data[pos + 1] | |
| pos += 2 | |
| sub_x = xy >> 4 | |
| sub_y = xy & 0x0F | |
| sub_w = (wh >> 4) + 1 | |
| sub_h = (wh & 0x0F) + 1 | |
| fill_rect(rgb, width, tile_x + sub_x, tile_y + sub_y, sub_w, sub_h, color) | |
| last_subencoding = sub | |
| return bytes(rgb), pos | |
| def looks_like_frame_header(stream: bytes, pos: int) -> bool: | |
| if pos + 30 > len(stream): | |
| return False | |
| seq, _timestamp, size, _previous, _delay, width, height, full, changed = struct.unpack( | |
| ">IIIIIHHBB", stream[pos : pos + 26] | |
| ) | |
| rect_count = int.from_bytes(stream[pos + 28 : pos + 30], "big") | |
| return ( | |
| seq > 0 | |
| and size >= 30 | |
| and width > 0 | |
| and height > 0 | |
| and full <= 1 | |
| and changed <= 1 | |
| and rect_count <= MAX_RECTANGLES_PER_FRAME | |
| ) | |
| def media_description(info: Mapping[str, object]) -> str: | |
| return ( | |
| f"Offline OpenVNC RFB recording, {info['framebuffer_width']}x{info['framebuffer_height']}, " | |
| f"{info['frame_count']} frames, session {info['session']}, " | |
| f"duration {info['duration_ms']} ms, source SHA-256 {info['source_sha256']}" | |
| ) | |
| def png_metadata(info: Mapping[str, object], frame: FrameInfo) -> dict[str, str]: | |
| return { | |
| "Title": f"{Path(str(info['source'])).stem} frame {frame['frame']:06d}", | |
| "Description": ( | |
| f"Offline OpenVNC RFB frame, {info['framebuffer_width']}x{info['framebuffer_height']}, " | |
| f"source SHA-256 {info['source_sha256']}" | |
| ), | |
| "Software": "parse_rfb_capture.py", | |
| "Source": str(info["source"]), | |
| "Comment": ( | |
| f"RFB sequence {frame['sequence_number']}, session {frame['session']}, " | |
| f"timestamp {frame['timestamp_ms']} ms, delay {frame['delay_ms']} ms" | |
| ), | |
| } | |
| def save_rgb(path: Path, width: int, height: int, rgb: bytes, metadata: Mapping[str, str]) -> None: | |
| if Image is None: | |
| raise RuntimeError("Pillow is required to write PNG frames") | |
| pnginfo = PngInfo() if PngInfo is not None else None | |
| if pnginfo is not None: | |
| for key, value in metadata.items(): | |
| pnginfo.add_text(key, value) | |
| Image.frombytes("RGB", (width, height), rgb).save(path, pnginfo=pnginfo) | |
| def decode_rectangles(reader: Reader, fb: bytearray, fb_width: int, fb_height: int, rect_count: int) -> None: | |
| for _ in range(rect_count): | |
| x = reader.u16() | |
| y = reader.u16() | |
| width = reader.u16() | |
| height = reader.u16() | |
| encoding = reader.s32() | |
| if encoding != 5: | |
| raise ValueError(f"unsupported encoding in minimized parser: {encoding}") | |
| if x + width > fb_width or y + height > fb_height: | |
| raise ValueError(f"rectangle outside framebuffer: {x},{y} {width}x{height}") | |
| base = get_rect(fb, fb_width, x, y, width, height) | |
| rgb, used = decode_hextile(reader.data[reader.pos :], width, height, base) | |
| reader.pos += used | |
| set_rect(fb, fb_width, x, y, width, height, rgb) | |
| def timeline_duration_ms(frames: list[FrameInfo]) -> int | None: | |
| if not frames: | |
| return None | |
| return frames[-1]["timestamp_ms"] - frames[0]["timestamp_ms"] + max(0, frames[-1]["delay_ms"]) | |
| def decode_stream(stream: bytes, frame_root: Path | None, base_info: Mapping[str, object]) -> list[SessionResult]: | |
| pos = 0 | |
| session_no = 0 | |
| sessions: list[SessionResult] = [] | |
| while pos < len(stream): | |
| rfb_pos = stream.find(b"RFB ", pos) | |
| if rfb_pos < 0: | |
| break | |
| handshake, pos = parse_handshake(stream, rfb_pos) | |
| session_no += 1 | |
| width = handshake["framebuffer_width"] | |
| height = handshake["framebuffer_height"] | |
| fb = bytearray(width * height * 3) | |
| frames: list[FrameInfo] = [] | |
| session_dir = frame_root / f"session_{session_no:02d}" if frame_root is not None else None | |
| if session_dir is not None: | |
| session_dir.mkdir(parents=True, exist_ok=True) | |
| while looks_like_frame_header(stream, pos): | |
| reader = Reader(stream, pos) | |
| sequence = reader.u32() | |
| timestamp = reader.u32() | |
| _size = reader.u32() | |
| _previous_size = reader.u32() | |
| delay = reader.u32() | |
| frame_width = reader.u16() | |
| frame_height = reader.u16() | |
| is_full_frame = bool(reader.u8()) | |
| is_resolution_changed = bool(reader.u8()) | |
| reader.u16() | |
| rect_count = reader.u16() | |
| if is_resolution_changed: | |
| raise ValueError(f"unsupported framebuffer change at sequence {sequence}") | |
| if is_full_frame: | |
| fb = bytearray(width * height * 3) | |
| decode_rectangles(reader, fb, width, height, rect_count) | |
| pos = reader.pos | |
| frame: FrameInfo = { | |
| "frame": len(frames) + 1, | |
| "image": f"frame_{len(frames) + 1:06d}.png", | |
| "session": session_no, | |
| "sequence_number": sequence, | |
| "timestamp_ms": timestamp, | |
| "delay_ms": delay, | |
| "is_full_frame": is_full_frame, | |
| } | |
| frames.append(frame) | |
| if session_dir is not None: | |
| temp_info = dict(base_info) | |
| temp_info["frame_count"] = len(frames) | |
| temp_info["session"] = session_no | |
| temp_info["duration_ms"] = timeline_duration_ms(frames) | |
| temp_info["framebuffer_width"] = width | |
| temp_info["framebuffer_height"] = height | |
| save_rgb(session_dir / frame["image"], width, height, bytes(fb), png_metadata(temp_info, frame)) | |
| if stream.startswith(b"RFB ", pos): | |
| break | |
| if not looks_like_frame_header(stream, pos) and stream.find(b"RFB ", pos) < 0: | |
| break | |
| sessions.append({"session": session_no, "handshake": handshake, "frames": frames, "frames_dir": session_dir} if session_dir is not None else {"session": session_no, "handshake": handshake, "frames": frames}) | |
| if not sessions: | |
| raise ValueError("no RFB handshake found") | |
| return sessions | |
| def prepare_timestamped_sequence(images_dir: Path, frames: list[FrameInfo], temp_dir: Path) -> None: | |
| temp_dir.mkdir(parents=True, exist_ok=True) | |
| base_ns = 1_700_000_000_000_000_000 | |
| start_ms = frames[0]["timestamp_ms"] | |
| def link_or_copy(source: Path, target: Path) -> None: | |
| try: | |
| target.hardlink_to(source.resolve()) | |
| except OSError: | |
| shutil.copy2(source, target) | |
| for i, frame in enumerate(frames, start=1): | |
| target = temp_dir / f"video_frame_{i:06d}.png" | |
| link_or_copy(images_dir / frame["image"], target) | |
| mtime_ns = base_ns + (frame["timestamp_ms"] - start_ms) * 1_000_000 | |
| os.utime(target, ns=(mtime_ns, mtime_ns)) | |
| duration_ms = timeline_duration_ms(frames) | |
| if duration_ms is not None: | |
| target = temp_dir / f"video_frame_{len(frames) + 1:06d}.png" | |
| link_or_copy(images_dir / frames[-1]["image"], target) | |
| final_ns = base_ns + duration_ms * 1_000_000 | |
| os.utime(target, ns=(final_ns, final_ns)) | |
| def mp4_metadata_args(info: Mapping[str, object]) -> list[str]: | |
| tags = { | |
| "title": Path(str(info["source"])).stem, | |
| "artist": "OpenVNC", | |
| "album": "RFB screen recording", | |
| "genre": "Screen recording", | |
| "description": media_description(info), | |
| "comment": media_description(info), | |
| "encoded_by": "parse_rfb_capture.py", | |
| } | |
| return [arg for item in tags.items() for arg in ("-metadata", f"{item[0]}={item[1]}")] | |
| def make_video(images_dir: Path, out_file: Path, frames: list[FrameInfo], info: Mapping[str, object]) -> None: | |
| ffmpeg = shutil.which("ffmpeg") | |
| if ffmpeg is None: | |
| raise RuntimeError("ffmpeg not found in PATH") | |
| out_file.parent.mkdir(parents=True, exist_ok=True) | |
| with TemporaryDirectory(prefix=f"{out_file.stem}.timed.", dir=str(out_file.parent)) as temp: | |
| timed_dir = Path(temp) | |
| prepare_timestamped_sequence(images_dir, frames, timed_dir) | |
| cmd = [ | |
| ffmpeg, | |
| "-y", | |
| "-framerate", | |
| "1000", | |
| "-ts_from_file", | |
| "2", | |
| "-i", | |
| str(timed_dir / "video_frame_%06d.png"), | |
| "-copyts", | |
| "-start_at_zero", | |
| "-fps_mode", | |
| "vfr", | |
| "-c:v", | |
| "libx264rgb", | |
| "-crf", | |
| "0", | |
| "-preset", | |
| "veryslow", | |
| "-bf", | |
| "0", | |
| "-enc_time_base", | |
| "1/1000", | |
| "-pix_fmt", | |
| "bgr24", | |
| "-video_track_timescale", | |
| "1000", | |
| *mp4_metadata_args(info), | |
| str(out_file), | |
| ] | |
| proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True) | |
| if proc.returncode != 0: | |
| raise RuntimeError(proc.stderr[-2000:]) | |
| def info_text(info: Mapping[str, object]) -> str: | |
| lines = [ | |
| f"Source: {info['source']}", | |
| f"Source SHA-256: {info['source_sha256']}", | |
| f"Capture format: {info['capture_format']}", | |
| f"Records: {info['record_count']}", | |
| f"RFB stream size: {info['stream_size']} bytes", | |
| f"Sessions: {info['session_count']}", | |
| f"Updates/frames: {info['update_count']}/{info['frame_count']}", | |
| f"Duration: {info['duration_ms']} ms", | |
| f"Protocol: {info['protocol_version']}", | |
| f"Security type: {info['security_type']}", | |
| f"Desktop name: {info['desktop_name']}", | |
| f"Framebuffer: {info['framebuffer_width']}x{info['framebuffer_height']}", | |
| ] | |
| if info.get("video"): | |
| lines.append(f"Video: {info['video']}") | |
| return "\n".join(lines) + "\n" | |
| def session_video_path(out: Path, source: Path, requested: Path | None, session: int, session_count: int) -> Path: | |
| if requested is None: | |
| return out / f"{source.stem}_session_{session:02d}.mp4" | |
| base = requested if requested.is_absolute() else out / requested | |
| if session_count == 1: | |
| return base | |
| suffix = base.suffix or ".mp4" | |
| return base.with_name(f"{base.stem}_session_{session:02d}{suffix}") | |
| def make_session_info( | |
| base_info: DecodeInfo, | |
| result: SessionResult, | |
| session_count: int, | |
| video: Path | None, | |
| ) -> DecodeInfo: | |
| handshake = result["handshake"] | |
| frames = result["frames"] | |
| return { | |
| **base_info, | |
| "session": result["session"], | |
| "session_count": session_count, | |
| "update_count": len(frames), | |
| "frame_count": len(frames), | |
| "duration_ms": timeline_duration_ms(frames), | |
| "protocol_version": handshake["protocol_version"], | |
| "security_type": handshake["security_type"], | |
| "desktop_name": handshake["desktop_name"], | |
| "framebuffer_width": handshake["framebuffer_width"], | |
| "framebuffer_height": handshake["framebuffer_height"], | |
| "video": str(video) if video is not None else None, | |
| } | |
| def main(argv: list[str]) -> int: | |
| parser = argparse.ArgumentParser(description=__doc__) | |
| parser.add_argument("input", type=Path) | |
| parser.add_argument("-o", "--output", type=Path, default=None, help="output directory") | |
| parser.add_argument("--video", type=Path, default=None, help="MP4 output path or file name") | |
| parser.add_argument("--no-video", action="store_true", help="parse and optionally write frames/info without MP4 conversion") | |
| parser.add_argument("--frames", action="store_true", help="keep reconstructed PNG frames") | |
| parser.add_argument("--info", action="store_true", help="write info.txt") | |
| args = parser.parse_args(argv) | |
| source = args.input | |
| out = args.output or source.with_suffix(source.suffix + "_extracted") | |
| clean_generated_output(out) | |
| out.mkdir(parents=True, exist_ok=True) | |
| for old_video in (out / f"{source.stem}.mp4", *out.glob(f"{source.stem}_session_*.mp4")): | |
| if old_video.exists(): | |
| old_video.unlink() | |
| data = source.read_bytes() | |
| capture = parse_offline_capture(data) | |
| stream = b"".join(capture.records) | |
| base_info: DecodeInfo = { | |
| "source": str(source), | |
| "source_sha256": sha256(data), | |
| "capture_format": "offline OpenVNC RFB", | |
| "record_count": len(capture.records), | |
| "stream_size": len(stream), | |
| "session": 0, | |
| "session_count": 0, | |
| "update_count": 0, | |
| "frame_count": 0, | |
| "duration_ms": None, | |
| "protocol_version": "", | |
| "security_type": 0, | |
| "desktop_name": "", | |
| "framebuffer_width": 0, | |
| "framebuffer_height": 0, | |
| "video": None, | |
| } | |
| with TemporaryDirectory(prefix=f"{source.stem}.frames.", dir=str(out)) as temp_frames: | |
| frame_root: Path | None | |
| if args.frames: | |
| frame_root = out / "frames" | |
| elif args.no_video: | |
| frame_root = None | |
| else: | |
| frame_root = Path(temp_frames) | |
| if frame_root is not None: | |
| frame_root.mkdir(parents=True, exist_ok=True) | |
| sessions = decode_stream(stream, frame_root, base_info) | |
| session_infos: list[DecodeInfo] = [] | |
| for result in sessions: | |
| video_path = None | |
| if not args.no_video: | |
| video_path = session_video_path(out, source, args.video, result["session"], len(sessions)) | |
| info = make_session_info(base_info, result, len(sessions), video_path) | |
| session_infos.append(info) | |
| if video_path is not None: | |
| frames_dir = result.get("frames_dir") | |
| if frames_dir is None: | |
| raise RuntimeError("internal error: video creation requires decoded frame images") | |
| make_video(frames_dir, video_path, result["frames"], info) | |
| if args.info: | |
| (out / "info.txt").write_text("\n".join(info_text(info).rstrip() for info in session_infos) + "\n", encoding="utf-8") | |
| print("\n".join(info_text(info).rstrip() for info in session_infos), end="\n") | |
| return 0 | |
| if __name__ == "__main__": | |
| raise SystemExit(main(sys.argv[1:])) |
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment