""" APRS collector ??? DMZ service. POST /ingest/rf ??? agw-forwarder on shack machine pushes RF frames here GET /health ??? Docker / uptime check APRS-IS collector runs as a background asyncio task alongside uvicorn. """ import asyncio import logging import sys from contextlib import asynccontextmanager from datetime import datetime, timezone from typing import Annotated, Optional import aprslib import uvicorn from fastapi import Depends, FastAPI, HTTPException, status from fastapi.middleware.cors import CORSMiddleware from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer from pydantic import BaseModel from aprs_is import run_aprs_is_collector from config import Config from db import init_db, insert_is_frame, insert_rf_frame from maidenhead import latlon_to_subsquare, subsquare_bounds import re as _re def _extract_digis(path: str) -> str: """ Reduce a raw APRS path to the actual physical digipeaters that handled it. WIDE*, RELAY, TRACE etc are aliases and are stripped. Returns 'DIRECT' if no real digi hops remain. """ if not path or path == 'Direkt': return 'DIRECT' _ALIAS = _re.compile(r'^(WIDE|RELAY|TRACE|ECHO|GATE|TCPIP|TCPXX|NOGATE|RFONLY|qA[A-Z])', _re.IGNORECASE) hops = [h.strip() for h in path.split(',')] digis = [h for h in hops if h and not _ALIAS.match(h)] return ','.join(digis) if digis else 'DIRECT' logger = logging.getLogger(__name__) # --------------------------------------------------------------------------- # App state ??? populated during startup # --------------------------------------------------------------------------- _pool = None _config: Optional[Config] = None # --------------------------------------------------------------------------- # Startup / shutdown # --------------------------------------------------------------------------- @asynccontextmanager async def lifespan(app: FastAPI): global _pool, _config _config = Config.from_env() logging.basicConfig( level=getattr(logging, _config.log_level.upper(), logging.INFO), format="%(asctime)s %(levelname)-8s %(name)s %(message)s", stream=sys.stdout, ) # Wait for Postgres for attempt in range(1, 61): try: _pool = await init_db(_config.db_url) break except Exception as exc: logger.warning("DB not ready (%d/60): %s", attempt, exc) await asyncio.sleep(5) else: logger.error("Could not connect to database ??? giving up") sys.exit(1) # Start APRS-IS collector as background task is_task = asyncio.create_task(_run_aprs_is()) logger.info("Startup complete ??? listening for RF frames") yield # ??? app runs here is_task.cancel() try: await is_task except asyncio.CancelledError: pass async def _run_aprs_is() -> None: async def on_is_frame(*, ts: datetime, raw: str) -> None: try: src_call = raw.split(">")[0] if ">" in raw else "" lat, lon, comment = _parse_position(raw) path = _extract_path(raw) await insert_is_frame(_pool, ts, src_call, lat, lon, path, comment, raw) logger.debug("IS %-9s lat=%s lon=%s", src_call, f"{lat:.4f}" if lat else "-", f"{lon:.4f}" if lon else "-") except Exception as exc: logger.warning("IS frame error: %s raw=%r", exc, raw[:80]) await run_aprs_is_collector( host=_config.aprs_is_host, port=_config.aprs_is_port, callsign=_config.aprs_is_callsign, passcode=_config.aprs_is_passcode, filter_str=_config.aprs_is_filter, on_frame=on_is_frame, ) # --------------------------------------------------------------------------- # FastAPI app # --------------------------------------------------------------------------- app = FastAPI(title="aprs-collector", lifespan=lifespan) app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"]) _bearer = HTTPBearer() def _require_api_key( creds: Annotated[HTTPAuthorizationCredentials, Depends(_bearer)], ) -> None: if creds.credentials != _config.api_key: raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid API key") # --------------------------------------------------------------------------- # Models # --------------------------------------------------------------------------- class RFFrameIn(BaseModel): ts: datetime # UTC timestamp from forwarder src_call: str dst_call: str via_path: str # comma-separated, may contain '*' info: str # APRS info field heard_direct: bool # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _parse_position(packet: str) -> tuple[Optional[float], Optional[float], Optional[str]]: try: p = aprslib.parse(packet) return p.get("latitude"), p.get("longitude"), p.get("comment") except Exception: return None, None, None def _extract_path(raw: str) -> str: try: header = raw.split(":")[0] parts = header.split(",") return ",".join(parts[1:]) if len(parts) > 1 else "" except Exception: return "" # --------------------------------------------------------------------------- # Endpoints # --------------------------------------------------------------------------- @app.get("/health") async def health(): return {"status": "ok"} @app.post("/ingest/rf", status_code=status.HTTP_204_NO_CONTENT, dependencies=[Depends(_require_api_key)]) async def ingest_rf(frame: RFFrameIn) -> None: """Receive a single APRS RF frame from the shack forwarder.""" try: # Build TNC2 string so aprslib can parse position tnc2 = f"{frame.src_call}>{frame.dst_call}" if frame.via_path: tnc2 += f",{frame.via_path}" tnc2 += f":{frame.info}" lat, lon, _ = _parse_position(tnc2) logger.info( "RF %-6s %-9s [%s] lat=%s lon=%s", "DIRECT" if frame.heard_direct else "VIA", frame.src_call, frame.via_path, f"{lat:.4f}" if lat else "-", f"{lon:.4f}" if lon else "-", ) await insert_rf_frame( pool=_pool, ts=frame.ts, rx_station=_config.station_call, src_call=frame.src_call, dst_call=frame.dst_call, lat=lat, lon=lon, heard_direct=frame.heard_direct, path=frame.via_path, info=frame.info, raw=tnc2, ) except Exception as exc: logger.warning("ingest_rf error (frame dropped): %s", exc) # --------------------------------------------------------------------------- # Coverage endpoints # --------------------------------------------------------------------------- @app.get("/coverage/squares") async def coverage_squares(): """ Return all Maidenhead subsquares with path breakdown per square. """ async with _pool.acquire() as conn: rows = await conn.fetch( """ SELECT lat, lon, path, COUNT(*) AS hits FROM rf_frames WHERE lat IS NOT NULL AND lon IS NOT NULL GROUP BY lat, lon, path """ ) # Second query: first_seen per lat/lon async with _pool.acquire() as conn: firstseen_rows = await conn.fetch( """ SELECT lat, lon, MIN(ts) AS first_seen FROM rf_frames WHERE lat IS NOT NULL AND lon IS NOT NULL GROUP BY lat, lon """ ) # Build first_seen per square sq_first_seen: dict[str, str] = {} for row in firstseen_rows: sq = latlon_to_subsquare(row['lat'], row['lon']) ts = row['first_seen'].isoformat() if sq not in sq_first_seen or ts < sq_first_seen[sq]: sq_first_seen[sq] = ts # Third query: stations per square async with _pool.acquire() as conn: station_rows = await conn.fetch( """ SELECT lat, lon, src_call, COUNT(*) AS hits FROM rf_frames WHERE lat IS NOT NULL AND lon IS NOT NULL GROUP BY lat, lon, src_call """ ) squares: dict[str, dict] = {} for row in rows: sq = latlon_to_subsquare(row["lat"], row["lon"]) if sq not in squares: squares[sq] = {"paths": {}, "stations": {}} p = _extract_digis(row["path"] or "") squares[sq]["paths"][p] = squares[sq]["paths"].get(p, 0) + row["hits"] for row in station_rows: sq = latlon_to_subsquare(row["lat"], row["lon"]) if sq not in squares: squares[sq] = {"paths": {}, "stations": {}} call = row["src_call"] squares[sq]["stations"][call] = squares[sq]["stations"].get(call, 0) + row["hits"] result = [] for sq, data in squares.items(): bounds = subsquare_bounds(sq) total = sum(data["paths"].values()) sorted_paths = sorted(data["paths"].items(), key=lambda x: x[1], reverse=True) sorted_stations = sorted(data["stations"].items(), key=lambda x: x[1], reverse=True) import math sq_lat = (bounds['lat_min'] + bounds['lat_max']) / 2 sq_lon = (bounds['lon_min'] + bounds['lon_max']) / 2 dlat = math.radians(sq_lat - 58.35) dlon = math.radians(sq_lon - 14.05) a = math.sin(dlat/2)**2 + math.cos(math.radians(58.35)) * math.cos(math.radians(sq_lat)) * math.sin(dlon/2)**2 dist_km = round(6371 * 2 * math.asin(math.sqrt(a)), 1) result.append({ "square": sq, "hits": total, "first_seen": sq_first_seen.get(sq), "dist_km": dist_km, "paths": [{"path": p, "count": c} for p, c in sorted_paths], "stations": [{"call": s, "count": c} for s, c in sorted_stations], **bounds, }) return {"squares": result} @app.get("/coverage/points") async def coverage_points(): """ Return individual heard-direct observations with position. Used for the zoomed-in map layer. """ async with _pool.acquire() as conn: rows = await conn.fetch( """ SELECT DISTINCT ON (src_call, ROUND(lat::numeric,4), ROUND(lon::numeric,4)) src_call, ROUND(lat::numeric, 4) AS lat, ROUND(lon::numeric, 4) AS lon, ts FROM rf_frames WHERE heard_direct = TRUE AND lat IS NOT NULL AND lon IS NOT NULL ORDER BY src_call, ROUND(lat::numeric,4), ROUND(lon::numeric,4), ts DESC """ ) return { "points": [ { "call": row["src_call"], "lat": float(row["lat"]), "lon": float(row["lon"]), "ts": row["ts"].isoformat(), } for row in rows ] } @app.get("/coverage/station/{callsign}") async def coverage_station(callsign: str): """Detailed statistics for a single station.""" import math def haversine(lat1, lon1, lat2, lon2): R = 6371 dlat = math.radians(lat2 - lat1) dlon = math.radians(lon2 - lon1) a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2 return R * 2 * math.asin(math.sqrt(a)) rx_lat = float(_config.station_call and 58.35) rx_lon = 14.05 async with _pool.acquire() as conn: row = await conn.fetchrow(""" SELECT COUNT(*) AS total_frames, MIN(ts) AS first_heard, MAX(ts) AS last_heard, COUNT(DISTINCT ROUND(lat::numeric,3) || ',' || ROUND(lon::numeric,3)) AS unique_positions, COUNT(DISTINCT path) AS unique_paths, AVG(lat) AS avg_lat, AVG(lon) AS avg_lon FROM rf_frames WHERE src_call = $1 AND lat IS NOT NULL """, callsign) path_rows = await conn.fetch(""" SELECT path, COUNT(*) AS cnt FROM rf_frames WHERE src_call = $1 AND lat IS NOT NULL GROUP BY path ORDER BY cnt DESC LIMIT 5 """, callsign) square_rows = await conn.fetch(""" SELECT lat, lon, COUNT(*) AS cnt FROM rf_frames WHERE src_call = $1 AND lat IS NOT NULL GROUP BY lat, lon ORDER BY cnt DESC """, callsign) if not row or row["total_frames"] == 0: raise HTTPException(status_code=404, detail="Station not found") # Distance from each unique position to RX distances = [] squares_seen = set() for r in square_rows: d = haversine(rx_lat, rx_lon, r["lat"], r["lon"]) distances.append(d) squares_seen.add(latlon_to_subsquare(r["lat"], r["lon"])) return { "callsign": callsign, "total_frames": row["total_frames"], "first_heard": row["first_heard"].isoformat(), "last_heard": row["last_heard"].isoformat(), "unique_positions": row["unique_positions"], "unique_paths": row["unique_paths"], "squares": sorted(squares_seen), "distance_min_km": round(min(distances), 1) if distances else None, "distance_max_km": round(max(distances), 1) if distances else None, "distance_avg_km": round(sum(distances)/len(distances), 1) if distances else None, "top_paths": [{"path": _extract_digis(r["path"] or ""), "count": r["cnt"]} for r in path_rows], } @app.get("/coverage/digis") async def coverage_digis(): """ Return latest known position for every callsign we have heard. Used client-side to resolve digi positions for path drawing. """ async with _pool.acquire() as conn: rows = await conn.fetch( """ SELECT DISTINCT ON (src_call) src_call, lat, lon, ts FROM rf_frames WHERE lat IS NOT NULL AND lon IS NOT NULL ORDER BY src_call, ts DESC """ ) return { "digis": { row["src_call"]: { "lat": row["lat"], "lon": row["lon"], "ts": row["ts"].isoformat(), } for row in rows } }