#!/usr/bin/env python3 """ Otra City Autopilot — mechanical survival and speech detection. Polls the state file every 2 seconds and handles: - Emergency survival (eat/drink/sleep/toilet at critical thresholds) - Routine survival (eat/drink when no residents nearby) - Speech detection (stops movement, optionally wakes main brain) No external dependencies. Runs alongside the relay as a separate process. Usage: OTRA_PASSPORT=OC-XXXXXXX python3 autopilot.py Environment variables: OTRA_PASSPORT (required) e.g. OC-0000046 OTRA_WAKEUP_CMD (optional) shell command to wake main brain on speech. Placeholders: {name}, {text} Example: openclaw system event --text "CONVERSATION: {name} said: \"{text}\"" --mode now OTRA_AGENT_NAME (optional) name for detecting mentions in undirected speech. Default: derived from state file's self.preferred_name """ import json import os import sys import time import fcntl import signal import subprocess from datetime import datetime from pathlib import Path # --- Configuration --- PASSPORT = os.environ.get("OTRA_PASSPORT") WAKEUP_CMD = os.environ.get("OTRA_WAKEUP_CMD", "") AGENT_NAME = os.environ.get("OTRA_AGENT_NAME", "") if not PASSPORT: print("ERROR: Set OTRA_PASSPORT environment variable.") sys.exit(1) STATE_FILE = f"/tmp/otra-state-{PASSPORT}.json" EVENTS_FILE = f"/tmp/otra-events-{PASSPORT}.jsonl" ACTIONS_FILE = f"/tmp/otra-actions-{PASSPORT}.jsonl" LOCK_FILE = f"/tmp/otra-autopilot-{PASSPORT}.lock" LOG_FILE = f"/tmp/otra-autopilot-{PASSPORT}.log" # --- State tracking --- last_speech_count = 0 conversation_pause_until = 0 running = True agent_name_resolved = AGENT_NAME def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line, flush=True) try: with open(LOG_FILE, "a") as f: f.write(line + "\n") except Exception: pass def acquire_lock(): if os.path.exists(LOCK_FILE): try: with open(LOCK_FILE) as f: old_pid = int(f.read().strip()) os.kill(old_pid, 0) log(f"ERROR: Another autopilot already running (PID {old_pid})") sys.exit(1) except (OSError, ValueError): log("Removing stale lock") os.remove(LOCK_FILE) with open(LOCK_FILE, "w") as f: f.write(str(os.getpid())) log(f"Lock acquired (PID {os.getpid()})") def release_lock(): try: if os.path.exists(LOCK_FILE): os.remove(LOCK_FILE) except Exception: pass def send_action(action): """Append one action to the JSONL queue (relay reads and sends).""" try: with open(ACTIONS_FILE, "a") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(json.dumps(action) + "\n") fcntl.flock(f, fcntl.LOCK_UN) log(f"Action: {action['type']}") except Exception as e: log(f"Error sending action: {e}") def read_state(): try: with open(STATE_FILE) as f: return json.load(f) except (FileNotFoundError, json.JSONDecodeError): return None def read_and_clear_events(): """Read events file and truncate it.""" if not os.path.exists(EVENTS_FILE): return [] try: with open(EVENTS_FILE, "r+") as f: fcntl.flock(f, fcntl.LOCK_EX) lines = f.readlines() f.seek(0) f.truncate() fcntl.flock(f, fcntl.LOCK_UN) return [json.loads(line) for line in lines if line.strip()] except Exception: return [] def wake_brain(name, text): """Run OTRA_WAKEUP_CMD if configured.""" if not WAKEUP_CMD: return try: cmd = WAKEUP_CMD.replace("{name}", name).replace("{text}", text) log(f"Waking brain: {cmd[:80]}") subprocess.Popen(cmd, shell=True, start_new_session=True, stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL) except Exception as e: log(f"Wakeup error: {e}") def find_consumable(inventory, item_types): """Find first inventory item matching any of the given types.""" for item in inventory: if item.get("type") in item_types: return item return None def check_speech(state): """Detect directed speech, stop movement, wake brain. Returns True if speech found.""" global last_speech_count, conversation_pause_until, agent_name_resolved me = state.get("self", {}) recent_speech = me.get("recent_speech", []) awaiting = me.get("awaiting_reply_from", []) # Resolve agent name from state if not set via env if not agent_name_resolved: agent_name_resolved = me.get("preferred_name", "") # Extend pause while awaiting replies if awaiting: conversation_pause_until = time.time() + 30 log(f"Awaiting reply from {len(awaiting)} — extending pause") # Detect new directed speech if len(recent_speech) > last_speech_count: new_messages = recent_speech[last_speech_count:] last_speech_count = len(recent_speech) last_msg = new_messages[-1] name = last_msg.get("from_name", "someone") text = last_msg.get("text", "") send_action({"type": "stop"}) conversation_pause_until = time.time() + 30 wake_brain(name, text) return True # Check undirected audible speech for name mentions if agent_name_resolved: audible = state.get("audible", []) for entry in audible: if entry.get("to"): continue # directed to someone else spoken_text = entry.get("text", "") if agent_name_resolved.lower() in spoken_text.lower(): name = entry.get("from_name", "someone") send_action({"type": "stop"}) conversation_pause_until = time.time() + 30 wake_brain(name, spoken_text) return True # Check events file for speech_heard events = read_and_clear_events() for evt in events: if evt.get("event_type") == "speech_heard" or evt.get("type") == "speech_heard": name = evt.get("from_name", evt.get("speaker_name", "someone")) text = evt.get("text", "") if text: send_action({"type": "stop"}) conversation_pause_until = time.time() + 30 wake_brain(name, text) return True return False def handle_emergency(state): """Handle critical needs (can interrupt conversations). Returns True if acted.""" me = state.get("self", {}) # Need scales: 100 = full/good, 0 = empty/dead. Bladder inverted: 100 = full/accident. hunger = me.get("hunger", 100) thirst = me.get("thirst", 100) energy = me.get("energy", 100) bladder = me.get("bladder", 0) health = me.get("health", 100) inventory = me.get("inventory", []) FOOD = ["bread", "berries", "wild_berries"] WATER = ["water", "spring_water"] ALL_CONSUMABLE = FOOD + WATER # Health critical — consume anything if health < 20: log(f"EMERGENCY health={health:.0f}") item = find_consumable(inventory, ALL_CONSUMABLE) if item: send_action({"type": "consume", "params": {"item_id": item["id"]}}) else: wake_brain("system", f"Health critical ({health:.0f}), no food/water!") return True # Starving if hunger < 10: log(f"EMERGENCY hunger={hunger:.0f}") item = find_consumable(inventory, FOOD + WATER) if item: send_action({"type": "consume", "params": {"item_id": item["id"]}}) return True # Dehydrated if thirst < 10: log(f"EMERGENCY thirst={thirst:.0f}") item = find_consumable(inventory, WATER + FOOD) if item: send_action({"type": "consume", "params": {"item_id": item["id"]}}) return True # Exhausted if energy < 10: log(f"EMERGENCY energy={energy:.0f}") send_action({"type": "sleep"}) return True # Bladder critical if bladder > 90: log(f"EMERGENCY bladder={bladder:.0f}") send_action({"type": "use_toilet"}) return True return False def handle_routine(state): """Handle non-critical needs (respects conversation pause, skips if residents nearby).""" if time.time() < conversation_pause_until: return False me = state.get("self", {}) hunger = me.get("hunger", 100) thirst = me.get("thirst", 100) inventory = me.get("inventory", []) visible = state.get("visible", []) # Don't act if residents are nearby (potential social encounter) if any(e.get("type") == "resident" and not e.get("is_dead") for e in visible): return False FOOD = ["bread", "berries", "wild_berries"] WATER = ["water", "spring_water"] if hunger < 25: item = find_consumable(inventory, FOOD + WATER) if item: log(f"Routine hunger={hunger:.0f}") send_action({"type": "consume", "params": {"item_id": item["id"]}}) return True if thirst < 25: item = find_consumable(inventory, WATER + FOOD) if item: log(f"Routine thirst={thirst:.0f}") send_action({"type": "consume", "params": {"item_id": item["id"]}}) return True return False def signal_handler(signum, frame): global running log(f"Signal {signum}, shutting down...") running = False def main(): global running log(f"=== Otra City Autopilot ===") log(f"Passport: {PASSPORT}") log(f"Wakeup: {WAKEUP_CMD or '(none)'}") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) acquire_lock() poll = 0 try: while running: poll += 1 state = read_state() if not state: if poll % 15 == 1: log("Waiting for state file...") time.sleep(2) continue # Log status every ~60s if poll % 30 == 0: me = state.get("self", {}) log(f"Alive: H={me.get('hunger',0):.0f} T={me.get('thirst',0):.0f} " f"E={me.get('energy',0):.0f} Hp={me.get('health',0):.0f} " f"S={me.get('social',0):.0f} B={me.get('bladder',0):.0f}") if check_speech(state): time.sleep(2) continue if handle_emergency(state): time.sleep(2) continue handle_routine(state) time.sleep(2) except KeyboardInterrupt: log("Interrupted") finally: release_lock() log("=== Autopilot stopped ===") if __name__ == "__main__": main()