Files
go-weatherstation/db/init/001_schema.sql

238 lines
6.3 KiB
SQL

CREATE EXTENSION IF NOT EXISTS timescaledb;
-- WS90 observations (as received)
CREATE TABLE IF NOT EXISTS observations_ws90 (
ts TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
site TEXT NOT NULL,
station_id BIGINT NOT NULL,
model TEXT,
battery_ok SMALLINT,
battery_mv INT,
temperature_c DOUBLE PRECISION,
humidity DOUBLE PRECISION,
wind_dir_deg DOUBLE PRECISION,
wind_avg_m_s DOUBLE PRECISION,
wind_max_m_s DOUBLE PRECISION,
uvi DOUBLE PRECISION,
light_lux DOUBLE PRECISION,
flags INT,
rain_mm DOUBLE PRECISION,
rain_start BIGINT,
supercap_v DOUBLE PRECISION,
firmware INT,
raw_data TEXT,
mic TEXT,
protocol TEXT,
rssi INT,
duration BIGINT,
payload_json JSONB
);
SELECT create_hypertable('observations_ws90', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC);
CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC);
-- Barometric pressure observations (from other MQTT sources)
CREATE TABLE IF NOT EXISTS observations_baro (
ts TIMESTAMPTZ NOT NULL,
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
site TEXT NOT NULL,
source TEXT NOT NULL,
pressure_hpa DOUBLE PRECISION,
payload_json JSONB
);
SELECT create_hypertable('observations_baro', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_observations_baro_site_ts ON observations_baro(site, ts DESC);
CREATE INDEX IF NOT EXISTS idx_observations_baro_source_ts ON observations_baro(source, ts DESC);
-- Open-Meteo hourly baseline forecasts
CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly (
ts TIMESTAMPTZ NOT NULL,
retrieved_at TIMESTAMPTZ NOT NULL,
site TEXT NOT NULL,
model TEXT NOT NULL,
temp_c DOUBLE PRECISION,
rh DOUBLE PRECISION,
pressure_msl_hpa DOUBLE PRECISION,
wind_m_s DOUBLE PRECISION,
wind_gust_m_s DOUBLE PRECISION,
wind_dir_deg DOUBLE PRECISION,
precip_mm DOUBLE PRECISION,
precip_prob DOUBLE PRECISION,
cloud_cover DOUBLE PRECISION,
source_json JSONB,
PRIMARY KEY (site, model, retrieved_at, ts)
);
SELECT create_hypertable('forecast_openmeteo_hourly', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC);
-- Raw retention: 90 days
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs j
WHERE j.proc_name = 'policy_retention'
AND j.hypertable_name = 'observations_ws90'
) THEN
PERFORM add_retention_policy('observations_ws90', INTERVAL '90 days');
END IF;
END$$;
-- Compression after 7 days (recommended)
ALTER TABLE observations_ws90 SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'site,station_id'
);
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_compression'
AND hypertable_name = 'observations_ws90'
) THEN
PERFORM add_compression_policy('observations_ws90', INTERVAL '7 days');
END IF;
END$$;
-- Raw retention: 90 days (barometric pressure)
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs j
WHERE j.proc_name = 'policy_retention'
AND j.hypertable_name = 'observations_baro'
) THEN
PERFORM add_retention_policy('observations_baro', INTERVAL '90 days');
END IF;
END$$;
-- Compression after 7 days
ALTER TABLE observations_baro SET (
timescaledb.compress,
timescaledb.compress_segmentby = 'site,source'
);
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_compression'
AND hypertable_name = 'observations_baro'
) THEN
PERFORM add_compression_policy('observations_baro', INTERVAL '7 days');
END IF;
END$$;
-- 1-minute continuous aggregate
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '1 minute', ts) AS bucket,
site,
station_id,
avg(temperature_c) AS temp_c_avg,
min(temperature_c) AS temp_c_min,
max(temperature_c) AS temp_c_max,
avg(humidity) AS rh_avg,
min(humidity) AS rh_min,
max(humidity) AS rh_max,
avg(wind_avg_m_s) AS wind_avg_ms_avg,
max(wind_max_m_s) AS wind_gust_ms_max,
avg(wind_dir_deg) AS wind_dir_deg_avg,
max(uvi) AS uvi_max,
max(light_lux) AS light_lux_max,
min(rain_mm) AS rain_mm_min,
max(rain_mm) AS rain_mm_max
FROM observations_ws90
GROUP BY 1,2,3;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
AND hypertable_name = 'cagg_ws90_1m'
) THEN
PERFORM add_continuous_aggregate_policy('cagg_ws90_1m',
start_offset => INTERVAL '2 hours',
end_offset => INTERVAL '2 minutes',
schedule_interval => INTERVAL '5 minutes'
);
END IF;
END$$;
-- 5-minute continuous aggregate (optional but useful)
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_5m
WITH (timescaledb.continuous) AS
SELECT
time_bucket(INTERVAL '5 minutes', ts) AS bucket,
site,
station_id,
avg(temperature_c) AS temp_c_avg,
min(temperature_c) AS temp_c_min,
max(temperature_c) AS temp_c_max,
avg(humidity) AS rh_avg,
min(humidity) AS rh_min,
max(humidity) AS rh_max,
avg(wind_avg_m_s) AS wind_avg_ms_avg,
max(wind_max_m_s) AS wind_gust_ms_max,
avg(wind_dir_deg) AS wind_dir_deg_avg,
max(uvi) AS uvi_max,
max(light_lux) AS light_lux_max,
min(rain_mm) AS rain_mm_min,
max(rain_mm) AS rain_mm_max
FROM observations_ws90
GROUP BY 1,2,3;
DO $$
BEGIN
IF NOT EXISTS (
SELECT 1
FROM timescaledb_information.jobs
WHERE proc_name = 'policy_refresh_continuous_aggregate'
AND hypertable_name = 'cagg_ws90_5m'
) THEN
PERFORM add_continuous_aggregate_policy('cagg_ws90_5m',
start_offset => INTERVAL '24 hours',
end_offset => INTERVAL '10 minutes',
schedule_interval => INTERVAL '15 minutes'
);
END IF;
END$$;