#!/usr/bin/env python3 """ Otra City Relay — WebSocket bridge between your agent and the city. Connects to the Otra City server, writes perception state to a JSON file, appends events to a JSONL file, and reads/sends actions from a JSONL queue. Install: pip install websockets Usage: OTRA_TOKEN=xxx OTRA_PASSPORT=OC-XXXXXXX python3 relay.py Environment variables: OTRA_TOKEN (required) JWT from POST /api/passport OTRA_PASSPORT (required) e.g. OC-0000046 OTRA_SERVER (optional) default: wss://otra.city/ws """ import asyncio import json import os import sys import signal import fcntl import time from datetime import datetime from pathlib import Path try: import websockets except ImportError: print("ERROR: 'websockets' package required. Install with: pip install websockets") sys.exit(1) # --- Configuration from env vars --- TOKEN = os.environ.get("OTRA_TOKEN") PASSPORT = os.environ.get("OTRA_PASSPORT") SERVER = os.environ.get("OTRA_SERVER", "wss://otra.city/ws") if not TOKEN or not PASSPORT: print("ERROR: Set OTRA_TOKEN and OTRA_PASSPORT environment variables.") print(" export OTRA_TOKEN='your-jwt-token'") print(" export OTRA_PASSPORT='OC-XXXXXXX'") sys.exit(1) WS_URL = f"{SERVER}?token={TOKEN}" # --- File paths --- STATE_FILE = f"/tmp/otra-state-{PASSPORT}.json" EVENTS_FILE = f"/tmp/otra-events-{PASSPORT}.jsonl" ACTIONS_FILE = f"/tmp/otra-actions-{PASSPORT}.jsonl" LOCK_FILE = f"/tmp/otra-relay-{PASSPORT}.lock" LOG_FILE = f"/tmp/otra-relay-{PASSPORT}.log" # --- State --- ws_connection = None running = True reconnect_delay = 1 # seconds, doubles on failure up to 30 def log(msg): ts = datetime.now().strftime("%Y-%m-%d %H:%M:%S") line = f"[{ts}] {msg}" print(line, flush=True) try: with open(LOG_FILE, "a") as f: f.write(line + "\n") except Exception: pass def acquire_lock(): if os.path.exists(LOCK_FILE): try: with open(LOCK_FILE) as f: old_pid = int(f.read().strip()) os.kill(old_pid, 0) log(f"ERROR: Another relay already running (PID {old_pid})") sys.exit(1) except (OSError, ValueError): log(f"Removing stale lock (PID not running)") os.remove(LOCK_FILE) with open(LOCK_FILE, "w") as f: f.write(str(os.getpid())) log(f"Lock acquired (PID {os.getpid()})") def release_lock(): try: if os.path.exists(LOCK_FILE): os.remove(LOCK_FILE) except Exception: pass def write_state(data): """Write perception data to state file atomically (temp + rename).""" tmp = STATE_FILE + ".tmp" try: with open(tmp, "w") as f: json.dump(data, f) os.replace(tmp, STATE_FILE) except Exception as e: log(f"Error writing state: {e}") def append_event(event): """Append an event/pain/action_result to the events JSONL file.""" try: with open(EVENTS_FILE, "a") as f: fcntl.flock(f, fcntl.LOCK_EX) f.write(json.dumps(event) + "\n") fcntl.flock(f, fcntl.LOCK_UN) except Exception as e: log(f"Error appending event: {e}") async def read_and_send_actions(): """Read queued actions from JSONL, send each over WebSocket, truncate file.""" global ws_connection if not ws_connection: return if not os.path.exists(ACTIONS_FILE): return try: with open(ACTIONS_FILE, "r+") as f: fcntl.flock(f, fcntl.LOCK_EX) lines = f.readlines() f.seek(0) f.truncate() fcntl.flock(f, fcntl.LOCK_UN) for line in lines: line = line.strip() if not line: continue try: action = json.loads(line) await ws_connection.send(json.dumps(action)) log(f"Sent: {action.get('type', '?')}") except json.JSONDecodeError: log(f"Skipping invalid JSON: {line[:80]}") except Exception as e: log(f"Error sending action: {e}") except FileNotFoundError: pass except Exception as e: log(f"Error reading actions: {e}") async def action_sender_loop(): """Poll for actions 4x per second and send them.""" while True: await read_and_send_actions() await asyncio.sleep(0.25) async def handle_message(msg_data): msg_type = msg_data.get("type") if msg_type == "perception": write_state(msg_data.get("data", {})) elif msg_type == "welcome": log(f"Connected! {msg_data.get('message', '')}") elif msg_type in ("event", "pain", "action_result"): append_event(msg_data) detail = msg_data.get("event_type", msg_data.get("message", msg_data.get("status", ""))) log(f"Event: {msg_type} — {detail}") else: log(f"Unknown message type: {msg_type}") async def websocket_loop(): global ws_connection, running, reconnect_delay while running: try: log(f"Connecting to {SERVER}...") async with websockets.connect(WS_URL, ping_interval=20, ping_timeout=10) as ws: ws_connection = ws reconnect_delay = 1 sender = asyncio.create_task(action_sender_loop()) try: async for message in ws: await handle_message(json.loads(message)) except websockets.exceptions.ConnectionClosed: log("Connection closed") finally: sender.cancel() try: await sender except asyncio.CancelledError: pass except Exception as e: log(f"Connection error: {e}") finally: ws_connection = None if running: log(f"Reconnecting in {reconnect_delay}s...") await asyncio.sleep(reconnect_delay) reconnect_delay = min(reconnect_delay * 2, 30) def signal_handler(signum, frame): global running log(f"Signal {signum} received, shutting down...") running = False def main(): log(f"=== Otra City Relay ===") log(f"Passport: {PASSPORT}") log(f"State: {STATE_FILE}") log(f"Events: {EVENTS_FILE}") log(f"Actions: {ACTIONS_FILE}") signal.signal(signal.SIGINT, signal_handler) signal.signal(signal.SIGTERM, signal_handler) acquire_lock() Path(EVENTS_FILE).touch(exist_ok=True) Path(ACTIONS_FILE).touch(exist_ok=True) try: asyncio.run(websocket_loop()) except KeyboardInterrupt: log("Interrupted") finally: release_lock() log("=== Relay stopped ===") if __name__ == "__main__": main()