CREATE EXTENSION IF NOT EXISTS timescaledb; -- WS90 observations (as received) CREATE TABLE IF NOT EXISTS observations_ws90 ( ts TIMESTAMPTZ NOT NULL, received_at TIMESTAMPTZ NOT NULL DEFAULT now(), site TEXT NOT NULL, station_id BIGINT NOT NULL, model TEXT, battery_ok SMALLINT, battery_mv INT, temperature_c DOUBLE PRECISION, humidity DOUBLE PRECISION, wind_dir_deg DOUBLE PRECISION, wind_avg_m_s DOUBLE PRECISION, wind_max_m_s DOUBLE PRECISION, uvi DOUBLE PRECISION, light_lux DOUBLE PRECISION, flags INT, rain_mm DOUBLE PRECISION, rain_start BIGINT, supercap_v DOUBLE PRECISION, firmware INT, raw_data TEXT, mic TEXT, protocol TEXT, rssi INT, duration BIGINT, payload_json JSONB ); SELECT create_hypertable('observations_ws90', 'ts', if_not_exists => TRUE); CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC); CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC); -- Open-Meteo hourly baseline forecasts CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly ( ts TIMESTAMPTZ NOT NULL, retrieved_at TIMESTAMPTZ NOT NULL, site TEXT NOT NULL, model TEXT NOT NULL, temp_c DOUBLE PRECISION, rh DOUBLE PRECISION, pressure_msl_hpa DOUBLE PRECISION, wind_m_s DOUBLE PRECISION, wind_gust_m_s DOUBLE PRECISION, wind_dir_deg DOUBLE PRECISION, precip_mm DOUBLE PRECISION, precip_prob DOUBLE PRECISION, cloud_cover DOUBLE PRECISION, source_json JSONB, PRIMARY KEY (site, model, retrieved_at, ts) ); SELECT create_hypertable('forecast_openmeteo_hourly', 'ts', if_not_exists => TRUE); CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC); -- Raw retention: 90 days DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM timescaledb_information.jobs j WHERE j.proc_name = 'policy_retention' AND j.hypertable_name = 'observations_ws90' ) THEN PERFORM add_retention_policy('observations_ws90', INTERVAL '90 days'); END IF; END$$; -- Compression after 7 days (recommended) ALTER TABLE observations_ws90 SET ( timescaledb.compress, timescaledb.compress_segmentby = 'site,station_id' ); DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM timescaledb_information.jobs WHERE proc_name = 'policy_compression' AND hypertable_name = 'observations_ws90' ) THEN PERFORM add_compression_policy('observations_ws90', INTERVAL '7 days'); END IF; END$$; -- 1-minute continuous aggregate CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m WITH (timescaledb.continuous) AS SELECT time_bucket(INTERVAL '1 minute', ts) AS bucket, site, station_id, avg(temperature_c) AS temp_c_avg, min(temperature_c) AS temp_c_min, max(temperature_c) AS temp_c_max, avg(humidity) AS rh_avg, min(humidity) AS rh_min, max(humidity) AS rh_max, avg(wind_avg_m_s) AS wind_avg_ms_avg, max(wind_max_m_s) AS wind_gust_ms_max, avg(wind_dir_deg) AS wind_dir_deg_avg, max(uvi) AS uvi_max, max(light_lux) AS light_lux_max, min(rain_mm) AS rain_mm_min, max(rain_mm) AS rain_mm_max FROM observations_ws90 GROUP BY 1,2,3; DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM timescaledb_information.jobs WHERE proc_name = 'policy_refresh_continuous_aggregate' AND hypertable_name = 'cagg_ws90_1m' ) THEN PERFORM add_continuous_aggregate_policy('cagg_ws90_1m', start_offset => INTERVAL '2 hours', end_offset => INTERVAL '2 minutes', schedule_interval => INTERVAL '5 minutes' ); END IF; END$$; -- 5-minute continuous aggregate (optional but useful) CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_5m WITH (timescaledb.continuous) AS SELECT time_bucket(INTERVAL '5 minutes', ts) AS bucket, site, station_id, avg(temperature_c) AS temp_c_avg, min(temperature_c) AS temp_c_min, max(temperature_c) AS temp_c_max, avg(humidity) AS rh_avg, min(humidity) AS rh_min, max(humidity) AS rh_max, avg(wind_avg_m_s) AS wind_avg_ms_avg, max(wind_max_m_s) AS wind_gust_ms_max, avg(wind_dir_deg) AS wind_dir_deg_avg, max(uvi) AS uvi_max, max(light_lux) AS light_lux_max, min(rain_mm) AS rain_mm_min, max(rain_mm) AS rain_mm_max FROM observations_ws90 GROUP BY 1,2,3; DO $$ BEGIN IF NOT EXISTS ( SELECT 1 FROM timescaledb_information.jobs WHERE proc_name = 'policy_refresh_continuous_aggregate' AND hypertable_name = 'cagg_ws90_5m' ) THEN PERFORM add_continuous_aggregate_policy('cagg_ws90_5m', start_offset => INTERVAL '24 hours', end_offset => INTERVAL '10 minutes', schedule_interval => INTERVAL '15 minutes' ); END IF; END$$;