108 lines
3.1 KiB
Python
108 lines
3.1 KiB
Python
import logging
|
|
from datetime import datetime
|
|
from typing import Optional
|
|
|
|
import asyncpg
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
_DDL = """
|
|
CREATE EXTENSION IF NOT EXISTS timescaledb;
|
|
CREATE EXTENSION IF NOT EXISTS postgis;
|
|
|
|
CREATE TABLE IF NOT EXISTS rf_frames (
|
|
ts TIMESTAMPTZ NOT NULL,
|
|
rx_station TEXT NOT NULL,
|
|
src_call TEXT NOT NULL,
|
|
dst_call TEXT,
|
|
lat DOUBLE PRECISION,
|
|
lon DOUBLE PRECISION,
|
|
heard_direct BOOLEAN NOT NULL DEFAULT FALSE,
|
|
path TEXT,
|
|
info TEXT,
|
|
raw TEXT
|
|
);
|
|
SELECT create_hypertable('rf_frames', 'ts', if_not_exists => TRUE);
|
|
CREATE INDEX IF NOT EXISTS rf_frames_src_ts ON rf_frames (src_call, ts DESC);
|
|
CREATE INDEX IF NOT EXISTS rf_frames_direct ON rf_frames (heard_direct, ts DESC);
|
|
CREATE INDEX IF NOT EXISTS rf_frames_geo
|
|
ON rf_frames USING GIST (ST_MakePoint(lon, lat))
|
|
WHERE heard_direct = TRUE AND lat IS NOT NULL AND lon IS NOT NULL;
|
|
|
|
CREATE TABLE IF NOT EXISTS is_frames (
|
|
ts TIMESTAMPTZ NOT NULL,
|
|
src_call TEXT NOT NULL,
|
|
lat DOUBLE PRECISION,
|
|
lon DOUBLE PRECISION,
|
|
path TEXT,
|
|
comment TEXT,
|
|
raw TEXT
|
|
);
|
|
SELECT create_hypertable('is_frames', 'ts', if_not_exists => TRUE);
|
|
CREATE INDEX IF NOT EXISTS is_frames_src_ts ON is_frames (src_call, ts DESC);
|
|
CREATE INDEX IF NOT EXISTS is_frames_geo
|
|
ON is_frames USING GIST (ST_MakePoint(lon, lat))
|
|
WHERE lat IS NOT NULL AND lon IS NOT NULL;
|
|
"""
|
|
|
|
|
|
async def init_db(url: str) -> asyncpg.Pool:
|
|
pool = await asyncpg.create_pool(url, min_size=2, max_size=10)
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(_DDL)
|
|
logger.info("Database schema ready")
|
|
return pool
|
|
|
|
|
|
def _clean(s: Optional[str]) -> Optional[str]:
|
|
"""Strip null bytes ??? Postgres UTF8 rejects 0x00."""
|
|
if s is None:
|
|
return None
|
|
return s.replace("\x00", "")
|
|
|
|
|
|
async def insert_rf_frame(
|
|
pool: asyncpg.Pool,
|
|
ts: datetime,
|
|
rx_station: str,
|
|
src_call: str,
|
|
dst_call: Optional[str],
|
|
lat: Optional[float],
|
|
lon: Optional[float],
|
|
heard_direct: bool,
|
|
path: str,
|
|
info: str,
|
|
raw: str,
|
|
) -> None:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO rf_frames
|
|
(ts, rx_station, src_call, dst_call, lat, lon,
|
|
heard_direct, path, info, raw)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7,$8,$9,$10)
|
|
""",
|
|
ts, rx_station, _clean(src_call), _clean(dst_call), lat, lon,
|
|
heard_direct, _clean(path), _clean(info), _clean(raw),
|
|
)
|
|
|
|
|
|
async def insert_is_frame(
|
|
pool: asyncpg.Pool,
|
|
ts: datetime,
|
|
src_call: str,
|
|
lat: Optional[float],
|
|
lon: Optional[float],
|
|
path: str,
|
|
comment: Optional[str],
|
|
raw: str,
|
|
) -> None:
|
|
async with pool.acquire() as conn:
|
|
await conn.execute(
|
|
"""
|
|
INSERT INTO is_frames (ts, src_call, lat, lon, path, comment, raw)
|
|
VALUES ($1,$2,$3,$4,$5,$6,$7)
|
|
""",
|
|
ts, _clean(src_call), lat, lon, _clean(path), _clean(comment), _clean(raw),
|
|
)
|