Skip to content

Instantly share code, notes, and snippets.

@maxried
Created May 11, 2026 08:12
Show Gist options
  • Select an option

  • Save maxried/730224ac9423ce747a2227423eb326f3 to your computer and use it in GitHub Desktop.

Select an option

Save maxried/730224ac9423ce747a2227423eb326f3 to your computer and use it in GitHub Desktop.
iDRAC RFB Converter
#!/usr/bin/env python3
"""Vibe-coded tool to convert iDRAC RFB crashvideos and bootvideos to a playable format."""
import argparse
import hashlib
import os
import shutil
import struct
import subprocess
import sys
from dataclasses import dataclass
from pathlib import Path
from tempfile import TemporaryDirectory
from typing import Mapping, NotRequired, TypedDict
try:
from PIL import Image
from PIL.PngImagePlugin import PngInfo
except Exception: # pragma: no cover - optional dependency
Image = None
PngInfo = None
MAX_RECTANGLES_PER_FRAME = 10_000
MAX_DIMENSION = 16_384
MAX_PIXELS = 128 * 1024 * 1024
class PixelFormat(TypedDict):
bits_per_pixel: int
depth: int
big_endian_flag: int
true_color_flag: int
red_max: int
green_max: int
blue_max: int
red_shift: int
green_shift: int
blue_shift: int
class Handshake(TypedDict):
protocol_version: str
security_type: int
framebuffer_width: int
framebuffer_height: int
pixel_format: PixelFormat
desktop_name: str
class FrameInfo(TypedDict):
frame: int
image: str
session: int
sequence_number: int
timestamp_ms: int
delay_ms: int
is_full_frame: bool
class DecodeInfo(TypedDict):
source: str
source_sha256: str
capture_format: str
record_count: int
stream_size: int
session: int
session_count: int
update_count: int
frame_count: int
duration_ms: int | None
protocol_version: str
security_type: int
desktop_name: str
framebuffer_width: int
framebuffer_height: int
video: NotRequired[str | None]
class SessionResult(TypedDict):
session: int
handshake: Handshake
frames: list[FrameInfo]
frames_dir: NotRequired[Path]
@dataclass(frozen=True)
class OfflineCapture:
records: list[bytes]
total_ms: int
class Reader:
def __init__(self, data: bytes, pos: int = 0) -> None:
self.data = data
self.pos = pos
def take(self, n: int) -> bytes:
if self.pos + n > len(self.data):
raise EOFError(f"need {n} bytes at {self.pos}, have {len(self.data) - self.pos}")
out = self.data[self.pos : self.pos + n]
self.pos += n
return out
def u8(self) -> int:
return self.take(1)[0]
def u16(self) -> int:
return struct.unpack(">H", self.take(2))[0]
def u32(self) -> int:
return struct.unpack(">I", self.take(4))[0]
def s32(self) -> int:
return struct.unpack(">i", self.take(4))[0]
def sha256(data: bytes) -> str:
return hashlib.sha256(data).hexdigest()
def validate_framebuffer_size(width: int, height: int) -> None:
if width <= 0 or height <= 0:
raise ValueError(f"invalid framebuffer size: {width}x{height}")
if width > MAX_DIMENSION or height > MAX_DIMENSION or width * height > MAX_PIXELS:
raise ValueError(f"framebuffer size exceeds parser limits: {width}x{height}")
def clean_generated_output(out: Path) -> None:
for name in ("images", "frames", "messages", "chunks", "embedded_images"):
path = out / name
if path.exists():
if path.is_file() or path.is_symlink():
path.unlink()
else:
shutil.rmtree(path)
for name in ("metadata.json", "records.jsonl", "recording_preamble.bin", "server_stream.bin", "info.txt"):
path = out / name
if path.exists():
path.unlink()
def parse_offline_capture(data: bytes) -> OfflineCapture:
if len(data) < 24:
raise ValueError("file is too small for the offline RFB container")
number_of_files = int.from_bytes(data[0:2], "big")
total_map_size = int.from_bytes(data[2:6], "big")
total_ms = int.from_bytes(data[6:10], "big")
if number_of_files < 1 or total_map_size > len(data):
raise ValueError("not an offline OpenVNC/RFB capture")
records: list[bytes] = []
map_offset = 10
for _ in range(number_of_files):
if map_offset + 14 > len(data):
raise ValueError("truncated offline file map")
map_size = int.from_bytes(data[map_offset + 6 : map_offset + 10], "big")
file_end = map_offset + map_size
if map_size < 14 or file_end > len(data):
raise ValueError("invalid offline file map size")
packet = map_offset + 14
while packet + 8 <= file_end:
length = int.from_bytes(data[packet : packet + 4], "big")
payload_offset = packet + 4
payload_end = payload_offset + length
if payload_end + 4 > file_end:
raise ValueError("truncated offline packet")
records.append(data[payload_offset:payload_end])
packet = payload_end + 4
map_offset = file_end
if not records or not b"".join(records[:4]).startswith(b"RFB "):
raise ValueError("offline container does not start with an RFB server stream")
return OfflineCapture(records=records, total_ms=total_ms)
def parse_pixel_format(raw: bytes) -> PixelFormat:
if len(raw) != 16:
raise ValueError("pixel format must be 16 bytes")
fmt: PixelFormat = {
"bits_per_pixel": raw[0],
"depth": raw[1],
"big_endian_flag": raw[2],
"true_color_flag": raw[3],
"red_max": struct.unpack(">H", raw[4:6])[0],
"green_max": struct.unpack(">H", raw[6:8])[0],
"blue_max": struct.unpack(">H", raw[8:10])[0],
"red_shift": raw[10],
"green_shift": raw[11],
"blue_shift": raw[12],
}
expected: PixelFormat = {
"bits_per_pixel": 16,
"depth": 16,
"big_endian_flag": 0,
"true_color_flag": 1,
"red_max": 31,
"green_max": 31,
"blue_max": 31,
"red_shift": 11,
"green_shift": 6,
"blue_shift": 0,
}
if fmt != expected:
raise ValueError(f"unsupported pixel format for this minimized parser: {fmt}")
return fmt
def parse_handshake(stream: bytes, pos: int) -> tuple[Handshake, int]:
reader = Reader(stream, pos)
version = reader.take(12).decode("ascii", errors="replace").strip()
if version != "RFB 003.003":
raise ValueError(f"unsupported RFB version: {version}")
security_type = reader.u32()
width = reader.u16()
height = reader.u16()
validate_framebuffer_size(width, height)
pixel_format = parse_pixel_format(reader.take(16))
name_len = reader.u32()
desktop_name = reader.take(name_len).decode("utf-8", errors="replace")
return (
{
"protocol_version": version,
"security_type": security_type,
"framebuffer_width": width,
"framebuffer_height": height,
"pixel_format": pixel_format,
"desktop_name": desktop_name,
},
reader.pos,
)
def rgb555_to_rgb(value: int) -> tuple[int, int, int]:
red = (value >> 11) & 31
green = (value >> 6) & 31
blue = value & 31
return red * 255 // 31, green * 255 // 31, blue * 255 // 31
def pixels_to_rgb555(raw: bytes, width: int, height: int) -> bytes:
out = bytearray(width * height * 3)
dst = 0
for src in range(0, min(len(raw), width * height * 2), 2):
out[dst : dst + 3] = bytes(rgb555_to_rgb(int.from_bytes(raw[src : src + 2], "little")))
dst += 3
return bytes(out)
def set_rect(
fb: bytearray,
fb_width: int,
x: int,
y: int,
width: int,
height: int,
rgb: bytes,
) -> None:
row_len = width * 3
for row in range(height):
dst = ((y + row) * fb_width + x) * 3
src = row * row_len
fb[dst : dst + row_len] = rgb[src : src + row_len]
def get_rect(fb: bytearray, fb_width: int, x: int, y: int, width: int, height: int) -> bytes:
out = bytearray(width * height * 3)
row_len = width * 3
for row in range(height):
src = ((y + row) * fb_width + x) * 3
dst = row * row_len
out[dst : dst + row_len] = fb[src : src + row_len]
return bytes(out)
def fill_rect(
rgb: bytearray,
rect_width: int,
x: int,
y: int,
width: int,
height: int,
color: tuple[int, int, int],
) -> None:
row_data = bytes(color) * width
for row in range(height):
dst = ((y + row) * rect_width + x) * 3
rgb[dst : dst + width * 3] = row_data
def read_pixel(data: bytes, pos: int) -> tuple[tuple[int, int, int], int]:
if pos + 2 > len(data):
raise EOFError("truncated hextile pixel")
return rgb555_to_rgb(int.from_bytes(data[pos : pos + 2], "little")), pos + 2
def decode_hextile(data: bytes, width: int, height: int, base_rgb: bytes) -> tuple[bytes, int]:
rgb = bytearray(base_rgb)
pos = 0
bg = (0, 0, 0)
fg = (0, 0, 0)
last_subencoding = 0
for tile_y in range(0, height, 16):
tile_h = min(16, height - tile_y)
for tile_x in range(0, width, 16):
if pos >= len(data):
raise EOFError("truncated hextile subencoding")
sub = data[pos]
pos += 1
if sub & ~0x1F:
raise ValueError(f"invalid hextile subencoding 0x{sub:02x}")
tile_w = min(16, width - tile_x)
if sub & 0x01:
raw_len = tile_w * tile_h * 2
tile = pixels_to_rgb555(data[pos : pos + raw_len], tile_w, tile_h)
pos += raw_len
set_rect(rgb, width, tile_x, tile_y, tile_w, tile_h, tile)
last_subencoding = sub
continue
if sub == 0 and last_subencoding & 0x01:
last_subencoding = sub
continue
if sub & 0x02:
bg, pos = read_pixel(data, pos)
fill_rect(rgb, width, tile_x, tile_y, tile_w, tile_h, bg)
if sub & 0x04:
fg, pos = read_pixel(data, pos)
if sub & 0x08:
count = data[pos]
pos += 1
for _ in range(count):
color = fg
if sub & 0x10:
color, pos = read_pixel(data, pos)
xy = data[pos]
wh = data[pos + 1]
pos += 2
sub_x = xy >> 4
sub_y = xy & 0x0F
sub_w = (wh >> 4) + 1
sub_h = (wh & 0x0F) + 1
fill_rect(rgb, width, tile_x + sub_x, tile_y + sub_y, sub_w, sub_h, color)
last_subencoding = sub
return bytes(rgb), pos
def looks_like_frame_header(stream: bytes, pos: int) -> bool:
if pos + 30 > len(stream):
return False
seq, _timestamp, size, _previous, _delay, width, height, full, changed = struct.unpack(
">IIIIIHHBB", stream[pos : pos + 26]
)
rect_count = int.from_bytes(stream[pos + 28 : pos + 30], "big")
return (
seq > 0
and size >= 30
and width > 0
and height > 0
and full <= 1
and changed <= 1
and rect_count <= MAX_RECTANGLES_PER_FRAME
)
def media_description(info: Mapping[str, object]) -> str:
return (
f"Offline OpenVNC RFB recording, {info['framebuffer_width']}x{info['framebuffer_height']}, "
f"{info['frame_count']} frames, session {info['session']}, "
f"duration {info['duration_ms']} ms, source SHA-256 {info['source_sha256']}"
)
def png_metadata(info: Mapping[str, object], frame: FrameInfo) -> dict[str, str]:
return {
"Title": f"{Path(str(info['source'])).stem} frame {frame['frame']:06d}",
"Description": (
f"Offline OpenVNC RFB frame, {info['framebuffer_width']}x{info['framebuffer_height']}, "
f"source SHA-256 {info['source_sha256']}"
),
"Software": "parse_rfb_capture.py",
"Source": str(info["source"]),
"Comment": (
f"RFB sequence {frame['sequence_number']}, session {frame['session']}, "
f"timestamp {frame['timestamp_ms']} ms, delay {frame['delay_ms']} ms"
),
}
def save_rgb(path: Path, width: int, height: int, rgb: bytes, metadata: Mapping[str, str]) -> None:
if Image is None:
raise RuntimeError("Pillow is required to write PNG frames")
pnginfo = PngInfo() if PngInfo is not None else None
if pnginfo is not None:
for key, value in metadata.items():
pnginfo.add_text(key, value)
Image.frombytes("RGB", (width, height), rgb).save(path, pnginfo=pnginfo)
def decode_rectangles(reader: Reader, fb: bytearray, fb_width: int, fb_height: int, rect_count: int) -> None:
for _ in range(rect_count):
x = reader.u16()
y = reader.u16()
width = reader.u16()
height = reader.u16()
encoding = reader.s32()
if encoding != 5:
raise ValueError(f"unsupported encoding in minimized parser: {encoding}")
if x + width > fb_width or y + height > fb_height:
raise ValueError(f"rectangle outside framebuffer: {x},{y} {width}x{height}")
base = get_rect(fb, fb_width, x, y, width, height)
rgb, used = decode_hextile(reader.data[reader.pos :], width, height, base)
reader.pos += used
set_rect(fb, fb_width, x, y, width, height, rgb)
def timeline_duration_ms(frames: list[FrameInfo]) -> int | None:
if not frames:
return None
return frames[-1]["timestamp_ms"] - frames[0]["timestamp_ms"] + max(0, frames[-1]["delay_ms"])
def decode_stream(stream: bytes, frame_root: Path | None, base_info: Mapping[str, object]) -> list[SessionResult]:
pos = 0
session_no = 0
sessions: list[SessionResult] = []
while pos < len(stream):
rfb_pos = stream.find(b"RFB ", pos)
if rfb_pos < 0:
break
handshake, pos = parse_handshake(stream, rfb_pos)
session_no += 1
width = handshake["framebuffer_width"]
height = handshake["framebuffer_height"]
fb = bytearray(width * height * 3)
frames: list[FrameInfo] = []
session_dir = frame_root / f"session_{session_no:02d}" if frame_root is not None else None
if session_dir is not None:
session_dir.mkdir(parents=True, exist_ok=True)
while looks_like_frame_header(stream, pos):
reader = Reader(stream, pos)
sequence = reader.u32()
timestamp = reader.u32()
_size = reader.u32()
_previous_size = reader.u32()
delay = reader.u32()
frame_width = reader.u16()
frame_height = reader.u16()
is_full_frame = bool(reader.u8())
is_resolution_changed = bool(reader.u8())
reader.u16()
rect_count = reader.u16()
if is_resolution_changed:
raise ValueError(f"unsupported framebuffer change at sequence {sequence}")
if is_full_frame:
fb = bytearray(width * height * 3)
decode_rectangles(reader, fb, width, height, rect_count)
pos = reader.pos
frame: FrameInfo = {
"frame": len(frames) + 1,
"image": f"frame_{len(frames) + 1:06d}.png",
"session": session_no,
"sequence_number": sequence,
"timestamp_ms": timestamp,
"delay_ms": delay,
"is_full_frame": is_full_frame,
}
frames.append(frame)
if session_dir is not None:
temp_info = dict(base_info)
temp_info["frame_count"] = len(frames)
temp_info["session"] = session_no
temp_info["duration_ms"] = timeline_duration_ms(frames)
temp_info["framebuffer_width"] = width
temp_info["framebuffer_height"] = height
save_rgb(session_dir / frame["image"], width, height, bytes(fb), png_metadata(temp_info, frame))
if stream.startswith(b"RFB ", pos):
break
if not looks_like_frame_header(stream, pos) and stream.find(b"RFB ", pos) < 0:
break
sessions.append({"session": session_no, "handshake": handshake, "frames": frames, "frames_dir": session_dir} if session_dir is not None else {"session": session_no, "handshake": handshake, "frames": frames})
if not sessions:
raise ValueError("no RFB handshake found")
return sessions
def prepare_timestamped_sequence(images_dir: Path, frames: list[FrameInfo], temp_dir: Path) -> None:
temp_dir.mkdir(parents=True, exist_ok=True)
base_ns = 1_700_000_000_000_000_000
start_ms = frames[0]["timestamp_ms"]
def link_or_copy(source: Path, target: Path) -> None:
try:
target.hardlink_to(source.resolve())
except OSError:
shutil.copy2(source, target)
for i, frame in enumerate(frames, start=1):
target = temp_dir / f"video_frame_{i:06d}.png"
link_or_copy(images_dir / frame["image"], target)
mtime_ns = base_ns + (frame["timestamp_ms"] - start_ms) * 1_000_000
os.utime(target, ns=(mtime_ns, mtime_ns))
duration_ms = timeline_duration_ms(frames)
if duration_ms is not None:
target = temp_dir / f"video_frame_{len(frames) + 1:06d}.png"
link_or_copy(images_dir / frames[-1]["image"], target)
final_ns = base_ns + duration_ms * 1_000_000
os.utime(target, ns=(final_ns, final_ns))
def mp4_metadata_args(info: Mapping[str, object]) -> list[str]:
tags = {
"title": Path(str(info["source"])).stem,
"artist": "OpenVNC",
"album": "RFB screen recording",
"genre": "Screen recording",
"description": media_description(info),
"comment": media_description(info),
"encoded_by": "parse_rfb_capture.py",
}
return [arg for item in tags.items() for arg in ("-metadata", f"{item[0]}={item[1]}")]
def make_video(images_dir: Path, out_file: Path, frames: list[FrameInfo], info: Mapping[str, object]) -> None:
ffmpeg = shutil.which("ffmpeg")
if ffmpeg is None:
raise RuntimeError("ffmpeg not found in PATH")
out_file.parent.mkdir(parents=True, exist_ok=True)
with TemporaryDirectory(prefix=f"{out_file.stem}.timed.", dir=str(out_file.parent)) as temp:
timed_dir = Path(temp)
prepare_timestamped_sequence(images_dir, frames, timed_dir)
cmd = [
ffmpeg,
"-y",
"-framerate",
"1000",
"-ts_from_file",
"2",
"-i",
str(timed_dir / "video_frame_%06d.png"),
"-copyts",
"-start_at_zero",
"-fps_mode",
"vfr",
"-c:v",
"libx264rgb",
"-crf",
"0",
"-preset",
"veryslow",
"-bf",
"0",
"-enc_time_base",
"1/1000",
"-pix_fmt",
"bgr24",
"-video_track_timescale",
"1000",
*mp4_metadata_args(info),
str(out_file),
]
proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)
if proc.returncode != 0:
raise RuntimeError(proc.stderr[-2000:])
def info_text(info: Mapping[str, object]) -> str:
lines = [
f"Source: {info['source']}",
f"Source SHA-256: {info['source_sha256']}",
f"Capture format: {info['capture_format']}",
f"Records: {info['record_count']}",
f"RFB stream size: {info['stream_size']} bytes",
f"Sessions: {info['session_count']}",
f"Updates/frames: {info['update_count']}/{info['frame_count']}",
f"Duration: {info['duration_ms']} ms",
f"Protocol: {info['protocol_version']}",
f"Security type: {info['security_type']}",
f"Desktop name: {info['desktop_name']}",
f"Framebuffer: {info['framebuffer_width']}x{info['framebuffer_height']}",
]
if info.get("video"):
lines.append(f"Video: {info['video']}")
return "\n".join(lines) + "\n"
def session_video_path(out: Path, source: Path, requested: Path | None, session: int, session_count: int) -> Path:
if requested is None:
return out / f"{source.stem}_session_{session:02d}.mp4"
base = requested if requested.is_absolute() else out / requested
if session_count == 1:
return base
suffix = base.suffix or ".mp4"
return base.with_name(f"{base.stem}_session_{session:02d}{suffix}")
def make_session_info(
base_info: DecodeInfo,
result: SessionResult,
session_count: int,
video: Path | None,
) -> DecodeInfo:
handshake = result["handshake"]
frames = result["frames"]
return {
**base_info,
"session": result["session"],
"session_count": session_count,
"update_count": len(frames),
"frame_count": len(frames),
"duration_ms": timeline_duration_ms(frames),
"protocol_version": handshake["protocol_version"],
"security_type": handshake["security_type"],
"desktop_name": handshake["desktop_name"],
"framebuffer_width": handshake["framebuffer_width"],
"framebuffer_height": handshake["framebuffer_height"],
"video": str(video) if video is not None else None,
}
def main(argv: list[str]) -> int:
parser = argparse.ArgumentParser(description=__doc__)
parser.add_argument("input", type=Path)
parser.add_argument("-o", "--output", type=Path, default=None, help="output directory")
parser.add_argument("--video", type=Path, default=None, help="MP4 output path or file name")
parser.add_argument("--no-video", action="store_true", help="parse and optionally write frames/info without MP4 conversion")
parser.add_argument("--frames", action="store_true", help="keep reconstructed PNG frames")
parser.add_argument("--info", action="store_true", help="write info.txt")
args = parser.parse_args(argv)
source = args.input
out = args.output or source.with_suffix(source.suffix + "_extracted")
clean_generated_output(out)
out.mkdir(parents=True, exist_ok=True)
for old_video in (out / f"{source.stem}.mp4", *out.glob(f"{source.stem}_session_*.mp4")):
if old_video.exists():
old_video.unlink()
data = source.read_bytes()
capture = parse_offline_capture(data)
stream = b"".join(capture.records)
base_info: DecodeInfo = {
"source": str(source),
"source_sha256": sha256(data),
"capture_format": "offline OpenVNC RFB",
"record_count": len(capture.records),
"stream_size": len(stream),
"session": 0,
"session_count": 0,
"update_count": 0,
"frame_count": 0,
"duration_ms": None,
"protocol_version": "",
"security_type": 0,
"desktop_name": "",
"framebuffer_width": 0,
"framebuffer_height": 0,
"video": None,
}
with TemporaryDirectory(prefix=f"{source.stem}.frames.", dir=str(out)) as temp_frames:
frame_root: Path | None
if args.frames:
frame_root = out / "frames"
elif args.no_video:
frame_root = None
else:
frame_root = Path(temp_frames)
if frame_root is not None:
frame_root.mkdir(parents=True, exist_ok=True)
sessions = decode_stream(stream, frame_root, base_info)
session_infos: list[DecodeInfo] = []
for result in sessions:
video_path = None
if not args.no_video:
video_path = session_video_path(out, source, args.video, result["session"], len(sessions))
info = make_session_info(base_info, result, len(sessions), video_path)
session_infos.append(info)
if video_path is not None:
frames_dir = result.get("frames_dir")
if frames_dir is None:
raise RuntimeError("internal error: video creation requires decoded frame images")
make_video(frames_dir, video_path, result["frames"], info)
if args.info:
(out / "info.txt").write_text("\n".join(info_text(info).rstrip() for info in session_infos) + "\n", encoding="utf-8")
print("\n".join(info_text(info).rstrip() for info in session_infos), end="\n")
return 0
if __name__ == "__main__":
raise SystemExit(main(sys.argv[1:]))
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment