192 lines
4.9 KiB
SQL
192 lines
4.9 KiB
SQL
CREATE EXTENSION IF NOT EXISTS timescaledb;
|
|
|
|
-- WS90 observations (as received)
|
|
CREATE TABLE IF NOT EXISTS observations_ws90 (
|
|
ts TIMESTAMPTZ NOT NULL,
|
|
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
|
site TEXT NOT NULL,
|
|
station_id BIGINT NOT NULL,
|
|
|
|
model TEXT,
|
|
battery_ok SMALLINT,
|
|
battery_mv INT,
|
|
|
|
temperature_c DOUBLE PRECISION,
|
|
humidity DOUBLE PRECISION,
|
|
|
|
wind_dir_deg DOUBLE PRECISION,
|
|
wind_avg_m_s DOUBLE PRECISION,
|
|
wind_max_m_s DOUBLE PRECISION,
|
|
|
|
uvi DOUBLE PRECISION,
|
|
light_lux DOUBLE PRECISION,
|
|
|
|
flags INT,
|
|
rain_mm DOUBLE PRECISION,
|
|
rain_start BIGINT,
|
|
|
|
supercap_v DOUBLE PRECISION,
|
|
firmware INT,
|
|
raw_data TEXT,
|
|
mic TEXT,
|
|
protocol TEXT,
|
|
rssi INT,
|
|
duration BIGINT,
|
|
|
|
payload_json JSONB
|
|
);
|
|
|
|
SELECT create_hypertable('observations_ws90', 'ts', if_not_exists => TRUE);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC);
|
|
CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC);
|
|
|
|
-- Open-Meteo hourly baseline forecasts
|
|
CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly (
|
|
ts TIMESTAMPTZ NOT NULL,
|
|
retrieved_at TIMESTAMPTZ NOT NULL,
|
|
site TEXT NOT NULL,
|
|
model TEXT NOT NULL,
|
|
|
|
temp_c DOUBLE PRECISION,
|
|
rh DOUBLE PRECISION,
|
|
pressure_msl_hpa DOUBLE PRECISION,
|
|
|
|
wind_m_s DOUBLE PRECISION,
|
|
wind_gust_m_s DOUBLE PRECISION,
|
|
wind_dir_deg DOUBLE PRECISION,
|
|
|
|
precip_mm DOUBLE PRECISION,
|
|
precip_prob DOUBLE PRECISION,
|
|
cloud_cover DOUBLE PRECISION,
|
|
|
|
source_json JSONB,
|
|
|
|
PRIMARY KEY (site, model, retrieved_at, ts)
|
|
);
|
|
|
|
SELECT create_hypertable('forecast_openmeteo_hourly', 'ts', if_not_exists => TRUE);
|
|
|
|
CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC);
|
|
|
|
-- Raw retention: 90 days
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1
|
|
FROM timescaledb_information.jobs j
|
|
WHERE j.proc_name = 'policy_retention'
|
|
AND j.hypertable_name = 'observations_ws90'
|
|
) THEN
|
|
PERFORM add_retention_policy('observations_ws90', INTERVAL '90 days');
|
|
END IF;
|
|
END$$;
|
|
|
|
-- Compression after 7 days (recommended)
|
|
ALTER TABLE observations_ws90 SET (
|
|
timescaledb.compress,
|
|
timescaledb.compress_segmentby = 'site,station_id'
|
|
);
|
|
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1
|
|
FROM timescaledb_information.jobs
|
|
WHERE proc_name = 'policy_compression'
|
|
AND hypertable_name = 'observations_ws90'
|
|
) THEN
|
|
PERFORM add_compression_policy('observations_ws90', INTERVAL '7 days');
|
|
END IF;
|
|
END$$;
|
|
|
|
-- 1-minute continuous aggregate
|
|
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket(INTERVAL '1 minute', ts) AS bucket,
|
|
site,
|
|
station_id,
|
|
|
|
avg(temperature_c) AS temp_c_avg,
|
|
min(temperature_c) AS temp_c_min,
|
|
max(temperature_c) AS temp_c_max,
|
|
|
|
avg(humidity) AS rh_avg,
|
|
min(humidity) AS rh_min,
|
|
max(humidity) AS rh_max,
|
|
|
|
avg(wind_avg_m_s) AS wind_avg_ms_avg,
|
|
max(wind_max_m_s) AS wind_gust_ms_max,
|
|
avg(wind_dir_deg) AS wind_dir_deg_avg,
|
|
|
|
max(uvi) AS uvi_max,
|
|
max(light_lux) AS light_lux_max,
|
|
|
|
min(rain_mm) AS rain_mm_min,
|
|
max(rain_mm) AS rain_mm_max
|
|
|
|
FROM observations_ws90
|
|
GROUP BY 1,2,3;
|
|
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1
|
|
FROM timescaledb_information.jobs
|
|
WHERE proc_name = 'policy_refresh_continuous_aggregate'
|
|
AND hypertable_name = 'cagg_ws90_1m'
|
|
) THEN
|
|
PERFORM add_continuous_aggregate_policy('cagg_ws90_1m',
|
|
start_offset => INTERVAL '2 hours',
|
|
end_offset => INTERVAL '2 minutes',
|
|
schedule_interval => INTERVAL '5 minutes'
|
|
);
|
|
END IF;
|
|
END$$;
|
|
|
|
-- 5-minute continuous aggregate (optional but useful)
|
|
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_5m
|
|
WITH (timescaledb.continuous) AS
|
|
SELECT
|
|
time_bucket(INTERVAL '5 minutes', ts) AS bucket,
|
|
site,
|
|
station_id,
|
|
|
|
avg(temperature_c) AS temp_c_avg,
|
|
min(temperature_c) AS temp_c_min,
|
|
max(temperature_c) AS temp_c_max,
|
|
|
|
avg(humidity) AS rh_avg,
|
|
min(humidity) AS rh_min,
|
|
max(humidity) AS rh_max,
|
|
|
|
avg(wind_avg_m_s) AS wind_avg_ms_avg,
|
|
max(wind_max_m_s) AS wind_gust_ms_max,
|
|
avg(wind_dir_deg) AS wind_dir_deg_avg,
|
|
|
|
max(uvi) AS uvi_max,
|
|
max(light_lux) AS light_lux_max,
|
|
|
|
min(rain_mm) AS rain_mm_min,
|
|
max(rain_mm) AS rain_mm_max
|
|
|
|
FROM observations_ws90
|
|
GROUP BY 1,2,3;
|
|
|
|
DO $$
|
|
BEGIN
|
|
IF NOT EXISTS (
|
|
SELECT 1
|
|
FROM timescaledb_information.jobs
|
|
WHERE proc_name = 'policy_refresh_continuous_aggregate'
|
|
AND hypertable_name = 'cagg_ws90_5m'
|
|
) THEN
|
|
PERFORM add_continuous_aggregate_policy('cagg_ws90_5m',
|
|
start_offset => INTERVAL '24 hours',
|
|
end_offset => INTERVAL '10 minutes',
|
|
schedule_interval => INTERVAL '15 minutes'
|
|
);
|
|
END IF;
|
|
END$$;
|