Files
heardlog/agw-forwarder/agw_forwarder.py
Joakim Svensson 42ba6feed4 first commit
2026-04-26 17:20:58 +02:00

245 lines
8.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
#!/usr/bin/env python3
"""
agw_forwarder.py Reads APRS frames from Direwolf AGW port and forwards
them to the aprs-collector API in DMZ.
Usage:
python3 agw_forwarder.py
Configuration via environment variables (or edit DEFAULTS below):
AGW_HOST Direwolf host (default: localhost)
AGW_PORT Direwolf AGW port (default: 8000)
COLLECTOR_URL aprs-collector API URL (default: http://localhost:8080)
API_KEY shared secret (required)
STATION_CALL your callsign (default: SA6ANW-1)
LOG_LEVEL DEBUG / INFO (default: INFO)
Install deps:
pip3 install requests
Run as systemd service: see agw-forwarder.service
"""
import json
import logging
import os
import queue
import re
import socket
import struct
import threading
import time
from datetime import datetime, timezone
import requests
# ---------------------------------------------------------------------------
# Configuration
# ---------------------------------------------------------------------------
AGW_HOST = os.getenv("AGW_HOST", "localhost")
AGW_PORT = int(os.getenv("AGW_PORT", "8000"))
COLLECTOR_URL = os.getenv("COLLECTOR_URL", "http://localhost:8080")
API_KEY = os.getenv("API_KEY", "")
STATION_CALL = os.getenv("STATION_CALL", "SA6ANW-1")
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
INGEST_URL = f"{COLLECTOR_URL.rstrip('/')}/ingest/rf"
RECONNECT_DELAY = 10 # seconds between AGW reconnect attempts
MAX_QUEUE_SIZE = 2000 # frames buffered while API is unreachable
HTTP_TIMEOUT = 5 # seconds per POST
RETRY_DELAY = 5 # seconds between failed POST retries
logging.basicConfig(
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
format="%(asctime)s %(levelname)-8s %(message)s",
datefmt="%Y-%m-%dT%H:%M:%SZ",
)
logger = logging.getLogger(__name__)
if not API_KEY:
raise SystemExit("ERROR: API_KEY environment variable is required")
# ---------------------------------------------------------------------------
# AGW protocol helpers
# ---------------------------------------------------------------------------
_HEADER_SIZE = 36
def _build_frame(kind: str, pid: int = 0, data: bytes = b"", port: int = 0,
call_from: str = "", call_to: str = "") -> bytes:
cf = call_from.encode("ascii").ljust(10, b"\x00")[:10]
ct = call_to.encode("ascii").ljust(10, b"\x00")[:10]
return (
bytes([port, 0, 0, 0])
+ bytes([ord(kind), 0, pid, 0])
+ cf + ct
+ struct.pack("<I", len(data))
+ b"\x00\x00\x00\x00"
+ data
)
def _parse_header(raw: bytes) -> tuple:
"""Return (port, kind, call_from, call_to, data_len)."""
port = raw[0]
kind = chr(raw[4])
call_from = raw[8:18].rstrip(b"\x00").decode("ascii", errors="replace").strip()
call_to = raw[18:28].rstrip(b"\x00").decode("ascii", errors="replace").strip()
data_len = struct.unpack_from("<I", raw, 28)[0]
return port, kind, call_from, call_to, data_len
def _parse_monitoring_via(monitoring_line: str) -> str:
"""
Extract the via path from a Direwolf AGW monitoring header.
Direwolf sends 'U' frame data as:
[port:Fm CALL To CALL Via PATH <UI pid=F0 Len=N PF=0>[HH:MM:SS]]\r<info>
Extract PATH from between 'Via ' and ' <UI'.
If there is no 'Via' the frame had no path (direct / no-path beacon).
"""
m = re.search(r'\bVia\s+([^<\s][^<]*?)\s+<', monitoring_line)
if m:
return m.group(1).strip()
return ""
def _is_heard_direct(via_path: str) -> bool:
"""No '*' anywhere in path → frame arrived directly from the transmitter."""
if not via_path:
return True
return not any("*" in hop for hop in via_path.split(","))
def _recv_exact(sock: socket.socket, n: int) -> bytes:
buf = b""
while len(buf) < n:
chunk = sock.recv(n - len(buf))
if not chunk:
raise ConnectionError("AGW socket closed")
buf += chunk
return buf
# ---------------------------------------------------------------------------
# Sender thread dequeues frames and POSTs to collector API
# ---------------------------------------------------------------------------
def sender_thread(q: queue.Queue) -> None:
session = requests.Session()
session.headers.update({"Authorization": f"Bearer {API_KEY}",
"Content-Type": "application/json"})
while True:
payload = q.get()
while True:
try:
r = session.post(INGEST_URL, data=payload, timeout=HTTP_TIMEOUT)
if r.status_code == 204:
break
elif r.status_code == 401:
logger.error("API_KEY rejected check configuration")
time.sleep(30)
else:
logger.warning("POST returned %d retrying", r.status_code)
time.sleep(RETRY_DELAY)
except requests.exceptions.RequestException as exc:
logger.warning("POST failed (%s) retrying in %ds", exc, RETRY_DELAY)
time.sleep(RETRY_DELAY)
# ---------------------------------------------------------------------------
# AGW reader main loop
# ---------------------------------------------------------------------------
def run_agw_reader(q: queue.Queue) -> None:
while True:
sock = None
try:
logger.info("AGW: connecting to %s:%d", AGW_HOST, AGW_PORT)
sock = socket.create_connection((AGW_HOST, AGW_PORT), timeout=10)
sock.settimeout(None) # blocking reads after connect
# Capabilities request (warms up connection)
sock.sendall(_build_frame("G"))
# Enable monitoring mode → receive all UI frames
sock.sendall(_build_frame("m"))
logger.info("AGW: connected, monitoring enabled")
while True:
hdr = _recv_exact(sock, _HEADER_SIZE)
_, kind, call_from, call_to, data_len = _parse_header(hdr)
data = b""
if data_len > 0:
data = _recv_exact(sock, data_len)
if kind != "U":
continue # skip control frames
try:
if b"\r" in data:
monitoring_raw, info_raw = data.split(b"\r", 1)
else:
monitoring_raw, info_raw = b"", data
monitoring = monitoring_raw.decode("ascii", errors="replace").strip()
info = info_raw.decode("ascii", errors="replace")
# monitoring line is "[port:Fm X To Y Via PATH <UI...>]"
# extract just the via hops from it
via_path = _parse_monitoring_via(monitoring) if "[" in monitoring else monitoring
direct = _is_heard_direct(via_path)
ts = datetime.now(timezone.utc).isoformat()
payload = json.dumps({
"ts": ts,
"src_call": call_from,
"dst_call": call_to,
"via_path": via_path,
"info": info,
"heard_direct": direct,
})
logger.info("RF %-6s %-9s [%s]",
"DIRECT" if direct else "VIA", call_from, via_path)
if q.full():
logger.warning("Queue full dropping oldest frame")
try:
q.get_nowait()
except queue.Empty:
pass
q.put(payload)
except Exception as exc:
logger.warning("Frame parse error: %s", exc)
except (ConnectionError, OSError, TimeoutError) as exc:
logger.warning("AGW: %s reconnecting in %ds", exc, RECONNECT_DELAY)
finally:
if sock:
try:
sock.close()
except Exception:
pass
time.sleep(RECONNECT_DELAY)
# ---------------------------------------------------------------------------
# Entry point
# ---------------------------------------------------------------------------
if __name__ == "__main__":
logger.info("agw-forwarder starting station=%s agw=%s:%d api=%s",
STATION_CALL, AGW_HOST, AGW_PORT, INGEST_URL)
frame_queue: queue.Queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
t = threading.Thread(target=sender_thread, args=(frame_queue,), daemon=True)
t.start()
run_agw_reader(frame_queue) # blocks forever, reconnects on error