diff --git a/README.md b/README.md index 1da5357..6bb6710 100644 --- a/README.md +++ b/README.md @@ -94,6 +94,17 @@ docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-ent Or copy just the `observations_baro` section into a manual `psql -c`. +For rain-model monitoring views, also apply `db/init/002_rain_monitoring_views.sql` on existing DBs: + +```sh +docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql +``` + +Runbook/docs: +- `docs/rain_prediction.md` +- `docs/rain_data_issues.md` +- `docs/rain_model_runbook.md` + ## Publish a test WS90 payload ```sh mosquitto_pub -h localhost -t ecowitt/ws90 -m '{"model":"Fineoffset-WS90","id":70618,"battery_ok":1,"battery_mV":3180,"temperature_C":24.2,"humidity":60,"wind_dir_deg":129,"wind_avg_m_s":0,"wind_max_m_s":0,"uvi":0,"light_lux":0,"flags":130,"rain_mm":0,"rain_start":0,"supercap_V":0.5,"firmware":160,"data":"3fff000000------0000ff7ff70000","mic":"CRC","protocol":"Fine Offset Electronics WS90 weather station","rssi":-44,"duration":32996}' diff --git a/db/init/002_rain_monitoring_views.sql b/db/init/002_rain_monitoring_views.sql new file mode 100644 index 0000000..5945af8 --- /dev/null +++ b/db/init/002_rain_monitoring_views.sql @@ -0,0 +1,156 @@ +-- Rain-model monitoring views (feature/prediction/calibration drift + pipeline freshness). +-- Safe to re-run. + +CREATE OR REPLACE VIEW rain_feature_drift_daily AS +WITH ws90_daily AS ( + SELECT + time_bucket(INTERVAL '1 day', ts) AS day, + site, + count(*) AS ws90_rows, + avg(temperature_c) AS temp_mean, + avg(humidity) AS humidity_mean, + avg(wind_avg_m_s) AS wind_avg_mean, + avg(wind_max_m_s) AS wind_gust_mean + FROM observations_ws90 + GROUP BY 1,2 +), +baro_daily AS ( + SELECT + time_bucket(INTERVAL '1 day', ts) AS day, + site, + count(*) AS baro_rows, + avg(pressure_hpa) AS pressure_mean + FROM observations_baro + GROUP BY 1,2 +), +joined AS ( + SELECT + w.day, + w.site, + w.ws90_rows, + b.baro_rows, + w.temp_mean, + w.humidity_mean, + w.wind_avg_mean, + w.wind_gust_mean, + b.pressure_mean + FROM ws90_daily w + LEFT JOIN baro_daily b + ON b.day = w.day + AND b.site = w.site +), +baseline AS ( + SELECT + j1.day, + j1.site, + j1.ws90_rows, + j1.baro_rows, + j1.temp_mean, + j1.humidity_mean, + j1.wind_avg_mean, + j1.wind_gust_mean, + j1.pressure_mean, + avg(j2.temp_mean) AS temp_mean_30d, + stddev_samp(j2.temp_mean) AS temp_std_30d, + avg(j2.humidity_mean) AS humidity_mean_30d, + stddev_samp(j2.humidity_mean) AS humidity_std_30d, + avg(j2.wind_avg_mean) AS wind_avg_mean_30d, + stddev_samp(j2.wind_avg_mean) AS wind_avg_std_30d, + avg(j2.wind_gust_mean) AS wind_gust_mean_30d, + stddev_samp(j2.wind_gust_mean) AS wind_gust_std_30d, + avg(j2.pressure_mean) AS pressure_mean_30d, + stddev_samp(j2.pressure_mean) AS pressure_std_30d + FROM joined j1 + LEFT JOIN joined j2 + ON j2.site = j1.site + AND j2.day < j1.day + AND j2.day >= j1.day - INTERVAL '30 days' + GROUP BY + j1.day, + j1.site, + j1.ws90_rows, + j1.baro_rows, + j1.temp_mean, + j1.humidity_mean, + j1.wind_avg_mean, + j1.wind_gust_mean, + j1.pressure_mean +) +SELECT + day, + site, + ws90_rows, + baro_rows, + temp_mean, + humidity_mean, + wind_avg_mean, + wind_gust_mean, + pressure_mean, + temp_mean_30d, + humidity_mean_30d, + wind_avg_mean_30d, + wind_gust_mean_30d, + pressure_mean_30d, + CASE WHEN temp_std_30d > 0 THEN (temp_mean - temp_mean_30d) / temp_std_30d END AS temp_zscore_30d, + CASE WHEN humidity_std_30d > 0 THEN (humidity_mean - humidity_mean_30d) / humidity_std_30d END AS humidity_zscore_30d, + CASE WHEN wind_avg_std_30d > 0 THEN (wind_avg_mean - wind_avg_mean_30d) / wind_avg_std_30d END AS wind_avg_zscore_30d, + CASE WHEN wind_gust_std_30d > 0 THEN (wind_gust_mean - wind_gust_mean_30d) / wind_gust_std_30d END AS wind_gust_zscore_30d, + CASE WHEN pressure_std_30d > 0 THEN (pressure_mean - pressure_mean_30d) / pressure_std_30d END AS pressure_zscore_30d +FROM baseline; + + +CREATE OR REPLACE VIEW rain_prediction_drift_daily AS +SELECT + time_bucket(INTERVAL '1 day', ts) AS day, + site, + model_name, + model_version, + count(*) AS prediction_rows, + avg(probability) AS probability_mean, + stddev_samp(probability) AS probability_stddev, + avg(CASE WHEN predict_rain THEN 1.0 ELSE 0.0 END) AS predicted_positive_rate, + avg(threshold) AS threshold_mean +FROM predictions_rain_1h +GROUP BY 1,2,3,4; + + +CREATE OR REPLACE VIEW rain_calibration_drift_daily AS +SELECT + time_bucket(INTERVAL '1 day', ts) AS day, + site, + model_name, + model_version, + count(*) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS evaluated_rows, + avg(probability) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS mean_probability, + avg(CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS observed_positive_rate, + avg(power(probability - CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END, 2.0)) + FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS brier_score, + avg( + CASE + WHEN rain_next_1h_actual IS NULL THEN NULL + WHEN predict_rain = rain_next_1h_actual THEN 1.0 + ELSE 0.0 + END + ) AS decision_accuracy +FROM predictions_rain_1h +GROUP BY 1,2,3,4; + + +CREATE OR REPLACE VIEW rain_pipeline_health AS +WITH sites AS ( + SELECT DISTINCT site FROM observations_ws90 + UNION + SELECT DISTINCT site FROM observations_baro + UNION + SELECT DISTINCT site FROM forecast_openmeteo_hourly + UNION + SELECT DISTINCT site FROM predictions_rain_1h +) +SELECT + s.site, + (SELECT max(ts) FROM observations_ws90 w WHERE w.site = s.site) AS ws90_latest_ts, + (SELECT max(ts) FROM observations_baro b WHERE b.site = s.site) AS baro_latest_ts, + (SELECT max(ts) FROM forecast_openmeteo_hourly f WHERE f.site = s.site) AS forecast_latest_ts, + (SELECT max(generated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_generated_latest_ts, + (SELECT max(evaluated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_evaluated_latest_ts +FROM sites s; diff --git a/docker-compose.yml b/docker-compose.yml index 7bec4ad..6b14767 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -43,10 +43,16 @@ services: RAIN_TRAIN_INTERVAL_HOURS: "24" RAIN_PREDICT_INTERVAL_MINUTES: "10" RAIN_MIN_PRECISION: "0.70" + RAIN_TUNE_HYPERPARAMETERS: "true" + RAIN_MAX_HYPERPARAM_TRIALS: "12" + RAIN_CALIBRATION_METHODS: "none,sigmoid,isotonic" + RAIN_WALK_FORWARD_FOLDS: "0" + RAIN_ALLOW_EMPTY_DATA: "true" RAIN_MODEL_PATH: "/app/models/rain_model.pkl" RAIN_REPORT_PATH: "/app/models/rain_model_report.json" RAIN_AUDIT_PATH: "/app/models/rain_data_audit.json" RAIN_DATASET_PATH: "/app/models/datasets/rain_dataset_{model_version}_{feature_set}.csv" + RAIN_MODEL_CARD_PATH: "/app/models/model_card_{model_version}.md" volumes: - ./models:/app/models diff --git a/docs/rain_data_issues.md b/docs/rain_data_issues.md new file mode 100644 index 0000000..4a12ed5 --- /dev/null +++ b/docs/rain_data_issues.md @@ -0,0 +1,35 @@ +# Rain Model Data Issues and Mitigations + +This document captures known data-quality issues observed in the rain-model pipeline and the mitigation rules used in code. + +## Issue Register + +| Area | Known issue | Mitigation in code/workflow | +|---|---|---| +| WS90 rain counter (`rain_mm`) | Counter resets can produce negative deltas. | `rain_inc_raw = diff(rain_mm)` then `rain_inc = clip(lower=0)`; reset events tracked as `rain_reset`. | +| WS90 rain spikes | Isolated large 5-minute jumps may be sensor/transmission anomalies. | Spikes flagged as `rain_spike_5m` when increment >= `5.0mm/5m`; counts tracked in audit/training report. | +| Sensor gaps | Missing 5-minute buckets from WS90/barometer ingestion. | Resample to 5-minute grid; barometer interpolated with short limit (`limit=3`); gap lengths tracked by audit. | +| Out-of-order arrivals | Late MQTT events can arrive with older `ts`. | Audit reports out-of-order count by sorting on `received_at` and checking `ts` monotonicity. | +| Duplicate rows | Replays/reconnects can duplicate sensor rows. | Audit reports duplicate counts by `(ts, station_id)` for WS90 and `(ts, source)` for barometer. | +| Forecast sparsity/jitter | Hourly forecast retrieval cadence does not always align with 5-minute features. | Select latest forecast per `ts` (`DISTINCT ON` + `retrieved_at DESC`), resample to 5 minutes, short forward/backfill windows, and clip `fc_precip_prob` to `[0,1]`. | +| Local vs UTC day boundary | Daily rainfall resets can look wrong when local timezone is not respected. | Station timezone is configured via `site.timezone` and used by Wunderground uploader; model training/inference stays UTC-based for split consistency. | + +## Audit Command + +Run this regularly and retain JSON reports for comparison: + +```sh +python scripts/audit_rain_data.py \ + --site home \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --feature-set "extended" \ + --forecast-model "ecmwf" \ + --out "models/rain_data_audit.json" +``` + +## Operational Rules + +- Treat large jumps in `rain_reset_count` or `rain_spike_5m_count` as data-quality incidents. +- If `gaps_5m.ws90_max_gap_minutes` or `gaps_5m.baro_max_gap_minutes` exceeds one hour, avoid model refresh until ingestion stabilizes. +- If forecast rows are unexpectedly low for an `extended` feature run, either fix forecast ingestion first or temporarily fall back to `baseline` feature set. diff --git a/docs/rain_model_runbook.md b/docs/rain_model_runbook.md new file mode 100644 index 0000000..b649c5a --- /dev/null +++ b/docs/rain_model_runbook.md @@ -0,0 +1,141 @@ +# Rain Model Runbook + +Operational guide for training, evaluating, deploying, monitoring, and rolling back the rain model. + +## 1) One-time Setup + +Apply monitoring views: + +```sh +docker compose exec -T timescaledb \ + psql -U postgres -d micrometeo \ + -f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql +``` + +## 2) Train + Evaluate + +Recommended evaluation run (includes validation-only tuning, calibration comparison, naive baselines, and walk-forward folds): + +```sh +python scripts/train_rain_model.py \ + --site "home" \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --feature-set "extended" \ + --model-family "auto" \ + --forecast-model "ecmwf" \ + --tune-hyperparameters \ + --max-hyperparam-trials 12 \ + --calibration-methods "none,sigmoid,isotonic" \ + --walk-forward-folds 4 \ + --model-version "rain-auto-v1-extended" \ + --out "models/rain_model.pkl" \ + --report-out "models/rain_model_report.json" \ + --model-card-out "models/model_card_{model_version}.md" \ + --dataset-out "models/datasets/rain_dataset_{model_version}_{feature_set}.csv" +``` + +Review in report: +- `candidate_models[*].hyperparameter_tuning` +- `candidate_models[*].calibration_comparison` +- `naive_baselines_test` +- `walk_forward_backtest` + +## 3) Deploy + +1. Promote the selected artifact path to the inference worker (`RAIN_MODEL_PATH` or CLI `--model-path`). +2. Run one dry-run inference: + +```sh +python scripts/predict_rain_model.py \ + --site home \ + --model-path "models/rain_model.pkl" \ + --model-name "rain_next_1h" \ + --dry-run +``` + +3. Run live inference: + +```sh +python scripts/predict_rain_model.py \ + --site home \ + --model-path "models/rain_model.pkl" \ + --model-name "rain_next_1h" +``` + +## 4) Rollback + +1. Identify the last known-good model artifact in `models/`. +2. Point deployment to that artifact (worker env `RAIN_MODEL_PATH` or manual inference path). +3. Re-run inference command and verify writes in `predictions_rain_1h`. +4. Keep the failed artifact/report for postmortem. + +## 5) Monitoring + +### Feature drift + +```sql +SELECT * +FROM rain_feature_drift_daily +WHERE site = 'home' +ORDER BY day DESC +LIMIT 30; +``` + +Alert heuristic: any absolute z-score > 3 for 2+ consecutive days. + +### Prediction drift + +```sql +SELECT * +FROM rain_prediction_drift_daily +WHERE site = 'home' +ORDER BY day DESC +LIMIT 30; +``` + +Alert heuristic: `predicted_positive_rate` shifts by > 2x relative to trailing 14-day median. + +### Calibration/performance drift + +```sql +SELECT * +FROM rain_calibration_drift_daily +WHERE site = 'home' +ORDER BY day DESC +LIMIT 30; +``` + +Alert heuristic: sustained Brier-score increase > 25% from trailing 30-day average. + +## 6) Pipeline Failure Alerts + +Use the health-check script in cron, systemd timer, or your alerting scheduler: + +```sh +python scripts/check_rain_pipeline_health.py \ + --site home \ + --model-name rain_next_1h \ + --max-ws90-age 20m \ + --max-baro-age 30m \ + --max-forecast-age 3h \ + --max-prediction-age 30m \ + --max-pending-eval-age 3h \ + --max-pending-eval-rows 200 +``` + +The script exits non-zero on failure, so it can directly drive alerting. + +## 7) Continuous Worker Defaults + +`docker-compose.yml` provides these controls for `rainml`: +- `RAIN_TUNE_HYPERPARAMETERS` +- `RAIN_MAX_HYPERPARAM_TRIALS` +- `RAIN_CALIBRATION_METHODS` +- `RAIN_WALK_FORWARD_FOLDS` +- `RAIN_ALLOW_EMPTY_DATA` +- `RAIN_MODEL_CARD_PATH` + +Recommended production defaults: +- Enable tuning daily or weekly (`RAIN_TUNE_HYPERPARAMETERS=true`) +- Keep walk-forward folds `0` in continuous mode, run fold backtests in scheduled evaluation jobs diff --git a/docs/rain_prediction.md b/docs/rain_prediction.md index 4465546..64fb746 100644 --- a/docs/rain_prediction.md +++ b/docs/rain_prediction.md @@ -37,10 +37,12 @@ pip install -r scripts/requirements.txt ## Scripts - `scripts/audit_rain_data.py`: data quality + label quality + class balance audit. -- `scripts/train_rain_model.py`: strict time-based split training and metrics report. +- `scripts/train_rain_model.py`: strict time-based split training and metrics report, with optional + validation-only hyperparameter tuning, calibration comparison, naive baseline comparison, and walk-forward folds. - `scripts/predict_rain_model.py`: inference using saved model artifact; upserts into `predictions_rain_1h`. - `scripts/run_rain_ml_worker.py`: long-running worker for periodic training + prediction. +- `scripts/check_rain_pipeline_health.py`: freshness/failure check for alerting. Feature-set options: - `baseline`: original 5 local observation features. @@ -55,7 +57,7 @@ Model-family options (`train_rain_model.py`): ## Usage ### 1) Apply schema update (existing DBs) -`001_schema.sql` now includes `predictions_rain_1h`. +`001_schema.sql` includes `predictions_rain_1h`. ```sh docker compose exec -T timescaledb \ @@ -63,6 +65,14 @@ docker compose exec -T timescaledb \ -f /docker-entrypoint-initdb.d/001_schema.sql ``` +Apply monitoring views: + +```sh +docker compose exec -T timescaledb \ + psql -U postgres -d micrometeo \ + -f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql +``` + ### 2) Run data audit ```sh export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable" @@ -136,6 +146,25 @@ python scripts/train_rain_model.py \ --report-out "models/rain_model_report_auto.json" ``` +### 3e) Full P1 evaluation (tuning + calibration + walk-forward) +```sh +python scripts/train_rain_model.py \ + --site "home" \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --feature-set "extended" \ + --model-family "auto" \ + --forecast-model "ecmwf" \ + --tune-hyperparameters \ + --max-hyperparam-trials 12 \ + --calibration-methods "none,sigmoid,isotonic" \ + --walk-forward-folds 4 \ + --model-version "rain-auto-v1-extended-eval" \ + --out "models/rain_model_auto.pkl" \ + --report-out "models/rain_model_report_auto.json" \ + --model-card-out "models/model_card_{model_version}.md" +``` + ### 4) Run inference and store prediction ```sh python scripts/predict_rain_model.py \ @@ -154,6 +183,10 @@ bash scripts/run_p0_rain_workflow.sh The `rainml` service in `docker-compose.yml` now runs: - periodic retraining (default every 24 hours) - periodic prediction writes (default every 10 minutes) +- configurable tuning/calibration behavior (`RAIN_TUNE_HYPERPARAMETERS`, + `RAIN_MAX_HYPERPARAM_TRIALS`, `RAIN_CALIBRATION_METHODS`) +- graceful gap handling for temporary source outages (`RAIN_ALLOW_EMPTY_DATA=true`) +- optional model-card output (`RAIN_MODEL_CARD_PATH`) Artifacts are persisted to `./models` on the host. @@ -165,6 +198,7 @@ docker compose logs -f rainml ## Output - Audit report: `models/rain_data_audit.json` - Training report: `models/rain_model_report.json` +- Model card: `models/model_card_.md` - Model artifact: `models/rain_model.pkl` - Dataset snapshot: `models/datasets/rain_dataset__.csv` - Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized @@ -192,3 +226,5 @@ docker compose logs -f rainml - Data is resampled into 5-minute buckets. - Label is derived from incremental rain from WS90 cumulative `rain_mm`. - Timestamps are handled as UTC in training/inference workflow. +- See [Data issues and mitigation rules](./rain_data_issues.md) and + [runbook/monitoring guidance](./rain_model_runbook.md). diff --git a/scripts/check_rain_pipeline_health.py b/scripts/check_rain_pipeline_health.py new file mode 100644 index 0000000..92186ff --- /dev/null +++ b/scripts/check_rain_pipeline_health.py @@ -0,0 +1,214 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os +from datetime import datetime, timedelta, timezone +from typing import Any + +import psycopg2 + + +def parse_duration(value: str) -> timedelta: + raw = value.strip().lower() + if not raw: + raise ValueError("duration is empty") + + units = {"s": 1, "m": 60, "h": 3600, "d": 86400} + suffix = raw[-1] + if suffix not in units: + raise ValueError(f"invalid duration suffix: {value} (expected one of s,m,h,d)") + amount = float(raw[:-1]) + return timedelta(seconds=amount * units[suffix]) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Check freshness/health of rain-model data and predictions.") + parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") + parser.add_argument("--site", required=True, help="Site name.") + parser.add_argument("--model-name", default="rain_next_1h", help="Prediction model_name to check.") + parser.add_argument("--max-ws90-age", default="20m", help="Max allowed age for ws90 latest row.") + parser.add_argument("--max-baro-age", default="30m", help="Max allowed age for barometer latest row.") + parser.add_argument("--max-forecast-age", default="3h", help="Max allowed age for forecast latest row.") + parser.add_argument("--max-prediction-age", default="30m", help="Max allowed age for latest prediction write.") + parser.add_argument( + "--max-pending-eval-age", + default="3h", + help="Pending evaluations older than this count toward alert.", + ) + parser.add_argument( + "--max-pending-eval-rows", + type=int, + default=200, + help="Alert if pending evaluation rows older than max-pending-eval-age exceed this count.", + ) + parser.add_argument( + "--json-out", + help="Optional path to save JSON health report.", + ) + return parser.parse_args() + + +def fetch_latest_ts(conn, sql: str, params: tuple[Any, ...]): + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + return row[0] if row else None + + +def fetch_count(conn, sql: str, params: tuple[Any, ...]) -> int: + with conn.cursor() as cur: + cur.execute(sql, params) + row = cur.fetchone() + if not row: + return 0 + return int(row[0] or 0) + + +def age_seconds(now: datetime, ts: datetime | None) -> float | None: + if ts is None: + return None + return float((now - ts.astimezone(timezone.utc)).total_seconds()) + + +def status_for_age(age_s: float | None, max_age: timedelta) -> tuple[str, str]: + if age_s is None: + return "error", "missing" + if age_s <= max_age.total_seconds(): + return "ok", "fresh" + return "error", "stale" + + +def main() -> int: + args = parse_args() + if not args.db_url: + raise SystemExit("missing --db-url or DATABASE_URL") + + now = datetime.now(timezone.utc) + max_ws90_age = parse_duration(args.max_ws90_age) + max_baro_age = parse_duration(args.max_baro_age) + max_forecast_age = parse_duration(args.max_forecast_age) + max_prediction_age = parse_duration(args.max_prediction_age) + max_pending_eval_age = parse_duration(args.max_pending_eval_age) + + with psycopg2.connect(args.db_url) as conn: + ws90_latest = fetch_latest_ts( + conn, + "SELECT max(ts) FROM observations_ws90 WHERE site = %s", + (args.site,), + ) + baro_latest = fetch_latest_ts( + conn, + "SELECT max(ts) FROM observations_baro WHERE site = %s", + (args.site,), + ) + forecast_latest = fetch_latest_ts( + conn, + "SELECT max(ts) FROM forecast_openmeteo_hourly WHERE site = %s", + (args.site,), + ) + prediction_latest = fetch_latest_ts( + conn, + """ + SELECT max(generated_at) + FROM predictions_rain_1h + WHERE site = %s + AND model_name = %s + """, + (args.site, args.model_name), + ) + pending_eval_rows = fetch_count( + conn, + """ + SELECT count(*) + FROM predictions_rain_1h + WHERE site = %s + AND model_name = %s + AND evaluated_at IS NULL + AND ts < (now() - %s::interval) + """, + (args.site, args.model_name, f"{int(max_pending_eval_age.total_seconds())} seconds"), + ) + + ws90_age_s = age_seconds(now, ws90_latest) + baro_age_s = age_seconds(now, baro_latest) + forecast_age_s = age_seconds(now, forecast_latest) + prediction_age_s = age_seconds(now, prediction_latest) + + ws90_status, ws90_reason = status_for_age(ws90_age_s, max_ws90_age) + baro_status, baro_reason = status_for_age(baro_age_s, max_baro_age) + forecast_status, forecast_reason = status_for_age(forecast_age_s, max_forecast_age) + prediction_status, prediction_reason = status_for_age(prediction_age_s, max_prediction_age) + pending_status = "ok" if pending_eval_rows <= args.max_pending_eval_rows else "error" + + checks = { + "ws90_freshness": { + "status": ws90_status, + "reason": ws90_reason, + "latest_ts": ws90_latest, + "age_seconds": ws90_age_s, + "max_age_seconds": max_ws90_age.total_seconds(), + }, + "baro_freshness": { + "status": baro_status, + "reason": baro_reason, + "latest_ts": baro_latest, + "age_seconds": baro_age_s, + "max_age_seconds": max_baro_age.total_seconds(), + }, + "forecast_freshness": { + "status": forecast_status, + "reason": forecast_reason, + "latest_ts": forecast_latest, + "age_seconds": forecast_age_s, + "max_age_seconds": max_forecast_age.total_seconds(), + }, + "prediction_freshness": { + "status": prediction_status, + "reason": prediction_reason, + "latest_ts": prediction_latest, + "age_seconds": prediction_age_s, + "max_age_seconds": max_prediction_age.total_seconds(), + }, + "pending_evaluations": { + "status": pending_status, + "rows": pending_eval_rows, + "max_rows": args.max_pending_eval_rows, + "pending_older_than_seconds": max_pending_eval_age.total_seconds(), + }, + } + + overall_status = "ok" + failing = [name for name, item in checks.items() if item["status"] != "ok"] + if failing: + overall_status = "error" + + report = { + "generated_at": now.isoformat(), + "site": args.site, + "model_name": args.model_name, + "status": overall_status, + "failing_checks": failing, + "checks": checks, + } + + print(f"rain pipeline health: {overall_status}") + for name, item in checks.items(): + print(f" {name}: {item['status']}") + if failing: + print(f" failing: {', '.join(failing)}") + + if args.json_out: + out_dir = os.path.dirname(args.json_out) + if out_dir: + os.makedirs(out_dir, exist_ok=True) + with open(args.json_out, "w", encoding="utf-8") as f: + json.dump(report, f, indent=2, default=str) + print(f"saved health report to {args.json_out}") + + return 0 if overall_status == "ok" else 1 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/predict_rain_model.py b/scripts/predict_rain_model.py index 63cb9ab..9d81cb2 100644 --- a/scripts/predict_rain_model.py +++ b/scripts/predict_rain_model.py @@ -47,6 +47,19 @@ def parse_args() -> argparse.Namespace: default="ecmwf", help="Forecast model name when inference features require forecast columns.", ) + parser.set_defaults(allow_empty=True) + parser.add_argument( + "--allow-empty", + dest="allow_empty", + action="store_true", + help="Exit successfully when required source rows are temporarily unavailable (default: enabled).", + ) + parser.add_argument( + "--strict-source-data", + dest="allow_empty", + action="store_false", + help="Fail when required source rows are unavailable.", + ) parser.add_argument("--dry-run", action="store_true", help="Do not write prediction to DB.") return parser.parse_args() @@ -96,11 +109,28 @@ def main() -> int: if needs_forecast: forecast = fetch_forecast(conn, args.site, fetch_start, fetch_end, model=forecast_model) + if ws90.empty: + message = "no ws90 observations found in source window" + if args.allow_empty: + print(f"Rain inference skipped: {message}.") + return 0 + raise RuntimeError(message) + if baro.empty: + message = "no barometer observations found in source window" + if args.allow_empty: + print(f"Rain inference skipped: {message}.") + return 0 + raise RuntimeError(message) + full_df = build_dataset(ws90, baro, forecast=forecast) feature_df = model_frame(full_df, feature_cols=features, require_target=False) candidates = feature_df.loc[feature_df.index <= at] if candidates.empty: - raise RuntimeError("no feature-complete row available at or before requested timestamp") + message = "no feature-complete row available at or before requested timestamp" + if args.allow_empty: + print(f"Rain inference skipped: {message}.") + return 0 + raise RuntimeError(message) row = candidates.tail(1) pred_ts = row.index[0].to_pydatetime() diff --git a/scripts/run_p0_rain_workflow.sh b/scripts/run_p0_rain_workflow.sh index 018ef6c..478fd6a 100644 --- a/scripts/run_p0_rain_workflow.sh +++ b/scripts/run_p0_rain_workflow.sh @@ -36,6 +36,8 @@ python scripts/train_rain_model.py \ --train-ratio 0.7 \ --val-ratio 0.15 \ --min-precision 0.70 \ + --walk-forward-folds 0 \ + --calibration-methods "none" \ --feature-set "$FEATURE_SET" \ --model-family "$MODEL_FAMILY" \ --forecast-model "$FORECAST_MODEL" \ diff --git a/scripts/run_rain_ml_worker.py b/scripts/run_rain_ml_worker.py index b15fa6b..5548beb 100644 --- a/scripts/run_rain_ml_worker.py +++ b/scripts/run_rain_ml_worker.py @@ -50,7 +50,13 @@ class WorkerConfig: train_ratio: float val_ratio: float min_precision: float + tune_hyperparameters: bool + max_hyperparam_trials: int + calibration_methods: str + walk_forward_folds: int + allow_empty_data: bool dataset_path_template: str + model_card_path_template: str model_path: Path report_path: Path audit_path: Path @@ -86,12 +92,15 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: start, end = training_window(cfg.lookback_days) model_version = f"{cfg.model_version_base}-{now_utc().strftime('%Y%m%d%H%M')}" dataset_out = cfg.dataset_path_template.format(model_version=model_version, feature_set=cfg.feature_set) + model_card_out = cfg.model_card_path_template.format(model_version=model_version) ensure_parent(cfg.audit_path) ensure_parent(cfg.report_path) ensure_parent(cfg.model_path) if dataset_out: ensure_parent(Path(dataset_out)) + if model_card_out: + ensure_parent(Path(model_card_out)) run_cmd( [ @@ -113,39 +122,51 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: env, ) - run_cmd( - [ - sys.executable, - "scripts/train_rain_model.py", - "--site", - cfg.site, - "--start", - start, - "--end", - end, - "--train-ratio", - str(cfg.train_ratio), - "--val-ratio", - str(cfg.val_ratio), - "--min-precision", - str(cfg.min_precision), - "--feature-set", - cfg.feature_set, - "--model-family", - cfg.model_family, - "--forecast-model", - cfg.forecast_model, - "--model-version", - model_version, - "--out", - str(cfg.model_path), - "--report-out", - str(cfg.report_path), - "--dataset-out", - dataset_out, - ], - env, - ) + train_cmd = [ + sys.executable, + "scripts/train_rain_model.py", + "--site", + cfg.site, + "--start", + start, + "--end", + end, + "--train-ratio", + str(cfg.train_ratio), + "--val-ratio", + str(cfg.val_ratio), + "--min-precision", + str(cfg.min_precision), + "--max-hyperparam-trials", + str(cfg.max_hyperparam_trials), + "--calibration-methods", + cfg.calibration_methods, + "--walk-forward-folds", + str(cfg.walk_forward_folds), + "--feature-set", + cfg.feature_set, + "--model-family", + cfg.model_family, + "--forecast-model", + cfg.forecast_model, + "--model-version", + model_version, + "--out", + str(cfg.model_path), + "--report-out", + str(cfg.report_path), + "--model-card-out", + model_card_out, + "--dataset-out", + dataset_out, + ] + if cfg.tune_hyperparameters: + train_cmd.append("--tune-hyperparameters") + if cfg.allow_empty_data: + train_cmd.append("--allow-empty") + else: + train_cmd.append("--strict-source-data") + run_cmd(train_cmd, env) def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None: @@ -164,6 +185,7 @@ def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None: cfg.model_name, "--forecast-model", cfg.forecast_model, + *(["--allow-empty"] if cfg.allow_empty_data else ["--strict-source-data"]), ], env, ) @@ -188,10 +210,19 @@ def load_config() -> WorkerConfig: train_ratio=read_env_float("RAIN_TRAIN_RATIO", 0.7), val_ratio=read_env_float("RAIN_VAL_RATIO", 0.15), min_precision=read_env_float("RAIN_MIN_PRECISION", 0.70), + tune_hyperparameters=read_env_bool("RAIN_TUNE_HYPERPARAMETERS", False), + max_hyperparam_trials=read_env_int("RAIN_MAX_HYPERPARAM_TRIALS", 12), + calibration_methods=read_env("RAIN_CALIBRATION_METHODS", "none,sigmoid,isotonic"), + walk_forward_folds=read_env_int("RAIN_WALK_FORWARD_FOLDS", 0), + allow_empty_data=read_env_bool("RAIN_ALLOW_EMPTY_DATA", True), dataset_path_template=read_env( "RAIN_DATASET_PATH", "models/datasets/rain_dataset_{model_version}_{feature_set}.csv", ), + model_card_path_template=read_env( + "RAIN_MODEL_CARD_PATH", + "models/model_card_{model_version}.md", + ), model_path=Path(read_env("RAIN_MODEL_PATH", "models/rain_model.pkl")), report_path=Path(read_env("RAIN_REPORT_PATH", "models/rain_model_report.json")), audit_path=Path(read_env("RAIN_AUDIT_PATH", "models/rain_data_audit.json")), @@ -220,7 +251,10 @@ def main() -> int: f"feature_set={cfg.feature_set} " f"forecast_model={cfg.forecast_model} " f"train_interval_hours={cfg.train_interval_hours} " - f"predict_interval_minutes={cfg.predict_interval_minutes}", + f"predict_interval_minutes={cfg.predict_interval_minutes} " + f"tune_hyperparameters={cfg.tune_hyperparameters} " + f"walk_forward_folds={cfg.walk_forward_folds} " + f"allow_empty_data={cfg.allow_empty_data}", flush=True, ) diff --git a/scripts/train_rain_model.py b/scripts/train_rain_model.py index 9ee57d0..b690172 100644 --- a/scripts/train_rain_model.py +++ b/scripts/train_rain_model.py @@ -1,5 +1,8 @@ #!/usr/bin/env python3 +from __future__ import annotations + import argparse +import itertools import json import os from datetime import datetime, timezone @@ -7,8 +10,10 @@ from typing import Any import numpy as np import psycopg2 +from sklearn.calibration import CalibratedClassifierCV from sklearn.ensemble import HistGradientBoostingClassifier from sklearn.linear_model import LogisticRegression +from sklearn.model_selection import TimeSeriesSplit from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler @@ -38,6 +43,7 @@ except ImportError: # pragma: no cover - optional dependency MODEL_FAMILIES = ("logreg", "hist_gb", "auto") +CALIBRATION_METHODS = ("none", "sigmoid", "isotonic") def parse_args() -> argparse.Namespace: @@ -56,6 +62,19 @@ def parse_args() -> argparse.Namespace: ) parser.add_argument("--threshold", type=float, help="Optional fixed classification threshold.") parser.add_argument("--min-rows", type=int, default=200, help="Minimum model-ready rows required.") + parser.set_defaults(allow_empty=True) + parser.add_argument( + "--allow-empty", + dest="allow_empty", + action="store_true", + help="Exit successfully when source/model-ready rows are temporarily unavailable (default: enabled).", + ) + parser.add_argument( + "--strict-source-data", + dest="allow_empty", + action="store_false", + help="Fail when source/model-ready rows are unavailable.", + ) parser.add_argument( "--feature-set", default="baseline", @@ -76,6 +95,28 @@ def parse_args() -> argparse.Namespace: "'auto' compares logreg and hist_gb on validation and selects best by PR-AUC/ROC-AUC/F1." ), ) + parser.add_argument( + "--tune-hyperparameters", + action="store_true", + help="Run a lightweight validation-only hyperparameter search before final training.", + ) + parser.add_argument( + "--max-hyperparam-trials", + type=int, + default=12, + help="Maximum hyperparameter trials per model family when tuning is enabled.", + ) + parser.add_argument( + "--calibration-methods", + default="none,sigmoid,isotonic", + help="Comma-separated methods from: none,sigmoid,isotonic. Best method selected by validation Brier/ECE.", + ) + parser.add_argument( + "--walk-forward-folds", + type=int, + default=4, + help="Number of temporal folds for walk-forward backtest (0 to disable).", + ) parser.add_argument( "--random-state", type=int, @@ -88,6 +129,11 @@ def parse_args() -> argparse.Namespace: default="models/rain_model_report.json", help="Path to save JSON training report.", ) + parser.add_argument( + "--model-card-out", + default="models/model_card_{model_version}.md", + help="Path (or template) for markdown model card. Supports {model_version}. Use empty string to disable.", + ) parser.add_argument( "--dataset-out", default="models/datasets/rain_dataset_{model_version}_{feature_set}.csv", @@ -104,25 +150,556 @@ def parse_args() -> argparse.Namespace: return parser.parse_args() -def make_model(model_family: str, random_state: int): +def parse_calibration_methods(value: str) -> list[str]: + methods: list[str] = [] + for token in (value or "").split(","): + method = token.strip().lower() + if not method: + continue + if method not in CALIBRATION_METHODS: + raise ValueError(f"unknown calibration method: {method}") + if method not in methods: + methods.append(method) + if not methods: + return ["none"] + return methods + + +def default_model_params(model_family: str) -> dict[str, Any]: if model_family == "logreg": + return {"c": 1.0} + if model_family == "hist_gb": + return { + "max_iter": 300, + "learning_rate": 0.05, + "max_depth": 5, + "min_samples_leaf": 20, + } + raise ValueError(f"unknown model_family: {model_family}") + + +def model_param_grid(model_family: str) -> list[dict[str, Any]]: + if model_family == "logreg": + return [{"c": c} for c in [0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0]] + + if model_family == "hist_gb": + grid: list[dict[str, Any]] = [] + for learning_rate, max_depth, max_iter, min_samples_leaf in itertools.product( + [0.03, 0.05, 0.08], + [3, 5], + [200, 300], + [10, 20], + ): + grid.append( + { + "learning_rate": learning_rate, + "max_depth": max_depth, + "max_iter": max_iter, + "min_samples_leaf": min_samples_leaf, + } + ) + return grid + raise ValueError(f"unknown model_family: {model_family}") + + +def limit_trials(grid: list[dict[str, Any]], max_trials: int) -> list[dict[str, Any]]: + if max_trials <= 0 or len(grid) <= max_trials: + return grid + step = max(1, len(grid) // max_trials) + return grid[::step][:max_trials] + + +def make_model(model_family: str, random_state: int, params: dict[str, Any] | None = None): + model_params = default_model_params(model_family) + if params: + model_params.update(params) + + if model_family == "logreg": + c = float(model_params["c"]) return Pipeline( [ ("scaler", StandardScaler()), - ("clf", LogisticRegression(max_iter=1000, class_weight="balanced", random_state=random_state)), + ( + "clf", + LogisticRegression( + C=c, + max_iter=1000, + class_weight="balanced", + random_state=random_state, + ), + ), ] ) if model_family == "hist_gb": return HistGradientBoostingClassifier( - max_iter=300, - learning_rate=0.05, - max_depth=5, - min_samples_leaf=20, + max_iter=int(model_params["max_iter"]), + learning_rate=float(model_params["learning_rate"]), + max_depth=int(model_params["max_depth"]), + min_samples_leaf=int(model_params["min_samples_leaf"]), random_state=random_state, ) raise ValueError(f"unknown model_family: {model_family}") +def threshold_from_probs( + y_true: np.ndarray, + y_prob: np.ndarray, + min_precision: float, + fixed_threshold: float | None, +) -> tuple[float, dict[str, Any]]: + if fixed_threshold is not None: + return float(fixed_threshold), { + "selection_rule": "fixed_cli_threshold", + "threshold": float(fixed_threshold), + } + return select_threshold(y_true=y_true, y_prob=y_prob, min_precision=min_precision) + + +def metric_key(metrics: dict[str, Any]) -> tuple[float, float, float]: + return ( + safe_pr_auc(metrics), + safe_roc_auc(metrics), + float(metrics["f1"]), + ) + + +def expected_calibration_error(y_true: np.ndarray, y_prob: np.ndarray, bins: int = 10) -> float: + if len(y_true) == 0: + return float("nan") + + edges = np.linspace(0.0, 1.0, bins + 1) + total = float(len(y_true)) + ece = 0.0 + for i in range(bins): + lo = edges[i] + hi = edges[i + 1] + if i == bins - 1: + mask = (y_prob >= lo) & (y_prob <= hi) + else: + mask = (y_prob >= lo) & (y_prob < hi) + if not np.any(mask): + continue + bucket_weight = float(np.sum(mask)) / total + bucket_confidence = float(np.mean(y_prob[mask])) + bucket_frequency = float(np.mean(y_true[mask])) + ece += bucket_weight * abs(bucket_frequency - bucket_confidence) + return float(ece) + + +def calibration_cv_splits(rows: int) -> int: + if rows >= 800: + return 5 + if rows >= 400: + return 4 + if rows >= 240: + return 3 + if rows >= 140: + return 2 + return 0 + + +def fit_with_optional_calibration( + model_family: str, + model_params: dict[str, Any], + random_state: int, + x_train, + y_train: np.ndarray, + calibration_method: str, + fallback_to_none: bool = True, +): + base = make_model(model_family=model_family, random_state=random_state, params=model_params) + if calibration_method == "none": + base.fit(x_train, y_train) + return base, { + "requested_method": "none", + "effective_method": "none", + "cv_splits": 0, + } + + splits = calibration_cv_splits(len(y_train)) + if splits < 2: + if not fallback_to_none: + raise RuntimeError("not enough rows for calibration folds") + base.fit(x_train, y_train) + return base, { + "requested_method": calibration_method, + "effective_method": "none", + "cv_splits": splits, + "warning": "insufficient rows for calibration folds; falling back to uncalibrated model", + } + + try: + calibrated = CalibratedClassifierCV( + estimator=base, + method=calibration_method, + cv=TimeSeriesSplit(n_splits=splits), + ) + calibrated.fit(x_train, y_train) + return calibrated, { + "requested_method": calibration_method, + "effective_method": calibration_method, + "cv_splits": splits, + } + except Exception as exc: + if not fallback_to_none: + raise + base.fit(x_train, y_train) + return base, { + "requested_method": calibration_method, + "effective_method": "none", + "cv_splits": splits, + "warning": f"calibration failed ({exc}); falling back to uncalibrated model", + } + + +def hyperparameter_search( + model_family: str, + x_train, + y_train: np.ndarray, + x_val, + y_val: np.ndarray, + random_state: int, + min_precision: float, + fixed_threshold: float | None, + enabled: bool, + max_trials: int, +) -> tuple[dict[str, Any], list[dict[str, Any]]]: + if enabled: + grid = limit_trials(model_param_grid(model_family), max_trials=max_trials) + else: + grid = [default_model_params(model_family)] + + trials: list[dict[str, Any]] = [] + for params in grid: + model = make_model(model_family=model_family, random_state=random_state, params=params) + model.fit(x_train, y_train) + y_val_prob = model.predict_proba(x_val)[:, 1] + threshold, threshold_info = threshold_from_probs( + y_true=y_val, + y_prob=y_val_prob, + min_precision=min_precision, + fixed_threshold=fixed_threshold, + ) + metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=threshold) + trials.append( + { + "model_family": model_family, + "model_params": params, + "threshold": float(threshold), + "threshold_info": threshold_info, + "validation_metrics": metrics, + } + ) + + best = max(trials, key=lambda v: metric_key(v["validation_metrics"])) + return best["model_params"], trials + + +def calibration_selection_key(result: dict[str, Any]) -> tuple[float, float, float, float, float]: + metrics = result["validation_metrics"] + brier = float(metrics["brier"]) + ece_10 = float(result["calibration_quality"]["ece_10"]) + return ( + brier, + ece_10, + -safe_pr_auc(metrics), + -safe_roc_auc(metrics), + -float(metrics["f1"]), + ) + + +def evaluate_calibration_methods( + model_family: str, + model_params: dict[str, Any], + calibration_methods: list[str], + x_train, + y_train: np.ndarray, + x_val, + y_val: np.ndarray, + random_state: int, + min_precision: float, + fixed_threshold: float | None, +) -> tuple[dict[str, Any], list[dict[str, Any]]]: + results: list[dict[str, Any]] = [] + for method in calibration_methods: + fitted, fit_info = fit_with_optional_calibration( + model_family=model_family, + model_params=model_params, + random_state=random_state, + x_train=x_train, + y_train=y_train, + calibration_method=method, + fallback_to_none=True, + ) + y_val_prob = fitted.predict_proba(x_val)[:, 1] + threshold, threshold_info = threshold_from_probs( + y_true=y_val, + y_prob=y_val_prob, + min_precision=min_precision, + fixed_threshold=fixed_threshold, + ) + metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=threshold) + results.append( + { + "calibration_method": method, + "fit": fit_info, + "threshold": float(threshold), + "threshold_info": threshold_info, + "validation_metrics": metrics, + "calibration_quality": { + "ece_10": expected_calibration_error(y_true=y_val, y_prob=y_val_prob, bins=10), + }, + } + ) + + selected = min(results, key=calibration_selection_key) + return selected, results + + +def evaluate_naive_baselines(test_df, y_test: np.ndarray) -> dict[str, Any]: + out: dict[str, Any] = {} + + if "rain_last_1h_mm" in test_df.columns: + rain_last = test_df["rain_last_1h_mm"].to_numpy(dtype=float) + persistence_prob = (rain_last >= RAIN_EVENT_THRESHOLD_MM).astype(float) + out["persistence_last_1h"] = { + "rule": f"predict rain when rain_last_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", + "metrics": evaluate_probs(y_true=y_test, y_prob=persistence_prob, threshold=0.5), + } + + has_fc_prob = "fc_precip_prob" in test_df.columns and test_df["fc_precip_prob"].notna().any() + has_fc_mm = "fc_precip_mm" in test_df.columns and test_df["fc_precip_mm"].notna().any() + if has_fc_prob: + fc_prob = test_df["fc_precip_prob"].fillna(0.0).clip(lower=0.0, upper=1.0).to_numpy(dtype=float) + out["forecast_precip_prob"] = { + "rule": "use fc_precip_prob directly as baseline probability", + "metrics": evaluate_probs(y_true=y_test, y_prob=fc_prob, threshold=0.5), + } + + if has_fc_prob or has_fc_mm: + fc_prob = ( + test_df["fc_precip_prob"].fillna(0.0).clip(lower=0.0, upper=1.0).to_numpy(dtype=float) + if "fc_precip_prob" in test_df.columns + else np.zeros(len(test_df), dtype=float) + ) + fc_mm = ( + test_df["fc_precip_mm"].fillna(0.0).to_numpy(dtype=float) + if "fc_precip_mm" in test_df.columns + else np.zeros(len(test_df), dtype=float) + ) + rule_prob = ((fc_prob >= 0.5) | (fc_mm >= RAIN_EVENT_THRESHOLD_MM)).astype(float) + out["forecast_threshold_rule"] = { + "rule": ( + "predict rain when (fc_precip_prob >= 0.50) " + f"OR (fc_precip_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f})" + ), + "metrics": evaluate_probs(y_true=y_test, y_prob=rule_prob, threshold=0.5), + } + + return out + + +def walk_forward_backtest( + model_df, + feature_cols: list[str], + model_family: str, + model_params: dict[str, Any], + calibration_method: str, + random_state: int, + min_precision: float, + fixed_threshold: float | None, + folds: int, +) -> dict[str, Any]: + if folds <= 0: + return {"enabled": False, "folds": [], "summary": None} + + n = len(model_df) + min_train_rows = max(200, int(0.4 * n)) + remaining = n - min_train_rows + if remaining < 50: + return { + "enabled": True, + "folds": [], + "summary": { + "error": "not enough rows for walk-forward folds", + "requested_folds": folds, + "min_train_rows": min_train_rows, + }, + } + + fold_size = max(25, remaining // folds) + results: list[dict[str, Any]] = [] + + for idx in range(folds): + train_end = min_train_rows + idx * fold_size + test_end = n if idx == folds - 1 else min(min_train_rows + (idx + 1) * fold_size, n) + if train_end >= test_end: + continue + + fold_train = model_df.iloc[:train_end] + fold_test = model_df.iloc[train_end:test_end] + if len(fold_train) < 160 or len(fold_test) < 25: + continue + + y_fold_train = fold_train["rain_next_1h"].astype(int).to_numpy() + y_fold_test = fold_test["rain_next_1h"].astype(int).to_numpy() + if len(np.unique(y_fold_train)) < 2: + continue + + inner_val_rows = max(40, int(0.2 * len(fold_train))) + if len(fold_train) - inner_val_rows < 80: + continue + inner_train = fold_train.iloc[:-inner_val_rows] + inner_val = fold_train.iloc[-inner_val_rows:] + y_inner_train = inner_train["rain_next_1h"].astype(int).to_numpy() + y_inner_val = inner_val["rain_next_1h"].astype(int).to_numpy() + if len(np.unique(y_inner_train)) < 2: + continue + + try: + threshold_model, threshold_fit = fit_with_optional_calibration( + model_family=model_family, + model_params=model_params, + random_state=random_state, + x_train=inner_train[feature_cols], + y_train=y_inner_train, + calibration_method=calibration_method, + fallback_to_none=True, + ) + inner_val_prob = threshold_model.predict_proba(inner_val[feature_cols])[:, 1] + fold_threshold, fold_threshold_info = threshold_from_probs( + y_true=y_inner_val, + y_prob=inner_val_prob, + min_precision=min_precision, + fixed_threshold=fixed_threshold, + ) + + fold_model, fold_fit = fit_with_optional_calibration( + model_family=model_family, + model_params=model_params, + random_state=random_state, + x_train=fold_train[feature_cols], + y_train=y_fold_train, + calibration_method=calibration_method, + fallback_to_none=True, + ) + fold_test_prob = fold_model.predict_proba(fold_test[feature_cols])[:, 1] + fold_metrics = evaluate_probs(y_true=y_fold_test, y_prob=fold_test_prob, threshold=fold_threshold) + fold_result = { + "fold_index": idx + 1, + "train_rows": len(fold_train), + "test_rows": len(fold_test), + "train_start": fold_train.index.min(), + "train_end": fold_train.index.max(), + "test_start": fold_test.index.min(), + "test_end": fold_test.index.max(), + "threshold": float(fold_threshold), + "threshold_selection": fold_threshold_info, + "threshold_fit": threshold_fit, + "model_fit": fold_fit, + "metrics": fold_metrics, + } + results.append(fold_result) + except Exception as exc: + results.append( + { + "fold_index": idx + 1, + "train_rows": len(fold_train), + "test_rows": len(fold_test), + "error": str(exc), + } + ) + + good = [r for r in results if "metrics" in r] + if not good: + return { + "enabled": True, + "folds": results, + "summary": { + "error": "no successful fold evaluations", + "requested_folds": folds, + }, + } + + summary = { + "successful_folds": len(good), + "requested_folds": folds, + "mean_precision": float(np.mean([f["metrics"]["precision"] for f in good])), + "mean_recall": float(np.mean([f["metrics"]["recall"] for f in good])), + "mean_f1": float(np.mean([f["metrics"]["f1"] for f in good])), + "mean_brier": float(np.mean([f["metrics"]["brier"] for f in good])), + "mean_pr_auc": float(np.mean([f["metrics"]["pr_auc"] for f in good if f["metrics"]["pr_auc"] is not None])) + if any(f["metrics"]["pr_auc"] is not None for f in good) + else None, + "mean_roc_auc": float(np.mean([f["metrics"]["roc_auc"] for f in good if f["metrics"]["roc_auc"] is not None])) + if any(f["metrics"]["roc_auc"] is not None for f in good) + else None, + } + return {"enabled": True, "folds": results, "summary": summary} + + +def write_model_card(path: str, report: dict[str, Any]) -> None: + lines = [ + "# Rain Model Card", + "", + f"- Model version: `{report['model_version']}`", + f"- Generated at (UTC): `{report['generated_at']}`", + f"- Site: `{report['site']}`", + f"- Target: `{report['target_definition']}`", + f"- Feature set: `{report['feature_set']}`", + f"- Model family: `{report['model_family']}`", + f"- Model params: `{report['model_params']}`", + f"- Calibration method: `{report['calibration_method']}`", + f"- Operating threshold: `{report['threshold_selection']['threshold']:.3f}`", + "", + "## Data Window", + "", + f"- Requested start: `{report['data_window']['requested_start']}`", + f"- Requested end: `{report['data_window']['requested_end']}`", + f"- Actual start: `{report['data_window']['actual_start']}`", + f"- Actual end: `{report['data_window']['actual_end']}`", + f"- Rows: train `{report['split']['train_rows']}`, val `{report['split']['val_rows']}`, test `{report['split']['test_rows']}`", + "", + "## Features", + "", + ] + for col in report["feature_columns"]: + lines.append(f"- `{col}`") + + lines.extend( + [ + "", + "## Performance", + "", + "- Validation:", + f" precision `{report['validation_metrics']['precision']:.3f}`, " + f"recall `{report['validation_metrics']['recall']:.3f}`, " + f"PR-AUC `{report['validation_metrics']['pr_auc']}`, " + f"ROC-AUC `{report['validation_metrics']['roc_auc']}`, " + f"Brier `{report['validation_metrics']['brier']:.4f}`", + "- Test:", + f" precision `{report['test_metrics']['precision']:.3f}`, " + f"recall `{report['test_metrics']['recall']:.3f}`, " + f"PR-AUC `{report['test_metrics']['pr_auc']}`, " + f"ROC-AUC `{report['test_metrics']['roc_auc']}`, " + f"Brier `{report['test_metrics']['brier']:.4f}`", + "", + "## Known Limitations", + "", + "- Sensor rain counter resets are clipped at zero increment; extreme spikes are flagged but not fully removed.", + "- Forecast feature availability can vary by upstream model freshness.", + "- Performance may drift seasonally and should be monitored with the drift queries in docs.", + "", + ] + ) + + card_dir = os.path.dirname(path) + if card_dir: + os.makedirs(card_dir, exist_ok=True) + with open(path, "w", encoding="utf-8") as f: + f.write("\n".join(lines)) + + def train_candidate( model_family: str, x_train, @@ -132,30 +709,51 @@ def train_candidate( random_state: int, min_precision: float, fixed_threshold: float | None, + tune_hyperparameters: bool, + max_hyperparam_trials: int, + calibration_methods: list[str], ) -> dict[str, Any]: - model = make_model(model_family=model_family, random_state=random_state) - model.fit(x_train, y_train) - y_val_prob = model.predict_proba(x_val)[:, 1] + best_params, tuning_trials = hyperparameter_search( + model_family=model_family, + x_train=x_train, + y_train=y_train, + x_val=x_val, + y_val=y_val, + random_state=random_state, + min_precision=min_precision, + fixed_threshold=fixed_threshold, + enabled=tune_hyperparameters, + max_trials=max_hyperparam_trials, + ) - if fixed_threshold is not None: - threshold = fixed_threshold - threshold_info = { - "selection_rule": "fixed_cli_threshold", - "threshold": float(fixed_threshold), - } - else: - threshold, threshold_info = select_threshold( - y_true=y_val, - y_prob=y_val_prob, - min_precision=min_precision, - ) + selected_calibration, calibration_trials = evaluate_calibration_methods( + model_family=model_family, + model_params=best_params, + calibration_methods=calibration_methods, + x_train=x_train, + y_train=y_train, + x_val=x_val, + y_val=y_val, + random_state=random_state, + min_precision=min_precision, + fixed_threshold=fixed_threshold, + ) - val_metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=threshold) return { "model_family": model_family, - "threshold": float(threshold), - "threshold_info": threshold_info, - "validation_metrics": val_metrics, + "model_params": best_params, + "hyperparameter_tuning": { + "enabled": tune_hyperparameters, + "trial_count": len(tuning_trials), + "trials": tuning_trials, + }, + "calibration_comparison": calibration_trials, + "calibration_method": selected_calibration["calibration_method"], + "calibration_fit": selected_calibration["fit"], + "threshold": float(selected_calibration["threshold"]), + "threshold_info": selected_calibration["threshold_info"], + "validation_metrics": selected_calibration["validation_metrics"], + "calibration_quality": selected_calibration["calibration_quality"], } @@ -168,6 +766,7 @@ def main() -> int: end = parse_time(args.end) if args.end else "" feature_cols = feature_columns_for_set(args.feature_set) needs_forecast = feature_columns_need_forecast(feature_cols) + calibration_methods = parse_calibration_methods(args.calibration_methods) with psycopg2.connect(args.db_url) as conn: ws90 = fetch_ws90(conn, args.site, start, end) @@ -176,10 +775,27 @@ def main() -> int: if needs_forecast: forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model) + if ws90.empty: + message = "no ws90 observations found in requested window" + if args.allow_empty: + print(f"Rain model training skipped: {message}.") + return 0 + raise RuntimeError(message) + if baro.empty: + message = "no barometer observations found in requested window" + if args.allow_empty: + print(f"Rain model training skipped: {message}.") + return 0 + raise RuntimeError(message) + full_df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) model_df = model_frame(full_df, feature_cols, require_target=True) if len(model_df) < args.min_rows: - raise RuntimeError(f"not enough model-ready rows after filtering (need >= {args.min_rows})") + message = f"not enough model-ready rows after filtering (need >= {args.min_rows})" + if args.allow_empty: + print(f"Rain model training skipped: {message}.") + return 0 + raise RuntimeError(message) train_df, val_df, test_df = split_time_ordered( model_df, @@ -194,6 +810,11 @@ def main() -> int: x_test = test_df[feature_cols] y_test = test_df["rain_next_1h"].astype(int).to_numpy() + if len(np.unique(y_train)) < 2: + raise RuntimeError("training split does not contain both classes; cannot train classifier") + if len(np.unique(y_val)) < 2: + print("warning: validation split has one class; AUC metrics may be unavailable", flush=True) + candidate_families = ["logreg", "hist_gb"] if args.model_family == "auto" else [args.model_family] candidates = [ train_candidate( @@ -205,18 +826,19 @@ def main() -> int: random_state=args.random_state, min_precision=args.min_precision, fixed_threshold=args.threshold, + tune_hyperparameters=args.tune_hyperparameters, + max_hyperparam_trials=args.max_hyperparam_trials, + calibration_methods=calibration_methods, ) for family in candidate_families ] best_candidate = max( candidates, - key=lambda c: ( - safe_pr_auc(c["validation_metrics"]), - safe_roc_auc(c["validation_metrics"]), - float(c["validation_metrics"]["f1"]), - ), + key=lambda c: metric_key(c["validation_metrics"]), ) selected_model_family = str(best_candidate["model_family"]) + selected_model_params = best_candidate["model_params"] + selected_calibration_method = str(best_candidate["calibration_method"]) chosen_threshold = float(best_candidate["threshold"]) threshold_info = best_candidate["threshold_info"] val_metrics = best_candidate["validation_metrics"] @@ -225,10 +847,32 @@ def main() -> int: x_train_val = train_val_df[feature_cols] y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy() - final_model = make_model(model_family=selected_model_family, random_state=args.random_state) - final_model.fit(x_train_val, y_train_val) + final_model, final_fit_info = fit_with_optional_calibration( + model_family=selected_model_family, + model_params=selected_model_params, + random_state=args.random_state, + x_train=x_train_val, + y_train=y_train_val, + calibration_method=selected_calibration_method, + fallback_to_none=True, + ) y_test_prob = final_model.predict_proba(x_test)[:, 1] test_metrics = evaluate_probs(y_true=y_test, y_prob=y_test_prob, threshold=chosen_threshold) + test_calibration = { + "ece_10": expected_calibration_error(y_true=y_test, y_prob=y_test_prob, bins=10), + } + naive_baselines_test = evaluate_naive_baselines(test_df=test_df, y_test=y_test) + walk_forward = walk_forward_backtest( + model_df=model_df, + feature_cols=feature_cols, + model_family=selected_model_family, + model_params=selected_model_params, + calibration_method=selected_calibration_method, + random_state=args.random_state, + min_precision=args.min_precision, + fixed_threshold=args.threshold, + folds=args.walk_forward_folds, + ) report = { "generated_at": datetime.now(timezone.utc).isoformat(), @@ -236,10 +880,14 @@ def main() -> int: "model_version": args.model_version, "model_family_requested": args.model_family, "model_family": selected_model_family, + "model_params": selected_model_params, "feature_set": args.feature_set, "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", "feature_columns": feature_cols, "forecast_model": args.forecast_model if needs_forecast else None, + "calibration_method_requested": calibration_methods, + "calibration_method": selected_calibration_method, + "calibration_fit": final_fit_info, "data_window": { "requested_start": start or None, "requested_end": end or None, @@ -275,16 +923,25 @@ def main() -> int: "candidate_models": [ { "model_family": c["model_family"], + "model_params": c["model_params"], + "hyperparameter_tuning": c["hyperparameter_tuning"], + "calibration_method": c["calibration_method"], + "calibration_fit": c["calibration_fit"], + "calibration_comparison": c["calibration_comparison"], "threshold_selection": { **c["threshold_info"], "min_precision_constraint": args.min_precision, }, + "calibration_quality": c["calibration_quality"], "validation_metrics": c["validation_metrics"], } for c in candidates ], "validation_metrics": val_metrics, "test_metrics": test_metrics, + "test_calibration_quality": test_calibration, + "naive_baselines_test": naive_baselines_test, + "walk_forward_backtest": walk_forward, } report = to_builtin(report) @@ -292,8 +949,16 @@ def main() -> int: print(f" site: {args.site}") print(f" model_version: {args.model_version}") print(f" model_family: {selected_model_family} (requested={args.model_family})") + print(f" model_params: {selected_model_params}") + print(f" calibration_method: {report['calibration_method']}") print(f" feature_set: {args.feature_set} ({len(feature_cols)} features)") - print(f" rows: total={report['data_window']['model_rows']} train={report['split']['train_rows']} val={report['split']['val_rows']} test={report['split']['test_rows']}") + print( + " rows: " + f"total={report['data_window']['model_rows']} " + f"train={report['split']['train_rows']} " + f"val={report['split']['val_rows']} " + f"test={report['split']['test_rows']}" + ) print( " threshold: " f"{report['threshold_selection']['threshold']:.3f} " @@ -311,8 +976,32 @@ def main() -> int: f"precision={report['test_metrics']['precision']:.3f} " f"recall={report['test_metrics']['recall']:.3f} " f"roc_auc={report['test_metrics']['roc_auc'] if report['test_metrics']['roc_auc'] is not None else 'n/a'} " - f"pr_auc={report['test_metrics']['pr_auc'] if report['test_metrics']['pr_auc'] is not None else 'n/a'}" + f"pr_auc={report['test_metrics']['pr_auc'] if report['test_metrics']['pr_auc'] is not None else 'n/a'} " + f"brier={report['test_metrics']['brier']:.4f} " + f"ece10={report['test_calibration_quality']['ece_10']:.4f}" ) + if report["walk_forward_backtest"]["summary"] is not None: + summary = report["walk_forward_backtest"]["summary"] + if "error" in summary: + print(f" walk-forward: {summary['error']}") + else: + print( + " walk-forward: " + f"folds={summary['successful_folds']}/{summary['requested_folds']} " + f"mean_precision={summary['mean_precision']:.3f} " + f"mean_recall={summary['mean_recall']:.3f} " + f"mean_pr_auc={summary['mean_pr_auc'] if summary['mean_pr_auc'] is not None else 'n/a'}" + ) + if report["naive_baselines_test"]: + print(" naive baselines (test):") + for name, baseline in report["naive_baselines_test"].items(): + m = baseline["metrics"] + print( + f" {name}: " + f"precision={m['precision']:.3f} recall={m['recall']:.3f} " + f"pr_auc={m['pr_auc'] if m['pr_auc'] is not None else 'n/a'} " + f"brier={m['brier']:.4f}" + ) if args.report_out: report_dir = os.path.dirname(args.report_out) @@ -322,6 +1011,11 @@ def main() -> int: json.dump(report, f, indent=2) print(f"Saved report to {args.report_out}") + if args.model_card_out: + model_card_out = args.model_card_out.format(model_version=args.model_version) + write_model_card(model_card_out, report) + print(f"Saved model card to {model_card_out}") + if args.dataset_out: dataset_out = args.dataset_out.format(model_version=args.model_version, feature_set=args.feature_set) dataset_dir = os.path.dirname(dataset_out) @@ -341,6 +1035,9 @@ def main() -> int: artifact = { "model": final_model, "model_family": selected_model_family, + "model_params": selected_model_params, + "calibration_method": selected_calibration_method, + "calibration_fit": final_fit_info, "features": feature_cols, "feature_set": args.feature_set, "forecast_model": args.forecast_model if needs_forecast else None, diff --git a/todo.md b/todo.md index 84266bb..84d4677 100644 --- a/todo.md +++ b/todo.md @@ -12,7 +12,7 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat - [x] [P0] Audit `observations_ws90` and `observations_baro` for missingness, gaps, duplicates, and out-of-order rows. (completed on runtime machine) - [x] [P0] Validate rain label construction from `rain_mm` (counter resets, negative deltas, spikes). (completed on runtime machine) - [x] [P0] Measure class balance by week (rain-positive vs rain-negative). (completed on runtime machine) -- [ ] [P1] Document known data issues and mitigation rules. +- [x] [P1] Document known data issues and mitigation rules. (see `docs/rain_data_issues.md`) ## 3) Dataset and Feature Engineering - [x] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow. @@ -26,16 +26,16 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat - [x] [P0] Keep logistic regression as baseline. - [x] [P1] Add at least one tree-based baseline (e.g. gradient boosting). (implemented via `hist_gb`; runtime evaluation pending local Python deps) - [x] [P0] Use strict time-based train/validation/test splits (no random shuffling). -- [ ] [P1] Add walk-forward backtesting across multiple temporal folds. -- [ ] [P1] Tune hyperparameters on validation data only. -- [ ] [P1] Calibrate probabilities (Platt or isotonic) and compare calibration quality. +- [x] [P1] Add walk-forward backtesting across multiple temporal folds. (`train_rain_model.py --walk-forward-folds`) +- [x] [P1] Tune hyperparameters on validation data only. (`train_rain_model.py --tune-hyperparameters`) +- [x] [P1] Calibrate probabilities (Platt or isotonic) and compare calibration quality. (`--calibration-methods`) - [x] [P0] Choose and lock the operating threshold based on use-case costs. ## 5) Evaluation and Reporting - [x] [P0] Report ROC-AUC, PR-AUC, confusion matrix, precision, recall, and Brier score. -- [ ] [P1] Compare against naive baselines (persistence and simple forecast-threshold rules). +- [x] [P1] Compare against naive baselines (persistence and simple forecast-threshold rules). - [ ] [P2] Slice performance by periods/weather regimes (day/night, rainy weeks, etc.). -- [ ] [P1] Produce a short model card (data window, features, metrics, known limitations). +- [x] [P1] Produce a short model card (data window, features, metrics, known limitations). (`--model-card-out`) ## 6) Packaging and Deployment - [x] [P1] Version model artifacts and feature schema together. @@ -45,10 +45,10 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat - [ ] [P2] Add scheduled retraining with rollback to last-known-good model. ## 7) Monitoring and Operations -- [ ] [P1] Track feature drift and prediction drift over time. -- [ ] [P1] Track calibration drift and realized performance after deployment. -- [ ] [P1] Add alerts for training/inference/data pipeline failures. -- [ ] [P1] Document runbook for train/evaluate/deploy/rollback. +- [x] [P1] Track feature drift and prediction drift over time. (view: `rain_feature_drift_daily`, `rain_prediction_drift_daily`) +- [x] [P1] Track calibration drift and realized performance after deployment. (view: `rain_calibration_drift_daily`) +- [x] [P1] Add alerts for training/inference/data pipeline failures. (`scripts/check_rain_pipeline_health.py`) +- [x] [P1] Document runbook for train/evaluate/deploy/rollback. (see `docs/rain_model_runbook.md`) ## 8) Immediate Next Steps (This Week) - [x] [P0] Run first full data audit and label-quality checks. (completed on runtime machine)