245 lines
8.7 KiB
Python
245 lines
8.7 KiB
Python
#!/usr/bin/env python3
|
||
"""
|
||
agw_forwarder.py – Reads APRS frames from Direwolf AGW port and forwards
|
||
them to the aprs-collector API in DMZ.
|
||
|
||
Usage:
|
||
python3 agw_forwarder.py
|
||
|
||
Configuration via environment variables (or edit DEFAULTS below):
|
||
AGW_HOST Direwolf host (default: localhost)
|
||
AGW_PORT Direwolf AGW port (default: 8000)
|
||
COLLECTOR_URL aprs-collector API URL (default: http://localhost:8080)
|
||
API_KEY shared secret (required)
|
||
STATION_CALL your callsign (default: SA6ANW-1)
|
||
LOG_LEVEL DEBUG / INFO (default: INFO)
|
||
|
||
Install deps:
|
||
pip3 install requests
|
||
|
||
Run as systemd service: see agw-forwarder.service
|
||
"""
|
||
|
||
import json
|
||
import logging
|
||
import os
|
||
import queue
|
||
import re
|
||
import socket
|
||
import struct
|
||
import threading
|
||
import time
|
||
from datetime import datetime, timezone
|
||
|
||
import requests
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Configuration
|
||
# ---------------------------------------------------------------------------
|
||
AGW_HOST = os.getenv("AGW_HOST", "localhost")
|
||
AGW_PORT = int(os.getenv("AGW_PORT", "8000"))
|
||
COLLECTOR_URL = os.getenv("COLLECTOR_URL", "http://localhost:8080")
|
||
API_KEY = os.getenv("API_KEY", "")
|
||
STATION_CALL = os.getenv("STATION_CALL", "SA6ANW-1")
|
||
LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO")
|
||
|
||
INGEST_URL = f"{COLLECTOR_URL.rstrip('/')}/ingest/rf"
|
||
RECONNECT_DELAY = 10 # seconds between AGW reconnect attempts
|
||
MAX_QUEUE_SIZE = 2000 # frames buffered while API is unreachable
|
||
HTTP_TIMEOUT = 5 # seconds per POST
|
||
RETRY_DELAY = 5 # seconds between failed POST retries
|
||
|
||
logging.basicConfig(
|
||
level=getattr(logging, LOG_LEVEL.upper(), logging.INFO),
|
||
format="%(asctime)s %(levelname)-8s %(message)s",
|
||
datefmt="%Y-%m-%dT%H:%M:%SZ",
|
||
)
|
||
logger = logging.getLogger(__name__)
|
||
|
||
if not API_KEY:
|
||
raise SystemExit("ERROR: API_KEY environment variable is required")
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# AGW protocol helpers
|
||
# ---------------------------------------------------------------------------
|
||
_HEADER_SIZE = 36
|
||
|
||
|
||
def _build_frame(kind: str, pid: int = 0, data: bytes = b"", port: int = 0,
|
||
call_from: str = "", call_to: str = "") -> bytes:
|
||
cf = call_from.encode("ascii").ljust(10, b"\x00")[:10]
|
||
ct = call_to.encode("ascii").ljust(10, b"\x00")[:10]
|
||
return (
|
||
bytes([port, 0, 0, 0])
|
||
+ bytes([ord(kind), 0, pid, 0])
|
||
+ cf + ct
|
||
+ struct.pack("<I", len(data))
|
||
+ b"\x00\x00\x00\x00"
|
||
+ data
|
||
)
|
||
|
||
|
||
def _parse_header(raw: bytes) -> tuple:
|
||
"""Return (port, kind, call_from, call_to, data_len)."""
|
||
port = raw[0]
|
||
kind = chr(raw[4])
|
||
call_from = raw[8:18].rstrip(b"\x00").decode("ascii", errors="replace").strip()
|
||
call_to = raw[18:28].rstrip(b"\x00").decode("ascii", errors="replace").strip()
|
||
data_len = struct.unpack_from("<I", raw, 28)[0]
|
||
return port, kind, call_from, call_to, data_len
|
||
|
||
|
||
def _parse_monitoring_via(monitoring_line: str) -> str:
|
||
"""
|
||
Extract the via path from a Direwolf AGW monitoring header.
|
||
|
||
Direwolf sends 'U' frame data as:
|
||
[port:Fm CALL To CALL Via PATH <UI pid=F0 Len=N PF=0>[HH:MM:SS]]\r<info>
|
||
|
||
Extract PATH from between 'Via ' and ' <UI'.
|
||
If there is no 'Via' the frame had no path (direct / no-path beacon).
|
||
"""
|
||
m = re.search(r'\bVia\s+([^<\s][^<]*?)\s+<', monitoring_line)
|
||
if m:
|
||
return m.group(1).strip()
|
||
return ""
|
||
|
||
|
||
def _is_heard_direct(via_path: str) -> bool:
|
||
"""No '*' anywhere in path → frame arrived directly from the transmitter."""
|
||
if not via_path:
|
||
return True
|
||
return not any("*" in hop for hop in via_path.split(","))
|
||
|
||
|
||
def _recv_exact(sock: socket.socket, n: int) -> bytes:
|
||
buf = b""
|
||
while len(buf) < n:
|
||
chunk = sock.recv(n - len(buf))
|
||
if not chunk:
|
||
raise ConnectionError("AGW socket closed")
|
||
buf += chunk
|
||
return buf
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Sender thread – dequeues frames and POSTs to collector API
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def sender_thread(q: queue.Queue) -> None:
|
||
session = requests.Session()
|
||
session.headers.update({"Authorization": f"Bearer {API_KEY}",
|
||
"Content-Type": "application/json"})
|
||
while True:
|
||
payload = q.get()
|
||
while True:
|
||
try:
|
||
r = session.post(INGEST_URL, data=payload, timeout=HTTP_TIMEOUT)
|
||
if r.status_code == 204:
|
||
break
|
||
elif r.status_code == 401:
|
||
logger.error("API_KEY rejected – check configuration")
|
||
time.sleep(30)
|
||
else:
|
||
logger.warning("POST returned %d – retrying", r.status_code)
|
||
time.sleep(RETRY_DELAY)
|
||
except requests.exceptions.RequestException as exc:
|
||
logger.warning("POST failed (%s) – retrying in %ds", exc, RETRY_DELAY)
|
||
time.sleep(RETRY_DELAY)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# AGW reader – main loop
|
||
# ---------------------------------------------------------------------------
|
||
|
||
def run_agw_reader(q: queue.Queue) -> None:
|
||
while True:
|
||
sock = None
|
||
try:
|
||
logger.info("AGW: connecting to %s:%d", AGW_HOST, AGW_PORT)
|
||
sock = socket.create_connection((AGW_HOST, AGW_PORT), timeout=10)
|
||
sock.settimeout(None) # blocking reads after connect
|
||
|
||
# Capabilities request (warms up connection)
|
||
sock.sendall(_build_frame("G"))
|
||
# Enable monitoring mode → receive all UI frames
|
||
sock.sendall(_build_frame("m"))
|
||
|
||
logger.info("AGW: connected, monitoring enabled")
|
||
|
||
while True:
|
||
hdr = _recv_exact(sock, _HEADER_SIZE)
|
||
_, kind, call_from, call_to, data_len = _parse_header(hdr)
|
||
|
||
data = b""
|
||
if data_len > 0:
|
||
data = _recv_exact(sock, data_len)
|
||
|
||
if kind != "U":
|
||
continue # skip control frames
|
||
|
||
try:
|
||
if b"\r" in data:
|
||
monitoring_raw, info_raw = data.split(b"\r", 1)
|
||
else:
|
||
monitoring_raw, info_raw = b"", data
|
||
|
||
monitoring = monitoring_raw.decode("ascii", errors="replace").strip()
|
||
info = info_raw.decode("ascii", errors="replace")
|
||
|
||
# monitoring line is "[port:Fm X To Y Via PATH <UI...>]"
|
||
# extract just the via hops from it
|
||
via_path = _parse_monitoring_via(monitoring) if "[" in monitoring else monitoring
|
||
direct = _is_heard_direct(via_path)
|
||
ts = datetime.now(timezone.utc).isoformat()
|
||
|
||
payload = json.dumps({
|
||
"ts": ts,
|
||
"src_call": call_from,
|
||
"dst_call": call_to,
|
||
"via_path": via_path,
|
||
"info": info,
|
||
"heard_direct": direct,
|
||
})
|
||
|
||
logger.info("RF %-6s %-9s [%s]",
|
||
"DIRECT" if direct else "VIA", call_from, via_path)
|
||
|
||
if q.full():
|
||
logger.warning("Queue full – dropping oldest frame")
|
||
try:
|
||
q.get_nowait()
|
||
except queue.Empty:
|
||
pass
|
||
q.put(payload)
|
||
|
||
except Exception as exc:
|
||
logger.warning("Frame parse error: %s", exc)
|
||
|
||
except (ConnectionError, OSError, TimeoutError) as exc:
|
||
logger.warning("AGW: %s – reconnecting in %ds", exc, RECONNECT_DELAY)
|
||
finally:
|
||
if sock:
|
||
try:
|
||
sock.close()
|
||
except Exception:
|
||
pass
|
||
|
||
time.sleep(RECONNECT_DELAY)
|
||
|
||
|
||
# ---------------------------------------------------------------------------
|
||
# Entry point
|
||
# ---------------------------------------------------------------------------
|
||
|
||
if __name__ == "__main__":
|
||
logger.info("agw-forwarder starting station=%s agw=%s:%d api=%s",
|
||
STATION_CALL, AGW_HOST, AGW_PORT, INGEST_URL)
|
||
|
||
frame_queue: queue.Queue = queue.Queue(maxsize=MAX_QUEUE_SIZE)
|
||
|
||
t = threading.Thread(target=sender_thread, args=(frame_queue,), daemon=True)
|
||
t.start()
|
||
|
||
run_agw_reader(frame_queue) # blocks forever, reconnects on error
|