import logging from datetime import datetime from typing import Optional import asyncpg logger = logging.getLogger(__name__) _DDL = """ CREATE EXTENSION IF NOT EXISTS timescaledb; CREATE EXTENSION IF NOT EXISTS postgis; CREATE TABLE IF NOT EXISTS rf_frames ( ts TIMESTAMPTZ NOT NULL, rx_station TEXT NOT NULL, src_call TEXT NOT NULL, dst_call TEXT, lat DOUBLE PRECISION, lon DOUBLE PRECISION, heard_direct BOOLEAN NOT NULL DEFAULT FALSE, path TEXT, info TEXT, raw TEXT ); SELECT create_hypertable('rf_frames', 'ts', if_not_exists => TRUE); CREATE INDEX IF NOT EXISTS rf_frames_src_ts ON rf_frames (src_call, ts DESC); CREATE INDEX IF NOT EXISTS rf_frames_direct ON rf_frames (heard_direct, ts DESC); CREATE INDEX IF NOT EXISTS rf_frames_geo ON rf_frames USING GIST (ST_MakePoint(lon, lat)) WHERE heard_direct = TRUE AND lat IS NOT NULL AND lon IS NOT NULL; CREATE TABLE IF NOT EXISTS is_frames ( ts TIMESTAMPTZ NOT NULL, src_call TEXT NOT NULL, lat DOUBLE PRECISION, lon DOUBLE PRECISION, path TEXT, comment TEXT, raw TEXT ); SELECT create_hypertable('is_frames', 'ts', if_not_exists => TRUE); CREATE INDEX IF NOT EXISTS is_frames_src_ts ON is_frames (src_call, ts DESC); CREATE INDEX IF NOT EXISTS is_frames_geo ON is_frames USING GIST (ST_MakePoint(lon, lat)) WHERE lat IS NOT NULL AND lon IS NOT NULL; """ async def init_db(url: str) -> asyncpg.Pool: pool = await asyncpg.create_pool(url, min_size=2, max_size=10) async with pool.acquire() as conn: await conn.execute(_DDL) logger.info("Database schema ready") return pool def _clean(s: Optional[str]) -> Optional[str]: """Strip null bytes ??? Postgres UTF8 rejects 0x00.""" if s is None: return None return s.replace("\x00", "") async def insert_rf_frame( pool: asyncpg.Pool, ts: datetime, rx_station: str, src_call: str, dst_call: Optional[str], lat: Optional[float], lon: Optional[float], heard_direct: bool, path: str, info: str, raw: str, ) -> None: async with pool.acquire() as conn: await conn.execute( """ INSERT INTO rf_frames (ts, rx_station, src_call, dst_call, lat, lon, heard_direct, path, info, raw) VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10) """, ts, rx_station, _clean(src_call), _clean(dst_call), lat, lon, heard_direct, _clean(path), _clean(info), _clean(raw), ) async def insert_is_frame( pool: asyncpg.Pool, ts: datetime, src_call: str, lat: Optional[float], lon: Optional[float], path: str, comment: Optional[str], raw: str, ) -> None: async with pool.acquire() as conn: await conn.execute( """ INSERT INTO is_frames (ts, src_call, lat, lon, path, comment, raw) VALUES ($1,$2,$3,$4,$5,$6,$7) """, ts, _clean(src_call), lat, lon, _clean(path), _clean(comment), _clean(raw), )