From 42ba6feed478101b7e5a45f780f9d83b87ccca79 Mon Sep 17 00:00:00 2001 From: Joakim Svensson Date: Sun, 26 Apr 2026 17:20:58 +0200 Subject: [PATCH] first commit --- .env.example | 21 +++ .gitignore | 26 +++ README.md | 104 ++++++++++++ agw-forwarder/agw-forwarder.service | 18 ++ agw-forwarder/agw_forwarder.py | 244 ++++++++++++++++++++++++++++ agw-forwarder/requirements.txt | 1 + collector/Dockerfile | 10 ++ collector/aprs_is.py | 72 ++++++++ collector/config.py | 32 ++++ collector/db.py | 107 ++++++++++++ collector/main.py | 194 ++++++++++++++++++++++ collector/requirements.txt | 4 + docker-compose.yml | 32 ++++ 13 files changed, 865 insertions(+) create mode 100644 .env.example create mode 100644 .gitignore create mode 100644 README.md create mode 100644 agw-forwarder/agw-forwarder.service create mode 100644 agw-forwarder/agw_forwarder.py create mode 100644 agw-forwarder/requirements.txt create mode 100644 collector/Dockerfile create mode 100644 collector/aprs_is.py create mode 100644 collector/config.py create mode 100644 collector/db.py create mode 100644 collector/main.py create mode 100644 collector/requirements.txt create mode 100644 docker-compose.yml diff --git a/.env.example b/.env.example new file mode 100644 index 0000000..436d180 --- /dev/null +++ b/.env.example @@ -0,0 +1,21 @@ +# --- DMZ (docker-compose) --- + +STATION_CALL=SA6ANW-1 +DB_PASSWORD=changeme + +# Generate with: python3 -c "import secrets; print(secrets.token_hex(32))" +API_KEY=replace-with-random-secret + +# APRS-IS receive-only (-1) is fine for data collection +APRS_IS_PASSCODE=-1 + +# 200 km radius around JO68II / Bor??s +APRS_IS_FILTER=r/58.35/14.05/200 + +LOG_LEVEL=INFO + + +# --- Shack (agw-forwarder) ??? copy relevant vars to shack machine --- +# COLLECTOR_URL=http://:8080 +# API_KEY= +# STATION_CALL=SA6ANW-1 diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..44c9860 --- /dev/null +++ b/.gitignore @@ -0,0 +1,26 @@ +# OS +.DS_Store +Thumbs.db + +# Node +node_modules/ +dist/ +dist-ssr/ +*.local +.npm +.eslintcache + +# Python +__pycache__/ +*.py[cod] +*$py.class +.venv/ +venv/ +ENV/ +.env + +# Docker +.dockerignore + +# Git +.git/ diff --git a/README.md b/README.md new file mode 100644 index 0000000..21f923c --- /dev/null +++ b/README.md @@ -0,0 +1,104 @@ +# aprs-collector + +``` +[Shack-LAN] [DMZ] +Direwolf AGW:8000 + ??? +agw_forwarder.py ??????POST/Bearer????????? FastAPI :8080/ingest/rf ????????? TimescaleDB + ??? + rotate.aprs2.net??? (APRS-IS, outbound) +``` + +--- + +## DMZ ??? snabbstart + +```bash +cp .env.example .env + +# Generera en API-nyckel +python3 -c "import secrets; print(secrets.token_hex(32))" +# Klistra in i .env som API_KEY + +docker compose up -d +docker compose logs -f collector +``` + +S??tt en reverse proxy (Caddy/nginx) framf??r port 8080 om du vill ha TLS. + +--- + +## Shack ??? agw-forwarder + +Kopiera mappen `agw-forwarder/` till Direwolf-datorn. + +```bash +pip3 install requests + +export AGW_HOST=localhost +export AGW_PORT=8000 +export COLLECTOR_URL=http://:8080 +export API_KEY= +export STATION_CALL=SA6ANW-1 + +python3 agw_forwarder.py +``` + +### K??r som systemd-tj??nst + +```bash +# Redigera agw-forwarder.service ??? fyll i COLLECTOR_URL och API_KEY +sudo cp agw-forwarder.service /etc/systemd/system/ +sudo systemctl daemon-reload +sudo systemctl enable --now agw-forwarder +journalctl -u agw-forwarder -f +``` + +--- + +## Resiliens + +Forwardern har en intern k?? (2000 frames). Om DMZ ??r tillf??lligt on??bar buffras +frames i minnet och skickas n??r anslutningen ??terkommer. Vid omstart av forwardern +f??rsvinner buffrade frames ??? tillr??ckligt f??r de flesta avbrott. + +--- + +## API + +| Endpoint | Metod | Auth | Beskrivning | +|---|---|---|---| +| `/ingest/rf` | POST | Bearer | RF-frame fr??n forwarder | +| `/health` | GET | ??? | Liveness check | + +Swagger UI: `http://:8080/docs` + +--- + +## Nyttiga queries + +```sql +-- Direkt-h??rda stationer senaste 7 dagarna +SELECT ts, src_call, lat, lon, path +FROM rf_frames +WHERE heard_direct = TRUE + AND ts > NOW() - INTERVAL '7 days' +ORDER BY ts DESC; + +-- Digis som h??rts senaste 30 min (= "online") +SELECT src_call, MAX(ts) AS last_seen, COUNT(*) AS frames +FROM rf_frames +WHERE ts > NOW() - INTERVAL '30 minutes' +GROUP BY src_call +ORDER BY last_seen DESC; + +-- T??ckningspunkter per rutn??tscell +SELECT + ROUND(lat::numeric, 2) AS grid_lat, + ROUND(lon::numeric, 2) AS grid_lon, + COUNT(*) AS hits +FROM rf_frames +WHERE heard_direct = TRUE + AND lat IS NOT NULL +GROUP BY grid_lat, grid_lon; +``` diff --git a/agw-forwarder/agw-forwarder.service b/agw-forwarder/agw-forwarder.service new file mode 100644 index 0000000..1c56b26 --- /dev/null +++ b/agw-forwarder/agw-forwarder.service @@ -0,0 +1,18 @@ +[Unit] +Description=APRS AGW Forwarder +After=network.target direwolf.service +Wants=direwolf.service + +[Service] +Type=simple +User=joakim +WorkingDirectory=/home/joakim/heardlog/agw-forwarder + +EnvironmentFile=/home/joakim/heardlog/.env + +ExecStart=/usr/bin/python3 /home/joakim/heardlog/agw-forwarder/agw_forwarder.py +Restart=always +RestartSec=10 + +[Install] +WantedBy=multi-user.target diff --git a/agw-forwarder/agw_forwarder.py b/agw-forwarder/agw_forwarder.py new file mode 100644 index 0000000..bb8ab52 --- /dev/null +++ b/agw-forwarder/agw_forwarder.py @@ -0,0 +1,244 @@ +#!/usr/bin/env python3 +""" +agw_forwarder.py – Reads APRS frames from Direwolf AGW port and forwards +them to the aprs-collector API in DMZ. + +Usage: + python3 agw_forwarder.py + +Configuration via environment variables (or edit DEFAULTS below): + AGW_HOST Direwolf host (default: localhost) + AGW_PORT Direwolf AGW port (default: 8000) + COLLECTOR_URL aprs-collector API URL (default: http://localhost:8080) + API_KEY shared secret (required) + STATION_CALL your callsign (default: SA6ANW-1) + LOG_LEVEL DEBUG / INFO (default: INFO) + +Install deps: + pip3 install requests + +Run as systemd service: see agw-forwarder.service +""" + +import json +import logging +import os +import queue +import re +import socket +import struct +import threading +import time +from datetime import datetime, timezone + +import requests + +# --------------------------------------------------------------------------- +# Configuration +# --------------------------------------------------------------------------- +AGW_HOST = os.getenv("AGW_HOST", "localhost") +AGW_PORT = int(os.getenv("AGW_PORT", "8000")) +COLLECTOR_URL = os.getenv("COLLECTOR_URL", "http://localhost:8080") +API_KEY = os.getenv("API_KEY", "") +STATION_CALL = os.getenv("STATION_CALL", "SA6ANW-1") +LOG_LEVEL = os.getenv("LOG_LEVEL", "INFO") + +INGEST_URL = f"{COLLECTOR_URL.rstrip('/')}/ingest/rf" +RECONNECT_DELAY = 10 # seconds between AGW reconnect attempts +MAX_QUEUE_SIZE = 2000 # frames buffered while API is unreachable +HTTP_TIMEOUT = 5 # seconds per POST +RETRY_DELAY = 5 # seconds between failed POST retries + +logging.basicConfig( + level=getattr(logging, LOG_LEVEL.upper(), logging.INFO), + format="%(asctime)s %(levelname)-8s %(message)s", + datefmt="%Y-%m-%dT%H:%M:%SZ", +) +logger = logging.getLogger(__name__) + +if not API_KEY: + raise SystemExit("ERROR: API_KEY environment variable is required") + +# --------------------------------------------------------------------------- +# AGW protocol helpers +# --------------------------------------------------------------------------- +_HEADER_SIZE = 36 + + +def _build_frame(kind: str, pid: int = 0, data: bytes = b"", port: int = 0, + call_from: str = "", call_to: str = "") -> bytes: + cf = call_from.encode("ascii").ljust(10, b"\x00")[:10] + ct = call_to.encode("ascii").ljust(10, b"\x00")[:10] + return ( + bytes([port, 0, 0, 0]) + + bytes([ord(kind), 0, pid, 0]) + + cf + ct + + struct.pack(" tuple: + """Return (port, kind, call_from, call_to, data_len).""" + port = raw[0] + kind = chr(raw[4]) + call_from = raw[8:18].rstrip(b"\x00").decode("ascii", errors="replace").strip() + call_to = raw[18:28].rstrip(b"\x00").decode("ascii", errors="replace").strip() + data_len = struct.unpack_from(" str: + """ + Extract the via path from a Direwolf AGW monitoring header. + + Direwolf sends 'U' frame data as: + [port:Fm CALL To CALL Via PATH [HH:MM:SS]]\r + + Extract PATH from between 'Via ' and ' bool: + """No '*' anywhere in path → frame arrived directly from the transmitter.""" + if not via_path: + return True + return not any("*" in hop for hop in via_path.split(",")) + + +def _recv_exact(sock: socket.socket, n: int) -> bytes: + buf = b"" + while len(buf) < n: + chunk = sock.recv(n - len(buf)) + if not chunk: + raise ConnectionError("AGW socket closed") + buf += chunk + return buf + + +# --------------------------------------------------------------------------- +# Sender thread – dequeues frames and POSTs to collector API +# --------------------------------------------------------------------------- + +def sender_thread(q: queue.Queue) -> None: + session = requests.Session() + session.headers.update({"Authorization": f"Bearer {API_KEY}", + "Content-Type": "application/json"}) + while True: + payload = q.get() + while True: + try: + r = session.post(INGEST_URL, data=payload, timeout=HTTP_TIMEOUT) + if r.status_code == 204: + break + elif r.status_code == 401: + logger.error("API_KEY rejected – check configuration") + time.sleep(30) + else: + logger.warning("POST returned %d – retrying", r.status_code) + time.sleep(RETRY_DELAY) + except requests.exceptions.RequestException as exc: + logger.warning("POST failed (%s) – retrying in %ds", exc, RETRY_DELAY) + time.sleep(RETRY_DELAY) + + +# --------------------------------------------------------------------------- +# AGW reader – main loop +# --------------------------------------------------------------------------- + +def run_agw_reader(q: queue.Queue) -> None: + while True: + sock = None + try: + logger.info("AGW: connecting to %s:%d", AGW_HOST, AGW_PORT) + sock = socket.create_connection((AGW_HOST, AGW_PORT), timeout=10) + sock.settimeout(None) # blocking reads after connect + + # Capabilities request (warms up connection) + sock.sendall(_build_frame("G")) + # Enable monitoring mode → receive all UI frames + sock.sendall(_build_frame("m")) + + logger.info("AGW: connected, monitoring enabled") + + while True: + hdr = _recv_exact(sock, _HEADER_SIZE) + _, kind, call_from, call_to, data_len = _parse_header(hdr) + + data = b"" + if data_len > 0: + data = _recv_exact(sock, data_len) + + if kind != "U": + continue # skip control frames + + try: + if b"\r" in data: + monitoring_raw, info_raw = data.split(b"\r", 1) + else: + monitoring_raw, info_raw = b"", data + + monitoring = monitoring_raw.decode("ascii", errors="replace").strip() + info = info_raw.decode("ascii", errors="replace") + + # monitoring line is "[port:Fm X To Y Via PATH ]" + # extract just the via hops from it + via_path = _parse_monitoring_via(monitoring) if "[" in monitoring else monitoring + direct = _is_heard_direct(via_path) + ts = datetime.now(timezone.utc).isoformat() + + payload = json.dumps({ + "ts": ts, + "src_call": call_from, + "dst_call": call_to, + "via_path": via_path, + "info": info, + "heard_direct": direct, + }) + + logger.info("RF %-6s %-9s [%s]", + "DIRECT" if direct else "VIA", call_from, via_path) + + if q.full(): + logger.warning("Queue full – dropping oldest frame") + try: + q.get_nowait() + except queue.Empty: + pass + q.put(payload) + + except Exception as exc: + logger.warning("Frame parse error: %s", exc) + + except (ConnectionError, OSError, TimeoutError) as exc: + logger.warning("AGW: %s – reconnecting in %ds", exc, RECONNECT_DELAY) + finally: + if sock: + try: + sock.close() + except Exception: + pass + + time.sleep(RECONNECT_DELAY) + + +# --------------------------------------------------------------------------- +# Entry point +# --------------------------------------------------------------------------- + +if __name__ == "__main__": + logger.info("agw-forwarder starting station=%s agw=%s:%d api=%s", + STATION_CALL, AGW_HOST, AGW_PORT, INGEST_URL) + + frame_queue: queue.Queue = queue.Queue(maxsize=MAX_QUEUE_SIZE) + + t = threading.Thread(target=sender_thread, args=(frame_queue,), daemon=True) + t.start() + + run_agw_reader(frame_queue) # blocks forever, reconnects on error diff --git a/agw-forwarder/requirements.txt b/agw-forwarder/requirements.txt new file mode 100644 index 0000000..0eb8cae --- /dev/null +++ b/agw-forwarder/requirements.txt @@ -0,0 +1 @@ +requests>=2.31.0 diff --git a/collector/Dockerfile b/collector/Dockerfile new file mode 100644 index 0000000..a72838a --- /dev/null +++ b/collector/Dockerfile @@ -0,0 +1,10 @@ +FROM python:3.12-slim + +WORKDIR /app + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +COPY . . + +CMD ["uvicorn", "main:app", "--host", "0.0.0.0", "--port", "8080"] diff --git a/collector/aprs_is.py b/collector/aprs_is.py new file mode 100644 index 0000000..7d4632b --- /dev/null +++ b/collector/aprs_is.py @@ -0,0 +1,72 @@ +import asyncio +import logging +from datetime import datetime, timezone +from typing import Awaitable, Callable + +logger = logging.getLogger(__name__) + +_KEEPALIVE_INTERVAL = 60 +_RECONNECT_DELAY = 30 + + +async def run_aprs_is_collector( + host: str, + port: int, + callsign: str, + passcode: str, + filter_str: str, + on_frame: Callable[..., Awaitable[None]], +) -> None: + writer = None + while True: + try: + logger.info("APRS-IS: connecting to %s:%d", host, port) + reader, writer = await asyncio.open_connection(host, port) + + banner = await asyncio.wait_for(reader.readline(), timeout=15) + logger.info("APRS-IS: %s", banner.decode(errors="replace").strip()) + + login_line = ( + f"user {callsign} pass {passcode} " + f"vers aprs-collector 0.1 " + f"filter {filter_str}\r\n" + ) + writer.write(login_line.encode()) + await writer.drain() + + ack = await asyncio.wait_for(reader.readline(), timeout=15) + logger.info("APRS-IS login: %s", ack.decode(errors="replace").strip()) + + async def _keepalive() -> None: + while True: + await asyncio.sleep(_KEEPALIVE_INTERVAL) + writer.write(b"#ping\r\n") + await writer.drain() + + ka_task = asyncio.create_task(_keepalive()) + try: + while True: + line = await reader.readline() + if not line: + logger.warning("APRS-IS: server closed connection") + break + decoded = line.decode("utf-8", errors="replace").strip() + if not decoded or decoded.startswith("#"): + continue + await on_frame(ts=datetime.now(timezone.utc), raw=decoded) + finally: + ka_task.cancel() + + except asyncio.TimeoutError: + logger.warning("APRS-IS: timeout during handshake") + except Exception as exc: + logger.error("APRS-IS: %s", exc) + finally: + if writer: + try: + writer.close() + except Exception: + pass + + logger.info("APRS-IS: reconnecting in %ds...", _RECONNECT_DELAY) + await asyncio.sleep(_RECONNECT_DELAY) diff --git a/collector/config.py b/collector/config.py new file mode 100644 index 0000000..82bfce6 --- /dev/null +++ b/collector/config.py @@ -0,0 +1,32 @@ +import os +from dataclasses import dataclass + + +@dataclass +class Config: + db_url: str + api_key: str + station_call: str + aprs_is_host: str + aprs_is_port: int + aprs_is_callsign: str + aprs_is_passcode: str + aprs_is_filter: str + log_level: str + + @classmethod + def from_env(cls) -> "Config": + api_key = os.environ.get("API_KEY", "") + if not api_key: + raise RuntimeError("API_KEY environment variable is required") + return cls( + db_url=os.getenv("DATABASE_URL", "postgresql://aprs:aprs@db:5432/aprs"), + api_key=api_key, + station_call=os.getenv("STATION_CALL", "SA6ANW-1"), + aprs_is_host=os.getenv("APRS_IS_HOST", "rotate.aprs2.net"), + aprs_is_port=int(os.getenv("APRS_IS_PORT", "14580")), + aprs_is_callsign=os.getenv("APRS_IS_CALLSIGN", "SA6ANW-1"), + aprs_is_passcode=os.getenv("APRS_IS_PASSCODE", "-1"), + aprs_is_filter=os.getenv("APRS_IS_FILTER", "r/58.35/14.05/200"), + log_level=os.getenv("LOG_LEVEL", "INFO"), + ) diff --git a/collector/db.py b/collector/db.py new file mode 100644 index 0000000..8b50040 --- /dev/null +++ b/collector/db.py @@ -0,0 +1,107 @@ +import logging +from datetime import datetime +from typing import Optional + +import asyncpg + +logger = logging.getLogger(__name__) + +_DDL = """ +CREATE EXTENSION IF NOT EXISTS timescaledb; +CREATE EXTENSION IF NOT EXISTS postgis; + +CREATE TABLE IF NOT EXISTS rf_frames ( + ts TIMESTAMPTZ NOT NULL, + rx_station TEXT NOT NULL, + src_call TEXT NOT NULL, + dst_call TEXT, + lat DOUBLE PRECISION, + lon DOUBLE PRECISION, + heard_direct BOOLEAN NOT NULL DEFAULT FALSE, + path TEXT, + info TEXT, + raw TEXT +); +SELECT create_hypertable('rf_frames', 'ts', if_not_exists => TRUE); +CREATE INDEX IF NOT EXISTS rf_frames_src_ts ON rf_frames (src_call, ts DESC); +CREATE INDEX IF NOT EXISTS rf_frames_direct ON rf_frames (heard_direct, ts DESC); +CREATE INDEX IF NOT EXISTS rf_frames_geo + ON rf_frames USING GIST (ST_MakePoint(lon, lat)) + WHERE heard_direct = TRUE AND lat IS NOT NULL AND lon IS NOT NULL; + +CREATE TABLE IF NOT EXISTS is_frames ( + ts TIMESTAMPTZ NOT NULL, + src_call TEXT NOT NULL, + lat DOUBLE PRECISION, + lon DOUBLE PRECISION, + path TEXT, + comment TEXT, + raw TEXT +); +SELECT create_hypertable('is_frames', 'ts', if_not_exists => TRUE); +CREATE INDEX IF NOT EXISTS is_frames_src_ts ON is_frames (src_call, ts DESC); +CREATE INDEX IF NOT EXISTS is_frames_geo + ON is_frames USING GIST (ST_MakePoint(lon, lat)) + WHERE lat IS NOT NULL AND lon IS NOT NULL; +""" + + +async def init_db(url: str) -> asyncpg.Pool: + pool = await asyncpg.create_pool(url, min_size=2, max_size=10) + async with pool.acquire() as conn: + await conn.execute(_DDL) + logger.info("Database schema ready") + return pool + + +def _clean(s: Optional[str]) -> Optional[str]: + """Strip null bytes ??? Postgres UTF8 rejects 0x00.""" + if s is None: + return None + return s.replace("\x00", "") + + +async def insert_rf_frame( + pool: asyncpg.Pool, + ts: datetime, + rx_station: str, + src_call: str, + dst_call: Optional[str], + lat: Optional[float], + lon: Optional[float], + heard_direct: bool, + path: str, + info: str, + raw: str, +) -> None: + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO rf_frames + (ts, rx_station, src_call, dst_call, lat, lon, + heard_direct, path, info, raw) + VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) + """, + ts, rx_station, _clean(src_call), _clean(dst_call), lat, lon, + heard_direct, _clean(path), _clean(info), _clean(raw), + ) + + +async def insert_is_frame( + pool: asyncpg.Pool, + ts: datetime, + src_call: str, + lat: Optional[float], + lon: Optional[float], + path: str, + comment: Optional[str], + raw: str, +) -> None: + async with pool.acquire() as conn: + await conn.execute( + """ + INSERT INTO is_frames (ts, src_call, lat, lon, path, comment, raw) + VALUES ($1,$2,$3,$4,$5,$6,$7) + """, + ts, _clean(src_call), lat, lon, _clean(path), _clean(comment), _clean(raw), + ) diff --git a/collector/main.py b/collector/main.py new file mode 100644 index 0000000..ebfe63a --- /dev/null +++ b/collector/main.py @@ -0,0 +1,194 @@ +""" +APRS collector ??? DMZ service. + + POST /ingest/rf ??? agw-forwarder on shack machine pushes RF frames here + GET /health ??? Docker / uptime check + +APRS-IS collector runs as a background asyncio task alongside uvicorn. +""" + +import asyncio +import logging +import sys +from contextlib import asynccontextmanager +from datetime import datetime, timezone +from typing import Annotated, Optional + +import aprslib +import uvicorn +from fastapi import Depends, FastAPI, HTTPException, status +from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer +from pydantic import BaseModel + +from aprs_is import run_aprs_is_collector +from config import Config +from db import init_db, insert_is_frame, insert_rf_frame + +logger = logging.getLogger(__name__) + +# --------------------------------------------------------------------------- +# App state ??? populated during startup +# --------------------------------------------------------------------------- +_pool = None +_config: Optional[Config] = None + + +# --------------------------------------------------------------------------- +# Startup / shutdown +# --------------------------------------------------------------------------- + +@asynccontextmanager +async def lifespan(app: FastAPI): + global _pool, _config + + _config = Config.from_env() + logging.basicConfig( + level=getattr(logging, _config.log_level.upper(), logging.INFO), + format="%(asctime)s %(levelname)-8s %(name)s %(message)s", + stream=sys.stdout, + ) + + # Wait for Postgres + for attempt in range(1, 61): + try: + _pool = await init_db(_config.db_url) + break + except Exception as exc: + logger.warning("DB not ready (%d/60): %s", attempt, exc) + await asyncio.sleep(5) + else: + logger.error("Could not connect to database ??? giving up") + sys.exit(1) + + # Start APRS-IS collector as background task + is_task = asyncio.create_task(_run_aprs_is()) + logger.info("Startup complete ??? listening for RF frames") + + yield # ??? app runs here + + is_task.cancel() + try: + await is_task + except asyncio.CancelledError: + pass + + +async def _run_aprs_is() -> None: + async def on_is_frame(*, ts: datetime, raw: str) -> None: + try: + src_call = raw.split(">")[0] if ">" in raw else "" + lat, lon, comment = _parse_position(raw) + path = _extract_path(raw) + await insert_is_frame(_pool, ts, src_call, lat, lon, path, comment, raw) + logger.debug("IS %-9s lat=%s lon=%s", src_call, + f"{lat:.4f}" if lat else "-", + f"{lon:.4f}" if lon else "-") + except Exception as exc: + logger.warning("IS frame error: %s raw=%r", exc, raw[:80]) + + await run_aprs_is_collector( + host=_config.aprs_is_host, + port=_config.aprs_is_port, + callsign=_config.aprs_is_callsign, + passcode=_config.aprs_is_passcode, + filter_str=_config.aprs_is_filter, + on_frame=on_is_frame, + ) + + +# --------------------------------------------------------------------------- +# FastAPI app +# --------------------------------------------------------------------------- + +app = FastAPI(title="aprs-collector", lifespan=lifespan) +_bearer = HTTPBearer() + + +def _require_api_key( + creds: Annotated[HTTPAuthorizationCredentials, Depends(_bearer)], +) -> None: + if creds.credentials != _config.api_key: + raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, + detail="Invalid API key") + + +# --------------------------------------------------------------------------- +# Models +# --------------------------------------------------------------------------- + +class RFFrameIn(BaseModel): + ts: datetime # UTC timestamp from forwarder + src_call: str + dst_call: str + via_path: str # comma-separated, may contain '*' + info: str # APRS info field + heard_direct: bool + + +# --------------------------------------------------------------------------- +# Helpers +# --------------------------------------------------------------------------- + +def _parse_position(packet: str) -> tuple[Optional[float], Optional[float], Optional[str]]: + try: + p = aprslib.parse(packet) + return p.get("latitude"), p.get("longitude"), p.get("comment") + except Exception: + return None, None, None + + +def _extract_path(raw: str) -> str: + try: + header = raw.split(":")[0] + parts = header.split(",") + return ",".join(parts[1:]) if len(parts) > 1 else "" + except Exception: + return "" + + +# --------------------------------------------------------------------------- +# Endpoints +# --------------------------------------------------------------------------- + +@app.get("/health") +async def health(): + return {"status": "ok"} + + +@app.post("/ingest/rf", status_code=status.HTTP_204_NO_CONTENT, + dependencies=[Depends(_require_api_key)]) +async def ingest_rf(frame: RFFrameIn) -> None: + """Receive a single APRS RF frame from the shack forwarder.""" + try: + # Build TNC2 string so aprslib can parse position + tnc2 = f"{frame.src_call}>{frame.dst_call}" + if frame.via_path: + tnc2 += f",{frame.via_path}" + tnc2 += f":{frame.info}" + + lat, lon, _ = _parse_position(tnc2) + + logger.info( + "RF %-6s %-9s [%s] lat=%s lon=%s", + "DIRECT" if frame.heard_direct else "VIA", + frame.src_call, + frame.via_path, + f"{lat:.4f}" if lat else "-", + f"{lon:.4f}" if lon else "-", + ) + + await insert_rf_frame( + pool=_pool, + ts=frame.ts, + rx_station=_config.station_call, + src_call=frame.src_call, + dst_call=frame.dst_call, + lat=lat, + lon=lon, + heard_direct=frame.heard_direct, + path=frame.via_path, + info=frame.info, + raw=tnc2, + ) + except Exception as exc: + logger.warning("ingest_rf error (frame dropped): %s", exc) diff --git a/collector/requirements.txt b/collector/requirements.txt new file mode 100644 index 0000000..83cf3d6 --- /dev/null +++ b/collector/requirements.txt @@ -0,0 +1,4 @@ +fastapi>=0.111.0 +uvicorn[standard]>=0.29.0 +asyncpg>=0.29.0 +aprslib>=0.7.0 diff --git a/docker-compose.yml b/docker-compose.yml new file mode 100644 index 0000000..0724f15 --- /dev/null +++ b/docker-compose.yml @@ -0,0 +1,32 @@ +services: + db: + image: timescale/timescaledb-ha:pg16-all + environment: + POSTGRES_DB: aprs + POSTGRES_USER: aprs + POSTGRES_PASSWORD: ${DB_PASSWORD:-aprs} + volumes: + - ./data/postgres:/var/lib/postgresql/data + restart: unless-stopped + healthcheck: + test: ["CMD-SHELL", "pg_isready -U aprs -d aprs"] + interval: 10s + timeout: 5s + retries: 10 + + collector: + build: ./collector + ports: + - "8085:8080" # expose API ??? put Caddy/nginx in front for TLS + environment: + DATABASE_URL: postgresql://aprs:${DB_PASSWORD:-aprs}@db:5432/aprs + API_KEY: ${API_KEY} # shared secret with forwarder + STATION_CALL: ${STATION_CALL:-SA6ANW-1} + APRS_IS_CALLSIGN: ${STATION_CALL:-SA6ANW-1} + APRS_IS_PASSCODE: ${APRS_IS_PASSCODE:--1} + APRS_IS_FILTER: ${APRS_IS_FILTER:-r/58.35/14.05/200} + LOG_LEVEL: ${LOG_LEVEL:-INFO} + restart: unless-stopped + depends_on: + db: + condition: service_healthy