Skip to content

Instantly share code, notes, and snippets.

@h3po
Created February 4, 2026 09:08
Show Gist options
  • Select an option

  • Save h3po/f7703e7cc08cf7151b58820eaeccfbd9 to your computer and use it in GitHub Desktop.

Select an option

Save h3po/f7703e7cc08cf7151b58820eaeccfbd9 to your computer and use it in GitHub Desktop.
llama-swap metrics combiner
#!/usr/bin/env python3
import os
import time
from http.server import BaseHTTPRequestHandler, HTTPServer
from urllib.parse import urljoin
import requests
from prometheus_client import (
CONTENT_TYPE_LATEST,
REGISTRY,
generate_latest,
)
from prometheus_client.core import GaugeMetricFamily
LLAMA_SWAP_BASE_URL = os.getenv("LLAMA_SWAP_BASE_URL", "http://127.0.0.1:8080")
REFRESH_INTERVAL = int(os.getenv("REFRESH_INTERVAL", "15"))
EXPORTER_PORT = int(os.getenv("EXPORTER_PORT", "2102"))
class LlamaSwapCollector:
def __init__(self, base_url):
self.base_url = base_url
self.last_scrape = 0
self.cached_metrics = []
def collect(self):
current_time = time.time()
if current_time - self.last_scrape < REFRESH_INTERVAL:
for metric in self.cached_metrics:
yield metric
return
self.cached_metrics.clear()
self.last_scrape = current_time
try:
response = requests.get(urljoin(self.base_url, "/running"))
response.raise_for_status()
backends = response.json().get("running", [])
except Exception as e:
print(f"Error fetching /running: {e}")
return
for backend in backends:
model = backend.get("model")
state = backend.get("state")
if state != "ready":
continue
metrics_url = urljoin(self.base_url, f"/upstream/{model}/metrics")
try:
res = requests.get(metrics_url)
res.raise_for_status()
lines = res.text.splitlines()
except Exception as e:
print(f"Error fetching metrics from {metrics_url}: {e}")
continue
metric_map = {}
for line in lines:
if line.startswith("#"):
continue
try:
parts = line.split()
name_and_labels = parts[0]
value = float(parts[1])
except Exception:
continue
if "{" in name_and_labels:
name = name_and_labels.split("{")[0]
label_str = name_and_labels.split("{")[1].rstrip("}")
labels = dict(item.split("=") for item in label_str.split(","))
labels = {k: v.strip('"') for k, v in labels.items()}
else:
name = name_and_labels
labels = {}
labels["model"] = model
label_keys = tuple(sorted(labels.keys()))
metric_key = (name, label_keys)
if metric_key not in metric_map:
metric_map[metric_key] = GaugeMetricFamily(
name, f"Aggregated {name}", labels=label_keys
)
label_values = tuple(labels[k] for k in label_keys)
metric_map[metric_key].add_metric(label_values, value)
self.cached_metrics.extend(metric_map.values())
for metric in self.cached_metrics:
yield metric
REGISTRY.register(LlamaSwapCollector(LLAMA_SWAP_BASE_URL))
class MetricsHandler(BaseHTTPRequestHandler):
def do_GET(self):
if self.path == "/metrics":
try:
output = generate_latest(REGISTRY)
self.send_response(200)
self.send_header("Content-Type", CONTENT_TYPE_LATEST)
self.send_header("Content-Length", str(len(output)))
self.end_headers()
self.wfile.write(output)
except Exception as e:
self.send_error(500, f"Failed to generate metrics: {e}")
else:
self.send_response(200)
self.send_header("Content-Type", "text/html")
self.end_headers()
self.wfile.write(
b"<html><body><h1>Llama.cpp Exporter</h1><p>See <a href='/metrics'>/metrics</a></p></body></html>"
)
def run_http_server():
server = HTTPServer(("", EXPORTER_PORT), MetricsHandler)
print(f"Exporter running on http://localhost:{EXPORTER_PORT}/metrics")
server.serve_forever()
if __name__ == "__main__":
run_http_server()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment