Skip to content

Instantly share code, notes, and snippets.

@anvarazizov
Last active February 27, 2026 04:17
Show Gist options
  • Select an option

  • Save anvarazizov/10d25b1eeb19ff937aeacd3db4935cd9 to your computer and use it in GitHub Desktop.

Select an option

Save anvarazizov/10d25b1eeb19ff937aeacd3db4935cd9 to your computer and use it in GitHub Desktop.
listener_public.py
"""Meshtastic LoRa Listener — Off-Grid AI with Tool Calling
full repo link: https://github.com/anvarazizov/meshtastic-listener-ollama
Architecture:
phi4-mini = smart router (decides which tool to call)
gemma3:12b = knowledge brain (for general Q&A)
HA API = home automation (sensors, device control)
Radio = LoRa mesh I/O
Flow:
Radio msg → internet check
Online: forward to Discord (cloud AI responds)
Offline: phi4-mini routes → execute tools → respond via radio
"SAY:" prefix → TTS on Home Assistant speaker
"AI:" prefix → always routes to local AI
"status" → Home Assistant sensor readout
"""
import json
import time
import os
import glob
import logging
import threading
import requests
from datetime import datetime
import meshtastic
import meshtastic.serial_interface
from pubsub import pub
# ===== CONFIGURATION (edit these) =====
SERIAL_PORT = "/dev/cu.usbmodem1101" # Your Meshtastic radio serial port
CONFIG_PATH = "config.json" # {"discord_webhook_url": "...", "known_nodes": {...}}
LOG_PATH = "messages.log"
OLLAMA_URL = "http://localhost:11434/api"
ROUTER_MODEL = "phi4-mini" # Lightweight intent classifier
BRAIN_MODEL = "gemma3:12b" # Main AI brain
MAX_RADIO_MSG = 200 # LoRa message size limit
OUTBOX_DIR = "outbox" # Drop .msg files here to send via radio
# Home Assistant config path — expects {"ha_token": "your-long-lived-token"}
HA_CONFIG_PATH = "ha_config.json"
HA_URL = "http://homeassistant.local:8123"
# TTS config — adjust entity IDs to match your setup
TTS_ENTITY = "tts.google_translate_en_com"
TTS_SPEAKER = "media_player.your_speaker_entity" # Your HA media player
TTS_LANGUAGE = "uk" # Language for TTS
# Known nodes — map Meshtastic node IDs to friendly names
KNOWN_NODES = {
# "!abcd1234": "Alice",
# "!efgh5678": "Bob",
}
# Your own node ID (messages from this node are ignored)
MY_NODE_ID = "!00000000" # Replace with your radio node ID
DISCORD_WEBHOOK_URL = None
INTERFACE = None
# ===== TOOL DEFINITIONS (for phi4-mini router) =====
TOOLS = [
{
"type": "function",
"function": {
"name": "get_home_status",
"description": "Get current home sensor data: temperature, humidity, power status, weather, who is home",
"parameters": {"type": "object", "properties": {}}
}
},
{
"type": "function",
"function": {
"name": "control_device",
"description": "Control a home device (lights, switches, plugs). Use entity names like: living_room_lights, bedroom_lights, all_lights",
"parameters": {
"type": "object",
"properties": {
"entity": {"type": "string", "description": "Device name"},
"action": {"type": "string", "enum": ["on", "off", "toggle"]}
},
"required": ["entity", "action"]
}
}
},
{
"type": "function",
"function": {
"name": "answer_question",
"description": "Answer a general knowledge question, have a conversation, or provide information. Use this for anything that is NOT about home devices or sensors.",
"parameters": {
"type": "object",
"properties": {
"question": {"type": "string", "description": "The question to answer"}
},
"required": ["question"]
}
}
},
{
"type": "function",
"function": {
"name": "run_command",
"description": "Run a system command on the server (e.g., check disk space, uptime, network status)",
"parameters": {
"type": "object",
"properties": {
"command": {"type": "string", "description": "Shell command to run"}
},
"required": ["command"]
}
}
}
]
# Safe commands whitelist — only these are allowed via radio
SAFE_COMMANDS = ["uptime", "df", "date", "whoami", "hostname", "ollama", "cat", "ps", "top"]
# ===== LOGGING =====
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s [%(levelname)s] %(message)s',
handlers=[
logging.StreamHandler(),
logging.FileHandler(LOG_PATH),
]
)
logger = logging.getLogger(__name__)
# ===== HELPERS =====
def load_config():
global DISCORD_WEBHOOK_URL
try:
with open(CONFIG_PATH) as f:
config = json.load(f)
DISCORD_WEBHOOK_URL = config.get("discord_webhook_url")
for node_id, name in config.get("known_nodes", {}).items():
KNOWN_NODES[node_id] = name
except FileNotFoundError:
logger.warning("Config not found, using defaults")
def send_discord(message: str):
"""Forward message to Discord via webhook."""
if not DISCORD_WEBHOOK_URL:
return
try:
requests.post(DISCORD_WEBHOOK_URL, json={"content": message}, timeout=10)
except:
pass
def send_radio(text: str, channel_index: int = 0):
"""Send text over LoRa, auto-chunking if needed."""
global INTERFACE
if not INTERFACE:
logger.error("No radio interface")
return
chunks = []
while text:
if len(text) <= MAX_RADIO_MSG:
chunks.append(text)
break
cut = text[:MAX_RADIO_MSG].rfind(' ')
if cut <= 0:
cut = MAX_RADIO_MSG
chunks.append(text[:cut])
text = text[cut:].lstrip()
for i, chunk in enumerate(chunks):
if len(chunks) > 1:
chunk = f"[{i+1}/{len(chunks)}] {chunk}"
try:
INTERFACE.sendText(chunk, channelIndex=channel_index)
logger.info(f"Radio TX ch{channel_index}: {chunk[:80]}...")
if i < len(chunks) - 1:
time.sleep(3) # Pause between chunks to avoid radio congestion
except Exception as e:
logger.error(f"Radio send error: {e}")
def check_internet() -> bool:
"""Quick internet connectivity check."""
try:
requests.get("https://discord.com", timeout=5)
return True
except:
return False
def get_ha_headers():
"""Load Home Assistant auth headers from config."""
try:
cc = json.load(open(HA_CONFIG_PATH))
return {"Authorization": f"Bearer {cc['ha_token']}"}
except:
return None
# ===== TOOL IMPLEMENTATIONS =====
def tool_get_home_status() -> str:
"""Read sensors from Home Assistant."""
headers = get_ha_headers()
if not headers:
return "HA config not found"
# Map your HA entity IDs to friendly labels
sensors = {
# "sensor.living_room_temperature": "Temp",
# "sensor.living_room_humidity": "Humidity",
# "person.your_name": "You",
# "weather.forecast_home": "Weather",
}
parts = []
for entity_id, label in sensors.items():
try:
r = requests.get(f"{HA_URL}/api/states/{entity_id}",
headers=headers, timeout=5)
if r.status_code == 200:
state = r.json().get("state", "?")
parts.append(f"{label}: {state}")
except:
pass
return " | ".join(parts) if parts else "HA unreachable"
def tool_control_device(entity: str, action: str) -> str:
"""Control a Home Assistant device."""
headers = get_ha_headers()
if not headers:
return "HA config not found"
# Map friendly names to HA entity IDs — customize for your setup
entity_map = {
"living_room_lights": "light.living_room",
"bedroom_lights": "light.bedroom",
"kitchen_lights": "light.kitchen",
"all_lights": "light.all",
}
ha_entity = entity_map.get(entity.lower().replace(" ", "_"), entity)
domain = ha_entity.split(".")[0] if "." in ha_entity else "light"
service = f"turn_{action}" if action in ("on", "off") else "toggle"
try:
r = requests.post(
f"{HA_URL}/api/services/{domain}/{service}",
headers=headers,
json={"entity_id": ha_entity},
timeout=10
)
if r.status_code == 200:
return f"Done: {entity}{action}"
else:
return f"HA error: {r.status_code}"
except Exception as e:
return f"HA error: {str(e)[:80]}"
def tool_answer_question(question: str) -> str:
"""Use the brain model (gemma3:12b) for general knowledge answers."""
try:
resp = requests.post(f"{OLLAMA_URL}/generate", json={
"model": BRAIN_MODEL,
"prompt": question,
"system": "You are a helpful AI responding via LoRa radio. Keep answers SHORT (under 350 chars). No markdown. Be direct.",
"stream": False,
"options": {"temperature": 0.7, "num_predict": 200}
}, timeout=60)
if resp.status_code == 200:
return resp.json().get("response", "").strip() or "No response"
return f"AI error: HTTP {resp.status_code}"
except requests.exceptions.Timeout:
return "AI timeout"
except Exception as e:
return f"AI error: {str(e)[:80]}"
def tool_run_command(command: str) -> str:
"""Run safe system commands only."""
import subprocess
cmd_base = command.split("|")[0].strip().split()[0] if command else ""
if cmd_base not in SAFE_COMMANDS:
return f"Command not allowed: {cmd_base}. Safe: {', '.join(SAFE_COMMANDS)}"
try:
result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10)
output = result.stdout.strip()[:300]
return output if output else "No output"
except Exception as e:
return f"Error: {str(e)[:80]}"
# ===== ROUTING ENGINE =====
def route_with_phi(user_message: str) -> str:
"""Use phi4-mini to decide which tool to call, then execute it."""
try:
resp = requests.post(f"{OLLAMA_URL}/chat", json={
"model": ROUTER_MODEL,
"messages": [{"role": "user", "content": user_message}],
"tools": TOOLS,
"stream": False
}, timeout=30)
if resp.status_code != 200:
return tool_answer_question(user_message)
msg = resp.json().get("message", {})
tool_calls = msg.get("tool_calls", [])
content = msg.get("content", "")
if not tool_calls:
if content:
return content[:400]
return tool_answer_question(user_message)
results = []
for tc in tool_calls:
func = tc.get("function", {})
name = func.get("name", "")
args = func.get("arguments", {})
logger.info(f"Tool call: {name}({json.dumps(args)})")
if name == "get_home_status":
results.append(tool_get_home_status())
elif name == "control_device":
results.append(tool_control_device(
args.get("entity", "unknown"),
args.get("action", "off")
))
elif name == "answer_question":
results.append(tool_answer_question(
args.get("question", user_message)
))
elif name == "run_command":
results.append(tool_run_command(
args.get("command", "uptime")
))
else:
results.append(f"Unknown tool: {name}")
return " | ".join(results)
except Exception as e:
logger.error(f"Router error: {e}")
return tool_answer_question(user_message)
# ===== MESSAGE HANDLERS =====
def handle_message(sender_id: str, sender_name: str, text: str, channel: int):
"""Main message handler with internet-aware routing."""
text_lower = text.strip().lower()
# === SAY: prefix — TTS on Home Assistant speaker ===
if text_lower.startswith("say:") or text_lower.startswith("say "):
msg = text[4:].strip() if text_lower.startswith("say:") else text[3:].strip()
if not msg:
send_radio("Usage: SAY: <message to read aloud>", channel_index=channel)
return
logger.info(f"TTS request from {sender_name}: {msg}")
send_discord(f"🔊 **TTS [{sender_name}]:** {msg}")
try:
headers = get_ha_headers()
if headers:
requests.post(
f"{HA_URL}/api/services/tts/speak",
headers=headers,
json={
"entity_id": TTS_ENTITY,
"media_player_entity_id": TTS_SPEAKER,
"message": msg,
"language": TTS_LANGUAGE
},
timeout=10
)
send_radio("Done, message read aloud", channel_index=channel)
else:
send_radio("HA config not found", channel_index=channel)
except Exception as e:
send_radio(f"TTS error: {str(e)[:80]}", channel_index=channel)
return
# === AI: prefix — always local AI ===
if text_lower.startswith("ai:") or text_lower.startswith("ai "):
prompt = text[3:].strip() if text_lower.startswith("ai:") else text[2:].strip()
if not prompt:
send_radio("Usage: AI: <your question>", channel_index=channel)
return
logger.info(f"AI request from {sender_name}: {prompt}")
send_discord(f"🧠 **AI query [{sender_name}]:** {prompt}")
result = route_with_phi(prompt)
send_radio(result, channel_index=channel)
send_discord(f"🤖 **AI → radio:** {result[:500]}")
return
# === status — Home Assistant readout ===
if text_lower in ("status", "статус", "ha", "home"):
status = tool_get_home_status()
send_radio(f"HA: {status}", channel_index=channel)
send_discord(f"📊 **Status [{sender_name}]:** {status}")
return
# === Regular message — check internet ===
has_internet = check_internet()
if has_internet:
# Online: forward to Discord for cloud AI
send_discord(f"📡 **LoRa [{sender_name}]:** {text}")
logger.info("Online — forwarded to Discord")
else:
# Offline: route everything through local AI
logger.info(f"OFFLINE — routing to local AI: {text}")
send_radio("(offline mode)", channel_index=channel)
result = route_with_phi(text)
send_radio(result, channel_index=channel)
logger.info(f"Offline response: {result[:100]}")
# ===== RADIO EVENT HANDLERS =====
def on_receive(packet, interface):
try:
if 'decoded' not in packet or 'text' not in packet['decoded']:
return
sender_id = packet.get('fromId', 'unknown')
sender_name = KNOWN_NODES.get(sender_id, sender_id)
text = packet['decoded']['text']
channel = packet.get('channel', 0)
snr = packet.get('rxSnr')
rssi = packet.get('rxRssi')
# Ignore our own messages
if sender_id == MY_NODE_ID:
return
signal = f" [SNR:{snr}dB RSSI:{rssi}dBm]" if snr else ""
logger.info(f"[{sender_name}] ch{channel}: {text}{signal}")
# Handle in a thread to not block radio reception
thread = threading.Thread(target=handle_message,
args=(sender_id, sender_name, text, channel))
thread.daemon = True
thread.start()
except Exception as e:
logger.error(f"Error: {e}")
def check_outbox():
"""Check outbox directory for .msg files to send via radio."""
if not os.path.isdir(OUTBOX_DIR):
return
for filepath in sorted(glob.glob(os.path.join(OUTBOX_DIR, "*.msg"))):
try:
with open(filepath, 'r') as f:
content = f.read().strip()
if not content:
os.remove(filepath)
continue
# Optional first line: "channel:N" to specify radio channel
channel = 0
lines = content.split('\n', 1)
if lines[0].startswith('channel:'):
channel = int(lines[0].split(':')[1].strip())
content = lines[1] if len(lines) > 1 else ''
if content:
logger.info(f"Outbox sending: {content[:80]}...")
send_radio(content, channel_index=channel)
send_discord(f"📡 **Radio TX (outbox):** {content[:500]}")
os.remove(filepath)
except Exception as e:
logger.error(f"Outbox error {filepath}: {e}")
def on_connection(interface, topic=pub.AUTO_TOPIC):
logger.info("Connected to Meshtastic device")
def on_disconnect(interface, topic=pub.AUTO_TOPIC):
logger.warning("Disconnected — will reconnect...")
# ===== MAIN LOOP =====
WATCHDOG_INTERVAL = 300 # 5 min — check interface health
def main():
global INTERFACE
load_config()
logger.info("=" * 50)
logger.info("Meshtastic Off-Grid AI Listener")
logger.info(f"Router: {ROUTER_MODEL} | Brain: {BRAIN_MODEL}")
logger.info(f"Serial: {SERIAL_PORT}")
logger.info(f"Tools: {[t['function']['name'] for t in TOOLS]}")
logger.info("=" * 50)
pub.subscribe(on_receive, "meshtastic.receive.text")
pub.subscribe(on_connection, "meshtastic.connection.established")
pub.subscribe(on_disconnect, "meshtastic.connection.lost")
while True:
try:
INTERFACE = meshtastic.serial_interface.SerialInterface(SERIAL_PORT)
logger.info("Listening...")
os.makedirs(OUTBOX_DIR, exist_ok=True)
logger.info(f"Outbox: {OUTBOX_DIR}")
watchdog_counter = 0
while True:
check_outbox()
time.sleep(2)
watchdog_counter += 2
if watchdog_counter >= WATCHDOG_INTERVAL:
watchdog_counter = 0
try:
if INTERFACE and INTERFACE.myInfo:
logger.debug("Watchdog: interface healthy")
else:
logger.warning("Watchdog: interface stale, reconnecting...")
raise Exception("Stale interface")
except Exception as e:
logger.warning(f"Watchdog triggered reconnect: {e}")
try: INTERFACE.close()
except: pass
INTERFACE = None
break
except KeyboardInterrupt:
try: INTERFACE.close()
except: pass
break
except Exception as e:
logger.error(f"Connection error: {e}")
INTERFACE = None
time.sleep(10)
if __name__ == "__main__":
main()
@anvarazizov
Copy link
Author

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment