Files
heardlog/collector/db.py
Joakim Svensson 42ba6feed4 first commit
2026-04-26 17:20:58 +02:00

108 lines
3.1 KiB
Python

import logging
from datetime import datetime
from typing import Optional
import asyncpg
logger = logging.getLogger(__name__)
_DDL = """
CREATE EXTENSION IF NOT EXISTS timescaledb;
CREATE EXTENSION IF NOT EXISTS postgis;
CREATE TABLE IF NOT EXISTS rf_frames (
ts TIMESTAMPTZ NOT NULL,
rx_station TEXT NOT NULL,
src_call TEXT NOT NULL,
dst_call TEXT,
lat DOUBLE PRECISION,
lon DOUBLE PRECISION,
heard_direct BOOLEAN NOT NULL DEFAULT FALSE,
path TEXT,
info TEXT,
raw TEXT
);
SELECT create_hypertable('rf_frames', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS rf_frames_src_ts ON rf_frames (src_call, ts DESC);
CREATE INDEX IF NOT EXISTS rf_frames_direct ON rf_frames (heard_direct, ts DESC);
CREATE INDEX IF NOT EXISTS rf_frames_geo
ON rf_frames USING GIST (ST_MakePoint(lon, lat))
WHERE heard_direct = TRUE AND lat IS NOT NULL AND lon IS NOT NULL;
CREATE TABLE IF NOT EXISTS is_frames (
ts TIMESTAMPTZ NOT NULL,
src_call TEXT NOT NULL,
lat DOUBLE PRECISION,
lon DOUBLE PRECISION,
path TEXT,
comment TEXT,
raw TEXT
);
SELECT create_hypertable('is_frames', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS is_frames_src_ts ON is_frames (src_call, ts DESC);
CREATE INDEX IF NOT EXISTS is_frames_geo
ON is_frames USING GIST (ST_MakePoint(lon, lat))
WHERE lat IS NOT NULL AND lon IS NOT NULL;
"""
async def init_db(url: str) -> asyncpg.Pool:
pool = await asyncpg.create_pool(url, min_size=2, max_size=10)
async with pool.acquire() as conn:
await conn.execute(_DDL)
logger.info("Database schema ready")
return pool
def _clean(s: Optional[str]) -> Optional[str]:
"""Strip null bytes ??? Postgres UTF8 rejects 0x00."""
if s is None:
return None
return s.replace("\x00", "")
async def insert_rf_frame(
pool: asyncpg.Pool,
ts: datetime,
rx_station: str,
src_call: str,
dst_call: Optional[str],
lat: Optional[float],
lon: Optional[float],
heard_direct: bool,
path: str,
info: str,
raw: str,
) -> None:
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO rf_frames
(ts, rx_station, src_call, dst_call, lat, lon,
heard_direct, path, info, raw)
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
""",
ts, rx_station, _clean(src_call), _clean(dst_call), lat, lon,
heard_direct, _clean(path), _clean(info), _clean(raw),
)
async def insert_is_frame(
pool: asyncpg.Pool,
ts: datetime,
src_call: str,
lat: Optional[float],
lon: Optional[float],
path: str,
comment: Optional[str],
raw: str,
) -> None:
async with pool.acquire() as conn:
await conn.execute(
"""
INSERT INTO is_frames (ts, src_call, lat, lon, path, comment, raw)
VALUES ($1,$2,$3,$4,$5,$6,$7)
""",
ts, _clean(src_call), lat, lon, _clean(path), _clean(comment), _clean(raw),
)