#!/usr/bin/env python3 """ agw_forwarder.py – Reads APRS frames from Direwolf AGW port and forwards them to the aprs-collector API in DMZ. Usage: python3 agw_forwarder.py Configuration via environment variables (or edit DEFAULTS below): AGW_HOST Direwolf host (default: localhost) AGW_PORT Direwolf AGW port (default: 8000) COLLECTOR_URL aprs-collector API URL (default: http://localhost:8080) API_KEY shared secret (required) STATION_CALL your callsign (default: SA6ANW-1) LOG_LEVEL DEBUG / INFO (default: INFO) Install deps: pip3 install requests Run as systemd service: see agw-forwarder.service """ import json import logging import os import queue import re import socket import struct import threading import time from datetime import datetime, timezone import requests # --------------------------------------------------------------------------- # Configuration # --------------------------------------------------------------------------- AGW_HOST = os.getenv("AGW_HOST", "localhost") AGW_PORT = int(os.getenv("AGW_PORT", "8000")) COLLECTOR_URL = os.getenv("COLLECTOR_URL", "http://localhost:8080") API_KEY = os.getenv("API_KEY", "") STATION_CALL = os.getenv("STATION_CALL", "SA6ANW-1") LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") INGEST_URL = f"{COLLECTOR_URL.rstrip('/')}/ingest/rf" RECONNECT_DELAY = 10 # seconds between AGW reconnect attempts MAX_QUEUE_SIZE = 2000 # frames buffered while API is unreachable HTTP_TIMEOUT = 5 # seconds per POST RETRY_DELAY = 5 # seconds between failed POST retries logging.basicConfig( level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), format="%(asctime)s %(levelname)-8s %(message)s", datefmt="%Y-%m-%dT%H:%M:%SZ", ) logger = logging.getLogger(__name__) if not API_KEY: raise SystemExit("ERROR: API_KEY environment variable is required") # --------------------------------------------------------------------------- # AGW protocol helpers # --------------------------------------------------------------------------- _HEADER_SIZE = 36 def _build_frame(kind: str, pid: int = 0, data: bytes = b"", port: int = 0, call_from: str = "", call_to: str = "") -> bytes: cf = call_from.encode("ascii").ljust(10, b"\x00")[:10] ct = call_to.encode("ascii").ljust(10, b"\x00")[:10] return ( bytes([port, 0, 0, 0]) + bytes([ord(kind), 0, pid, 0]) + cf + ct + struct.pack(" tuple: """Return (port, kind, call_from, call_to, data_len).""" port = raw[0] kind = chr(raw[4]) call_from = raw[8:18].rstrip(b"\x00").decode("ascii", errors="replace").strip() call_to = raw[18:28].rstrip(b"\x00").decode("ascii", errors="replace").strip() data_len = struct.unpack_from(" str: """ Extract the via path from a Direwolf AGW monitoring header. Direwolf sends 'U' frame data as: [port:Fm CALL To CALL Via PATH [HH:MM:SS]]\r Extract PATH from between 'Via ' and ' bool: """No '*' anywhere in path → frame arrived directly from the transmitter.""" if not via_path: return True return not any("*" in hop for hop in via_path.split(",")) def _recv_exact(sock: socket.socket, n: int) -> bytes: buf = b"" while len(buf) < n: chunk = sock.recv(n - len(buf)) if not chunk: raise ConnectionError("AGW socket closed") buf += chunk return buf # --------------------------------------------------------------------------- # Sender thread – dequeues frames and POSTs to collector API # --------------------------------------------------------------------------- def sender_thread(q: queue.Queue) -> None: session = requests.Session() session.headers.update({"Authorization": f"Bearer {API_KEY}", "Content-Type": "application/json"}) while True: payload = q.get() while True: try: r = session.post(INGEST_URL, data=payload, timeout=HTTP_TIMEOUT) if r.status_code == 204: break elif r.status_code == 401: logger.error("API_KEY rejected – check configuration") time.sleep(30) else: logger.warning("POST returned %d – retrying", r.status_code) time.sleep(RETRY_DELAY) except requests.exceptions.RequestException as exc: logger.warning("POST failed (%s) – retrying in %ds", exc, RETRY_DELAY) time.sleep(RETRY_DELAY) # --------------------------------------------------------------------------- # AGW reader – main loop # --------------------------------------------------------------------------- def run_agw_reader(q: queue.Queue) -> None: while True: sock = None try: logger.info("AGW: connecting to %s:%d", AGW_HOST, AGW_PORT) sock = socket.create_connection((AGW_HOST, AGW_PORT), timeout=10) sock.settimeout(None) # blocking reads after connect # Capabilities request (warms up connection) sock.sendall(_build_frame("G")) # Enable monitoring mode → receive all UI frames sock.sendall(_build_frame("m")) logger.info("AGW: connected, monitoring enabled") while True: hdr = _recv_exact(sock, _HEADER_SIZE) _, kind, call_from, call_to, data_len = _parse_header(hdr) data = b"" if data_len > 0: data = _recv_exact(sock, data_len) if kind != "U": continue # skip control frames try: if b"\r" in data: monitoring_raw, info_raw = data.split(b"\r", 1) else: monitoring_raw, info_raw = b"", data monitoring = monitoring_raw.decode("ascii", errors="replace").strip() info = info_raw.decode("ascii", errors="replace") # monitoring line is "[port:Fm X To Y Via PATH ]" # extract just the via hops from it via_path = _parse_monitoring_via(monitoring) if "[" in monitoring else monitoring direct = _is_heard_direct(via_path) ts = datetime.now(timezone.utc).isoformat() payload = json.dumps({ "ts": ts, "src_call": call_from, "dst_call": call_to, "via_path": via_path, "info": info, "heard_direct": direct, }) logger.info("RF %-6s %-9s [%s]", "DIRECT" if direct else "VIA", call_from, via_path) if q.full(): logger.warning("Queue full – dropping oldest frame") try: q.get_nowait() except queue.Empty: pass q.put(payload) except Exception as exc: logger.warning("Frame parse error: %s", exc) except (ConnectionError, OSError, TimeoutError) as exc: logger.warning("AGW: %s – reconnecting in %ds", exc, RECONNECT_DELAY) finally: if sock: try: sock.close() except Exception: pass time.sleep(RECONNECT_DELAY) # --------------------------------------------------------------------------- # Entry point # --------------------------------------------------------------------------- if __name__ == "__main__": logger.info("agw-forwarder starting station=%s agw=%s:%d api=%s", STATION_CALL, AGW_HOST, AGW_PORT, INGEST_URL) frame_queue: queue.Queue = queue.Queue(maxsize=MAX_QUEUE_SIZE) t = threading.Thread(target=sender_thread, args=(frame_queue,), daemon=True) t.start() run_agw_reader(frame_queue) # blocks forever, reconnects on error