Files
heardlog/collector/main.py~
2026-05-02 18:16:56 +00:00

443 lines
15 KiB
Python

"""
APRS collector ??? DMZ service.
POST /ingest/rf ??? agw-forwarder on shack machine pushes RF frames here
GET /health ??? Docker / uptime check
APRS-IS collector runs as a background asyncio task alongside uvicorn.
"""
import asyncio
import logging
import sys
from contextlib import asynccontextmanager
from datetime import datetime, timezone
from typing import Annotated, Optional
import aprslib
import uvicorn
from fastapi import Depends, FastAPI, HTTPException, status
from fastapi.middleware.cors import CORSMiddleware
from fastapi.security import HTTPAuthorizationCredentials, HTTPBearer
from pydantic import BaseModel
from aprs_is import run_aprs_is_collector
from config import Config
from db import init_db, insert_is_frame, insert_rf_frame
from maidenhead import latlon_to_subsquare, subsquare_bounds
import re as _re
def _extract_digis(path: str) -> str:
"""
Reduce a raw APRS path to the actual physical digipeaters that handled it.
WIDE*, RELAY, TRACE etc are aliases and are stripped.
Returns 'DIRECT' if no real digi hops remain.
"""
if not path or path == 'Direkt':
return 'DIRECT'
_ALIAS = _re.compile(r'^(WIDE|RELAY|TRACE|ECHO|GATE|TCPIP|TCPXX|NOGATE|RFONLY|qA[A-Z])', _re.IGNORECASE)
hops = [h.strip() for h in path.split(',')]
digis = [h for h in hops if h and not _ALIAS.match(h)]
return ','.join(digis) if digis else 'DIRECT'
logger = logging.getLogger(__name__)
# ---------------------------------------------------------------------------
# App state ??? populated during startup
# ---------------------------------------------------------------------------
_pool = None
_config: Optional[Config] = None
# ---------------------------------------------------------------------------
# Startup / shutdown
# ---------------------------------------------------------------------------
@asynccontextmanager
async def lifespan(app: FastAPI):
global _pool, _config
_config = Config.from_env()
logging.basicConfig(
level=getattr(logging, _config.log_level.upper(), logging.INFO),
format="%(asctime)s %(levelname)-8s %(name)s %(message)s",
stream=sys.stdout,
)
# Wait for Postgres
for attempt in range(1, 61):
try:
_pool = await init_db(_config.db_url)
break
except Exception as exc:
logger.warning("DB not ready (%d/60): %s", attempt, exc)
await asyncio.sleep(5)
else:
logger.error("Could not connect to database ??? giving up")
sys.exit(1)
# Start APRS-IS collector as background task
is_task = asyncio.create_task(_run_aprs_is())
logger.info("Startup complete ??? listening for RF frames")
yield # ??? app runs here
is_task.cancel()
try:
await is_task
except asyncio.CancelledError:
pass
async def _run_aprs_is() -> None:
async def on_is_frame(*, ts: datetime, raw: str) -> None:
try:
src_call = raw.split(">")[0] if ">" in raw else ""
lat, lon, comment = _parse_position(raw)
path = _extract_path(raw)
await insert_is_frame(_pool, ts, src_call, lat, lon, path, comment, raw)
logger.debug("IS %-9s lat=%s lon=%s", src_call,
f"{lat:.4f}" if lat else "-",
f"{lon:.4f}" if lon else "-")
except Exception as exc:
logger.warning("IS frame error: %s raw=%r", exc, raw[:80])
await run_aprs_is_collector(
host=_config.aprs_is_host,
port=_config.aprs_is_port,
callsign=_config.aprs_is_callsign,
passcode=_config.aprs_is_passcode,
filter_str=_config.aprs_is_filter,
on_frame=on_is_frame,
)
# ---------------------------------------------------------------------------
# FastAPI app
# ---------------------------------------------------------------------------
app = FastAPI(title="aprs-collector", lifespan=lifespan)
app.add_middleware(CORSMiddleware, allow_origins=["*"], allow_methods=["*"], allow_headers=["*"])
_bearer = HTTPBearer()
def _require_api_key(
creds: Annotated[HTTPAuthorizationCredentials, Depends(_bearer)],
) -> None:
if creds.credentials != _config.api_key:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED,
detail="Invalid API key")
# ---------------------------------------------------------------------------
# Models
# ---------------------------------------------------------------------------
class RFFrameIn(BaseModel):
ts: datetime # UTC timestamp from forwarder
src_call: str
dst_call: str
via_path: str # comma-separated, may contain '*'
info: str # APRS info field
heard_direct: bool
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def _parse_position(packet: str) -> tuple[Optional[float], Optional[float], Optional[str]]:
try:
p = aprslib.parse(packet)
return p.get("latitude"), p.get("longitude"), p.get("comment")
except Exception:
return None, None, None
def _extract_path(raw: str) -> str:
try:
header = raw.split(":")[0]
parts = header.split(",")
return ",".join(parts[1:]) if len(parts) > 1 else ""
except Exception:
return ""
# ---------------------------------------------------------------------------
# Endpoints
# ---------------------------------------------------------------------------
@app.get("/health")
async def health():
return {"status": "ok"}
@app.post("/ingest/rf", status_code=status.HTTP_204_NO_CONTENT,
dependencies=[Depends(_require_api_key)])
async def ingest_rf(frame: RFFrameIn) -> None:
"""Receive a single APRS RF frame from the shack forwarder."""
try:
# Build TNC2 string so aprslib can parse position
tnc2 = f"{frame.src_call}>{frame.dst_call}"
if frame.via_path:
tnc2 += f",{frame.via_path}"
tnc2 += f":{frame.info}"
lat, lon, _ = _parse_position(tnc2)
logger.info(
"RF %-6s %-9s [%s] lat=%s lon=%s",
"DIRECT" if frame.heard_direct else "VIA",
frame.src_call,
frame.via_path,
f"{lat:.4f}" if lat else "-",
f"{lon:.4f}" if lon else "-",
)
await insert_rf_frame(
pool=_pool,
ts=frame.ts,
rx_station=_config.station_call,
src_call=frame.src_call,
dst_call=frame.dst_call,
lat=lat,
lon=lon,
heard_direct=frame.heard_direct,
path=frame.via_path,
info=frame.info,
raw=tnc2,
)
except Exception as exc:
logger.warning("ingest_rf error (frame dropped): %s", exc)
# ---------------------------------------------------------------------------
# Coverage endpoints
# ---------------------------------------------------------------------------
@app.get("/coverage/squares")
async def coverage_squares():
"""
Return all Maidenhead subsquares with path breakdown per square.
"""
async with _pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT lat, lon, path, COUNT(*) AS hits
FROM rf_frames
WHERE lat IS NOT NULL
AND lon IS NOT NULL
GROUP BY lat, lon, path
"""
)
# Second query: first_seen per lat/lon
async with _pool.acquire() as conn:
firstseen_rows = await conn.fetch(
"""
SELECT lat, lon, MIN(ts) AS first_seen
FROM rf_frames
WHERE lat IS NOT NULL AND lon IS NOT NULL
GROUP BY lat, lon
"""
)
# Build first_seen per square
sq_first_seen: dict[str, str] = {}
for row in firstseen_rows:
sq = latlon_to_subsquare(row['lat'], row['lon'])
ts = row['first_seen'].isoformat()
if sq not in sq_first_seen or ts < sq_first_seen[sq]:
sq_first_seen[sq] = ts
# Third query: stations per square
async with _pool.acquire() as conn:
station_rows = await conn.fetch(
"""
SELECT lat, lon, src_call, COUNT(*) AS hits
FROM rf_frames
WHERE lat IS NOT NULL AND lon IS NOT NULL
GROUP BY lat, lon, src_call
"""
)
squares: dict[str, dict] = {}
for row in rows:
sq = latlon_to_subsquare(row["lat"], row["lon"])
if sq not in squares:
squares[sq] = {"paths": {}, "stations": {}}
p = _extract_digis(row["path"] or "")
squares[sq]["paths"][p] = squares[sq]["paths"].get(p, 0) + row["hits"]
for row in station_rows:
sq = latlon_to_subsquare(row["lat"], row["lon"])
if sq not in squares:
squares[sq] = {"paths": {}, "stations": {}}
call = row["src_call"]
squares[sq]["stations"][call] = squares[sq]["stations"].get(call, 0) + row["hits"]
result = []
for sq, data in squares.items():
bounds = subsquare_bounds(sq)
total = sum(data["paths"].values())
sorted_paths = sorted(data["paths"].items(), key=lambda x: x[1], reverse=True)
sorted_stations = sorted(data["stations"].items(), key=lambda x: x[1], reverse=True)
import math
sq_lat = (bounds['lat_min'] + bounds['lat_max']) / 2
sq_lon = (bounds['lon_min'] + bounds['lon_max']) / 2
dlat = math.radians(sq_lat - 58.35)
dlon = math.radians(sq_lon - 14.05)
a = math.sin(dlat/2)**2 + math.cos(math.radians(58.35)) * math.cos(math.radians(sq_lat)) * math.sin(dlon/2)**2
dist_km = round(6371 * 2 * math.asin(math.sqrt(a)), 1)
result.append({
"square": sq,
"hits": total,
"first_seen": sq_first_seen.get(sq),
"dist_km": dist_km,
"paths": [{"path": p, "count": c} for p, c in sorted_paths],
"stations": [{"call": s, "count": c} for s, c in sorted_stations],
**bounds,
})
return {"squares": result}
@app.get("/coverage/points")
async def coverage_points():
"""
Return individual heard-direct observations with position.
Used for the zoomed-in map layer.
"""
async with _pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT DISTINCT ON (src_call, ROUND(lat::numeric,4), ROUND(lon::numeric,4))
src_call,
ROUND(lat::numeric, 4) AS lat,
ROUND(lon::numeric, 4) AS lon,
ts
FROM rf_frames
WHERE heard_direct = TRUE
AND lat IS NOT NULL
AND lon IS NOT NULL
ORDER BY src_call, ROUND(lat::numeric,4), ROUND(lon::numeric,4), ts DESC
"""
)
return {
"points": [
{
"call": row["src_call"],
"lat": float(row["lat"]),
"lon": float(row["lon"]),
"ts": row["ts"].isoformat(),
}
for row in rows
]
}
@app.get("/coverage/station/{callsign}")
async def coverage_station(callsign: str):
"""Detailed statistics for a single station."""
import math
def haversine(lat1, lon1, lat2, lon2):
R = 6371
dlat = math.radians(lat2 - lat1)
dlon = math.radians(lon2 - lon1)
a = math.sin(dlat/2)**2 + math.cos(math.radians(lat1)) * math.cos(math.radians(lat2)) * math.sin(dlon/2)**2
return R * 2 * math.asin(math.sqrt(a))
rx_lat = float(_config.station_call and 58.35)
rx_lon = 14.05
async with _pool.acquire() as conn:
row = await conn.fetchrow("""
SELECT
COUNT(*) AS total_frames,
MIN(ts) AS first_heard,
MAX(ts) AS last_heard,
COUNT(DISTINCT ROUND(lat::numeric,3) || ',' || ROUND(lon::numeric,3))
AS unique_positions,
COUNT(DISTINCT path) AS unique_paths,
AVG(lat) AS avg_lat,
AVG(lon) AS avg_lon
FROM rf_frames
WHERE src_call = $1
AND lat IS NOT NULL
""", callsign)
path_rows = await conn.fetch("""
SELECT path, COUNT(*) AS cnt
FROM rf_frames
WHERE src_call = $1 AND lat IS NOT NULL
GROUP BY path
ORDER BY cnt DESC
LIMIT 5
""", callsign)
square_rows = await conn.fetch("""
SELECT lat, lon, COUNT(*) AS cnt
FROM rf_frames
WHERE src_call = $1 AND lat IS NOT NULL
GROUP BY lat, lon
ORDER BY cnt DESC
""", callsign)
if not row or row["total_frames"] == 0:
raise HTTPException(status_code=404, detail="Station not found")
# Distance from each unique position to RX
distances = []
squares_seen = set()
for r in square_rows:
d = haversine(rx_lat, rx_lon, r["lat"], r["lon"])
distances.append(d)
squares_seen.add(latlon_to_subsquare(r["lat"], r["lon"]))
return {
"callsign": callsign,
"total_frames": row["total_frames"],
"first_heard": row["first_heard"].isoformat(),
"last_heard": row["last_heard"].isoformat(),
"unique_positions": row["unique_positions"],
"unique_paths": row["unique_paths"],
"squares": sorted(squares_seen),
"distance_min_km": round(min(distances), 1) if distances else None,
"distance_max_km": round(max(distances), 1) if distances else None,
"distance_avg_km": round(sum(distances)/len(distances), 1) if distances else None,
"top_paths": [{"path": _extract_digis(r["path"] or ""), "count": r["cnt"]} for r in path_rows],
}
@app.get("/coverage/digis")
async def coverage_digis():
"""
Return latest known position for every callsign we have heard.
Used client-side to resolve digi positions for path drawing.
"""
async with _pool.acquire() as conn:
rows = await conn.fetch(
"""
SELECT DISTINCT ON (src_call)
src_call, lat, lon, ts
FROM rf_frames
WHERE lat IS NOT NULL AND lon IS NOT NULL
ORDER BY src_call, ts DESC
"""
)
return {
"digis": {
row["src_call"]: {
"lat": row["lat"],
"lon": row["lon"],
"ts": row["ts"].isoformat(),
}
for row in rows
}
}