Last active
February 27, 2026 04:17
-
-
Save anvarazizov/10d25b1eeb19ff937aeacd3db4935cd9 to your computer and use it in GitHub Desktop.
listener_public.py
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| """Meshtastic LoRa Listener — Off-Grid AI with Tool Calling | |
| full repo link: https://github.com/anvarazizov/meshtastic-listener-ollama | |
| Architecture: | |
| phi4-mini = smart router (decides which tool to call) | |
| gemma3:12b = knowledge brain (for general Q&A) | |
| HA API = home automation (sensors, device control) | |
| Radio = LoRa mesh I/O | |
| Flow: | |
| Radio msg → internet check | |
| Online: forward to Discord (cloud AI responds) | |
| Offline: phi4-mini routes → execute tools → respond via radio | |
| "SAY:" prefix → TTS on Home Assistant speaker | |
| "AI:" prefix → always routes to local AI | |
| "status" → Home Assistant sensor readout | |
| """ | |
| import json | |
| import time | |
| import os | |
| import glob | |
| import logging | |
| import threading | |
| import requests | |
| from datetime import datetime | |
| import meshtastic | |
| import meshtastic.serial_interface | |
| from pubsub import pub | |
| # ===== CONFIGURATION (edit these) ===== | |
| SERIAL_PORT = "/dev/cu.usbmodem1101" # Your Meshtastic radio serial port | |
| CONFIG_PATH = "config.json" # {"discord_webhook_url": "...", "known_nodes": {...}} | |
| LOG_PATH = "messages.log" | |
| OLLAMA_URL = "http://localhost:11434/api" | |
| ROUTER_MODEL = "phi4-mini" # Lightweight intent classifier | |
| BRAIN_MODEL = "gemma3:12b" # Main AI brain | |
| MAX_RADIO_MSG = 200 # LoRa message size limit | |
| OUTBOX_DIR = "outbox" # Drop .msg files here to send via radio | |
| # Home Assistant config path — expects {"ha_token": "your-long-lived-token"} | |
| HA_CONFIG_PATH = "ha_config.json" | |
| HA_URL = "http://homeassistant.local:8123" | |
| # TTS config — adjust entity IDs to match your setup | |
| TTS_ENTITY = "tts.google_translate_en_com" | |
| TTS_SPEAKER = "media_player.your_speaker_entity" # Your HA media player | |
| TTS_LANGUAGE = "uk" # Language for TTS | |
| # Known nodes — map Meshtastic node IDs to friendly names | |
| KNOWN_NODES = { | |
| # "!abcd1234": "Alice", | |
| # "!efgh5678": "Bob", | |
| } | |
| # Your own node ID (messages from this node are ignored) | |
| MY_NODE_ID = "!00000000" # Replace with your radio node ID | |
| DISCORD_WEBHOOK_URL = None | |
| INTERFACE = None | |
| # ===== TOOL DEFINITIONS (for phi4-mini router) ===== | |
| TOOLS = [ | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "get_home_status", | |
| "description": "Get current home sensor data: temperature, humidity, power status, weather, who is home", | |
| "parameters": {"type": "object", "properties": {}} | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "control_device", | |
| "description": "Control a home device (lights, switches, plugs). Use entity names like: living_room_lights, bedroom_lights, all_lights", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "entity": {"type": "string", "description": "Device name"}, | |
| "action": {"type": "string", "enum": ["on", "off", "toggle"]} | |
| }, | |
| "required": ["entity", "action"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "answer_question", | |
| "description": "Answer a general knowledge question, have a conversation, or provide information. Use this for anything that is NOT about home devices or sensors.", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "question": {"type": "string", "description": "The question to answer"} | |
| }, | |
| "required": ["question"] | |
| } | |
| } | |
| }, | |
| { | |
| "type": "function", | |
| "function": { | |
| "name": "run_command", | |
| "description": "Run a system command on the server (e.g., check disk space, uptime, network status)", | |
| "parameters": { | |
| "type": "object", | |
| "properties": { | |
| "command": {"type": "string", "description": "Shell command to run"} | |
| }, | |
| "required": ["command"] | |
| } | |
| } | |
| } | |
| ] | |
| # Safe commands whitelist — only these are allowed via radio | |
| SAFE_COMMANDS = ["uptime", "df", "date", "whoami", "hostname", "ollama", "cat", "ps", "top"] | |
| # ===== LOGGING ===== | |
| logging.basicConfig( | |
| level=logging.INFO, | |
| format='%(asctime)s [%(levelname)s] %(message)s', | |
| handlers=[ | |
| logging.StreamHandler(), | |
| logging.FileHandler(LOG_PATH), | |
| ] | |
| ) | |
| logger = logging.getLogger(__name__) | |
| # ===== HELPERS ===== | |
| def load_config(): | |
| global DISCORD_WEBHOOK_URL | |
| try: | |
| with open(CONFIG_PATH) as f: | |
| config = json.load(f) | |
| DISCORD_WEBHOOK_URL = config.get("discord_webhook_url") | |
| for node_id, name in config.get("known_nodes", {}).items(): | |
| KNOWN_NODES[node_id] = name | |
| except FileNotFoundError: | |
| logger.warning("Config not found, using defaults") | |
| def send_discord(message: str): | |
| """Forward message to Discord via webhook.""" | |
| if not DISCORD_WEBHOOK_URL: | |
| return | |
| try: | |
| requests.post(DISCORD_WEBHOOK_URL, json={"content": message}, timeout=10) | |
| except: | |
| pass | |
| def send_radio(text: str, channel_index: int = 0): | |
| """Send text over LoRa, auto-chunking if needed.""" | |
| global INTERFACE | |
| if not INTERFACE: | |
| logger.error("No radio interface") | |
| return | |
| chunks = [] | |
| while text: | |
| if len(text) <= MAX_RADIO_MSG: | |
| chunks.append(text) | |
| break | |
| cut = text[:MAX_RADIO_MSG].rfind(' ') | |
| if cut <= 0: | |
| cut = MAX_RADIO_MSG | |
| chunks.append(text[:cut]) | |
| text = text[cut:].lstrip() | |
| for i, chunk in enumerate(chunks): | |
| if len(chunks) > 1: | |
| chunk = f"[{i+1}/{len(chunks)}] {chunk}" | |
| try: | |
| INTERFACE.sendText(chunk, channelIndex=channel_index) | |
| logger.info(f"Radio TX ch{channel_index}: {chunk[:80]}...") | |
| if i < len(chunks) - 1: | |
| time.sleep(3) # Pause between chunks to avoid radio congestion | |
| except Exception as e: | |
| logger.error(f"Radio send error: {e}") | |
| def check_internet() -> bool: | |
| """Quick internet connectivity check.""" | |
| try: | |
| requests.get("https://discord.com", timeout=5) | |
| return True | |
| except: | |
| return False | |
| def get_ha_headers(): | |
| """Load Home Assistant auth headers from config.""" | |
| try: | |
| cc = json.load(open(HA_CONFIG_PATH)) | |
| return {"Authorization": f"Bearer {cc['ha_token']}"} | |
| except: | |
| return None | |
| # ===== TOOL IMPLEMENTATIONS ===== | |
| def tool_get_home_status() -> str: | |
| """Read sensors from Home Assistant.""" | |
| headers = get_ha_headers() | |
| if not headers: | |
| return "HA config not found" | |
| # Map your HA entity IDs to friendly labels | |
| sensors = { | |
| # "sensor.living_room_temperature": "Temp", | |
| # "sensor.living_room_humidity": "Humidity", | |
| # "person.your_name": "You", | |
| # "weather.forecast_home": "Weather", | |
| } | |
| parts = [] | |
| for entity_id, label in sensors.items(): | |
| try: | |
| r = requests.get(f"{HA_URL}/api/states/{entity_id}", | |
| headers=headers, timeout=5) | |
| if r.status_code == 200: | |
| state = r.json().get("state", "?") | |
| parts.append(f"{label}: {state}") | |
| except: | |
| pass | |
| return " | ".join(parts) if parts else "HA unreachable" | |
| def tool_control_device(entity: str, action: str) -> str: | |
| """Control a Home Assistant device.""" | |
| headers = get_ha_headers() | |
| if not headers: | |
| return "HA config not found" | |
| # Map friendly names to HA entity IDs — customize for your setup | |
| entity_map = { | |
| "living_room_lights": "light.living_room", | |
| "bedroom_lights": "light.bedroom", | |
| "kitchen_lights": "light.kitchen", | |
| "all_lights": "light.all", | |
| } | |
| ha_entity = entity_map.get(entity.lower().replace(" ", "_"), entity) | |
| domain = ha_entity.split(".")[0] if "." in ha_entity else "light" | |
| service = f"turn_{action}" if action in ("on", "off") else "toggle" | |
| try: | |
| r = requests.post( | |
| f"{HA_URL}/api/services/{domain}/{service}", | |
| headers=headers, | |
| json={"entity_id": ha_entity}, | |
| timeout=10 | |
| ) | |
| if r.status_code == 200: | |
| return f"Done: {entity} → {action}" | |
| else: | |
| return f"HA error: {r.status_code}" | |
| except Exception as e: | |
| return f"HA error: {str(e)[:80]}" | |
| def tool_answer_question(question: str) -> str: | |
| """Use the brain model (gemma3:12b) for general knowledge answers.""" | |
| try: | |
| resp = requests.post(f"{OLLAMA_URL}/generate", json={ | |
| "model": BRAIN_MODEL, | |
| "prompt": question, | |
| "system": "You are a helpful AI responding via LoRa radio. Keep answers SHORT (under 350 chars). No markdown. Be direct.", | |
| "stream": False, | |
| "options": {"temperature": 0.7, "num_predict": 200} | |
| }, timeout=60) | |
| if resp.status_code == 200: | |
| return resp.json().get("response", "").strip() or "No response" | |
| return f"AI error: HTTP {resp.status_code}" | |
| except requests.exceptions.Timeout: | |
| return "AI timeout" | |
| except Exception as e: | |
| return f"AI error: {str(e)[:80]}" | |
| def tool_run_command(command: str) -> str: | |
| """Run safe system commands only.""" | |
| import subprocess | |
| cmd_base = command.split("|")[0].strip().split()[0] if command else "" | |
| if cmd_base not in SAFE_COMMANDS: | |
| return f"Command not allowed: {cmd_base}. Safe: {', '.join(SAFE_COMMANDS)}" | |
| try: | |
| result = subprocess.run(command, shell=True, capture_output=True, text=True, timeout=10) | |
| output = result.stdout.strip()[:300] | |
| return output if output else "No output" | |
| except Exception as e: | |
| return f"Error: {str(e)[:80]}" | |
| # ===== ROUTING ENGINE ===== | |
| def route_with_phi(user_message: str) -> str: | |
| """Use phi4-mini to decide which tool to call, then execute it.""" | |
| try: | |
| resp = requests.post(f"{OLLAMA_URL}/chat", json={ | |
| "model": ROUTER_MODEL, | |
| "messages": [{"role": "user", "content": user_message}], | |
| "tools": TOOLS, | |
| "stream": False | |
| }, timeout=30) | |
| if resp.status_code != 200: | |
| return tool_answer_question(user_message) | |
| msg = resp.json().get("message", {}) | |
| tool_calls = msg.get("tool_calls", []) | |
| content = msg.get("content", "") | |
| if not tool_calls: | |
| if content: | |
| return content[:400] | |
| return tool_answer_question(user_message) | |
| results = [] | |
| for tc in tool_calls: | |
| func = tc.get("function", {}) | |
| name = func.get("name", "") | |
| args = func.get("arguments", {}) | |
| logger.info(f"Tool call: {name}({json.dumps(args)})") | |
| if name == "get_home_status": | |
| results.append(tool_get_home_status()) | |
| elif name == "control_device": | |
| results.append(tool_control_device( | |
| args.get("entity", "unknown"), | |
| args.get("action", "off") | |
| )) | |
| elif name == "answer_question": | |
| results.append(tool_answer_question( | |
| args.get("question", user_message) | |
| )) | |
| elif name == "run_command": | |
| results.append(tool_run_command( | |
| args.get("command", "uptime") | |
| )) | |
| else: | |
| results.append(f"Unknown tool: {name}") | |
| return " | ".join(results) | |
| except Exception as e: | |
| logger.error(f"Router error: {e}") | |
| return tool_answer_question(user_message) | |
| # ===== MESSAGE HANDLERS ===== | |
| def handle_message(sender_id: str, sender_name: str, text: str, channel: int): | |
| """Main message handler with internet-aware routing.""" | |
| text_lower = text.strip().lower() | |
| # === SAY: prefix — TTS on Home Assistant speaker === | |
| if text_lower.startswith("say:") or text_lower.startswith("say "): | |
| msg = text[4:].strip() if text_lower.startswith("say:") else text[3:].strip() | |
| if not msg: | |
| send_radio("Usage: SAY: <message to read aloud>", channel_index=channel) | |
| return | |
| logger.info(f"TTS request from {sender_name}: {msg}") | |
| send_discord(f"🔊 **TTS [{sender_name}]:** {msg}") | |
| try: | |
| headers = get_ha_headers() | |
| if headers: | |
| requests.post( | |
| f"{HA_URL}/api/services/tts/speak", | |
| headers=headers, | |
| json={ | |
| "entity_id": TTS_ENTITY, | |
| "media_player_entity_id": TTS_SPEAKER, | |
| "message": msg, | |
| "language": TTS_LANGUAGE | |
| }, | |
| timeout=10 | |
| ) | |
| send_radio("Done, message read aloud", channel_index=channel) | |
| else: | |
| send_radio("HA config not found", channel_index=channel) | |
| except Exception as e: | |
| send_radio(f"TTS error: {str(e)[:80]}", channel_index=channel) | |
| return | |
| # === AI: prefix — always local AI === | |
| if text_lower.startswith("ai:") or text_lower.startswith("ai "): | |
| prompt = text[3:].strip() if text_lower.startswith("ai:") else text[2:].strip() | |
| if not prompt: | |
| send_radio("Usage: AI: <your question>", channel_index=channel) | |
| return | |
| logger.info(f"AI request from {sender_name}: {prompt}") | |
| send_discord(f"🧠 **AI query [{sender_name}]:** {prompt}") | |
| result = route_with_phi(prompt) | |
| send_radio(result, channel_index=channel) | |
| send_discord(f"🤖 **AI → radio:** {result[:500]}") | |
| return | |
| # === status — Home Assistant readout === | |
| if text_lower in ("status", "статус", "ha", "home"): | |
| status = tool_get_home_status() | |
| send_radio(f"HA: {status}", channel_index=channel) | |
| send_discord(f"📊 **Status [{sender_name}]:** {status}") | |
| return | |
| # === Regular message — check internet === | |
| has_internet = check_internet() | |
| if has_internet: | |
| # Online: forward to Discord for cloud AI | |
| send_discord(f"📡 **LoRa [{sender_name}]:** {text}") | |
| logger.info("Online — forwarded to Discord") | |
| else: | |
| # Offline: route everything through local AI | |
| logger.info(f"OFFLINE — routing to local AI: {text}") | |
| send_radio("(offline mode)", channel_index=channel) | |
| result = route_with_phi(text) | |
| send_radio(result, channel_index=channel) | |
| logger.info(f"Offline response: {result[:100]}") | |
| # ===== RADIO EVENT HANDLERS ===== | |
| def on_receive(packet, interface): | |
| try: | |
| if 'decoded' not in packet or 'text' not in packet['decoded']: | |
| return | |
| sender_id = packet.get('fromId', 'unknown') | |
| sender_name = KNOWN_NODES.get(sender_id, sender_id) | |
| text = packet['decoded']['text'] | |
| channel = packet.get('channel', 0) | |
| snr = packet.get('rxSnr') | |
| rssi = packet.get('rxRssi') | |
| # Ignore our own messages | |
| if sender_id == MY_NODE_ID: | |
| return | |
| signal = f" [SNR:{snr}dB RSSI:{rssi}dBm]" if snr else "" | |
| logger.info(f"[{sender_name}] ch{channel}: {text}{signal}") | |
| # Handle in a thread to not block radio reception | |
| thread = threading.Thread(target=handle_message, | |
| args=(sender_id, sender_name, text, channel)) | |
| thread.daemon = True | |
| thread.start() | |
| except Exception as e: | |
| logger.error(f"Error: {e}") | |
| def check_outbox(): | |
| """Check outbox directory for .msg files to send via radio.""" | |
| if not os.path.isdir(OUTBOX_DIR): | |
| return | |
| for filepath in sorted(glob.glob(os.path.join(OUTBOX_DIR, "*.msg"))): | |
| try: | |
| with open(filepath, 'r') as f: | |
| content = f.read().strip() | |
| if not content: | |
| os.remove(filepath) | |
| continue | |
| # Optional first line: "channel:N" to specify radio channel | |
| channel = 0 | |
| lines = content.split('\n', 1) | |
| if lines[0].startswith('channel:'): | |
| channel = int(lines[0].split(':')[1].strip()) | |
| content = lines[1] if len(lines) > 1 else '' | |
| if content: | |
| logger.info(f"Outbox sending: {content[:80]}...") | |
| send_radio(content, channel_index=channel) | |
| send_discord(f"📡 **Radio TX (outbox):** {content[:500]}") | |
| os.remove(filepath) | |
| except Exception as e: | |
| logger.error(f"Outbox error {filepath}: {e}") | |
| def on_connection(interface, topic=pub.AUTO_TOPIC): | |
| logger.info("Connected to Meshtastic device") | |
| def on_disconnect(interface, topic=pub.AUTO_TOPIC): | |
| logger.warning("Disconnected — will reconnect...") | |
| # ===== MAIN LOOP ===== | |
| WATCHDOG_INTERVAL = 300 # 5 min — check interface health | |
| def main(): | |
| global INTERFACE | |
| load_config() | |
| logger.info("=" * 50) | |
| logger.info("Meshtastic Off-Grid AI Listener") | |
| logger.info(f"Router: {ROUTER_MODEL} | Brain: {BRAIN_MODEL}") | |
| logger.info(f"Serial: {SERIAL_PORT}") | |
| logger.info(f"Tools: {[t['function']['name'] for t in TOOLS]}") | |
| logger.info("=" * 50) | |
| pub.subscribe(on_receive, "meshtastic.receive.text") | |
| pub.subscribe(on_connection, "meshtastic.connection.established") | |
| pub.subscribe(on_disconnect, "meshtastic.connection.lost") | |
| while True: | |
| try: | |
| INTERFACE = meshtastic.serial_interface.SerialInterface(SERIAL_PORT) | |
| logger.info("Listening...") | |
| os.makedirs(OUTBOX_DIR, exist_ok=True) | |
| logger.info(f"Outbox: {OUTBOX_DIR}") | |
| watchdog_counter = 0 | |
| while True: | |
| check_outbox() | |
| time.sleep(2) | |
| watchdog_counter += 2 | |
| if watchdog_counter >= WATCHDOG_INTERVAL: | |
| watchdog_counter = 0 | |
| try: | |
| if INTERFACE and INTERFACE.myInfo: | |
| logger.debug("Watchdog: interface healthy") | |
| else: | |
| logger.warning("Watchdog: interface stale, reconnecting...") | |
| raise Exception("Stale interface") | |
| except Exception as e: | |
| logger.warning(f"Watchdog triggered reconnect: {e}") | |
| try: INTERFACE.close() | |
| except: pass | |
| INTERFACE = None | |
| break | |
| except KeyboardInterrupt: | |
| try: INTERFACE.close() | |
| except: pass | |
| break | |
| except Exception as e: | |
| logger.error(f"Connection error: {e}") | |
| INTERFACE = None | |
| time.sleep(10) | |
| if __name__ == "__main__": | |
| main() |
Author
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Link to example setup: https://github.com/anvarazizov/meshtastic-listener-ollama