From 3a7309b2cfea677ed2ca8cd993473fad77a80609 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Mon, 6 Apr 2026 18:32:33 +1000 Subject: [PATCH] update for 4 hour rain forecast --- README.md | 13 ++- cmd/ingestd/web.go | 19 ++++- cmd/ingestd/web/app.js | 2 +- cmd/ingestd/web/index.html | 2 +- db/init/001_schema.sql | 28 +++++++ db/init/002_rain_monitoring_views.sql | 94 ++++++++++++++++++---- db/init/003_rain_predictions_4h.sql | 29 +++++++ docker-compose.yml | 5 +- docs/rain_model_runbook.md | 109 ++++++++++++++++++++++++-- docs/rain_prediction.md | 25 ++++-- internal/db/series.go | 68 +++++++++++----- scripts/audit_rain_data.py | 38 +++++++-- scripts/check_rain_pipeline_health.py | 40 ++++++++-- scripts/compare_rain_reports.py | 78 ++++++++++++++++++ scripts/predict_rain_model.py | 62 +++++++++++---- scripts/rain_model_common.py | 85 ++++++++++++++++---- scripts/run_p0_rain_workflow.sh | 9 ++- scripts/run_rain_ml_worker.py | 13 ++- scripts/train_rain_model.py | 107 +++++++++++++++++++------ todo.md | 22 ++++++ 20 files changed, 716 insertions(+), 132 deletions(-) create mode 100644 db/init/003_rain_predictions_4h.sql create mode 100644 scripts/compare_rain_reports.py diff --git a/README.md b/README.md index ff30bd1..95b3ad4 100644 --- a/README.md +++ b/README.md @@ -76,7 +76,8 @@ TimescaleDB schema is initialized from `db/init/001_schema.sql` and includes: - `observations_ws90` (hypertable): raw WS90 observations with payload metadata, plus the full JSON payload (`payload_json`). - `observations_baro` (hypertable): barometric pressure observations from other MQTT topics. - `forecast_openmeteo_hourly` (hypertable): hourly forecast points keyed by `(site, model, retrieved_at, ts)`. -- `predictions_rain_1h` (hypertable): model probability + decision + realized outcome fields. +- `predictions_rain_1h` (hypertable): legacy 1-hour model probability + decision + realized outcome fields. +- `predictions_rain_4h` (hypertable): 4-hour model probability + decision + realized outcome fields. - Continuous aggregates: - `cagg_ws90_1m`: 1‑minute rollups (avg/min/max for temp, humidity, wind, uvi, light, rain). - `cagg_ws90_5m`: 5‑minute rollups (same metrics as `cagg_ws90_1m`). @@ -86,14 +87,18 @@ Retention/compression: - `observations_baro` has a 90‑day retention policy and compression after 7 days. ### Existing databases -If you’re on an existing database, you’ll need to apply the new table definition once (the init SQL only runs on a fresh DB). Example: +If you’re on an existing database, apply the 4-hour prediction table migration: + +```sh +docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql +``` + +If you also need to backfill older schema objects, you can still re-run: ```sh docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-entrypoint-initdb.d/001_schema.sql ``` -Or copy just the `observations_baro` section into a manual `psql -c`. - For rain-model monitoring views, also apply `db/init/002_rain_monitoring_views.sql` on existing DBs: ```sh diff --git a/cmd/ingestd/web.go b/cmd/ingestd/web.go index f54841f..ebb616e 100644 --- a/cmd/ingestd/web.go +++ b/cmd/ingestd/web.go @@ -179,16 +179,29 @@ func (s *webServer) handleDashboard(w http.ResponseWriter, r *http.Request) { return } - const rainModelName = "rain_next_1h" + const rainModelName = "rain_next_4h" + const rainModelHorizonHours = 4 - latestRainPrediction, err := s.db.LatestRainPrediction(r.Context(), s.site.Name, rainModelName) + latestRainPrediction, err := s.db.LatestRainPrediction( + r.Context(), + s.site.Name, + rainModelName, + rainModelHorizonHours, + ) if err != nil { http.Error(w, "failed to query latest rain prediction", http.StatusInternalServerError) log.Printf("web dashboard latest rain prediction error: %v", err) return } - rainPredictionRange, err := s.db.RainPredictionSeriesRange(r.Context(), s.site.Name, rainModelName, start, end) + rainPredictionRange, err := s.db.RainPredictionSeriesRange( + r.Context(), + s.site.Name, + rainModelName, + rainModelHorizonHours, + start, + end, + ) if err != nil { http.Error(w, "failed to query rain predictions", http.StatusInternalServerError) log.Printf("web dashboard rain prediction range error: %v", err) diff --git a/cmd/ingestd/web/app.js b/cmd/ingestd/web/app.js index 9988a73..577547a 100644 --- a/cmd/ingestd/web/app.js +++ b/cmd/ingestd/web/app.js @@ -979,7 +979,7 @@ function renderDashboard(data) { data: { datasets: [ { - label: rainPredictions.length ? "model rain probability (%)" : "heuristic rain probability (%)", + label: rainPredictions.length ? "model rain probability next 4h (%)" : "heuristic rain probability (%)", data: rainPredictions.length ? buildRainProbabilitySeriesFromPredictions(rainPredictions) : buildRainProbabilitySeries(obsFiltered), borderColor: colors.rain, backgroundColor: "rgba(78, 168, 222, 0.18)", diff --git a/cmd/ingestd/web/index.html b/cmd/ingestd/web/index.html index 7bbf807..34dd1d2 100644 --- a/cmd/ingestd/web/index.html +++ b/cmd/ingestd/web/index.html @@ -70,7 +70,7 @@
--
-
Rain 1h %
+
Rain 4h %
--
diff --git a/db/init/001_schema.sql b/db/init/001_schema.sql index b8d85fd..1117141 100644 --- a/db/init/001_schema.sql +++ b/db/init/001_schema.sql @@ -112,6 +112,34 @@ CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_site_ts CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_pending_eval ON predictions_rain_1h(site, evaluated_at, ts DESC); +-- Rain model predictions (next 4h) +CREATE TABLE IF NOT EXISTS predictions_rain_4h ( + ts TIMESTAMPTZ NOT NULL, + generated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + site TEXT NOT NULL, + model_name TEXT NOT NULL, + model_version TEXT NOT NULL, + threshold DOUBLE PRECISION NOT NULL, + probability DOUBLE PRECISION NOT NULL, + predict_rain BOOLEAN NOT NULL, + + rain_next_4h_mm_actual DOUBLE PRECISION, + rain_next_4h_actual BOOLEAN, + evaluated_at TIMESTAMPTZ, + + metadata JSONB, + + PRIMARY KEY (site, model_name, model_version, ts) +); + +SELECT create_hypertable('predictions_rain_4h', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_site_ts + ON predictions_rain_4h(site, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_pending_eval + ON predictions_rain_4h(site, evaluated_at, ts DESC); + -- Raw retention: 90 days DO $$ BEGIN diff --git a/db/init/002_rain_monitoring_views.sql b/db/init/002_rain_monitoring_views.sql index 5945af8..1a06467 100644 --- a/db/init/002_rain_monitoring_views.sql +++ b/db/init/002_rain_monitoring_views.sql @@ -100,44 +100,104 @@ FROM baseline; CREATE OR REPLACE VIEW rain_prediction_drift_daily AS +WITH all_predictions AS ( + SELECT + 1::INT AS horizon_hours, + ts, + site, + model_name, + model_version, + threshold, + probability, + predict_rain + FROM predictions_rain_1h + UNION ALL + SELECT + 4::INT AS horizon_hours, + ts, + site, + model_name, + model_version, + threshold, + probability, + predict_rain + FROM predictions_rain_4h +) SELECT time_bucket(INTERVAL '1 day', ts) AS day, site, model_name, model_version, + horizon_hours, count(*) AS prediction_rows, avg(probability) AS probability_mean, stddev_samp(probability) AS probability_stddev, avg(CASE WHEN predict_rain THEN 1.0 ELSE 0.0 END) AS predicted_positive_rate, avg(threshold) AS threshold_mean -FROM predictions_rain_1h -GROUP BY 1,2,3,4; +FROM all_predictions +GROUP BY 1,2,3,4,5; CREATE OR REPLACE VIEW rain_calibration_drift_daily AS +WITH all_predictions AS ( + SELECT + 1::INT AS horizon_hours, + ts, + site, + model_name, + model_version, + probability, + predict_rain, + rain_next_1h_actual AS rain_actual + FROM predictions_rain_1h + UNION ALL + SELECT + 4::INT AS horizon_hours, + ts, + site, + model_name, + model_version, + probability, + predict_rain, + rain_next_4h_actual AS rain_actual + FROM predictions_rain_4h +) SELECT time_bucket(INTERVAL '1 day', ts) AS day, site, model_name, model_version, - count(*) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS evaluated_rows, - avg(probability) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS mean_probability, - avg(CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS observed_positive_rate, - avg(power(probability - CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END, 2.0)) - FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS brier_score, + horizon_hours, + count(*) FILTER (WHERE rain_actual IS NOT NULL) AS evaluated_rows, + avg(probability) FILTER (WHERE rain_actual IS NOT NULL) AS mean_probability, + avg(CASE WHEN rain_actual THEN 1.0 ELSE 0.0 END) FILTER (WHERE rain_actual IS NOT NULL) AS observed_positive_rate, + avg(power(probability - CASE WHEN rain_actual THEN 1.0 ELSE 0.0 END, 2.0)) + FILTER (WHERE rain_actual IS NOT NULL) AS brier_score, avg( CASE - WHEN rain_next_1h_actual IS NULL THEN NULL - WHEN predict_rain = rain_next_1h_actual THEN 1.0 + WHEN rain_actual IS NULL THEN NULL + WHEN predict_rain = rain_actual THEN 1.0 ELSE 0.0 END ) AS decision_accuracy -FROM predictions_rain_1h -GROUP BY 1,2,3,4; +FROM all_predictions +GROUP BY 1,2,3,4,5; CREATE OR REPLACE VIEW rain_pipeline_health AS -WITH sites AS ( +WITH prediction_latest AS ( + SELECT + site, + max(generated_at) AS prediction_generated_latest_ts, + max(evaluated_at) AS prediction_evaluated_latest_ts + FROM ( + SELECT site, generated_at, evaluated_at FROM predictions_rain_1h + UNION ALL + SELECT site, generated_at, evaluated_at FROM predictions_rain_4h + ) p + GROUP BY site +), +sites AS ( SELECT DISTINCT site FROM observations_ws90 UNION SELECT DISTINCT site FROM observations_baro @@ -145,12 +205,16 @@ WITH sites AS ( SELECT DISTINCT site FROM forecast_openmeteo_hourly UNION SELECT DISTINCT site FROM predictions_rain_1h + UNION + SELECT DISTINCT site FROM predictions_rain_4h ) SELECT s.site, (SELECT max(ts) FROM observations_ws90 w WHERE w.site = s.site) AS ws90_latest_ts, (SELECT max(ts) FROM observations_baro b WHERE b.site = s.site) AS baro_latest_ts, (SELECT max(ts) FROM forecast_openmeteo_hourly f WHERE f.site = s.site) AS forecast_latest_ts, - (SELECT max(generated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_generated_latest_ts, - (SELECT max(evaluated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_evaluated_latest_ts -FROM sites s; + p.prediction_generated_latest_ts, + p.prediction_evaluated_latest_ts +FROM sites s +LEFT JOIN prediction_latest p + ON p.site = s.site; diff --git a/db/init/003_rain_predictions_4h.sql b/db/init/003_rain_predictions_4h.sql new file mode 100644 index 0000000..8de62a0 --- /dev/null +++ b/db/init/003_rain_predictions_4h.sql @@ -0,0 +1,29 @@ +-- Add 4-hour rain prediction storage table. +-- Safe to re-run. + +CREATE TABLE IF NOT EXISTS predictions_rain_4h ( + ts TIMESTAMPTZ NOT NULL, + generated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + site TEXT NOT NULL, + model_name TEXT NOT NULL, + model_version TEXT NOT NULL, + threshold DOUBLE PRECISION NOT NULL, + probability DOUBLE PRECISION NOT NULL, + predict_rain BOOLEAN NOT NULL, + + rain_next_4h_mm_actual DOUBLE PRECISION, + rain_next_4h_actual BOOLEAN, + evaluated_at TIMESTAMPTZ, + + metadata JSONB, + + PRIMARY KEY (site, model_name, model_version, ts) +); + +SELECT create_hypertable('predictions_rain_4h', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_site_ts + ON predictions_rain_4h(site, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_pending_eval + ON predictions_rain_4h(site, evaluated_at, ts DESC); diff --git a/docker-compose.yml b/docker-compose.yml index 5b76f5c..b275c4f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -34,8 +34,9 @@ services: environment: DATABASE_URL: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable" RAIN_SITE: "home" - RAIN_MODEL_NAME: "rain_next_1h" - RAIN_MODEL_VERSION_BASE: "rain-auto-v1-extended" + RAIN_HORIZON_HOURS: "4" + RAIN_MODEL_NAME: "rain_next_4h" + RAIN_MODEL_VERSION_BASE: "rain-auto-v2-extended-4h" RAIN_MODEL_FAMILY: "auto" RAIN_FEATURE_SET: "extended" RAIN_FORECAST_MODEL: "ecmwf" diff --git a/docs/rain_model_runbook.md b/docs/rain_model_runbook.md index 0c01a0a..24f3798 100644 --- a/docs/rain_model_runbook.md +++ b/docs/rain_model_runbook.md @@ -4,6 +4,14 @@ Operational guide for training, evaluating, deploying, monitoring, and rolling b ## 1) One-time Setup +Apply 4-hour prediction table migration: + +```sh +docker compose exec -T timescaledb \ + psql -U postgres -d micrometeo \ + -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql +``` + Apply monitoring views: ```sh @@ -21,6 +29,7 @@ python scripts/train_rain_model.py \ --site "home" \ --start "2026-02-01T00:00:00Z" \ --end "2026-03-03T23:55:00Z" \ + --horizon-hours 4 \ --feature-set "extended" \ --model-family "auto" \ --forecast-model "ecmwf" \ @@ -29,7 +38,7 @@ python scripts/train_rain_model.py \ --calibration-methods "none,sigmoid,isotonic" \ --threshold-policy "walk_forward" \ --walk-forward-folds 4 \ - --model-version "rain-auto-v1-extended" \ + --model-version "rain-auto-v2-extended-4h" \ --out "models/rain_model.pkl" \ --report-out "models/rain_model_report.json" \ --model-card-out "models/model_card_{model_version}.md" \ @@ -53,7 +62,8 @@ Review in report: python scripts/predict_rain_model.py \ --site home \ --model-path "models/rain_model.pkl" \ - --model-name "rain_next_1h" \ + --model-name "rain_next_4h" \ + --horizon-hours 4 \ --dry-run ``` @@ -63,7 +73,8 @@ python scripts/predict_rain_model.py \ python scripts/predict_rain_model.py \ --site home \ --model-path "models/rain_model.pkl" \ - --model-name "rain_next_1h" + --model-name "rain_next_4h" \ + --horizon-hours 4 ``` ## 4) Rollback @@ -72,6 +83,7 @@ python scripts/predict_rain_model.py \ 2. If promotion fails or no candidate model is produced, the worker keeps the active model unchanged. 3. If inference starts without `RAIN_MODEL_PATH` but backup exists, the worker restores from backup automatically. 4. Keep failed candidate artifacts for postmortem. +5. During 4-hour rollout stabilization, keep `predictions_rain_1h` and `rain_next_1h` model artifacts available for immediate fallback. ## 5) Monitoring @@ -118,12 +130,13 @@ Use the health-check script in cron, systemd timer, or your alerting scheduler: ```sh python scripts/check_rain_pipeline_health.py \ --site home \ - --model-name rain_next_1h \ + --model-name rain_next_4h \ + --horizon-hours 4 \ --max-ws90-age 20m \ --max-baro-age 30m \ --max-forecast-age 3h \ --max-prediction-age 30m \ - --max-pending-eval-age 3h \ + --max-pending-eval-age 6h \ --max-pending-eval-rows 200 ``` @@ -138,6 +151,7 @@ The script exits non-zero on failure, so it can directly drive alerting. - `RAIN_THRESHOLD_POLICY` - `RAIN_WALK_FORWARD_FOLDS` - `RAIN_ALLOW_EMPTY_DATA` +- `RAIN_HORIZON_HOURS` - `RAIN_MODEL_BACKUP_PATH` - `RAIN_MODEL_CARD_PATH` @@ -156,3 +170,88 @@ python scripts/recommend_rain_model.py \ --top-k 5 \ --json-out "models/rain_model_recommendation.json" ``` + +## 9) Staged 4h Rollout Checklist + +Run this sequence in production/staging to satisfy the 4h cutover gate: + +1. Apply schema migration for 4h predictions: + +```sh +docker compose exec -T timescaledb \ + psql -U postgres -d micrometeo \ + -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql +``` + +2. Re-apply monitoring views (now include 1h + 4h unions): + +```sh +docker compose exec -T timescaledb \ + psql -U postgres -d micrometeo \ + -f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql +``` + +3. Run a full 4h training/evaluation cycle and save report: + +```sh +python scripts/train_rain_model.py \ + --site "home" \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --horizon-hours 4 \ + --feature-set "extended" \ + --model-family "auto" \ + --forecast-model "ecmwf" \ + --tune-hyperparameters \ + --threshold-policy "walk_forward" \ + --walk-forward-folds 4 \ + --model-version "rain-auto-v2-extended-4h" \ + --out "models/rain_model_4h.pkl" \ + --report-out "models/rain_model_report_4h.json" +``` + +4. Compare 4h metrics against the latest 1h benchmark report before switching dashboard defaults: + +```sh +python scripts/compare_rain_reports.py \ + --baseline "models/rain_model_report_1h.json" \ + --candidate "models/rain_model_report_4h.json" +``` +5. Run dry-run inference, then live inference with 4h model name/horizon: + +```sh +python scripts/predict_rain_model.py \ + --site home \ + --model-path "models/rain_model_4h.pkl" \ + --model-name "rain_next_4h" \ + --horizon-hours 4 \ + --dry-run + +python scripts/predict_rain_model.py \ + --site home \ + --model-path "models/rain_model_4h.pkl" \ + --model-name "rain_next_4h" \ + --horizon-hours 4 +``` + +6. Validate health checks and dashboard data path for 4h: + +```sh +python scripts/check_rain_pipeline_health.py \ + --site home \ + --model-name rain_next_4h \ + --horizon-hours 4 \ + --max-pending-eval-age 6h +``` + +7. Keep 1h path live in parallel until 4h drift/calibration remains stable for at least 7 days. + +### Fast rollback to 1h + +If 4h performance or pipeline health regresses: + +1. Set worker env back to: + `RAIN_HORIZON_HOURS=1`, `RAIN_MODEL_NAME=rain_next_1h`, and a known-good 1h model path/version. +2. Restart `rainml` service. +3. Confirm `check_rain_pipeline_health.py --horizon-hours 1 --model-name rain_next_1h` returns `ok`. +4. Keep `predictions_rain_4h` data for postmortem; do not drop tables during rollback. diff --git a/docs/rain_prediction.md b/docs/rain_prediction.md index bc0277a..6cae8a0 100644 --- a/docs/rain_prediction.md +++ b/docs/rain_prediction.md @@ -1,14 +1,14 @@ -# Rain Prediction (Next 1 Hour) +# Rain Prediction (Next 4 Hours) This project includes a baseline workflow for **binary rain prediction**: -> **Will we see >= 0.2 mm of rain in the next hour?** +> **Will we see >= 0.2 mm of rain in the next 4 hours?** It uses local observations (WS90 + barometer), trains a logistic regression baseline, and writes model-driven predictions back to TimescaleDB. ## P0 Decisions (Locked) -- Target: `rain_next_1h_mm >= 0.2`. +- Target: `rain_next_4h_mm >= 0.2`. - Primary use-case: low-noise rain heads-up signal for dashboard + alert candidate. - Frozen v1 training window (UTC): `2026-02-01T00:00:00Z` to `2026-03-03T23:55:00Z`. - Threshold policy: choose threshold on validation set by maximizing recall under @@ -40,7 +40,7 @@ pip install -r scripts/requirements.txt - `scripts/train_rain_model.py`: strict time-based split training and metrics report, with optional validation-only hyperparameter tuning, calibration comparison, naive baseline comparison, and walk-forward folds. - `scripts/predict_rain_model.py`: inference using saved model artifact; upserts into - `predictions_rain_1h`. + `predictions_rain_4h`. - `scripts/run_rain_ml_worker.py`: long-running worker for periodic training + prediction. - `scripts/check_rain_pipeline_health.py`: freshness/failure check for alerting. - `scripts/recommend_rain_model.py`: rank saved training reports and recommend a deployment candidate. @@ -60,7 +60,15 @@ Model-family options (`train_rain_model.py`): ## Usage ### 1) Apply schema update (existing DBs) -`001_schema.sql` includes `predictions_rain_1h`. +`003_rain_predictions_4h.sql` adds `predictions_rain_4h`. + +```sh +docker compose exec -T timescaledb \ + psql -U postgres -d micrometeo \ + -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql +``` + +`001_schema.sql` still remains safe to re-run for full schema parity. ```sh docker compose exec -T timescaledb \ @@ -76,6 +84,8 @@ docker compose exec -T timescaledb \ -f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql ``` +All examples below assume a 4-hour horizon (`--horizon-hours 4`) and `model-name=rain_next_4h`. + ### 2) Run data audit ```sh export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable" @@ -203,7 +213,8 @@ python scripts/train_rain_model.py \ python scripts/predict_rain_model.py \ --site home \ --model-path "models/rain_model.pkl" \ - --model-name "rain_next_1h" + --model-name "rain_next_4h" \ + --horizon-hours 4 ``` ### 5) One-command P0 workflow @@ -236,7 +247,7 @@ docker compose logs -f rainml - Model card: `models/model_card_.md` - Model artifact: `models/rain_model.pkl` - Dataset snapshot: `models/datasets/rain_dataset__.csv` -- Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized +- Prediction rows: `predictions_rain_4h` (probability + threshold decision + realized outcome fields once available) ### 7) Recommend deploy candidate from saved reports diff --git a/internal/db/series.go b/internal/db/series.go index 404dec0..0ffe848 100644 --- a/internal/db/series.go +++ b/internal/db/series.go @@ -51,11 +51,14 @@ type RainPredictionPoint struct { GeneratedAt time.Time `json:"generated_at"` ModelName string `json:"model_name"` ModelVersion string `json:"model_version"` + HorizonHours int `json:"horizon_hours"` Threshold float64 `json:"threshold"` Probability float64 `json:"probability"` PredictRain bool `json:"predict_rain"` - RainNext1hMM *float64 `json:"rain_next_1h_mm_actual,omitempty"` - RainNext1hActual *bool `json:"rain_next_1h_actual,omitempty"` + RainNextMM *float64 `json:"rain_next_mm_actual,omitempty"` + RainNextActual *bool `json:"rain_next_actual,omitempty"` + RainNext1hMM *float64 `json:"rain_next_1h_mm_actual,omitempty"` // backward-compatible alias + RainNext1hActual *bool `json:"rain_next_1h_actual,omitempty"` // backward-compatible alias EvaluatedAt *time.Time `json:"evaluated_at,omitempty"` } @@ -365,8 +368,24 @@ func (d *DB) ForecastSeriesRange(ctx context.Context, site, model string, start, }, nil } -func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) (*RainPredictionPoint, error) { - query := ` +func predictionStorageForHorizon(horizonHours int) (table string, mmCol string, flagCol string, err error) { + switch horizonHours { + case 1: + return "predictions_rain_1h", "rain_next_1h_mm_actual", "rain_next_1h_actual", nil + case 4: + return "predictions_rain_4h", "rain_next_4h_mm_actual", "rain_next_4h_actual", nil + default: + return "", "", "", fmt.Errorf("unsupported rain prediction horizon: %d", horizonHours) + } +} + +func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string, horizonHours int) (*RainPredictionPoint, error) { + table, mmCol, flagCol, err := predictionStorageForHorizon(horizonHours) + if err != nil { + return nil, err + } + + query := fmt.Sprintf(` SELECT ts, generated_at, @@ -375,15 +394,15 @@ func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) ( threshold, probability, predict_rain, - rain_next_1h_mm_actual, - rain_next_1h_actual, + %s, + %s, evaluated_at - FROM predictions_rain_1h + FROM %s WHERE site = $1 AND model_name = $2 ORDER BY ts DESC, generated_at DESC LIMIT 1 - ` + `, mmCol, flagCol, table) var ( p RainPredictionPoint @@ -393,7 +412,7 @@ func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) ( predictRain sql.NullBool ) - err := d.Pool.QueryRow(ctx, query, site, modelName).Scan( + err = d.Pool.QueryRow(ctx, query, site, modelName).Scan( &p.TS, &p.GeneratedAt, &p.ModelName, @@ -421,14 +440,22 @@ func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) ( if predictRain.Valid { p.PredictRain = predictRain.Bool } - p.RainNext1hMM = nullFloatPtr(rainMM) - p.RainNext1hActual = nullBoolPtr(rainActual) + p.HorizonHours = horizonHours + p.RainNextMM = nullFloatPtr(rainMM) + p.RainNextActual = nullBoolPtr(rainActual) + p.RainNext1hMM = p.RainNextMM + p.RainNext1hActual = p.RainNextActual p.EvaluatedAt = nullTimePtr(evaluatedAt) return &p, nil } -func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName string, start, end time.Time) ([]RainPredictionPoint, error) { - query := ` +func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName string, horizonHours int, start, end time.Time) ([]RainPredictionPoint, error) { + table, mmCol, flagCol, err := predictionStorageForHorizon(horizonHours) + if err != nil { + return nil, err + } + + query := fmt.Sprintf(` SELECT DISTINCT ON (ts) ts, generated_at, @@ -437,16 +464,16 @@ func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName stri threshold, probability, predict_rain, - rain_next_1h_mm_actual, - rain_next_1h_actual, + %s, + %s, evaluated_at - FROM predictions_rain_1h + FROM %s WHERE site = $1 AND model_name = $2 AND ts >= $3 AND ts <= $4 ORDER BY ts ASC, generated_at DESC - ` + `, mmCol, flagCol, table) rows, err := d.Pool.Query(ctx, query, site, modelName, start, end) if err != nil { @@ -486,8 +513,11 @@ func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName stri if predictRain.Valid { p.PredictRain = predictRain.Bool } - p.RainNext1hMM = nullFloatPtr(rainMM) - p.RainNext1hActual = nullBoolPtr(rainActual) + p.HorizonHours = horizonHours + p.RainNextMM = nullFloatPtr(rainMM) + p.RainNextActual = nullBoolPtr(rainActual) + p.RainNext1hMM = p.RainNextMM + p.RainNext1hActual = p.RainNextActual p.EvaluatedAt = nullTimePtr(evaluatedAt) points = append(points, p) } diff --git a/scripts/audit_rain_data.py b/scripts/audit_rain_data.py index 6dd60b6..4f96541 100644 --- a/scripts/audit_rain_data.py +++ b/scripts/audit_rain_data.py @@ -10,6 +10,7 @@ import psycopg2 from rain_model_common import ( AVAILABLE_FEATURE_SETS, + DEFAULT_HORIZON_HOURS, RAIN_EVENT_THRESHOLD_MM, build_dataset, feature_columns_for_set, @@ -17,8 +18,11 @@ from rain_model_common import ( fetch_baro, fetch_forecast, fetch_ws90, + normalize_horizon_hours, model_frame, parse_time, + rain_next_flag_col, + rain_next_mm_col, to_builtin, ) @@ -29,6 +33,12 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--site", required=True, help="Site name (e.g. home).") parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).") parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).") + parser.add_argument( + "--horizon-hours", + type=int, + default=DEFAULT_HORIZON_HOURS, + help="Prediction horizon in hours for target/label auditing.", + ) parser.add_argument( "--feature-set", default="baseline", @@ -57,13 +67,13 @@ def longest_zero_run(counts: np.ndarray) -> int: return best -def build_weekly_balance(model_df): +def build_weekly_balance(model_df, target_col: str): weekly = model_df.copy() iso = weekly.index.to_series().dt.isocalendar() weekly["year_week"] = iso["year"].astype(str) + "-W" + iso["week"].astype(str).str.zfill(2) grouped = ( - weekly.groupby("year_week")["rain_next_1h"] + weekly.groupby("year_week")[target_col] .agg(total_rows="count", positive_rows="sum") .reset_index() .sort_values("year_week") @@ -79,6 +89,9 @@ def main() -> int: start = parse_time(args.start) if args.start else "" end = parse_time(args.end) if args.end else "" + horizon_hours = normalize_horizon_hours(args.horizon_hours) + target_col = rain_next_flag_col(horizon_hours) + target_mm_col = rain_next_mm_col(horizon_hours) feature_cols = feature_columns_for_set(args.feature_set) needs_forecast = feature_columns_need_forecast(feature_cols) @@ -89,8 +102,14 @@ def main() -> int: if needs_forecast: forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model) - df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) - model_df = model_frame(df, feature_cols, require_target=True) + df = build_dataset( + ws90, + baro, + forecast=forecast, + rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM, + horizon_hours=horizon_hours, + ) + model_df = model_frame(df, feature_cols, require_target=True, target_col=target_col) ws90_dupes = int(ws90.duplicated(subset=["ts", "station_id"]).sum()) if not ws90.empty else 0 baro_dupes = int(baro.duplicated(subset=["ts", "source"]).sum()) if not baro.empty else 0 @@ -114,7 +133,7 @@ def main() -> int: baro_max_gap_min = longest_zero_run(np.array(baro_counts)) * 5 if len(baro_counts) else 0 missingness = {} - for col in feature_cols + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]: + for col in feature_cols + ["pressure_hpa", "rain_mm", "rain_inc", target_mm_col]: if col in df.columns: missingness[col] = float(df[col].isna().mean()) @@ -125,9 +144,12 @@ def main() -> int: report = { "site": args.site, "feature_set": args.feature_set, + "horizon_hours": horizon_hours, + "target_column": target_col, + "target_mm_column": target_mm_col, "feature_columns": feature_cols, "forecast_model": args.forecast_model if needs_forecast else None, - "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", + "target_definition": f"{target_mm_col} >= {RAIN_EVENT_THRESHOLD_MM:.2f}", "requested_window": { "start": start or None, "end": end or None, @@ -167,8 +189,8 @@ def main() -> int: "max_rain_increment_5m_mm": max_rain_inc, }, "class_balance": { - "overall_positive_rate": float(model_df["rain_next_1h"].mean()) if not model_df.empty else None, - "weekly": build_weekly_balance(model_df) if not model_df.empty else [], + "overall_positive_rate": float(model_df[target_col].mean()) if not model_df.empty else None, + "weekly": build_weekly_balance(model_df, target_col=target_col) if not model_df.empty else [], }, } report = to_builtin(report) diff --git a/scripts/check_rain_pipeline_health.py b/scripts/check_rain_pipeline_health.py index 92186ff..3bb400e 100644 --- a/scripts/check_rain_pipeline_health.py +++ b/scripts/check_rain_pipeline_health.py @@ -9,6 +9,24 @@ from typing import Any import psycopg2 +DEFAULT_HORIZON_HOURS = 4 + + +def normalize_horizon_hours(horizon_hours: int) -> int: + out = int(horizon_hours) + if out <= 0: + raise ValueError("horizon_hours must be > 0") + return out + + +def prediction_table_for_horizon(horizon_hours: int) -> str: + horizon = normalize_horizon_hours(horizon_hours) + if horizon == 1: + return "predictions_rain_1h" + if horizon == 4: + return "predictions_rain_4h" + raise ValueError(f"unsupported prediction-table horizon: {horizon_hours}") + def parse_duration(value: str) -> timedelta: raw = value.strip().lower() @@ -27,14 +45,20 @@ def parse_args() -> argparse.Namespace: parser = argparse.ArgumentParser(description="Check freshness/health of rain-model data and predictions.") parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") parser.add_argument("--site", required=True, help="Site name.") - parser.add_argument("--model-name", default="rain_next_1h", help="Prediction model_name to check.") + parser.add_argument("--model-name", default="rain_next_4h", help="Prediction model_name to check.") + parser.add_argument( + "--horizon-hours", + type=int, + default=DEFAULT_HORIZON_HOURS, + help="Prediction horizon in hours used to select prediction storage table.", + ) parser.add_argument("--max-ws90-age", default="20m", help="Max allowed age for ws90 latest row.") parser.add_argument("--max-baro-age", default="30m", help="Max allowed age for barometer latest row.") parser.add_argument("--max-forecast-age", default="3h", help="Max allowed age for forecast latest row.") parser.add_argument("--max-prediction-age", default="30m", help="Max allowed age for latest prediction write.") parser.add_argument( "--max-pending-eval-age", - default="3h", + default="6h", help="Pending evaluations older than this count toward alert.", ) parser.add_argument( @@ -91,6 +115,8 @@ def main() -> int: max_forecast_age = parse_duration(args.max_forecast_age) max_prediction_age = parse_duration(args.max_prediction_age) max_pending_eval_age = parse_duration(args.max_pending_eval_age) + horizon_hours = normalize_horizon_hours(args.horizon_hours) + prediction_table = prediction_table_for_horizon(horizon_hours) with psycopg2.connect(args.db_url) as conn: ws90_latest = fetch_latest_ts( @@ -110,9 +136,9 @@ def main() -> int: ) prediction_latest = fetch_latest_ts( conn, - """ + f""" SELECT max(generated_at) - FROM predictions_rain_1h + FROM {prediction_table} WHERE site = %s AND model_name = %s """, @@ -120,9 +146,9 @@ def main() -> int: ) pending_eval_rows = fetch_count( conn, - """ + f""" SELECT count(*) - FROM predictions_rain_1h + FROM {prediction_table} WHERE site = %s AND model_name = %s AND evaluated_at IS NULL @@ -188,6 +214,8 @@ def main() -> int: "generated_at": now.isoformat(), "site": args.site, "model_name": args.model_name, + "horizon_hours": horizon_hours, + "prediction_table": prediction_table, "status": overall_status, "failing_checks": failing, "checks": checks, diff --git a/scripts/compare_rain_reports.py b/scripts/compare_rain_reports.py new file mode 100644 index 0000000..712486b --- /dev/null +++ b/scripts/compare_rain_reports.py @@ -0,0 +1,78 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +from pathlib import Path +from typing import Any + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Compare two rain-model training reports.") + parser.add_argument("--baseline", required=True, help="Baseline report path (for example 1h).") + parser.add_argument("--candidate", required=True, help="Candidate report path (for example 4h).") + return parser.parse_args() + + +def load_json(path: str) -> dict[str, Any]: + p = Path(path) + with p.open("r", encoding="utf-8") as f: + return json.load(f) + + +def to_float(v: Any) -> float | None: + if v is None: + return None + try: + return float(v) + except (TypeError, ValueError): + return None + + +def metric(report: dict[str, Any], split: str, key: str) -> float | None: + return to_float(report.get(split, {}).get(key)) + + +def delta_str(base: float | None, cand: float | None) -> str: + if base is None or cand is None: + return "n/a" + d = cand - base + return f"{d:+.4f}" + + +def main() -> int: + args = parse_args() + baseline = load_json(args.baseline) + candidate = load_json(args.candidate) + + pairs = [ + ("precision", metric(baseline, "test_metrics", "precision"), metric(candidate, "test_metrics", "precision")), + ("recall", metric(baseline, "test_metrics", "recall"), metric(candidate, "test_metrics", "recall")), + ("f1", metric(baseline, "test_metrics", "f1"), metric(candidate, "test_metrics", "f1")), + ("pr_auc", metric(baseline, "test_metrics", "pr_auc"), metric(candidate, "test_metrics", "pr_auc")), + ("roc_auc", metric(baseline, "test_metrics", "roc_auc"), metric(candidate, "test_metrics", "roc_auc")), + ("brier", metric(baseline, "test_metrics", "brier"), metric(candidate, "test_metrics", "brier")), + ] + + print("Rain report comparison:") + print( + f" baseline: version={baseline.get('model_version')} " + f"horizon={baseline.get('horizon_hours')}h " + f"target={baseline.get('target_definition')}" + ) + print( + f" candidate: version={candidate.get('model_version')} " + f"horizon={candidate.get('horizon_hours')}h " + f"target={candidate.get('target_definition')}" + ) + print(" metrics (candidate - baseline):") + for name, base, cand in pairs: + base_txt = "n/a" if base is None else f"{base:.4f}" + cand_txt = "n/a" if cand is None else f"{cand:.4f}" + print(f" {name}: baseline={base_txt} candidate={cand_txt} delta={delta_str(base, cand)}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/predict_rain_model.py b/scripts/predict_rain_model.py index 9d81cb2..571bb62 100644 --- a/scripts/predict_rain_model.py +++ b/scripts/predict_rain_model.py @@ -9,13 +9,18 @@ import psycopg2 from psycopg2.extras import Json from rain_model_common import ( + DEFAULT_HORIZON_HOURS, build_dataset, feature_columns_need_forecast, fetch_baro, fetch_forecast, fetch_ws90, model_frame, + normalize_horizon_hours, parse_time, + prediction_table_for_horizon, + rain_next_flag_col, + rain_next_mm_col, to_builtin, ) @@ -30,8 +35,13 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") parser.add_argument("--site", required=True, help="Site name (e.g. home).") parser.add_argument("--model-path", default="models/rain_model.pkl", help="Path to trained model artifact.") - parser.add_argument("--model-name", default="rain_next_1h", help="Logical prediction model name.") + parser.add_argument("--model-name", default="rain_next_4h", help="Logical prediction model name.") parser.add_argument("--model-version", help="Override artifact model_version.") + parser.add_argument( + "--horizon-hours", + type=int, + help="Prediction horizon in hours. Defaults to artifact horizon when present, else 4.", + ) parser.add_argument( "--at", help="Prediction timestamp (RFC3339 or YYYY-MM-DD). Default: current UTC time.", @@ -98,9 +108,21 @@ def main() -> int: threshold = float(artifact.get("threshold", 0.5)) model_version = args.model_version or artifact.get("model_version") or "unknown" forecast_model = str(artifact.get("forecast_model") or args.forecast_model) + artifact_horizon = artifact.get("horizon_hours") + if args.horizon_hours is not None: + horizon_hours = normalize_horizon_hours(args.horizon_hours) + elif artifact_horizon is not None: + horizon_hours = normalize_horizon_hours(int(artifact_horizon)) + else: + horizon_hours = DEFAULT_HORIZON_HOURS + target_col = str(artifact.get("target_col") or rain_next_flag_col(horizon_hours)) + target_mm_col = str(artifact.get("target_mm_col") or rain_next_mm_col(horizon_hours)) + prediction_table = prediction_table_for_horizon(horizon_hours) + actual_mm_col = f"{target_mm_col}_actual" + actual_flag_col = f"{target_col}_actual" fetch_start = (at - timedelta(hours=args.history_hours)).isoformat() - fetch_end = (at + timedelta(hours=1, minutes=5)).isoformat() + fetch_end = (at + timedelta(hours=horizon_hours, minutes=5)).isoformat() with psycopg2.connect(args.db_url) as conn: ws90 = fetch_ws90(conn, args.site, fetch_start, fetch_end) @@ -122,7 +144,7 @@ def main() -> int: return 0 raise RuntimeError(message) - full_df = build_dataset(ws90, baro, forecast=forecast) + full_df = build_dataset(ws90, baro, forecast=forecast, horizon_hours=horizon_hours) feature_df = model_frame(full_df, feature_cols=features, require_target=False) candidates = feature_df.loc[feature_df.index <= at] if candidates.empty: @@ -143,9 +165,9 @@ def main() -> int: actual_flag = None evaluated_at = None latest_available = full_df.index.max().to_pydatetime() - if pred_ts + timedelta(hours=1) <= latest_available: - next_mm = full_df.loc[pred_ts, "rain_next_1h_mm"] - next_flag = full_df.loc[pred_ts, "rain_next_1h"] + if pred_ts + timedelta(hours=horizon_hours) <= latest_available: + next_mm = full_df.loc[pred_ts, target_mm_col] + next_flag = full_df.loc[pred_ts, target_col] if next_mm == next_mm: # NaN-safe check actual_mm = float(next_mm) if next_flag == next_flag: @@ -156,6 +178,10 @@ def main() -> int: "artifact_path": args.model_path, "artifact_model_version": artifact.get("model_version"), "artifact_feature_set": feature_set, + "horizon_hours": horizon_hours, + "target_col": target_col, + "target_mm_col": target_mm_col, + "prediction_table": prediction_table, "forecast_model": forecast_model if needs_forecast else None, "needs_forecast_features": needs_forecast, "feature_values": {col: float(row.iloc[0][col]) for col in features}, @@ -170,6 +196,7 @@ def main() -> int: print(f" site: {args.site}") print(f" model_name: {args.model_name}") print(f" model_version: {model_version}") + print(f" horizon_hours: {horizon_hours}") if feature_set: print(f" feature_set: {feature_set}") print(f" pred_ts: {pred_ts.isoformat()}") @@ -182,10 +209,8 @@ def main() -> int: print("dry-run enabled; skipping DB upsert.") return 0 - with conn.cursor() as cur: - cur.execute( - """ - INSERT INTO predictions_rain_1h ( + query = f""" + INSERT INTO {prediction_table} ( ts, generated_at, site, @@ -194,8 +219,8 @@ def main() -> int: threshold, probability, predict_rain, - rain_next_1h_mm_actual, - rain_next_1h_actual, + {actual_mm_col}, + {actual_flag_col}, evaluated_at, metadata ) VALUES ( @@ -207,11 +232,14 @@ def main() -> int: threshold = EXCLUDED.threshold, probability = EXCLUDED.probability, predict_rain = EXCLUDED.predict_rain, - rain_next_1h_mm_actual = COALESCE(EXCLUDED.rain_next_1h_mm_actual, predictions_rain_1h.rain_next_1h_mm_actual), - rain_next_1h_actual = COALESCE(EXCLUDED.rain_next_1h_actual, predictions_rain_1h.rain_next_1h_actual), - evaluated_at = COALESCE(EXCLUDED.evaluated_at, predictions_rain_1h.evaluated_at), + {actual_mm_col} = COALESCE(EXCLUDED.{actual_mm_col}, {prediction_table}.{actual_mm_col}), + {actual_flag_col} = COALESCE(EXCLUDED.{actual_flag_col}, {prediction_table}.{actual_flag_col}), + evaluated_at = COALESCE(EXCLUDED.evaluated_at, {prediction_table}.evaluated_at), metadata = EXCLUDED.metadata - """, + """ + with conn.cursor() as cur: + cur.execute( + query, ( pred_ts, args.site, @@ -227,7 +255,7 @@ def main() -> int: ), ) conn.commit() - print("Prediction upserted into predictions_rain_1h.") + print(f"Prediction upserted into {prediction_table}.") return 0 diff --git a/scripts/rain_model_common.py b/scripts/rain_model_common.py index 86b1b12..997607a 100644 --- a/scripts/rain_model_common.py +++ b/scripts/rain_model_common.py @@ -86,7 +86,46 @@ FEATURE_COLUMNS = BASELINE_FEATURE_COLUMNS RAIN_EVENT_THRESHOLD_MM = 0.2 RAIN_SPIKE_THRESHOLD_MM_5M = 5.0 -RAIN_HORIZON_BUCKETS = 12 # 12 * 5m = 1h +BUCKET_MINUTES = 5 +DEFAULT_HORIZON_HOURS = 4 +SUPPORTED_PREDICTION_HORIZONS = (1, 4) + + +def normalize_horizon_hours(horizon_hours: int) -> int: + out = int(horizon_hours) + if out <= 0: + raise ValueError("horizon_hours must be > 0") + return out + + +def horizon_suffix(horizon_hours: int) -> str: + return f"{normalize_horizon_hours(horizon_hours)}h" + + +def horizon_buckets(horizon_hours: int) -> int: + hours = normalize_horizon_hours(horizon_hours) + return (hours * 60) // BUCKET_MINUTES + + +def rain_last_mm_col(horizon_hours: int) -> str: + return f"rain_last_{horizon_suffix(horizon_hours)}_mm" + + +def rain_next_mm_col(horizon_hours: int) -> str: + return f"rain_next_{horizon_suffix(horizon_hours)}_mm" + + +def rain_next_flag_col(horizon_hours: int) -> str: + return f"rain_next_{horizon_suffix(horizon_hours)}" + + +def prediction_table_for_horizon(horizon_hours: int) -> str: + horizon = normalize_horizon_hours(horizon_hours) + if horizon == 1: + return "predictions_rain_1h" + if horizon == 4: + return "predictions_rain_4h" + raise ValueError(f"unsupported prediction-table horizon: {horizon_hours}") def parse_time(value: str) -> str: @@ -232,6 +271,7 @@ def build_dataset( baro: pd.DataFrame, forecast: pd.DataFrame | None = None, rain_event_threshold_mm: float = RAIN_EVENT_THRESHOLD_MM, + horizon_hours: int = 1, ) -> pd.DataFrame: if ws90.empty: raise RuntimeError("no ws90 observations found") @@ -261,12 +301,20 @@ def build_dataset( df["rain_inc"] = df["rain_inc_raw"].clip(lower=0) df["rain_spike_5m"] = df["rain_inc"] >= RAIN_SPIKE_THRESHOLD_MM_5M - window = RAIN_HORIZON_BUCKETS - df["rain_last_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum() - df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1)) - df["rain_next_1h"] = df["rain_next_1h_mm"] >= rain_event_threshold_mm + windows: dict[int, int] = { + 1: horizon_buckets(1), + normalize_horizon_hours(horizon_hours): horizon_buckets(horizon_hours), + } + for hours, window in windows.items(): + rain_last_col = rain_last_mm_col(hours) + rain_next_mm = rain_next_mm_col(hours) + rain_next_flag = rain_next_flag_col(hours) + df[rain_last_col] = df["rain_inc"].rolling(window=window, min_periods=1).sum() + df[rain_next_mm] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1)) + df[rain_next_flag] = df[rain_next_mm] >= rain_event_threshold_mm - df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window) + window_1h = horizon_buckets(1) + df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window_1h) # Wind direction cyclical encoding. radians = np.deg2rad(df["wind_dir_deg"] % 360.0) @@ -279,14 +327,14 @@ def build_dataset( df["wind_avg_lag_5m"] = df["wind_avg_m_s"].shift(1) df["pressure_lag_5m"] = df["pressure_hpa"].shift(1) - df["temp_roll_1h_mean"] = df["temperature_c"].rolling(window=window, min_periods=3).mean() - df["temp_roll_1h_std"] = df["temperature_c"].rolling(window=window, min_periods=3).std() - df["humidity_roll_1h_mean"] = df["humidity"].rolling(window=window, min_periods=3).mean() - df["humidity_roll_1h_std"] = df["humidity"].rolling(window=window, min_periods=3).std() - df["wind_avg_roll_1h_mean"] = df["wind_avg_m_s"].rolling(window=window, min_periods=3).mean() - df["wind_gust_roll_1h_max"] = df["wind_max_m_s"].rolling(window=window, min_periods=3).max() - df["pressure_roll_1h_mean"] = df["pressure_hpa"].rolling(window=window, min_periods=3).mean() - df["pressure_roll_1h_std"] = df["pressure_hpa"].rolling(window=window, min_periods=3).std() + df["temp_roll_1h_mean"] = df["temperature_c"].rolling(window=window_1h, min_periods=3).mean() + df["temp_roll_1h_std"] = df["temperature_c"].rolling(window=window_1h, min_periods=3).std() + df["humidity_roll_1h_mean"] = df["humidity"].rolling(window=window_1h, min_periods=3).mean() + df["humidity_roll_1h_std"] = df["humidity"].rolling(window=window_1h, min_periods=3).std() + df["wind_avg_roll_1h_mean"] = df["wind_avg_m_s"].rolling(window=window_1h, min_periods=3).mean() + df["wind_gust_roll_1h_max"] = df["wind_max_m_s"].rolling(window=window_1h, min_periods=3).max() + df["pressure_roll_1h_mean"] = df["pressure_hpa"].rolling(window=window_1h, min_periods=3).mean() + df["pressure_roll_1h_std"] = df["pressure_hpa"].rolling(window=window_1h, min_periods=3).std() # Calendar/seasonality features (UTC based). hour_of_day = df.index.hour + (df.index.minute / 60.0) @@ -304,11 +352,16 @@ def build_dataset( return df -def model_frame(df: pd.DataFrame, feature_cols: list[str] | None = None, require_target: bool = True) -> pd.DataFrame: +def model_frame( + df: pd.DataFrame, + feature_cols: list[str] | None = None, + require_target: bool = True, + target_col: str | None = None, +) -> pd.DataFrame: features = feature_cols or FEATURE_COLUMNS required = list(features) if require_target: - required.append("rain_next_1h") + required.append(target_col or rain_next_flag_col(1)) out = df.dropna(subset=required).copy() return out.sort_index() diff --git a/scripts/run_p0_rain_workflow.sh b/scripts/run_p0_rain_workflow.sh index 478fd6a..3f06201 100644 --- a/scripts/run_p0_rain_workflow.sh +++ b/scripts/run_p0_rain_workflow.sh @@ -4,7 +4,9 @@ set -euo pipefail SITE="${SITE:-home}" START="${START:-2026-02-01T00:00:00Z}" END="${END:-2026-03-03T23:55:00Z}" -MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v1}" +HORIZON_HOURS="${HORIZON_HOURS:-4}" +MODEL_NAME="${MODEL_NAME:-rain_next_${HORIZON_HOURS}h}" +MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v2-${HORIZON_HOURS}h}" MODEL_PATH="${MODEL_PATH:-models/rain_model.pkl}" REPORT_PATH="${REPORT_PATH:-models/rain_model_report.json}" AUDIT_PATH="${AUDIT_PATH:-models/rain_data_audit.json}" @@ -24,6 +26,7 @@ python scripts/audit_rain_data.py \ --site "$SITE" \ --start "$START" \ --end "$END" \ + --horizon-hours "$HORIZON_HOURS" \ --feature-set "$FEATURE_SET" \ --forecast-model "$FORECAST_MODEL" \ --out "$AUDIT_PATH" @@ -33,6 +36,7 @@ python scripts/train_rain_model.py \ --site "$SITE" \ --start "$START" \ --end "$END" \ + --horizon-hours "$HORIZON_HOURS" \ --train-ratio 0.7 \ --val-ratio 0.15 \ --min-precision 0.70 \ @@ -50,7 +54,8 @@ echo "Writing current prediction..." python scripts/predict_rain_model.py \ --site "$SITE" \ --model-path "$MODEL_PATH" \ - --model-name "rain_next_1h" \ + --model-name "$MODEL_NAME" \ + --horizon-hours "$HORIZON_HOURS" \ --forecast-model "$FORECAST_MODEL" echo "P0 rain workflow complete." diff --git a/scripts/run_rain_ml_worker.py b/scripts/run_rain_ml_worker.py index 7ce231b..8865054 100644 --- a/scripts/run_rain_ml_worker.py +++ b/scripts/run_rain_ml_worker.py @@ -40,6 +40,7 @@ def read_env_bool(name: str, default: bool) -> bool: class WorkerConfig: database_url: str site: str + horizon_hours: int model_name: str model_version_base: str model_family: str @@ -166,6 +167,8 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: start, "--end", end, + "--horizon-hours", + str(cfg.horizon_hours), "--feature-set", cfg.feature_set, "--forecast-model", @@ -185,6 +188,8 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: start, "--end", end, + "--horizon-hours", + str(cfg.horizon_hours), "--train-ratio", str(cfg.train_ratio), "--val-ratio", @@ -269,6 +274,8 @@ def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None: str(cfg.model_path), "--model-name", cfg.model_name, + "--horizon-hours", + str(cfg.horizon_hours), "--forecast-model", cfg.forecast_model, *(["--allow-empty"] if cfg.allow_empty_data else ["--strict-source-data"]), @@ -289,8 +296,9 @@ def load_config() -> WorkerConfig: return WorkerConfig( database_url=database_url, site=read_env("RAIN_SITE", "home"), - model_name=read_env("RAIN_MODEL_NAME", "rain_next_1h"), - model_version_base=read_env("RAIN_MODEL_VERSION_BASE", "rain-auto-v1-extended"), + horizon_hours=read_env_int("RAIN_HORIZON_HOURS", 4), + model_name=read_env("RAIN_MODEL_NAME", "rain_next_4h"), + model_version_base=read_env("RAIN_MODEL_VERSION_BASE", "rain-auto-v2-extended-4h"), model_family=read_env("RAIN_MODEL_FAMILY", "auto"), feature_set=read_env("RAIN_FEATURE_SET", "extended"), forecast_model=read_env("RAIN_FORECAST_MODEL", "ecmwf"), @@ -338,6 +346,7 @@ def main() -> int: print( "[rain-ml] worker start " f"site={cfg.site} " + f"horizon_hours={cfg.horizon_hours} " f"model_name={cfg.model_name} " f"model_family={cfg.model_family} " f"feature_set={cfg.feature_set} " diff --git a/scripts/train_rain_model.py b/scripts/train_rain_model.py index 96e2d53..4662f1b 100644 --- a/scripts/train_rain_model.py +++ b/scripts/train_rain_model.py @@ -20,6 +20,7 @@ from sklearn.preprocessing import StandardScaler from rain_model_common import ( AVAILABLE_FEATURE_SETS, + DEFAULT_HORIZON_HOURS, RAIN_EVENT_THRESHOLD_MM, build_dataset, evaluate_probs, @@ -28,8 +29,13 @@ from rain_model_common import ( fetch_ws90, feature_columns_for_set, feature_columns_need_forecast, + horizon_suffix, model_frame, + normalize_horizon_hours, parse_time, + rain_last_mm_col, + rain_next_flag_col, + rain_next_mm_col, safe_pr_auc, safe_roc_auc, select_threshold, @@ -49,11 +55,17 @@ THRESHOLD_POLICIES = ("validation", "walk_forward") def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Train a rain prediction model (next 1h >= 0.2mm).") + parser = argparse.ArgumentParser(description="Train a rain prediction model (next Nh >= threshold).") parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") parser.add_argument("--site", required=True, help="Site name (e.g. home).") parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).") parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).") + parser.add_argument( + "--horizon-hours", + type=int, + default=DEFAULT_HORIZON_HOURS, + help="Prediction horizon in hours (for example 1 or 4).", + ) parser.add_argument("--train-ratio", type=float, default=0.7, help="Time-ordered train split ratio.") parser.add_argument("--val-ratio", type=float, default=0.15, help="Time-ordered validation split ratio.") parser.add_argument( @@ -464,14 +476,18 @@ def evaluate_calibration_methods( return selected, results -def evaluate_naive_baselines(test_df, y_test: np.ndarray) -> dict[str, Any]: +def evaluate_naive_baselines( + test_df, + y_test: np.ndarray, + persistence_context_col: str, +) -> dict[str, Any]: out: dict[str, Any] = {} - if "rain_last_1h_mm" in test_df.columns: - rain_last = test_df["rain_last_1h_mm"].to_numpy(dtype=float) + if persistence_context_col in test_df.columns: + rain_last = test_df[persistence_context_col].to_numpy(dtype=float) persistence_prob = (rain_last >= RAIN_EVENT_THRESHOLD_MM).astype(float) - out["persistence_last_1h"] = { - "rule": f"predict rain when rain_last_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", + out[f"persistence_{persistence_context_col}"] = { + "rule": f"predict rain when {persistence_context_col} >= {RAIN_EVENT_THRESHOLD_MM:.2f}", "metrics": evaluate_probs(y_true=y_test, y_prob=persistence_prob, threshold=0.5), } @@ -512,6 +528,8 @@ def evaluate_sliced_performance( y_true: np.ndarray, y_prob: np.ndarray, threshold: float, + context_col: str, + context_label: str, min_rows_per_slice: int = 30, ) -> dict[str, Any]: frame = pd.DataFrame( @@ -530,7 +548,11 @@ def evaluate_sliced_performance( weekly_positive_rate = frame.groupby(week_label)["y_true"].transform("mean") rainy_week = weekly_positive_rate >= overall_rate - rain_context = test_df["rain_last_1h_mm"].to_numpy(dtype=float) if "rain_last_1h_mm" in test_df.columns else np.zeros(len(test_df)) + rain_context = ( + test_df[context_col].to_numpy(dtype=float) + if context_col in test_df.columns + else np.zeros(len(test_df)) + ) wet_context = rain_context >= RAIN_EVENT_THRESHOLD_MM wind_values = test_df["wind_max_m_s"].to_numpy(dtype=float) if "wind_max_m_s" in test_df.columns else np.full(len(test_df), np.nan) @@ -545,8 +567,8 @@ def evaluate_sliced_performance( ("nighttime_utc", np.asarray(~is_day, dtype=bool), "18:00-05:59 UTC"), ("rainy_weeks", np.asarray(rainy_week, dtype=bool), "weeks with positive-rate >= test positive-rate"), ("non_rainy_weeks", np.asarray(~rainy_week, dtype=bool), "weeks with positive-rate < test positive-rate"), - ("wet_context_last_1h", np.asarray(wet_context, dtype=bool), f"rain_last_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}"), - ("dry_context_last_1h", np.asarray(~wet_context, dtype=bool), f"rain_last_1h_mm < {RAIN_EVENT_THRESHOLD_MM:.2f}"), + ("wet_context_recent_rain", np.asarray(wet_context, dtype=bool), f"{context_label} >= {RAIN_EVENT_THRESHOLD_MM:.2f}"), + ("dry_context_recent_rain", np.asarray(~wet_context, dtype=bool), f"{context_label} < {RAIN_EVENT_THRESHOLD_MM:.2f}"), ("windy_q75", np.asarray(windy, dtype=bool), "wind_max_m_s >= test 75th percentile"), ("calm_below_q75", np.asarray(~windy, dtype=bool), "wind_max_m_s < test 75th percentile"), ] @@ -585,6 +607,7 @@ def evaluate_sliced_performance( def tune_threshold_walk_forward( model_df, feature_cols: list[str], + target_col: str, model_family: str, model_params: dict[str, Any], calibration_method: str, @@ -627,8 +650,8 @@ def tune_threshold_walk_forward( if len(fold_train) < 160 or len(fold_test) < 25: continue - y_fold_train = fold_train["rain_next_1h"].astype(int).to_numpy() - y_fold_test = fold_test["rain_next_1h"].astype(int).to_numpy() + y_fold_train = fold_train[target_col].astype(int).to_numpy() + y_fold_test = fold_test[target_col].astype(int).to_numpy() if len(np.unique(y_fold_train)) < 2: continue @@ -706,6 +729,7 @@ def tune_threshold_walk_forward( def walk_forward_backtest( model_df, feature_cols: list[str], + target_col: str, model_family: str, model_params: dict[str, Any], calibration_method: str, @@ -745,8 +769,8 @@ def walk_forward_backtest( if len(fold_train) < 160 or len(fold_test) < 25: continue - y_fold_train = fold_train["rain_next_1h"].astype(int).to_numpy() - y_fold_test = fold_test["rain_next_1h"].astype(int).to_numpy() + y_fold_train = fold_train[target_col].astype(int).to_numpy() + y_fold_test = fold_test[target_col].astype(int).to_numpy() if len(np.unique(y_fold_train)) < 2: continue @@ -755,8 +779,8 @@ def walk_forward_backtest( continue inner_train = fold_train.iloc[:-inner_val_rows] inner_val = fold_train.iloc[-inner_val_rows:] - y_inner_train = inner_train["rain_next_1h"].astype(int).to_numpy() - y_inner_val = inner_val["rain_next_1h"].astype(int).to_numpy() + y_inner_train = inner_train[target_col].astype(int).to_numpy() + y_inner_val = inner_val[target_col].astype(int).to_numpy() if len(np.unique(y_inner_train)) < 2: continue @@ -987,6 +1011,11 @@ def main() -> int: start = parse_time(args.start) if args.start else "" end = parse_time(args.end) if args.end else "" + horizon_hours = normalize_horizon_hours(args.horizon_hours) + horizon_label = horizon_suffix(horizon_hours) + target_col = rain_next_flag_col(horizon_hours) + target_mm_col = rain_next_mm_col(horizon_hours) + persistence_context_col = rain_last_mm_col(horizon_hours) feature_cols = feature_columns_for_set(args.feature_set) needs_forecast = feature_columns_need_forecast(feature_cols) calibration_methods = parse_calibration_methods(args.calibration_methods) @@ -1011,8 +1040,21 @@ def main() -> int: return 0 raise RuntimeError(message) - full_df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) - model_df = model_frame(full_df, feature_cols, require_target=True) + full_df = build_dataset( + ws90, + baro, + forecast=forecast, + rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM, + horizon_hours=horizon_hours, + ) + if persistence_context_col not in full_df.columns: + persistence_context_col = rain_last_mm_col(1) + model_df = model_frame( + full_df, + feature_cols, + require_target=True, + target_col=target_col, + ) if len(model_df) < args.min_rows: message = f"not enough model-ready rows after filtering (need >= {args.min_rows})" if args.allow_empty: @@ -1027,11 +1069,11 @@ def main() -> int: ) x_train = train_df[feature_cols] - y_train = train_df["rain_next_1h"].astype(int).to_numpy() + y_train = train_df[target_col].astype(int).to_numpy() x_val = val_df[feature_cols] - y_val = val_df["rain_next_1h"].astype(int).to_numpy() + y_val = val_df[target_col].astype(int).to_numpy() x_test = test_df[feature_cols] - y_test = test_df["rain_next_1h"].astype(int).to_numpy() + y_test = test_df[target_col].astype(int).to_numpy() if len(np.unique(y_train)) < 2: raise RuntimeError("training split does not contain both classes; cannot train classifier") @@ -1073,6 +1115,7 @@ def main() -> int: threshold_tuning_walk_forward = tune_threshold_walk_forward( model_df=model_df.iloc[: len(train_df) + len(val_df)], feature_cols=feature_cols, + target_col=target_col, model_family=selected_model_family, model_params=selected_model_params, calibration_method=selected_calibration_method, @@ -1093,7 +1136,7 @@ def main() -> int: train_val_df = model_df.iloc[: len(train_df) + len(val_df)] x_train_val = train_val_df[feature_cols] - y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy() + y_train_val = train_val_df[target_col].astype(int).to_numpy() final_model, final_fit_info = fit_with_optional_calibration( model_family=selected_model_family, @@ -1109,16 +1152,23 @@ def main() -> int: test_calibration = { "ece_10": expected_calibration_error(y_true=y_test, y_prob=y_test_prob, bins=10), } - naive_baselines_test = evaluate_naive_baselines(test_df=test_df, y_test=y_test) + naive_baselines_test = evaluate_naive_baselines( + test_df=test_df, + y_test=y_test, + persistence_context_col=persistence_context_col, + ) sliced_performance = evaluate_sliced_performance( test_df=test_df, y_true=y_test, y_prob=y_test_prob, threshold=chosen_threshold, + context_col=persistence_context_col, + context_label=persistence_context_col, ) walk_forward = walk_forward_backtest( model_df=model_df, feature_cols=feature_cols, + target_col=target_col, model_family=selected_model_family, model_params=selected_model_params, calibration_method=selected_calibration_method, @@ -1135,8 +1185,12 @@ def main() -> int: "model_family_requested": args.model_family, "model_family": selected_model_family, "model_params": selected_model_params, + "horizon_hours": horizon_hours, + "horizon_label": horizon_label, "feature_set": args.feature_set, - "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", + "target_column": target_col, + "target_mm_column": target_mm_col, + "target_definition": f"{target_mm_col} >= {RAIN_EVENT_THRESHOLD_MM:.2f}", "feature_columns": feature_cols, "forecast_model": args.forecast_model if needs_forecast else None, "calibration_method_requested": calibration_methods, @@ -1207,6 +1261,7 @@ def main() -> int: print(f" site: {args.site}") print(f" model_version: {args.model_version}") print(f" model_family: {selected_model_family} (requested={args.model_family})") + print(f" horizon: {horizon_hours}h") print(f" model_params: {selected_model_params}") print(f" calibration_method: {report['calibration_method']}") print( @@ -1298,7 +1353,7 @@ def main() -> int: dataset_dir = os.path.dirname(dataset_out) if dataset_dir: os.makedirs(dataset_dir, exist_ok=True) - snapshot_cols = list(dict.fromkeys(feature_cols + ["rain_next_1h", "rain_next_1h_mm"])) + snapshot_cols = list(dict.fromkeys(feature_cols + [target_col, target_mm_col])) model_df[snapshot_cols].to_csv(dataset_out, index=True, index_label="ts") print(f"Saved dataset snapshot to {dataset_out}") @@ -1320,6 +1375,10 @@ def main() -> int: "forecast_model": args.forecast_model if needs_forecast else None, "threshold": float(chosen_threshold), "target_mm": float(RAIN_EVENT_THRESHOLD_MM), + "horizon_hours": horizon_hours, + "target_col": target_col, + "target_mm_col": target_mm_col, + "persistence_context_col": persistence_context_col, "model_version": args.model_version, "trained_at": datetime.now(timezone.utc).isoformat(), "split": report["split"], diff --git a/todo.md b/todo.md index 894bf38..43111e6 100644 --- a/todo.md +++ b/todo.md @@ -55,3 +55,25 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat - [x] [P0] Train baseline model on full available history and capture metrics. (completed on runtime machine) - [x] [P1] Add one expanded feature set and rerun evaluation. (completed on runtime machine 2026-03-12 with `feature_set=extended`, `model_version=rain-auto-v1-extended-202603120932`) - [x] [P0] Decide v1 threshold and define deployment interface. + +## 9) Extension Plan: 4-Hour Precipitation Window (Not Started) +- [x] [P0] Lock v2 target definition for horizon extension: `rain_next_4h_mm >= ` and explicitly decide whether the threshold remains `0.2mm` or is increased for 4-hour labeling. (implemented with `0.2mm` carry-forward) +- [x] [P0] Decide rollout strategy: additive dual-horizon support (`1h` + `4h`) vs direct replacement; prefer dual-horizon for safe cutover. (implemented as additive dual-horizon) +- [x] [P0] Parameterize label horizon in shared ML code (`scripts/rain_model_common.py`) so target columns are generated for 4-hour windows (48 x 5-minute buckets) instead of hard-coded 1-hour columns. +- [x] [P1] Revisit persistence/context features currently tied to `rain_last_1h_mm`; decide whether to keep 1-hour context, add 4-hour context, or both for the 4-hour target. (implemented horizon-aware context column selection) +- [x] [P0] Update training pipeline (`scripts/train_rain_model.py`) to train against the 4-hour target column, including reports, model-card content, dataset snapshot columns, and artifact metadata. +- [x] [P0] Update audit pipeline (`scripts/audit_rain_data.py`) to report class balance and target definition for 4-hour labels. +- [x] [P0] Update inference pipeline (`scripts/predict_rain_model.py`) to use the 4-hour target, including realized-outcome availability checks (`pred_ts + 4h`) and metadata/reporting fields. +- [x] [P0] Finalize DB storage design for 4-hour predictions (new `predictions_rain_4h` table vs generic horizon column strategy) before migrations. (implemented dedicated `predictions_rain_4h` table) +- [x] [P0] Create schema migration (recommended: new hypertable `predictions_rain_4h` with `rain_next_4h_mm_actual` and `rain_next_4h_actual` fields) and matching indexes. +- [x] [P0] Update prediction upsert SQL to write to the 4-hour prediction table/columns. +- [x] [P0] Update monitoring views in `db/init/002_rain_monitoring_views.sql` so drift/calibration/pipeline-health views include the 4-hour prediction path. +- [x] [P0] Update Go DB query layer (`internal/db/series.go`) to read 4-hour prediction rows/fields. +- [x] [P1] Update dashboard API defaults (`cmd/ingestd/web.go`) from `rain_next_1h` to the selected 4-hour model name (or make model name configurable). +- [x] [P1] Update web UI labels/semantics (`cmd/ingestd/web/index.html`, `cmd/ingestd/web/app.js`) from “Rain 1h %” to “Rain 4h %” and verify chart legends/tooltips match the new horizon. +- [x] [P1] Update worker/runtime defaults (`docker-compose.yml`, `scripts/run_rain_ml_worker.py`, `scripts/run_p0_rain_workflow.sh`) to use `rain_next_4h` naming/versioning. +- [x] [P0] Update health-check defaults (`scripts/check_rain_pipeline_health.py`) for 4-hour evaluation latency (e.g., pending-eval age threshold > 4h). +- [x] [P1] Update docs and runbooks (`README.md`, `docs/rain_prediction.md`, `docs/rain_model_runbook.md`) so commands, table names, and target definitions match the 4-hour system. +- [ ] [P0] Run full retraining/evaluation for the 4-hour target and compare against current 1-hour model metrics before production cutover. +- [ ] [P0] Execute staged rollout: deploy schema + views, deploy model + inference, verify dashboard/health checks, then switch default model name. +- [x] [P1] Keep rollback path documented: retain `rain_next_1h` artifacts/table access until 4-hour monitoring is stable. (documented in `docs/rain_model_runbook.md` staged rollout/rollback section)