diff --git a/README.md b/README.md
index ff30bd1..95b3ad4 100644
--- a/README.md
+++ b/README.md
@@ -76,7 +76,8 @@ TimescaleDB schema is initialized from `db/init/001_schema.sql` and includes:
- `observations_ws90` (hypertable): raw WS90 observations with payload metadata, plus the full JSON payload (`payload_json`).
- `observations_baro` (hypertable): barometric pressure observations from other MQTT topics.
- `forecast_openmeteo_hourly` (hypertable): hourly forecast points keyed by `(site, model, retrieved_at, ts)`.
-- `predictions_rain_1h` (hypertable): model probability + decision + realized outcome fields.
+- `predictions_rain_1h` (hypertable): legacy 1-hour model probability + decision + realized outcome fields.
+- `predictions_rain_4h` (hypertable): 4-hour model probability + decision + realized outcome fields.
- Continuous aggregates:
- `cagg_ws90_1m`: 1‑minute rollups (avg/min/max for temp, humidity, wind, uvi, light, rain).
- `cagg_ws90_5m`: 5‑minute rollups (same metrics as `cagg_ws90_1m`).
@@ -86,14 +87,18 @@ Retention/compression:
- `observations_baro` has a 90‑day retention policy and compression after 7 days.
### Existing databases
-If you’re on an existing database, you’ll need to apply the new table definition once (the init SQL only runs on a fresh DB). Example:
+If you’re on an existing database, apply the 4-hour prediction table migration:
+
+```sh
+docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql
+```
+
+If you also need to backfill older schema objects, you can still re-run:
```sh
docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-entrypoint-initdb.d/001_schema.sql
```
-Or copy just the `observations_baro` section into a manual `psql -c`.
-
For rain-model monitoring views, also apply `db/init/002_rain_monitoring_views.sql` on existing DBs:
```sh
diff --git a/cmd/ingestd/web.go b/cmd/ingestd/web.go
index f54841f..ebb616e 100644
--- a/cmd/ingestd/web.go
+++ b/cmd/ingestd/web.go
@@ -179,16 +179,29 @@ func (s *webServer) handleDashboard(w http.ResponseWriter, r *http.Request) {
return
}
- const rainModelName = "rain_next_1h"
+ const rainModelName = "rain_next_4h"
+ const rainModelHorizonHours = 4
- latestRainPrediction, err := s.db.LatestRainPrediction(r.Context(), s.site.Name, rainModelName)
+ latestRainPrediction, err := s.db.LatestRainPrediction(
+ r.Context(),
+ s.site.Name,
+ rainModelName,
+ rainModelHorizonHours,
+ )
if err != nil {
http.Error(w, "failed to query latest rain prediction", http.StatusInternalServerError)
log.Printf("web dashboard latest rain prediction error: %v", err)
return
}
- rainPredictionRange, err := s.db.RainPredictionSeriesRange(r.Context(), s.site.Name, rainModelName, start, end)
+ rainPredictionRange, err := s.db.RainPredictionSeriesRange(
+ r.Context(),
+ s.site.Name,
+ rainModelName,
+ rainModelHorizonHours,
+ start,
+ end,
+ )
if err != nil {
http.Error(w, "failed to query rain predictions", http.StatusInternalServerError)
log.Printf("web dashboard rain prediction range error: %v", err)
diff --git a/cmd/ingestd/web/app.js b/cmd/ingestd/web/app.js
index 9988a73..577547a 100644
--- a/cmd/ingestd/web/app.js
+++ b/cmd/ingestd/web/app.js
@@ -979,7 +979,7 @@ function renderDashboard(data) {
data: {
datasets: [
{
- label: rainPredictions.length ? "model rain probability (%)" : "heuristic rain probability (%)",
+ label: rainPredictions.length ? "model rain probability next 4h (%)" : "heuristic rain probability (%)",
data: rainPredictions.length ? buildRainProbabilitySeriesFromPredictions(rainPredictions) : buildRainProbabilitySeries(obsFiltered),
borderColor: colors.rain,
backgroundColor: "rgba(78, 168, 222, 0.18)",
diff --git a/cmd/ingestd/web/index.html b/cmd/ingestd/web/index.html
index 7bbf807..34dd1d2 100644
--- a/cmd/ingestd/web/index.html
+++ b/cmd/ingestd/web/index.html
@@ -70,7 +70,7 @@
--
-
Rain 1h %
+
Rain 4h %
--
diff --git a/db/init/001_schema.sql b/db/init/001_schema.sql
index b8d85fd..1117141 100644
--- a/db/init/001_schema.sql
+++ b/db/init/001_schema.sql
@@ -112,6 +112,34 @@ CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_site_ts
CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_pending_eval
ON predictions_rain_1h(site, evaluated_at, ts DESC);
+-- Rain model predictions (next 4h)
+CREATE TABLE IF NOT EXISTS predictions_rain_4h (
+ ts TIMESTAMPTZ NOT NULL,
+ generated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
+ site TEXT NOT NULL,
+ model_name TEXT NOT NULL,
+ model_version TEXT NOT NULL,
+ threshold DOUBLE PRECISION NOT NULL,
+ probability DOUBLE PRECISION NOT NULL,
+ predict_rain BOOLEAN NOT NULL,
+
+ rain_next_4h_mm_actual DOUBLE PRECISION,
+ rain_next_4h_actual BOOLEAN,
+ evaluated_at TIMESTAMPTZ,
+
+ metadata JSONB,
+
+ PRIMARY KEY (site, model_name, model_version, ts)
+);
+
+SELECT create_hypertable('predictions_rain_4h', 'ts', if_not_exists => TRUE);
+
+CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_site_ts
+ ON predictions_rain_4h(site, ts DESC);
+
+CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_pending_eval
+ ON predictions_rain_4h(site, evaluated_at, ts DESC);
+
-- Raw retention: 90 days
DO $$
BEGIN
diff --git a/db/init/002_rain_monitoring_views.sql b/db/init/002_rain_monitoring_views.sql
index 5945af8..1a06467 100644
--- a/db/init/002_rain_monitoring_views.sql
+++ b/db/init/002_rain_monitoring_views.sql
@@ -100,44 +100,104 @@ FROM baseline;
CREATE OR REPLACE VIEW rain_prediction_drift_daily AS
+WITH all_predictions AS (
+ SELECT
+ 1::INT AS horizon_hours,
+ ts,
+ site,
+ model_name,
+ model_version,
+ threshold,
+ probability,
+ predict_rain
+ FROM predictions_rain_1h
+ UNION ALL
+ SELECT
+ 4::INT AS horizon_hours,
+ ts,
+ site,
+ model_name,
+ model_version,
+ threshold,
+ probability,
+ predict_rain
+ FROM predictions_rain_4h
+)
SELECT
time_bucket(INTERVAL '1 day', ts) AS day,
site,
model_name,
model_version,
+ horizon_hours,
count(*) AS prediction_rows,
avg(probability) AS probability_mean,
stddev_samp(probability) AS probability_stddev,
avg(CASE WHEN predict_rain THEN 1.0 ELSE 0.0 END) AS predicted_positive_rate,
avg(threshold) AS threshold_mean
-FROM predictions_rain_1h
-GROUP BY 1,2,3,4;
+FROM all_predictions
+GROUP BY 1,2,3,4,5;
CREATE OR REPLACE VIEW rain_calibration_drift_daily AS
+WITH all_predictions AS (
+ SELECT
+ 1::INT AS horizon_hours,
+ ts,
+ site,
+ model_name,
+ model_version,
+ probability,
+ predict_rain,
+ rain_next_1h_actual AS rain_actual
+ FROM predictions_rain_1h
+ UNION ALL
+ SELECT
+ 4::INT AS horizon_hours,
+ ts,
+ site,
+ model_name,
+ model_version,
+ probability,
+ predict_rain,
+ rain_next_4h_actual AS rain_actual
+ FROM predictions_rain_4h
+)
SELECT
time_bucket(INTERVAL '1 day', ts) AS day,
site,
model_name,
model_version,
- count(*) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS evaluated_rows,
- avg(probability) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS mean_probability,
- avg(CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS observed_positive_rate,
- avg(power(probability - CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END, 2.0))
- FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS brier_score,
+ horizon_hours,
+ count(*) FILTER (WHERE rain_actual IS NOT NULL) AS evaluated_rows,
+ avg(probability) FILTER (WHERE rain_actual IS NOT NULL) AS mean_probability,
+ avg(CASE WHEN rain_actual THEN 1.0 ELSE 0.0 END) FILTER (WHERE rain_actual IS NOT NULL) AS observed_positive_rate,
+ avg(power(probability - CASE WHEN rain_actual THEN 1.0 ELSE 0.0 END, 2.0))
+ FILTER (WHERE rain_actual IS NOT NULL) AS brier_score,
avg(
CASE
- WHEN rain_next_1h_actual IS NULL THEN NULL
- WHEN predict_rain = rain_next_1h_actual THEN 1.0
+ WHEN rain_actual IS NULL THEN NULL
+ WHEN predict_rain = rain_actual THEN 1.0
ELSE 0.0
END
) AS decision_accuracy
-FROM predictions_rain_1h
-GROUP BY 1,2,3,4;
+FROM all_predictions
+GROUP BY 1,2,3,4,5;
CREATE OR REPLACE VIEW rain_pipeline_health AS
-WITH sites AS (
+WITH prediction_latest AS (
+ SELECT
+ site,
+ max(generated_at) AS prediction_generated_latest_ts,
+ max(evaluated_at) AS prediction_evaluated_latest_ts
+ FROM (
+ SELECT site, generated_at, evaluated_at FROM predictions_rain_1h
+ UNION ALL
+ SELECT site, generated_at, evaluated_at FROM predictions_rain_4h
+ ) p
+ GROUP BY site
+),
+sites AS (
SELECT DISTINCT site FROM observations_ws90
UNION
SELECT DISTINCT site FROM observations_baro
@@ -145,12 +205,16 @@ WITH sites AS (
SELECT DISTINCT site FROM forecast_openmeteo_hourly
UNION
SELECT DISTINCT site FROM predictions_rain_1h
+ UNION
+ SELECT DISTINCT site FROM predictions_rain_4h
)
SELECT
s.site,
(SELECT max(ts) FROM observations_ws90 w WHERE w.site = s.site) AS ws90_latest_ts,
(SELECT max(ts) FROM observations_baro b WHERE b.site = s.site) AS baro_latest_ts,
(SELECT max(ts) FROM forecast_openmeteo_hourly f WHERE f.site = s.site) AS forecast_latest_ts,
- (SELECT max(generated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_generated_latest_ts,
- (SELECT max(evaluated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_evaluated_latest_ts
-FROM sites s;
+ p.prediction_generated_latest_ts,
+ p.prediction_evaluated_latest_ts
+FROM sites s
+LEFT JOIN prediction_latest p
+ ON p.site = s.site;
diff --git a/db/init/003_rain_predictions_4h.sql b/db/init/003_rain_predictions_4h.sql
new file mode 100644
index 0000000..8de62a0
--- /dev/null
+++ b/db/init/003_rain_predictions_4h.sql
@@ -0,0 +1,29 @@
+-- Add 4-hour rain prediction storage table.
+-- Safe to re-run.
+
+CREATE TABLE IF NOT EXISTS predictions_rain_4h (
+ ts TIMESTAMPTZ NOT NULL,
+ generated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
+ site TEXT NOT NULL,
+ model_name TEXT NOT NULL,
+ model_version TEXT NOT NULL,
+ threshold DOUBLE PRECISION NOT NULL,
+ probability DOUBLE PRECISION NOT NULL,
+ predict_rain BOOLEAN NOT NULL,
+
+ rain_next_4h_mm_actual DOUBLE PRECISION,
+ rain_next_4h_actual BOOLEAN,
+ evaluated_at TIMESTAMPTZ,
+
+ metadata JSONB,
+
+ PRIMARY KEY (site, model_name, model_version, ts)
+);
+
+SELECT create_hypertable('predictions_rain_4h', 'ts', if_not_exists => TRUE);
+
+CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_site_ts
+ ON predictions_rain_4h(site, ts DESC);
+
+CREATE INDEX IF NOT EXISTS idx_predictions_rain_4h_pending_eval
+ ON predictions_rain_4h(site, evaluated_at, ts DESC);
diff --git a/docker-compose.yml b/docker-compose.yml
index 5b76f5c..b275c4f 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -34,8 +34,9 @@ services:
environment:
DATABASE_URL: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable"
RAIN_SITE: "home"
- RAIN_MODEL_NAME: "rain_next_1h"
- RAIN_MODEL_VERSION_BASE: "rain-auto-v1-extended"
+ RAIN_HORIZON_HOURS: "4"
+ RAIN_MODEL_NAME: "rain_next_4h"
+ RAIN_MODEL_VERSION_BASE: "rain-auto-v2-extended-4h"
RAIN_MODEL_FAMILY: "auto"
RAIN_FEATURE_SET: "extended"
RAIN_FORECAST_MODEL: "ecmwf"
diff --git a/docs/rain_model_runbook.md b/docs/rain_model_runbook.md
index 0c01a0a..24f3798 100644
--- a/docs/rain_model_runbook.md
+++ b/docs/rain_model_runbook.md
@@ -4,6 +4,14 @@ Operational guide for training, evaluating, deploying, monitoring, and rolling b
## 1) One-time Setup
+Apply 4-hour prediction table migration:
+
+```sh
+docker compose exec -T timescaledb \
+ psql -U postgres -d micrometeo \
+ -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql
+```
+
Apply monitoring views:
```sh
@@ -21,6 +29,7 @@ python scripts/train_rain_model.py \
--site "home" \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
+ --horizon-hours 4 \
--feature-set "extended" \
--model-family "auto" \
--forecast-model "ecmwf" \
@@ -29,7 +38,7 @@ python scripts/train_rain_model.py \
--calibration-methods "none,sigmoid,isotonic" \
--threshold-policy "walk_forward" \
--walk-forward-folds 4 \
- --model-version "rain-auto-v1-extended" \
+ --model-version "rain-auto-v2-extended-4h" \
--out "models/rain_model.pkl" \
--report-out "models/rain_model_report.json" \
--model-card-out "models/model_card_{model_version}.md" \
@@ -53,7 +62,8 @@ Review in report:
python scripts/predict_rain_model.py \
--site home \
--model-path "models/rain_model.pkl" \
- --model-name "rain_next_1h" \
+ --model-name "rain_next_4h" \
+ --horizon-hours 4 \
--dry-run
```
@@ -63,7 +73,8 @@ python scripts/predict_rain_model.py \
python scripts/predict_rain_model.py \
--site home \
--model-path "models/rain_model.pkl" \
- --model-name "rain_next_1h"
+ --model-name "rain_next_4h" \
+ --horizon-hours 4
```
## 4) Rollback
@@ -72,6 +83,7 @@ python scripts/predict_rain_model.py \
2. If promotion fails or no candidate model is produced, the worker keeps the active model unchanged.
3. If inference starts without `RAIN_MODEL_PATH` but backup exists, the worker restores from backup automatically.
4. Keep failed candidate artifacts for postmortem.
+5. During 4-hour rollout stabilization, keep `predictions_rain_1h` and `rain_next_1h` model artifacts available for immediate fallback.
## 5) Monitoring
@@ -118,12 +130,13 @@ Use the health-check script in cron, systemd timer, or your alerting scheduler:
```sh
python scripts/check_rain_pipeline_health.py \
--site home \
- --model-name rain_next_1h \
+ --model-name rain_next_4h \
+ --horizon-hours 4 \
--max-ws90-age 20m \
--max-baro-age 30m \
--max-forecast-age 3h \
--max-prediction-age 30m \
- --max-pending-eval-age 3h \
+ --max-pending-eval-age 6h \
--max-pending-eval-rows 200
```
@@ -138,6 +151,7 @@ The script exits non-zero on failure, so it can directly drive alerting.
- `RAIN_THRESHOLD_POLICY`
- `RAIN_WALK_FORWARD_FOLDS`
- `RAIN_ALLOW_EMPTY_DATA`
+- `RAIN_HORIZON_HOURS`
- `RAIN_MODEL_BACKUP_PATH`
- `RAIN_MODEL_CARD_PATH`
@@ -156,3 +170,88 @@ python scripts/recommend_rain_model.py \
--top-k 5 \
--json-out "models/rain_model_recommendation.json"
```
+
+## 9) Staged 4h Rollout Checklist
+
+Run this sequence in production/staging to satisfy the 4h cutover gate:
+
+1. Apply schema migration for 4h predictions:
+
+```sh
+docker compose exec -T timescaledb \
+ psql -U postgres -d micrometeo \
+ -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql
+```
+
+2. Re-apply monitoring views (now include 1h + 4h unions):
+
+```sh
+docker compose exec -T timescaledb \
+ psql -U postgres -d micrometeo \
+ -f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql
+```
+
+3. Run a full 4h training/evaluation cycle and save report:
+
+```sh
+python scripts/train_rain_model.py \
+ --site "home" \
+ --start "2026-02-01T00:00:00Z" \
+ --end "2026-03-03T23:55:00Z" \
+ --horizon-hours 4 \
+ --feature-set "extended" \
+ --model-family "auto" \
+ --forecast-model "ecmwf" \
+ --tune-hyperparameters \
+ --threshold-policy "walk_forward" \
+ --walk-forward-folds 4 \
+ --model-version "rain-auto-v2-extended-4h" \
+ --out "models/rain_model_4h.pkl" \
+ --report-out "models/rain_model_report_4h.json"
+```
+
+4. Compare 4h metrics against the latest 1h benchmark report before switching dashboard defaults:
+
+```sh
+python scripts/compare_rain_reports.py \
+ --baseline "models/rain_model_report_1h.json" \
+ --candidate "models/rain_model_report_4h.json"
+```
+5. Run dry-run inference, then live inference with 4h model name/horizon:
+
+```sh
+python scripts/predict_rain_model.py \
+ --site home \
+ --model-path "models/rain_model_4h.pkl" \
+ --model-name "rain_next_4h" \
+ --horizon-hours 4 \
+ --dry-run
+
+python scripts/predict_rain_model.py \
+ --site home \
+ --model-path "models/rain_model_4h.pkl" \
+ --model-name "rain_next_4h" \
+ --horizon-hours 4
+```
+
+6. Validate health checks and dashboard data path for 4h:
+
+```sh
+python scripts/check_rain_pipeline_health.py \
+ --site home \
+ --model-name rain_next_4h \
+ --horizon-hours 4 \
+ --max-pending-eval-age 6h
+```
+
+7. Keep 1h path live in parallel until 4h drift/calibration remains stable for at least 7 days.
+
+### Fast rollback to 1h
+
+If 4h performance or pipeline health regresses:
+
+1. Set worker env back to:
+ `RAIN_HORIZON_HOURS=1`, `RAIN_MODEL_NAME=rain_next_1h`, and a known-good 1h model path/version.
+2. Restart `rainml` service.
+3. Confirm `check_rain_pipeline_health.py --horizon-hours 1 --model-name rain_next_1h` returns `ok`.
+4. Keep `predictions_rain_4h` data for postmortem; do not drop tables during rollback.
diff --git a/docs/rain_prediction.md b/docs/rain_prediction.md
index bc0277a..6cae8a0 100644
--- a/docs/rain_prediction.md
+++ b/docs/rain_prediction.md
@@ -1,14 +1,14 @@
-# Rain Prediction (Next 1 Hour)
+# Rain Prediction (Next 4 Hours)
This project includes a baseline workflow for **binary rain prediction**:
-> **Will we see >= 0.2 mm of rain in the next hour?**
+> **Will we see >= 0.2 mm of rain in the next 4 hours?**
It uses local observations (WS90 + barometer), trains a logistic regression
baseline, and writes model-driven predictions back to TimescaleDB.
## P0 Decisions (Locked)
-- Target: `rain_next_1h_mm >= 0.2`.
+- Target: `rain_next_4h_mm >= 0.2`.
- Primary use-case: low-noise rain heads-up signal for dashboard + alert candidate.
- Frozen v1 training window (UTC): `2026-02-01T00:00:00Z` to `2026-03-03T23:55:00Z`.
- Threshold policy: choose threshold on validation set by maximizing recall under
@@ -40,7 +40,7 @@ pip install -r scripts/requirements.txt
- `scripts/train_rain_model.py`: strict time-based split training and metrics report, with optional
validation-only hyperparameter tuning, calibration comparison, naive baseline comparison, and walk-forward folds.
- `scripts/predict_rain_model.py`: inference using saved model artifact; upserts into
- `predictions_rain_1h`.
+ `predictions_rain_4h`.
- `scripts/run_rain_ml_worker.py`: long-running worker for periodic training + prediction.
- `scripts/check_rain_pipeline_health.py`: freshness/failure check for alerting.
- `scripts/recommend_rain_model.py`: rank saved training reports and recommend a deployment candidate.
@@ -60,7 +60,15 @@ Model-family options (`train_rain_model.py`):
## Usage
### 1) Apply schema update (existing DBs)
-`001_schema.sql` includes `predictions_rain_1h`.
+`003_rain_predictions_4h.sql` adds `predictions_rain_4h`.
+
+```sh
+docker compose exec -T timescaledb \
+ psql -U postgres -d micrometeo \
+ -f /docker-entrypoint-initdb.d/003_rain_predictions_4h.sql
+```
+
+`001_schema.sql` still remains safe to re-run for full schema parity.
```sh
docker compose exec -T timescaledb \
@@ -76,6 +84,8 @@ docker compose exec -T timescaledb \
-f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql
```
+All examples below assume a 4-hour horizon (`--horizon-hours 4`) and `model-name=rain_next_4h`.
+
### 2) Run data audit
```sh
export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable"
@@ -203,7 +213,8 @@ python scripts/train_rain_model.py \
python scripts/predict_rain_model.py \
--site home \
--model-path "models/rain_model.pkl" \
- --model-name "rain_next_1h"
+ --model-name "rain_next_4h" \
+ --horizon-hours 4
```
### 5) One-command P0 workflow
@@ -236,7 +247,7 @@ docker compose logs -f rainml
- Model card: `models/model_card_.md`
- Model artifact: `models/rain_model.pkl`
- Dataset snapshot: `models/datasets/rain_dataset__.csv`
-- Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized
+- Prediction rows: `predictions_rain_4h` (probability + threshold decision + realized
outcome fields once available)
### 7) Recommend deploy candidate from saved reports
diff --git a/internal/db/series.go b/internal/db/series.go
index 404dec0..0ffe848 100644
--- a/internal/db/series.go
+++ b/internal/db/series.go
@@ -51,11 +51,14 @@ type RainPredictionPoint struct {
GeneratedAt time.Time `json:"generated_at"`
ModelName string `json:"model_name"`
ModelVersion string `json:"model_version"`
+ HorizonHours int `json:"horizon_hours"`
Threshold float64 `json:"threshold"`
Probability float64 `json:"probability"`
PredictRain bool `json:"predict_rain"`
- RainNext1hMM *float64 `json:"rain_next_1h_mm_actual,omitempty"`
- RainNext1hActual *bool `json:"rain_next_1h_actual,omitempty"`
+ RainNextMM *float64 `json:"rain_next_mm_actual,omitempty"`
+ RainNextActual *bool `json:"rain_next_actual,omitempty"`
+ RainNext1hMM *float64 `json:"rain_next_1h_mm_actual,omitempty"` // backward-compatible alias
+ RainNext1hActual *bool `json:"rain_next_1h_actual,omitempty"` // backward-compatible alias
EvaluatedAt *time.Time `json:"evaluated_at,omitempty"`
}
@@ -365,8 +368,24 @@ func (d *DB) ForecastSeriesRange(ctx context.Context, site, model string, start,
}, nil
}
-func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) (*RainPredictionPoint, error) {
- query := `
+func predictionStorageForHorizon(horizonHours int) (table string, mmCol string, flagCol string, err error) {
+ switch horizonHours {
+ case 1:
+ return "predictions_rain_1h", "rain_next_1h_mm_actual", "rain_next_1h_actual", nil
+ case 4:
+ return "predictions_rain_4h", "rain_next_4h_mm_actual", "rain_next_4h_actual", nil
+ default:
+ return "", "", "", fmt.Errorf("unsupported rain prediction horizon: %d", horizonHours)
+ }
+}
+
+func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string, horizonHours int) (*RainPredictionPoint, error) {
+ table, mmCol, flagCol, err := predictionStorageForHorizon(horizonHours)
+ if err != nil {
+ return nil, err
+ }
+
+ query := fmt.Sprintf(`
SELECT
ts,
generated_at,
@@ -375,15 +394,15 @@ func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) (
threshold,
probability,
predict_rain,
- rain_next_1h_mm_actual,
- rain_next_1h_actual,
+ %s,
+ %s,
evaluated_at
- FROM predictions_rain_1h
+ FROM %s
WHERE site = $1
AND model_name = $2
ORDER BY ts DESC, generated_at DESC
LIMIT 1
- `
+ `, mmCol, flagCol, table)
var (
p RainPredictionPoint
@@ -393,7 +412,7 @@ func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) (
predictRain sql.NullBool
)
- err := d.Pool.QueryRow(ctx, query, site, modelName).Scan(
+ err = d.Pool.QueryRow(ctx, query, site, modelName).Scan(
&p.TS,
&p.GeneratedAt,
&p.ModelName,
@@ -421,14 +440,22 @@ func (d *DB) LatestRainPrediction(ctx context.Context, site, modelName string) (
if predictRain.Valid {
p.PredictRain = predictRain.Bool
}
- p.RainNext1hMM = nullFloatPtr(rainMM)
- p.RainNext1hActual = nullBoolPtr(rainActual)
+ p.HorizonHours = horizonHours
+ p.RainNextMM = nullFloatPtr(rainMM)
+ p.RainNextActual = nullBoolPtr(rainActual)
+ p.RainNext1hMM = p.RainNextMM
+ p.RainNext1hActual = p.RainNextActual
p.EvaluatedAt = nullTimePtr(evaluatedAt)
return &p, nil
}
-func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName string, start, end time.Time) ([]RainPredictionPoint, error) {
- query := `
+func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName string, horizonHours int, start, end time.Time) ([]RainPredictionPoint, error) {
+ table, mmCol, flagCol, err := predictionStorageForHorizon(horizonHours)
+ if err != nil {
+ return nil, err
+ }
+
+ query := fmt.Sprintf(`
SELECT DISTINCT ON (ts)
ts,
generated_at,
@@ -437,16 +464,16 @@ func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName stri
threshold,
probability,
predict_rain,
- rain_next_1h_mm_actual,
- rain_next_1h_actual,
+ %s,
+ %s,
evaluated_at
- FROM predictions_rain_1h
+ FROM %s
WHERE site = $1
AND model_name = $2
AND ts >= $3
AND ts <= $4
ORDER BY ts ASC, generated_at DESC
- `
+ `, mmCol, flagCol, table)
rows, err := d.Pool.Query(ctx, query, site, modelName, start, end)
if err != nil {
@@ -486,8 +513,11 @@ func (d *DB) RainPredictionSeriesRange(ctx context.Context, site, modelName stri
if predictRain.Valid {
p.PredictRain = predictRain.Bool
}
- p.RainNext1hMM = nullFloatPtr(rainMM)
- p.RainNext1hActual = nullBoolPtr(rainActual)
+ p.HorizonHours = horizonHours
+ p.RainNextMM = nullFloatPtr(rainMM)
+ p.RainNextActual = nullBoolPtr(rainActual)
+ p.RainNext1hMM = p.RainNextMM
+ p.RainNext1hActual = p.RainNextActual
p.EvaluatedAt = nullTimePtr(evaluatedAt)
points = append(points, p)
}
diff --git a/scripts/audit_rain_data.py b/scripts/audit_rain_data.py
index 6dd60b6..4f96541 100644
--- a/scripts/audit_rain_data.py
+++ b/scripts/audit_rain_data.py
@@ -10,6 +10,7 @@ import psycopg2
from rain_model_common import (
AVAILABLE_FEATURE_SETS,
+ DEFAULT_HORIZON_HOURS,
RAIN_EVENT_THRESHOLD_MM,
build_dataset,
feature_columns_for_set,
@@ -17,8 +18,11 @@ from rain_model_common import (
fetch_baro,
fetch_forecast,
fetch_ws90,
+ normalize_horizon_hours,
model_frame,
parse_time,
+ rain_next_flag_col,
+ rain_next_mm_col,
to_builtin,
)
@@ -29,6 +33,12 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--site", required=True, help="Site name (e.g. home).")
parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).")
parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).")
+ parser.add_argument(
+ "--horizon-hours",
+ type=int,
+ default=DEFAULT_HORIZON_HOURS,
+ help="Prediction horizon in hours for target/label auditing.",
+ )
parser.add_argument(
"--feature-set",
default="baseline",
@@ -57,13 +67,13 @@ def longest_zero_run(counts: np.ndarray) -> int:
return best
-def build_weekly_balance(model_df):
+def build_weekly_balance(model_df, target_col: str):
weekly = model_df.copy()
iso = weekly.index.to_series().dt.isocalendar()
weekly["year_week"] = iso["year"].astype(str) + "-W" + iso["week"].astype(str).str.zfill(2)
grouped = (
- weekly.groupby("year_week")["rain_next_1h"]
+ weekly.groupby("year_week")[target_col]
.agg(total_rows="count", positive_rows="sum")
.reset_index()
.sort_values("year_week")
@@ -79,6 +89,9 @@ def main() -> int:
start = parse_time(args.start) if args.start else ""
end = parse_time(args.end) if args.end else ""
+ horizon_hours = normalize_horizon_hours(args.horizon_hours)
+ target_col = rain_next_flag_col(horizon_hours)
+ target_mm_col = rain_next_mm_col(horizon_hours)
feature_cols = feature_columns_for_set(args.feature_set)
needs_forecast = feature_columns_need_forecast(feature_cols)
@@ -89,8 +102,14 @@ def main() -> int:
if needs_forecast:
forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model)
- df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
- model_df = model_frame(df, feature_cols, require_target=True)
+ df = build_dataset(
+ ws90,
+ baro,
+ forecast=forecast,
+ rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM,
+ horizon_hours=horizon_hours,
+ )
+ model_df = model_frame(df, feature_cols, require_target=True, target_col=target_col)
ws90_dupes = int(ws90.duplicated(subset=["ts", "station_id"]).sum()) if not ws90.empty else 0
baro_dupes = int(baro.duplicated(subset=["ts", "source"]).sum()) if not baro.empty else 0
@@ -114,7 +133,7 @@ def main() -> int:
baro_max_gap_min = longest_zero_run(np.array(baro_counts)) * 5 if len(baro_counts) else 0
missingness = {}
- for col in feature_cols + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]:
+ for col in feature_cols + ["pressure_hpa", "rain_mm", "rain_inc", target_mm_col]:
if col in df.columns:
missingness[col] = float(df[col].isna().mean())
@@ -125,9 +144,12 @@ def main() -> int:
report = {
"site": args.site,
"feature_set": args.feature_set,
+ "horizon_hours": horizon_hours,
+ "target_column": target_col,
+ "target_mm_column": target_mm_col,
"feature_columns": feature_cols,
"forecast_model": args.forecast_model if needs_forecast else None,
- "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
+ "target_definition": f"{target_mm_col} >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"requested_window": {
"start": start or None,
"end": end or None,
@@ -167,8 +189,8 @@ def main() -> int:
"max_rain_increment_5m_mm": max_rain_inc,
},
"class_balance": {
- "overall_positive_rate": float(model_df["rain_next_1h"].mean()) if not model_df.empty else None,
- "weekly": build_weekly_balance(model_df) if not model_df.empty else [],
+ "overall_positive_rate": float(model_df[target_col].mean()) if not model_df.empty else None,
+ "weekly": build_weekly_balance(model_df, target_col=target_col) if not model_df.empty else [],
},
}
report = to_builtin(report)
diff --git a/scripts/check_rain_pipeline_health.py b/scripts/check_rain_pipeline_health.py
index 92186ff..3bb400e 100644
--- a/scripts/check_rain_pipeline_health.py
+++ b/scripts/check_rain_pipeline_health.py
@@ -9,6 +9,24 @@ from typing import Any
import psycopg2
+DEFAULT_HORIZON_HOURS = 4
+
+
+def normalize_horizon_hours(horizon_hours: int) -> int:
+ out = int(horizon_hours)
+ if out <= 0:
+ raise ValueError("horizon_hours must be > 0")
+ return out
+
+
+def prediction_table_for_horizon(horizon_hours: int) -> str:
+ horizon = normalize_horizon_hours(horizon_hours)
+ if horizon == 1:
+ return "predictions_rain_1h"
+ if horizon == 4:
+ return "predictions_rain_4h"
+ raise ValueError(f"unsupported prediction-table horizon: {horizon_hours}")
+
def parse_duration(value: str) -> timedelta:
raw = value.strip().lower()
@@ -27,14 +45,20 @@ def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Check freshness/health of rain-model data and predictions.")
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name.")
- parser.add_argument("--model-name", default="rain_next_1h", help="Prediction model_name to check.")
+ parser.add_argument("--model-name", default="rain_next_4h", help="Prediction model_name to check.")
+ parser.add_argument(
+ "--horizon-hours",
+ type=int,
+ default=DEFAULT_HORIZON_HOURS,
+ help="Prediction horizon in hours used to select prediction storage table.",
+ )
parser.add_argument("--max-ws90-age", default="20m", help="Max allowed age for ws90 latest row.")
parser.add_argument("--max-baro-age", default="30m", help="Max allowed age for barometer latest row.")
parser.add_argument("--max-forecast-age", default="3h", help="Max allowed age for forecast latest row.")
parser.add_argument("--max-prediction-age", default="30m", help="Max allowed age for latest prediction write.")
parser.add_argument(
"--max-pending-eval-age",
- default="3h",
+ default="6h",
help="Pending evaluations older than this count toward alert.",
)
parser.add_argument(
@@ -91,6 +115,8 @@ def main() -> int:
max_forecast_age = parse_duration(args.max_forecast_age)
max_prediction_age = parse_duration(args.max_prediction_age)
max_pending_eval_age = parse_duration(args.max_pending_eval_age)
+ horizon_hours = normalize_horizon_hours(args.horizon_hours)
+ prediction_table = prediction_table_for_horizon(horizon_hours)
with psycopg2.connect(args.db_url) as conn:
ws90_latest = fetch_latest_ts(
@@ -110,9 +136,9 @@ def main() -> int:
)
prediction_latest = fetch_latest_ts(
conn,
- """
+ f"""
SELECT max(generated_at)
- FROM predictions_rain_1h
+ FROM {prediction_table}
WHERE site = %s
AND model_name = %s
""",
@@ -120,9 +146,9 @@ def main() -> int:
)
pending_eval_rows = fetch_count(
conn,
- """
+ f"""
SELECT count(*)
- FROM predictions_rain_1h
+ FROM {prediction_table}
WHERE site = %s
AND model_name = %s
AND evaluated_at IS NULL
@@ -188,6 +214,8 @@ def main() -> int:
"generated_at": now.isoformat(),
"site": args.site,
"model_name": args.model_name,
+ "horizon_hours": horizon_hours,
+ "prediction_table": prediction_table,
"status": overall_status,
"failing_checks": failing,
"checks": checks,
diff --git a/scripts/compare_rain_reports.py b/scripts/compare_rain_reports.py
new file mode 100644
index 0000000..712486b
--- /dev/null
+++ b/scripts/compare_rain_reports.py
@@ -0,0 +1,78 @@
+#!/usr/bin/env python3
+from __future__ import annotations
+
+import argparse
+import json
+from pathlib import Path
+from typing import Any
+
+
+def parse_args() -> argparse.Namespace:
+ parser = argparse.ArgumentParser(description="Compare two rain-model training reports.")
+ parser.add_argument("--baseline", required=True, help="Baseline report path (for example 1h).")
+ parser.add_argument("--candidate", required=True, help="Candidate report path (for example 4h).")
+ return parser.parse_args()
+
+
+def load_json(path: str) -> dict[str, Any]:
+ p = Path(path)
+ with p.open("r", encoding="utf-8") as f:
+ return json.load(f)
+
+
+def to_float(v: Any) -> float | None:
+ if v is None:
+ return None
+ try:
+ return float(v)
+ except (TypeError, ValueError):
+ return None
+
+
+def metric(report: dict[str, Any], split: str, key: str) -> float | None:
+ return to_float(report.get(split, {}).get(key))
+
+
+def delta_str(base: float | None, cand: float | None) -> str:
+ if base is None or cand is None:
+ return "n/a"
+ d = cand - base
+ return f"{d:+.4f}"
+
+
+def main() -> int:
+ args = parse_args()
+ baseline = load_json(args.baseline)
+ candidate = load_json(args.candidate)
+
+ pairs = [
+ ("precision", metric(baseline, "test_metrics", "precision"), metric(candidate, "test_metrics", "precision")),
+ ("recall", metric(baseline, "test_metrics", "recall"), metric(candidate, "test_metrics", "recall")),
+ ("f1", metric(baseline, "test_metrics", "f1"), metric(candidate, "test_metrics", "f1")),
+ ("pr_auc", metric(baseline, "test_metrics", "pr_auc"), metric(candidate, "test_metrics", "pr_auc")),
+ ("roc_auc", metric(baseline, "test_metrics", "roc_auc"), metric(candidate, "test_metrics", "roc_auc")),
+ ("brier", metric(baseline, "test_metrics", "brier"), metric(candidate, "test_metrics", "brier")),
+ ]
+
+ print("Rain report comparison:")
+ print(
+ f" baseline: version={baseline.get('model_version')} "
+ f"horizon={baseline.get('horizon_hours')}h "
+ f"target={baseline.get('target_definition')}"
+ )
+ print(
+ f" candidate: version={candidate.get('model_version')} "
+ f"horizon={candidate.get('horizon_hours')}h "
+ f"target={candidate.get('target_definition')}"
+ )
+ print(" metrics (candidate - baseline):")
+ for name, base, cand in pairs:
+ base_txt = "n/a" if base is None else f"{base:.4f}"
+ cand_txt = "n/a" if cand is None else f"{cand:.4f}"
+ print(f" {name}: baseline={base_txt} candidate={cand_txt} delta={delta_str(base, cand)}")
+
+ return 0
+
+
+if __name__ == "__main__":
+ raise SystemExit(main())
diff --git a/scripts/predict_rain_model.py b/scripts/predict_rain_model.py
index 9d81cb2..571bb62 100644
--- a/scripts/predict_rain_model.py
+++ b/scripts/predict_rain_model.py
@@ -9,13 +9,18 @@ import psycopg2
from psycopg2.extras import Json
from rain_model_common import (
+ DEFAULT_HORIZON_HOURS,
build_dataset,
feature_columns_need_forecast,
fetch_baro,
fetch_forecast,
fetch_ws90,
model_frame,
+ normalize_horizon_hours,
parse_time,
+ prediction_table_for_horizon,
+ rain_next_flag_col,
+ rain_next_mm_col,
to_builtin,
)
@@ -30,8 +35,13 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name (e.g. home).")
parser.add_argument("--model-path", default="models/rain_model.pkl", help="Path to trained model artifact.")
- parser.add_argument("--model-name", default="rain_next_1h", help="Logical prediction model name.")
+ parser.add_argument("--model-name", default="rain_next_4h", help="Logical prediction model name.")
parser.add_argument("--model-version", help="Override artifact model_version.")
+ parser.add_argument(
+ "--horizon-hours",
+ type=int,
+ help="Prediction horizon in hours. Defaults to artifact horizon when present, else 4.",
+ )
parser.add_argument(
"--at",
help="Prediction timestamp (RFC3339 or YYYY-MM-DD). Default: current UTC time.",
@@ -98,9 +108,21 @@ def main() -> int:
threshold = float(artifact.get("threshold", 0.5))
model_version = args.model_version or artifact.get("model_version") or "unknown"
forecast_model = str(artifact.get("forecast_model") or args.forecast_model)
+ artifact_horizon = artifact.get("horizon_hours")
+ if args.horizon_hours is not None:
+ horizon_hours = normalize_horizon_hours(args.horizon_hours)
+ elif artifact_horizon is not None:
+ horizon_hours = normalize_horizon_hours(int(artifact_horizon))
+ else:
+ horizon_hours = DEFAULT_HORIZON_HOURS
+ target_col = str(artifact.get("target_col") or rain_next_flag_col(horizon_hours))
+ target_mm_col = str(artifact.get("target_mm_col") or rain_next_mm_col(horizon_hours))
+ prediction_table = prediction_table_for_horizon(horizon_hours)
+ actual_mm_col = f"{target_mm_col}_actual"
+ actual_flag_col = f"{target_col}_actual"
fetch_start = (at - timedelta(hours=args.history_hours)).isoformat()
- fetch_end = (at + timedelta(hours=1, minutes=5)).isoformat()
+ fetch_end = (at + timedelta(hours=horizon_hours, minutes=5)).isoformat()
with psycopg2.connect(args.db_url) as conn:
ws90 = fetch_ws90(conn, args.site, fetch_start, fetch_end)
@@ -122,7 +144,7 @@ def main() -> int:
return 0
raise RuntimeError(message)
- full_df = build_dataset(ws90, baro, forecast=forecast)
+ full_df = build_dataset(ws90, baro, forecast=forecast, horizon_hours=horizon_hours)
feature_df = model_frame(full_df, feature_cols=features, require_target=False)
candidates = feature_df.loc[feature_df.index <= at]
if candidates.empty:
@@ -143,9 +165,9 @@ def main() -> int:
actual_flag = None
evaluated_at = None
latest_available = full_df.index.max().to_pydatetime()
- if pred_ts + timedelta(hours=1) <= latest_available:
- next_mm = full_df.loc[pred_ts, "rain_next_1h_mm"]
- next_flag = full_df.loc[pred_ts, "rain_next_1h"]
+ if pred_ts + timedelta(hours=horizon_hours) <= latest_available:
+ next_mm = full_df.loc[pred_ts, target_mm_col]
+ next_flag = full_df.loc[pred_ts, target_col]
if next_mm == next_mm: # NaN-safe check
actual_mm = float(next_mm)
if next_flag == next_flag:
@@ -156,6 +178,10 @@ def main() -> int:
"artifact_path": args.model_path,
"artifact_model_version": artifact.get("model_version"),
"artifact_feature_set": feature_set,
+ "horizon_hours": horizon_hours,
+ "target_col": target_col,
+ "target_mm_col": target_mm_col,
+ "prediction_table": prediction_table,
"forecast_model": forecast_model if needs_forecast else None,
"needs_forecast_features": needs_forecast,
"feature_values": {col: float(row.iloc[0][col]) for col in features},
@@ -170,6 +196,7 @@ def main() -> int:
print(f" site: {args.site}")
print(f" model_name: {args.model_name}")
print(f" model_version: {model_version}")
+ print(f" horizon_hours: {horizon_hours}")
if feature_set:
print(f" feature_set: {feature_set}")
print(f" pred_ts: {pred_ts.isoformat()}")
@@ -182,10 +209,8 @@ def main() -> int:
print("dry-run enabled; skipping DB upsert.")
return 0
- with conn.cursor() as cur:
- cur.execute(
- """
- INSERT INTO predictions_rain_1h (
+ query = f"""
+ INSERT INTO {prediction_table} (
ts,
generated_at,
site,
@@ -194,8 +219,8 @@ def main() -> int:
threshold,
probability,
predict_rain,
- rain_next_1h_mm_actual,
- rain_next_1h_actual,
+ {actual_mm_col},
+ {actual_flag_col},
evaluated_at,
metadata
) VALUES (
@@ -207,11 +232,14 @@ def main() -> int:
threshold = EXCLUDED.threshold,
probability = EXCLUDED.probability,
predict_rain = EXCLUDED.predict_rain,
- rain_next_1h_mm_actual = COALESCE(EXCLUDED.rain_next_1h_mm_actual, predictions_rain_1h.rain_next_1h_mm_actual),
- rain_next_1h_actual = COALESCE(EXCLUDED.rain_next_1h_actual, predictions_rain_1h.rain_next_1h_actual),
- evaluated_at = COALESCE(EXCLUDED.evaluated_at, predictions_rain_1h.evaluated_at),
+ {actual_mm_col} = COALESCE(EXCLUDED.{actual_mm_col}, {prediction_table}.{actual_mm_col}),
+ {actual_flag_col} = COALESCE(EXCLUDED.{actual_flag_col}, {prediction_table}.{actual_flag_col}),
+ evaluated_at = COALESCE(EXCLUDED.evaluated_at, {prediction_table}.evaluated_at),
metadata = EXCLUDED.metadata
- """,
+ """
+ with conn.cursor() as cur:
+ cur.execute(
+ query,
(
pred_ts,
args.site,
@@ -227,7 +255,7 @@ def main() -> int:
),
)
conn.commit()
- print("Prediction upserted into predictions_rain_1h.")
+ print(f"Prediction upserted into {prediction_table}.")
return 0
diff --git a/scripts/rain_model_common.py b/scripts/rain_model_common.py
index 86b1b12..997607a 100644
--- a/scripts/rain_model_common.py
+++ b/scripts/rain_model_common.py
@@ -86,7 +86,46 @@ FEATURE_COLUMNS = BASELINE_FEATURE_COLUMNS
RAIN_EVENT_THRESHOLD_MM = 0.2
RAIN_SPIKE_THRESHOLD_MM_5M = 5.0
-RAIN_HORIZON_BUCKETS = 12 # 12 * 5m = 1h
+BUCKET_MINUTES = 5
+DEFAULT_HORIZON_HOURS = 4
+SUPPORTED_PREDICTION_HORIZONS = (1, 4)
+
+
+def normalize_horizon_hours(horizon_hours: int) -> int:
+ out = int(horizon_hours)
+ if out <= 0:
+ raise ValueError("horizon_hours must be > 0")
+ return out
+
+
+def horizon_suffix(horizon_hours: int) -> str:
+ return f"{normalize_horizon_hours(horizon_hours)}h"
+
+
+def horizon_buckets(horizon_hours: int) -> int:
+ hours = normalize_horizon_hours(horizon_hours)
+ return (hours * 60) // BUCKET_MINUTES
+
+
+def rain_last_mm_col(horizon_hours: int) -> str:
+ return f"rain_last_{horizon_suffix(horizon_hours)}_mm"
+
+
+def rain_next_mm_col(horizon_hours: int) -> str:
+ return f"rain_next_{horizon_suffix(horizon_hours)}_mm"
+
+
+def rain_next_flag_col(horizon_hours: int) -> str:
+ return f"rain_next_{horizon_suffix(horizon_hours)}"
+
+
+def prediction_table_for_horizon(horizon_hours: int) -> str:
+ horizon = normalize_horizon_hours(horizon_hours)
+ if horizon == 1:
+ return "predictions_rain_1h"
+ if horizon == 4:
+ return "predictions_rain_4h"
+ raise ValueError(f"unsupported prediction-table horizon: {horizon_hours}")
def parse_time(value: str) -> str:
@@ -232,6 +271,7 @@ def build_dataset(
baro: pd.DataFrame,
forecast: pd.DataFrame | None = None,
rain_event_threshold_mm: float = RAIN_EVENT_THRESHOLD_MM,
+ horizon_hours: int = 1,
) -> pd.DataFrame:
if ws90.empty:
raise RuntimeError("no ws90 observations found")
@@ -261,12 +301,20 @@ def build_dataset(
df["rain_inc"] = df["rain_inc_raw"].clip(lower=0)
df["rain_spike_5m"] = df["rain_inc"] >= RAIN_SPIKE_THRESHOLD_MM_5M
- window = RAIN_HORIZON_BUCKETS
- df["rain_last_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum()
- df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1))
- df["rain_next_1h"] = df["rain_next_1h_mm"] >= rain_event_threshold_mm
+ windows: dict[int, int] = {
+ 1: horizon_buckets(1),
+ normalize_horizon_hours(horizon_hours): horizon_buckets(horizon_hours),
+ }
+ for hours, window in windows.items():
+ rain_last_col = rain_last_mm_col(hours)
+ rain_next_mm = rain_next_mm_col(hours)
+ rain_next_flag = rain_next_flag_col(hours)
+ df[rain_last_col] = df["rain_inc"].rolling(window=window, min_periods=1).sum()
+ df[rain_next_mm] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1))
+ df[rain_next_flag] = df[rain_next_mm] >= rain_event_threshold_mm
- df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window)
+ window_1h = horizon_buckets(1)
+ df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window_1h)
# Wind direction cyclical encoding.
radians = np.deg2rad(df["wind_dir_deg"] % 360.0)
@@ -279,14 +327,14 @@ def build_dataset(
df["wind_avg_lag_5m"] = df["wind_avg_m_s"].shift(1)
df["pressure_lag_5m"] = df["pressure_hpa"].shift(1)
- df["temp_roll_1h_mean"] = df["temperature_c"].rolling(window=window, min_periods=3).mean()
- df["temp_roll_1h_std"] = df["temperature_c"].rolling(window=window, min_periods=3).std()
- df["humidity_roll_1h_mean"] = df["humidity"].rolling(window=window, min_periods=3).mean()
- df["humidity_roll_1h_std"] = df["humidity"].rolling(window=window, min_periods=3).std()
- df["wind_avg_roll_1h_mean"] = df["wind_avg_m_s"].rolling(window=window, min_periods=3).mean()
- df["wind_gust_roll_1h_max"] = df["wind_max_m_s"].rolling(window=window, min_periods=3).max()
- df["pressure_roll_1h_mean"] = df["pressure_hpa"].rolling(window=window, min_periods=3).mean()
- df["pressure_roll_1h_std"] = df["pressure_hpa"].rolling(window=window, min_periods=3).std()
+ df["temp_roll_1h_mean"] = df["temperature_c"].rolling(window=window_1h, min_periods=3).mean()
+ df["temp_roll_1h_std"] = df["temperature_c"].rolling(window=window_1h, min_periods=3).std()
+ df["humidity_roll_1h_mean"] = df["humidity"].rolling(window=window_1h, min_periods=3).mean()
+ df["humidity_roll_1h_std"] = df["humidity"].rolling(window=window_1h, min_periods=3).std()
+ df["wind_avg_roll_1h_mean"] = df["wind_avg_m_s"].rolling(window=window_1h, min_periods=3).mean()
+ df["wind_gust_roll_1h_max"] = df["wind_max_m_s"].rolling(window=window_1h, min_periods=3).max()
+ df["pressure_roll_1h_mean"] = df["pressure_hpa"].rolling(window=window_1h, min_periods=3).mean()
+ df["pressure_roll_1h_std"] = df["pressure_hpa"].rolling(window=window_1h, min_periods=3).std()
# Calendar/seasonality features (UTC based).
hour_of_day = df.index.hour + (df.index.minute / 60.0)
@@ -304,11 +352,16 @@ def build_dataset(
return df
-def model_frame(df: pd.DataFrame, feature_cols: list[str] | None = None, require_target: bool = True) -> pd.DataFrame:
+def model_frame(
+ df: pd.DataFrame,
+ feature_cols: list[str] | None = None,
+ require_target: bool = True,
+ target_col: str | None = None,
+) -> pd.DataFrame:
features = feature_cols or FEATURE_COLUMNS
required = list(features)
if require_target:
- required.append("rain_next_1h")
+ required.append(target_col or rain_next_flag_col(1))
out = df.dropna(subset=required).copy()
return out.sort_index()
diff --git a/scripts/run_p0_rain_workflow.sh b/scripts/run_p0_rain_workflow.sh
index 478fd6a..3f06201 100644
--- a/scripts/run_p0_rain_workflow.sh
+++ b/scripts/run_p0_rain_workflow.sh
@@ -4,7 +4,9 @@ set -euo pipefail
SITE="${SITE:-home}"
START="${START:-2026-02-01T00:00:00Z}"
END="${END:-2026-03-03T23:55:00Z}"
-MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v1}"
+HORIZON_HOURS="${HORIZON_HOURS:-4}"
+MODEL_NAME="${MODEL_NAME:-rain_next_${HORIZON_HOURS}h}"
+MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v2-${HORIZON_HOURS}h}"
MODEL_PATH="${MODEL_PATH:-models/rain_model.pkl}"
REPORT_PATH="${REPORT_PATH:-models/rain_model_report.json}"
AUDIT_PATH="${AUDIT_PATH:-models/rain_data_audit.json}"
@@ -24,6 +26,7 @@ python scripts/audit_rain_data.py \
--site "$SITE" \
--start "$START" \
--end "$END" \
+ --horizon-hours "$HORIZON_HOURS" \
--feature-set "$FEATURE_SET" \
--forecast-model "$FORECAST_MODEL" \
--out "$AUDIT_PATH"
@@ -33,6 +36,7 @@ python scripts/train_rain_model.py \
--site "$SITE" \
--start "$START" \
--end "$END" \
+ --horizon-hours "$HORIZON_HOURS" \
--train-ratio 0.7 \
--val-ratio 0.15 \
--min-precision 0.70 \
@@ -50,7 +54,8 @@ echo "Writing current prediction..."
python scripts/predict_rain_model.py \
--site "$SITE" \
--model-path "$MODEL_PATH" \
- --model-name "rain_next_1h" \
+ --model-name "$MODEL_NAME" \
+ --horizon-hours "$HORIZON_HOURS" \
--forecast-model "$FORECAST_MODEL"
echo "P0 rain workflow complete."
diff --git a/scripts/run_rain_ml_worker.py b/scripts/run_rain_ml_worker.py
index 7ce231b..8865054 100644
--- a/scripts/run_rain_ml_worker.py
+++ b/scripts/run_rain_ml_worker.py
@@ -40,6 +40,7 @@ def read_env_bool(name: str, default: bool) -> bool:
class WorkerConfig:
database_url: str
site: str
+ horizon_hours: int
model_name: str
model_version_base: str
model_family: str
@@ -166,6 +167,8 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
start,
"--end",
end,
+ "--horizon-hours",
+ str(cfg.horizon_hours),
"--feature-set",
cfg.feature_set,
"--forecast-model",
@@ -185,6 +188,8 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
start,
"--end",
end,
+ "--horizon-hours",
+ str(cfg.horizon_hours),
"--train-ratio",
str(cfg.train_ratio),
"--val-ratio",
@@ -269,6 +274,8 @@ def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None:
str(cfg.model_path),
"--model-name",
cfg.model_name,
+ "--horizon-hours",
+ str(cfg.horizon_hours),
"--forecast-model",
cfg.forecast_model,
*(["--allow-empty"] if cfg.allow_empty_data else ["--strict-source-data"]),
@@ -289,8 +296,9 @@ def load_config() -> WorkerConfig:
return WorkerConfig(
database_url=database_url,
site=read_env("RAIN_SITE", "home"),
- model_name=read_env("RAIN_MODEL_NAME", "rain_next_1h"),
- model_version_base=read_env("RAIN_MODEL_VERSION_BASE", "rain-auto-v1-extended"),
+ horizon_hours=read_env_int("RAIN_HORIZON_HOURS", 4),
+ model_name=read_env("RAIN_MODEL_NAME", "rain_next_4h"),
+ model_version_base=read_env("RAIN_MODEL_VERSION_BASE", "rain-auto-v2-extended-4h"),
model_family=read_env("RAIN_MODEL_FAMILY", "auto"),
feature_set=read_env("RAIN_FEATURE_SET", "extended"),
forecast_model=read_env("RAIN_FORECAST_MODEL", "ecmwf"),
@@ -338,6 +346,7 @@ def main() -> int:
print(
"[rain-ml] worker start "
f"site={cfg.site} "
+ f"horizon_hours={cfg.horizon_hours} "
f"model_name={cfg.model_name} "
f"model_family={cfg.model_family} "
f"feature_set={cfg.feature_set} "
diff --git a/scripts/train_rain_model.py b/scripts/train_rain_model.py
index 96e2d53..4662f1b 100644
--- a/scripts/train_rain_model.py
+++ b/scripts/train_rain_model.py
@@ -20,6 +20,7 @@ from sklearn.preprocessing import StandardScaler
from rain_model_common import (
AVAILABLE_FEATURE_SETS,
+ DEFAULT_HORIZON_HOURS,
RAIN_EVENT_THRESHOLD_MM,
build_dataset,
evaluate_probs,
@@ -28,8 +29,13 @@ from rain_model_common import (
fetch_ws90,
feature_columns_for_set,
feature_columns_need_forecast,
+ horizon_suffix,
model_frame,
+ normalize_horizon_hours,
parse_time,
+ rain_last_mm_col,
+ rain_next_flag_col,
+ rain_next_mm_col,
safe_pr_auc,
safe_roc_auc,
select_threshold,
@@ -49,11 +55,17 @@ THRESHOLD_POLICIES = ("validation", "walk_forward")
def parse_args() -> argparse.Namespace:
- parser = argparse.ArgumentParser(description="Train a rain prediction model (next 1h >= 0.2mm).")
+ parser = argparse.ArgumentParser(description="Train a rain prediction model (next Nh >= threshold).")
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name (e.g. home).")
parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).")
parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).")
+ parser.add_argument(
+ "--horizon-hours",
+ type=int,
+ default=DEFAULT_HORIZON_HOURS,
+ help="Prediction horizon in hours (for example 1 or 4).",
+ )
parser.add_argument("--train-ratio", type=float, default=0.7, help="Time-ordered train split ratio.")
parser.add_argument("--val-ratio", type=float, default=0.15, help="Time-ordered validation split ratio.")
parser.add_argument(
@@ -464,14 +476,18 @@ def evaluate_calibration_methods(
return selected, results
-def evaluate_naive_baselines(test_df, y_test: np.ndarray) -> dict[str, Any]:
+def evaluate_naive_baselines(
+ test_df,
+ y_test: np.ndarray,
+ persistence_context_col: str,
+) -> dict[str, Any]:
out: dict[str, Any] = {}
- if "rain_last_1h_mm" in test_df.columns:
- rain_last = test_df["rain_last_1h_mm"].to_numpy(dtype=float)
+ if persistence_context_col in test_df.columns:
+ rain_last = test_df[persistence_context_col].to_numpy(dtype=float)
persistence_prob = (rain_last >= RAIN_EVENT_THRESHOLD_MM).astype(float)
- out["persistence_last_1h"] = {
- "rule": f"predict rain when rain_last_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
+ out[f"persistence_{persistence_context_col}"] = {
+ "rule": f"predict rain when {persistence_context_col} >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"metrics": evaluate_probs(y_true=y_test, y_prob=persistence_prob, threshold=0.5),
}
@@ -512,6 +528,8 @@ def evaluate_sliced_performance(
y_true: np.ndarray,
y_prob: np.ndarray,
threshold: float,
+ context_col: str,
+ context_label: str,
min_rows_per_slice: int = 30,
) -> dict[str, Any]:
frame = pd.DataFrame(
@@ -530,7 +548,11 @@ def evaluate_sliced_performance(
weekly_positive_rate = frame.groupby(week_label)["y_true"].transform("mean")
rainy_week = weekly_positive_rate >= overall_rate
- rain_context = test_df["rain_last_1h_mm"].to_numpy(dtype=float) if "rain_last_1h_mm" in test_df.columns else np.zeros(len(test_df))
+ rain_context = (
+ test_df[context_col].to_numpy(dtype=float)
+ if context_col in test_df.columns
+ else np.zeros(len(test_df))
+ )
wet_context = rain_context >= RAIN_EVENT_THRESHOLD_MM
wind_values = test_df["wind_max_m_s"].to_numpy(dtype=float) if "wind_max_m_s" in test_df.columns else np.full(len(test_df), np.nan)
@@ -545,8 +567,8 @@ def evaluate_sliced_performance(
("nighttime_utc", np.asarray(~is_day, dtype=bool), "18:00-05:59 UTC"),
("rainy_weeks", np.asarray(rainy_week, dtype=bool), "weeks with positive-rate >= test positive-rate"),
("non_rainy_weeks", np.asarray(~rainy_week, dtype=bool), "weeks with positive-rate < test positive-rate"),
- ("wet_context_last_1h", np.asarray(wet_context, dtype=bool), f"rain_last_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}"),
- ("dry_context_last_1h", np.asarray(~wet_context, dtype=bool), f"rain_last_1h_mm < {RAIN_EVENT_THRESHOLD_MM:.2f}"),
+ ("wet_context_recent_rain", np.asarray(wet_context, dtype=bool), f"{context_label} >= {RAIN_EVENT_THRESHOLD_MM:.2f}"),
+ ("dry_context_recent_rain", np.asarray(~wet_context, dtype=bool), f"{context_label} < {RAIN_EVENT_THRESHOLD_MM:.2f}"),
("windy_q75", np.asarray(windy, dtype=bool), "wind_max_m_s >= test 75th percentile"),
("calm_below_q75", np.asarray(~windy, dtype=bool), "wind_max_m_s < test 75th percentile"),
]
@@ -585,6 +607,7 @@ def evaluate_sliced_performance(
def tune_threshold_walk_forward(
model_df,
feature_cols: list[str],
+ target_col: str,
model_family: str,
model_params: dict[str, Any],
calibration_method: str,
@@ -627,8 +650,8 @@ def tune_threshold_walk_forward(
if len(fold_train) < 160 or len(fold_test) < 25:
continue
- y_fold_train = fold_train["rain_next_1h"].astype(int).to_numpy()
- y_fold_test = fold_test["rain_next_1h"].astype(int).to_numpy()
+ y_fold_train = fold_train[target_col].astype(int).to_numpy()
+ y_fold_test = fold_test[target_col].astype(int).to_numpy()
if len(np.unique(y_fold_train)) < 2:
continue
@@ -706,6 +729,7 @@ def tune_threshold_walk_forward(
def walk_forward_backtest(
model_df,
feature_cols: list[str],
+ target_col: str,
model_family: str,
model_params: dict[str, Any],
calibration_method: str,
@@ -745,8 +769,8 @@ def walk_forward_backtest(
if len(fold_train) < 160 or len(fold_test) < 25:
continue
- y_fold_train = fold_train["rain_next_1h"].astype(int).to_numpy()
- y_fold_test = fold_test["rain_next_1h"].astype(int).to_numpy()
+ y_fold_train = fold_train[target_col].astype(int).to_numpy()
+ y_fold_test = fold_test[target_col].astype(int).to_numpy()
if len(np.unique(y_fold_train)) < 2:
continue
@@ -755,8 +779,8 @@ def walk_forward_backtest(
continue
inner_train = fold_train.iloc[:-inner_val_rows]
inner_val = fold_train.iloc[-inner_val_rows:]
- y_inner_train = inner_train["rain_next_1h"].astype(int).to_numpy()
- y_inner_val = inner_val["rain_next_1h"].astype(int).to_numpy()
+ y_inner_train = inner_train[target_col].astype(int).to_numpy()
+ y_inner_val = inner_val[target_col].astype(int).to_numpy()
if len(np.unique(y_inner_train)) < 2:
continue
@@ -987,6 +1011,11 @@ def main() -> int:
start = parse_time(args.start) if args.start else ""
end = parse_time(args.end) if args.end else ""
+ horizon_hours = normalize_horizon_hours(args.horizon_hours)
+ horizon_label = horizon_suffix(horizon_hours)
+ target_col = rain_next_flag_col(horizon_hours)
+ target_mm_col = rain_next_mm_col(horizon_hours)
+ persistence_context_col = rain_last_mm_col(horizon_hours)
feature_cols = feature_columns_for_set(args.feature_set)
needs_forecast = feature_columns_need_forecast(feature_cols)
calibration_methods = parse_calibration_methods(args.calibration_methods)
@@ -1011,8 +1040,21 @@ def main() -> int:
return 0
raise RuntimeError(message)
- full_df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
- model_df = model_frame(full_df, feature_cols, require_target=True)
+ full_df = build_dataset(
+ ws90,
+ baro,
+ forecast=forecast,
+ rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM,
+ horizon_hours=horizon_hours,
+ )
+ if persistence_context_col not in full_df.columns:
+ persistence_context_col = rain_last_mm_col(1)
+ model_df = model_frame(
+ full_df,
+ feature_cols,
+ require_target=True,
+ target_col=target_col,
+ )
if len(model_df) < args.min_rows:
message = f"not enough model-ready rows after filtering (need >= {args.min_rows})"
if args.allow_empty:
@@ -1027,11 +1069,11 @@ def main() -> int:
)
x_train = train_df[feature_cols]
- y_train = train_df["rain_next_1h"].astype(int).to_numpy()
+ y_train = train_df[target_col].astype(int).to_numpy()
x_val = val_df[feature_cols]
- y_val = val_df["rain_next_1h"].astype(int).to_numpy()
+ y_val = val_df[target_col].astype(int).to_numpy()
x_test = test_df[feature_cols]
- y_test = test_df["rain_next_1h"].astype(int).to_numpy()
+ y_test = test_df[target_col].astype(int).to_numpy()
if len(np.unique(y_train)) < 2:
raise RuntimeError("training split does not contain both classes; cannot train classifier")
@@ -1073,6 +1115,7 @@ def main() -> int:
threshold_tuning_walk_forward = tune_threshold_walk_forward(
model_df=model_df.iloc[: len(train_df) + len(val_df)],
feature_cols=feature_cols,
+ target_col=target_col,
model_family=selected_model_family,
model_params=selected_model_params,
calibration_method=selected_calibration_method,
@@ -1093,7 +1136,7 @@ def main() -> int:
train_val_df = model_df.iloc[: len(train_df) + len(val_df)]
x_train_val = train_val_df[feature_cols]
- y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy()
+ y_train_val = train_val_df[target_col].astype(int).to_numpy()
final_model, final_fit_info = fit_with_optional_calibration(
model_family=selected_model_family,
@@ -1109,16 +1152,23 @@ def main() -> int:
test_calibration = {
"ece_10": expected_calibration_error(y_true=y_test, y_prob=y_test_prob, bins=10),
}
- naive_baselines_test = evaluate_naive_baselines(test_df=test_df, y_test=y_test)
+ naive_baselines_test = evaluate_naive_baselines(
+ test_df=test_df,
+ y_test=y_test,
+ persistence_context_col=persistence_context_col,
+ )
sliced_performance = evaluate_sliced_performance(
test_df=test_df,
y_true=y_test,
y_prob=y_test_prob,
threshold=chosen_threshold,
+ context_col=persistence_context_col,
+ context_label=persistence_context_col,
)
walk_forward = walk_forward_backtest(
model_df=model_df,
feature_cols=feature_cols,
+ target_col=target_col,
model_family=selected_model_family,
model_params=selected_model_params,
calibration_method=selected_calibration_method,
@@ -1135,8 +1185,12 @@ def main() -> int:
"model_family_requested": args.model_family,
"model_family": selected_model_family,
"model_params": selected_model_params,
+ "horizon_hours": horizon_hours,
+ "horizon_label": horizon_label,
"feature_set": args.feature_set,
- "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
+ "target_column": target_col,
+ "target_mm_column": target_mm_col,
+ "target_definition": f"{target_mm_col} >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"feature_columns": feature_cols,
"forecast_model": args.forecast_model if needs_forecast else None,
"calibration_method_requested": calibration_methods,
@@ -1207,6 +1261,7 @@ def main() -> int:
print(f" site: {args.site}")
print(f" model_version: {args.model_version}")
print(f" model_family: {selected_model_family} (requested={args.model_family})")
+ print(f" horizon: {horizon_hours}h")
print(f" model_params: {selected_model_params}")
print(f" calibration_method: {report['calibration_method']}")
print(
@@ -1298,7 +1353,7 @@ def main() -> int:
dataset_dir = os.path.dirname(dataset_out)
if dataset_dir:
os.makedirs(dataset_dir, exist_ok=True)
- snapshot_cols = list(dict.fromkeys(feature_cols + ["rain_next_1h", "rain_next_1h_mm"]))
+ snapshot_cols = list(dict.fromkeys(feature_cols + [target_col, target_mm_col]))
model_df[snapshot_cols].to_csv(dataset_out, index=True, index_label="ts")
print(f"Saved dataset snapshot to {dataset_out}")
@@ -1320,6 +1375,10 @@ def main() -> int:
"forecast_model": args.forecast_model if needs_forecast else None,
"threshold": float(chosen_threshold),
"target_mm": float(RAIN_EVENT_THRESHOLD_MM),
+ "horizon_hours": horizon_hours,
+ "target_col": target_col,
+ "target_mm_col": target_mm_col,
+ "persistence_context_col": persistence_context_col,
"model_version": args.model_version,
"trained_at": datetime.now(timezone.utc).isoformat(),
"split": report["split"],
diff --git a/todo.md b/todo.md
index 894bf38..43111e6 100644
--- a/todo.md
+++ b/todo.md
@@ -55,3 +55,25 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat
- [x] [P0] Train baseline model on full available history and capture metrics. (completed on runtime machine)
- [x] [P1] Add one expanded feature set and rerun evaluation. (completed on runtime machine 2026-03-12 with `feature_set=extended`, `model_version=rain-auto-v1-extended-202603120932`)
- [x] [P0] Decide v1 threshold and define deployment interface.
+
+## 9) Extension Plan: 4-Hour Precipitation Window (Not Started)
+- [x] [P0] Lock v2 target definition for horizon extension: `rain_next_4h_mm >= ` and explicitly decide whether the threshold remains `0.2mm` or is increased for 4-hour labeling. (implemented with `0.2mm` carry-forward)
+- [x] [P0] Decide rollout strategy: additive dual-horizon support (`1h` + `4h`) vs direct replacement; prefer dual-horizon for safe cutover. (implemented as additive dual-horizon)
+- [x] [P0] Parameterize label horizon in shared ML code (`scripts/rain_model_common.py`) so target columns are generated for 4-hour windows (48 x 5-minute buckets) instead of hard-coded 1-hour columns.
+- [x] [P1] Revisit persistence/context features currently tied to `rain_last_1h_mm`; decide whether to keep 1-hour context, add 4-hour context, or both for the 4-hour target. (implemented horizon-aware context column selection)
+- [x] [P0] Update training pipeline (`scripts/train_rain_model.py`) to train against the 4-hour target column, including reports, model-card content, dataset snapshot columns, and artifact metadata.
+- [x] [P0] Update audit pipeline (`scripts/audit_rain_data.py`) to report class balance and target definition for 4-hour labels.
+- [x] [P0] Update inference pipeline (`scripts/predict_rain_model.py`) to use the 4-hour target, including realized-outcome availability checks (`pred_ts + 4h`) and metadata/reporting fields.
+- [x] [P0] Finalize DB storage design for 4-hour predictions (new `predictions_rain_4h` table vs generic horizon column strategy) before migrations. (implemented dedicated `predictions_rain_4h` table)
+- [x] [P0] Create schema migration (recommended: new hypertable `predictions_rain_4h` with `rain_next_4h_mm_actual` and `rain_next_4h_actual` fields) and matching indexes.
+- [x] [P0] Update prediction upsert SQL to write to the 4-hour prediction table/columns.
+- [x] [P0] Update monitoring views in `db/init/002_rain_monitoring_views.sql` so drift/calibration/pipeline-health views include the 4-hour prediction path.
+- [x] [P0] Update Go DB query layer (`internal/db/series.go`) to read 4-hour prediction rows/fields.
+- [x] [P1] Update dashboard API defaults (`cmd/ingestd/web.go`) from `rain_next_1h` to the selected 4-hour model name (or make model name configurable).
+- [x] [P1] Update web UI labels/semantics (`cmd/ingestd/web/index.html`, `cmd/ingestd/web/app.js`) from “Rain 1h %” to “Rain 4h %” and verify chart legends/tooltips match the new horizon.
+- [x] [P1] Update worker/runtime defaults (`docker-compose.yml`, `scripts/run_rain_ml_worker.py`, `scripts/run_p0_rain_workflow.sh`) to use `rain_next_4h` naming/versioning.
+- [x] [P0] Update health-check defaults (`scripts/check_rain_pipeline_health.py`) for 4-hour evaluation latency (e.g., pending-eval age threshold > 4h).
+- [x] [P1] Update docs and runbooks (`README.md`, `docs/rain_prediction.md`, `docs/rain_model_runbook.md`) so commands, table names, and target definitions match the 4-hour system.
+- [ ] [P0] Run full retraining/evaluation for the 4-hour target and compare against current 1-hour model metrics before production cutover.
+- [ ] [P0] Execute staged rollout: deploy schema + views, deploy model + inference, verify dashboard/health checks, then switch default model name.
+- [x] [P1] Keep rollback path documented: retain `rain_next_1h` artifacts/table access until 4-hour monitoring is stable. (documented in `docs/rain_model_runbook.md` staged rollout/rollback section)