This commit is contained in:
2026-03-12 19:55:51 +11:00
parent 76851f0816
commit d1237eed44
12 changed files with 1444 additions and 82 deletions

View File

@@ -94,6 +94,17 @@ docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-ent
Or copy just the `observations_baro` section into a manual `psql -c`.
For rain-model monitoring views, also apply `db/init/002_rain_monitoring_views.sql` on existing DBs:
```sh
docker compose exec -T timescaledb psql -U postgres -d micrometeo -f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql
```
Runbook/docs:
- `docs/rain_prediction.md`
- `docs/rain_data_issues.md`
- `docs/rain_model_runbook.md`
## Publish a test WS90 payload
```sh
mosquitto_pub -h localhost -t ecowitt/ws90 -m '{"model":"Fineoffset-WS90","id":70618,"battery_ok":1,"battery_mV":3180,"temperature_C":24.2,"humidity":60,"wind_dir_deg":129,"wind_avg_m_s":0,"wind_max_m_s":0,"uvi":0,"light_lux":0,"flags":130,"rain_mm":0,"rain_start":0,"supercap_V":0.5,"firmware":160,"data":"3fff000000------0000ff7ff70000","mic":"CRC","protocol":"Fine Offset Electronics WS90 weather station","rssi":-44,"duration":32996}'

View File

@@ -0,0 +1,156 @@
-- Rain-model monitoring views (feature/prediction/calibration drift + pipeline freshness).
-- Safe to re-run.
CREATE OR REPLACE VIEW rain_feature_drift_daily AS
WITH ws90_daily AS (
SELECT
time_bucket(INTERVAL '1 day', ts) AS day,
site,
count(*) AS ws90_rows,
avg(temperature_c) AS temp_mean,
avg(humidity) AS humidity_mean,
avg(wind_avg_m_s) AS wind_avg_mean,
avg(wind_max_m_s) AS wind_gust_mean
FROM observations_ws90
GROUP BY 1,2
),
baro_daily AS (
SELECT
time_bucket(INTERVAL '1 day', ts) AS day,
site,
count(*) AS baro_rows,
avg(pressure_hpa) AS pressure_mean
FROM observations_baro
GROUP BY 1,2
),
joined AS (
SELECT
w.day,
w.site,
w.ws90_rows,
b.baro_rows,
w.temp_mean,
w.humidity_mean,
w.wind_avg_mean,
w.wind_gust_mean,
b.pressure_mean
FROM ws90_daily w
LEFT JOIN baro_daily b
ON b.day = w.day
AND b.site = w.site
),
baseline AS (
SELECT
j1.day,
j1.site,
j1.ws90_rows,
j1.baro_rows,
j1.temp_mean,
j1.humidity_mean,
j1.wind_avg_mean,
j1.wind_gust_mean,
j1.pressure_mean,
avg(j2.temp_mean) AS temp_mean_30d,
stddev_samp(j2.temp_mean) AS temp_std_30d,
avg(j2.humidity_mean) AS humidity_mean_30d,
stddev_samp(j2.humidity_mean) AS humidity_std_30d,
avg(j2.wind_avg_mean) AS wind_avg_mean_30d,
stddev_samp(j2.wind_avg_mean) AS wind_avg_std_30d,
avg(j2.wind_gust_mean) AS wind_gust_mean_30d,
stddev_samp(j2.wind_gust_mean) AS wind_gust_std_30d,
avg(j2.pressure_mean) AS pressure_mean_30d,
stddev_samp(j2.pressure_mean) AS pressure_std_30d
FROM joined j1
LEFT JOIN joined j2
ON j2.site = j1.site
AND j2.day < j1.day
AND j2.day >= j1.day - INTERVAL '30 days'
GROUP BY
j1.day,
j1.site,
j1.ws90_rows,
j1.baro_rows,
j1.temp_mean,
j1.humidity_mean,
j1.wind_avg_mean,
j1.wind_gust_mean,
j1.pressure_mean
)
SELECT
day,
site,
ws90_rows,
baro_rows,
temp_mean,
humidity_mean,
wind_avg_mean,
wind_gust_mean,
pressure_mean,
temp_mean_30d,
humidity_mean_30d,
wind_avg_mean_30d,
wind_gust_mean_30d,
pressure_mean_30d,
CASE WHEN temp_std_30d > 0 THEN (temp_mean - temp_mean_30d) / temp_std_30d END AS temp_zscore_30d,
CASE WHEN humidity_std_30d > 0 THEN (humidity_mean - humidity_mean_30d) / humidity_std_30d END AS humidity_zscore_30d,
CASE WHEN wind_avg_std_30d > 0 THEN (wind_avg_mean - wind_avg_mean_30d) / wind_avg_std_30d END AS wind_avg_zscore_30d,
CASE WHEN wind_gust_std_30d > 0 THEN (wind_gust_mean - wind_gust_mean_30d) / wind_gust_std_30d END AS wind_gust_zscore_30d,
CASE WHEN pressure_std_30d > 0 THEN (pressure_mean - pressure_mean_30d) / pressure_std_30d END AS pressure_zscore_30d
FROM baseline;
CREATE OR REPLACE VIEW rain_prediction_drift_daily AS
SELECT
time_bucket(INTERVAL '1 day', ts) AS day,
site,
model_name,
model_version,
count(*) AS prediction_rows,
avg(probability) AS probability_mean,
stddev_samp(probability) AS probability_stddev,
avg(CASE WHEN predict_rain THEN 1.0 ELSE 0.0 END) AS predicted_positive_rate,
avg(threshold) AS threshold_mean
FROM predictions_rain_1h
GROUP BY 1,2,3,4;
CREATE OR REPLACE VIEW rain_calibration_drift_daily AS
SELECT
time_bucket(INTERVAL '1 day', ts) AS day,
site,
model_name,
model_version,
count(*) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS evaluated_rows,
avg(probability) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS mean_probability,
avg(CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END) FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS observed_positive_rate,
avg(power(probability - CASE WHEN rain_next_1h_actual THEN 1.0 ELSE 0.0 END, 2.0))
FILTER (WHERE rain_next_1h_actual IS NOT NULL) AS brier_score,
avg(
CASE
WHEN rain_next_1h_actual IS NULL THEN NULL
WHEN predict_rain = rain_next_1h_actual THEN 1.0
ELSE 0.0
END
) AS decision_accuracy
FROM predictions_rain_1h
GROUP BY 1,2,3,4;
CREATE OR REPLACE VIEW rain_pipeline_health AS
WITH sites AS (
SELECT DISTINCT site FROM observations_ws90
UNION
SELECT DISTINCT site FROM observations_baro
UNION
SELECT DISTINCT site FROM forecast_openmeteo_hourly
UNION
SELECT DISTINCT site FROM predictions_rain_1h
)
SELECT
s.site,
(SELECT max(ts) FROM observations_ws90 w WHERE w.site = s.site) AS ws90_latest_ts,
(SELECT max(ts) FROM observations_baro b WHERE b.site = s.site) AS baro_latest_ts,
(SELECT max(ts) FROM forecast_openmeteo_hourly f WHERE f.site = s.site) AS forecast_latest_ts,
(SELECT max(generated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_generated_latest_ts,
(SELECT max(evaluated_at) FROM predictions_rain_1h p WHERE p.site = s.site) AS prediction_evaluated_latest_ts
FROM sites s;

View File

@@ -43,10 +43,16 @@ services:
RAIN_TRAIN_INTERVAL_HOURS: "24"
RAIN_PREDICT_INTERVAL_MINUTES: "10"
RAIN_MIN_PRECISION: "0.70"
RAIN_TUNE_HYPERPARAMETERS: "true"
RAIN_MAX_HYPERPARAM_TRIALS: "12"
RAIN_CALIBRATION_METHODS: "none,sigmoid,isotonic"
RAIN_WALK_FORWARD_FOLDS: "0"
RAIN_ALLOW_EMPTY_DATA: "true"
RAIN_MODEL_PATH: "/app/models/rain_model.pkl"
RAIN_REPORT_PATH: "/app/models/rain_model_report.json"
RAIN_AUDIT_PATH: "/app/models/rain_data_audit.json"
RAIN_DATASET_PATH: "/app/models/datasets/rain_dataset_{model_version}_{feature_set}.csv"
RAIN_MODEL_CARD_PATH: "/app/models/model_card_{model_version}.md"
volumes:
- ./models:/app/models

35
docs/rain_data_issues.md Normal file
View File

@@ -0,0 +1,35 @@
# Rain Model Data Issues and Mitigations
This document captures known data-quality issues observed in the rain-model pipeline and the mitigation rules used in code.
## Issue Register
| Area | Known issue | Mitigation in code/workflow |
|---|---|---|
| WS90 rain counter (`rain_mm`) | Counter resets can produce negative deltas. | `rain_inc_raw = diff(rain_mm)` then `rain_inc = clip(lower=0)`; reset events tracked as `rain_reset`. |
| WS90 rain spikes | Isolated large 5-minute jumps may be sensor/transmission anomalies. | Spikes flagged as `rain_spike_5m` when increment >= `5.0mm/5m`; counts tracked in audit/training report. |
| Sensor gaps | Missing 5-minute buckets from WS90/barometer ingestion. | Resample to 5-minute grid; barometer interpolated with short limit (`limit=3`); gap lengths tracked by audit. |
| Out-of-order arrivals | Late MQTT events can arrive with older `ts`. | Audit reports out-of-order count by sorting on `received_at` and checking `ts` monotonicity. |
| Duplicate rows | Replays/reconnects can duplicate sensor rows. | Audit reports duplicate counts by `(ts, station_id)` for WS90 and `(ts, source)` for barometer. |
| Forecast sparsity/jitter | Hourly forecast retrieval cadence does not always align with 5-minute features. | Select latest forecast per `ts` (`DISTINCT ON` + `retrieved_at DESC`), resample to 5 minutes, short forward/backfill windows, and clip `fc_precip_prob` to `[0,1]`. |
| Local vs UTC day boundary | Daily rainfall resets can look wrong when local timezone is not respected. | Station timezone is configured via `site.timezone` and used by Wunderground uploader; model training/inference stays UTC-based for split consistency. |
## Audit Command
Run this regularly and retain JSON reports for comparison:
```sh
python scripts/audit_rain_data.py \
--site home \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
--feature-set "extended" \
--forecast-model "ecmwf" \
--out "models/rain_data_audit.json"
```
## Operational Rules
- Treat large jumps in `rain_reset_count` or `rain_spike_5m_count` as data-quality incidents.
- If `gaps_5m.ws90_max_gap_minutes` or `gaps_5m.baro_max_gap_minutes` exceeds one hour, avoid model refresh until ingestion stabilizes.
- If forecast rows are unexpectedly low for an `extended` feature run, either fix forecast ingestion first or temporarily fall back to `baseline` feature set.

141
docs/rain_model_runbook.md Normal file
View File

@@ -0,0 +1,141 @@
# Rain Model Runbook
Operational guide for training, evaluating, deploying, monitoring, and rolling back the rain model.
## 1) One-time Setup
Apply monitoring views:
```sh
docker compose exec -T timescaledb \
psql -U postgres -d micrometeo \
-f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql
```
## 2) Train + Evaluate
Recommended evaluation run (includes validation-only tuning, calibration comparison, naive baselines, and walk-forward folds):
```sh
python scripts/train_rain_model.py \
--site "home" \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
--feature-set "extended" \
--model-family "auto" \
--forecast-model "ecmwf" \
--tune-hyperparameters \
--max-hyperparam-trials 12 \
--calibration-methods "none,sigmoid,isotonic" \
--walk-forward-folds 4 \
--model-version "rain-auto-v1-extended" \
--out "models/rain_model.pkl" \
--report-out "models/rain_model_report.json" \
--model-card-out "models/model_card_{model_version}.md" \
--dataset-out "models/datasets/rain_dataset_{model_version}_{feature_set}.csv"
```
Review in report:
- `candidate_models[*].hyperparameter_tuning`
- `candidate_models[*].calibration_comparison`
- `naive_baselines_test`
- `walk_forward_backtest`
## 3) Deploy
1. Promote the selected artifact path to the inference worker (`RAIN_MODEL_PATH` or CLI `--model-path`).
2. Run one dry-run inference:
```sh
python scripts/predict_rain_model.py \
--site home \
--model-path "models/rain_model.pkl" \
--model-name "rain_next_1h" \
--dry-run
```
3. Run live inference:
```sh
python scripts/predict_rain_model.py \
--site home \
--model-path "models/rain_model.pkl" \
--model-name "rain_next_1h"
```
## 4) Rollback
1. Identify the last known-good model artifact in `models/`.
2. Point deployment to that artifact (worker env `RAIN_MODEL_PATH` or manual inference path).
3. Re-run inference command and verify writes in `predictions_rain_1h`.
4. Keep the failed artifact/report for postmortem.
## 5) Monitoring
### Feature drift
```sql
SELECT *
FROM rain_feature_drift_daily
WHERE site = 'home'
ORDER BY day DESC
LIMIT 30;
```
Alert heuristic: any absolute z-score > 3 for 2+ consecutive days.
### Prediction drift
```sql
SELECT *
FROM rain_prediction_drift_daily
WHERE site = 'home'
ORDER BY day DESC
LIMIT 30;
```
Alert heuristic: `predicted_positive_rate` shifts by > 2x relative to trailing 14-day median.
### Calibration/performance drift
```sql
SELECT *
FROM rain_calibration_drift_daily
WHERE site = 'home'
ORDER BY day DESC
LIMIT 30;
```
Alert heuristic: sustained Brier-score increase > 25% from trailing 30-day average.
## 6) Pipeline Failure Alerts
Use the health-check script in cron, systemd timer, or your alerting scheduler:
```sh
python scripts/check_rain_pipeline_health.py \
--site home \
--model-name rain_next_1h \
--max-ws90-age 20m \
--max-baro-age 30m \
--max-forecast-age 3h \
--max-prediction-age 30m \
--max-pending-eval-age 3h \
--max-pending-eval-rows 200
```
The script exits non-zero on failure, so it can directly drive alerting.
## 7) Continuous Worker Defaults
`docker-compose.yml` provides these controls for `rainml`:
- `RAIN_TUNE_HYPERPARAMETERS`
- `RAIN_MAX_HYPERPARAM_TRIALS`
- `RAIN_CALIBRATION_METHODS`
- `RAIN_WALK_FORWARD_FOLDS`
- `RAIN_ALLOW_EMPTY_DATA`
- `RAIN_MODEL_CARD_PATH`
Recommended production defaults:
- Enable tuning daily or weekly (`RAIN_TUNE_HYPERPARAMETERS=true`)
- Keep walk-forward folds `0` in continuous mode, run fold backtests in scheduled evaluation jobs

View File

@@ -37,10 +37,12 @@ pip install -r scripts/requirements.txt
## Scripts
- `scripts/audit_rain_data.py`: data quality + label quality + class balance audit.
- `scripts/train_rain_model.py`: strict time-based split training and metrics report.
- `scripts/train_rain_model.py`: strict time-based split training and metrics report, with optional
validation-only hyperparameter tuning, calibration comparison, naive baseline comparison, and walk-forward folds.
- `scripts/predict_rain_model.py`: inference using saved model artifact; upserts into
`predictions_rain_1h`.
- `scripts/run_rain_ml_worker.py`: long-running worker for periodic training + prediction.
- `scripts/check_rain_pipeline_health.py`: freshness/failure check for alerting.
Feature-set options:
- `baseline`: original 5 local observation features.
@@ -55,7 +57,7 @@ Model-family options (`train_rain_model.py`):
## Usage
### 1) Apply schema update (existing DBs)
`001_schema.sql` now includes `predictions_rain_1h`.
`001_schema.sql` includes `predictions_rain_1h`.
```sh
docker compose exec -T timescaledb \
@@ -63,6 +65,14 @@ docker compose exec -T timescaledb \
-f /docker-entrypoint-initdb.d/001_schema.sql
```
Apply monitoring views:
```sh
docker compose exec -T timescaledb \
psql -U postgres -d micrometeo \
-f /docker-entrypoint-initdb.d/002_rain_monitoring_views.sql
```
### 2) Run data audit
```sh
export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable"
@@ -136,6 +146,25 @@ python scripts/train_rain_model.py \
--report-out "models/rain_model_report_auto.json"
```
### 3e) Full P1 evaluation (tuning + calibration + walk-forward)
```sh
python scripts/train_rain_model.py \
--site "home" \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
--feature-set "extended" \
--model-family "auto" \
--forecast-model "ecmwf" \
--tune-hyperparameters \
--max-hyperparam-trials 12 \
--calibration-methods "none,sigmoid,isotonic" \
--walk-forward-folds 4 \
--model-version "rain-auto-v1-extended-eval" \
--out "models/rain_model_auto.pkl" \
--report-out "models/rain_model_report_auto.json" \
--model-card-out "models/model_card_{model_version}.md"
```
### 4) Run inference and store prediction
```sh
python scripts/predict_rain_model.py \
@@ -154,6 +183,10 @@ bash scripts/run_p0_rain_workflow.sh
The `rainml` service in `docker-compose.yml` now runs:
- periodic retraining (default every 24 hours)
- periodic prediction writes (default every 10 minutes)
- configurable tuning/calibration behavior (`RAIN_TUNE_HYPERPARAMETERS`,
`RAIN_MAX_HYPERPARAM_TRIALS`, `RAIN_CALIBRATION_METHODS`)
- graceful gap handling for temporary source outages (`RAIN_ALLOW_EMPTY_DATA=true`)
- optional model-card output (`RAIN_MODEL_CARD_PATH`)
Artifacts are persisted to `./models` on the host.
@@ -165,6 +198,7 @@ docker compose logs -f rainml
## Output
- Audit report: `models/rain_data_audit.json`
- Training report: `models/rain_model_report.json`
- Model card: `models/model_card_<model_version>.md`
- Model artifact: `models/rain_model.pkl`
- Dataset snapshot: `models/datasets/rain_dataset_<model_version>_<feature_set>.csv`
- Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized
@@ -192,3 +226,5 @@ docker compose logs -f rainml
- Data is resampled into 5-minute buckets.
- Label is derived from incremental rain from WS90 cumulative `rain_mm`.
- Timestamps are handled as UTC in training/inference workflow.
- See [Data issues and mitigation rules](./rain_data_issues.md) and
[runbook/monitoring guidance](./rain_model_runbook.md).

View File

@@ -0,0 +1,214 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Any
import psycopg2
def parse_duration(value: str) -> timedelta:
raw = value.strip().lower()
if not raw:
raise ValueError("duration is empty")
units = {"s": 1, "m": 60, "h": 3600, "d": 86400}
suffix = raw[-1]
if suffix not in units:
raise ValueError(f"invalid duration suffix: {value} (expected one of s,m,h,d)")
amount = float(raw[:-1])
return timedelta(seconds=amount * units[suffix])
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Check freshness/health of rain-model data and predictions.")
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name.")
parser.add_argument("--model-name", default="rain_next_1h", help="Prediction model_name to check.")
parser.add_argument("--max-ws90-age", default="20m", help="Max allowed age for ws90 latest row.")
parser.add_argument("--max-baro-age", default="30m", help="Max allowed age for barometer latest row.")
parser.add_argument("--max-forecast-age", default="3h", help="Max allowed age for forecast latest row.")
parser.add_argument("--max-prediction-age", default="30m", help="Max allowed age for latest prediction write.")
parser.add_argument(
"--max-pending-eval-age",
default="3h",
help="Pending evaluations older than this count toward alert.",
)
parser.add_argument(
"--max-pending-eval-rows",
type=int,
default=200,
help="Alert if pending evaluation rows older than max-pending-eval-age exceed this count.",
)
parser.add_argument(
"--json-out",
help="Optional path to save JSON health report.",
)
return parser.parse_args()
def fetch_latest_ts(conn, sql: str, params: tuple[Any, ...]):
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
return row[0] if row else None
def fetch_count(conn, sql: str, params: tuple[Any, ...]) -> int:
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
if not row:
return 0
return int(row[0] or 0)
def age_seconds(now: datetime, ts: datetime | None) -> float | None:
if ts is None:
return None
return float((now - ts.astimezone(timezone.utc)).total_seconds())
def status_for_age(age_s: float | None, max_age: timedelta) -> tuple[str, str]:
if age_s is None:
return "error", "missing"
if age_s <= max_age.total_seconds():
return "ok", "fresh"
return "error", "stale"
def main() -> int:
args = parse_args()
if not args.db_url:
raise SystemExit("missing --db-url or DATABASE_URL")
now = datetime.now(timezone.utc)
max_ws90_age = parse_duration(args.max_ws90_age)
max_baro_age = parse_duration(args.max_baro_age)
max_forecast_age = parse_duration(args.max_forecast_age)
max_prediction_age = parse_duration(args.max_prediction_age)
max_pending_eval_age = parse_duration(args.max_pending_eval_age)
with psycopg2.connect(args.db_url) as conn:
ws90_latest = fetch_latest_ts(
conn,
"SELECT max(ts) FROM observations_ws90 WHERE site = %s",
(args.site,),
)
baro_latest = fetch_latest_ts(
conn,
"SELECT max(ts) FROM observations_baro WHERE site = %s",
(args.site,),
)
forecast_latest = fetch_latest_ts(
conn,
"SELECT max(ts) FROM forecast_openmeteo_hourly WHERE site = %s",
(args.site,),
)
prediction_latest = fetch_latest_ts(
conn,
"""
SELECT max(generated_at)
FROM predictions_rain_1h
WHERE site = %s
AND model_name = %s
""",
(args.site, args.model_name),
)
pending_eval_rows = fetch_count(
conn,
"""
SELECT count(*)
FROM predictions_rain_1h
WHERE site = %s
AND model_name = %s
AND evaluated_at IS NULL
AND ts < (now() - %s::interval)
""",
(args.site, args.model_name, f"{int(max_pending_eval_age.total_seconds())} seconds"),
)
ws90_age_s = age_seconds(now, ws90_latest)
baro_age_s = age_seconds(now, baro_latest)
forecast_age_s = age_seconds(now, forecast_latest)
prediction_age_s = age_seconds(now, prediction_latest)
ws90_status, ws90_reason = status_for_age(ws90_age_s, max_ws90_age)
baro_status, baro_reason = status_for_age(baro_age_s, max_baro_age)
forecast_status, forecast_reason = status_for_age(forecast_age_s, max_forecast_age)
prediction_status, prediction_reason = status_for_age(prediction_age_s, max_prediction_age)
pending_status = "ok" if pending_eval_rows <= args.max_pending_eval_rows else "error"
checks = {
"ws90_freshness": {
"status": ws90_status,
"reason": ws90_reason,
"latest_ts": ws90_latest,
"age_seconds": ws90_age_s,
"max_age_seconds": max_ws90_age.total_seconds(),
},
"baro_freshness": {
"status": baro_status,
"reason": baro_reason,
"latest_ts": baro_latest,
"age_seconds": baro_age_s,
"max_age_seconds": max_baro_age.total_seconds(),
},
"forecast_freshness": {
"status": forecast_status,
"reason": forecast_reason,
"latest_ts": forecast_latest,
"age_seconds": forecast_age_s,
"max_age_seconds": max_forecast_age.total_seconds(),
},
"prediction_freshness": {
"status": prediction_status,
"reason": prediction_reason,
"latest_ts": prediction_latest,
"age_seconds": prediction_age_s,
"max_age_seconds": max_prediction_age.total_seconds(),
},
"pending_evaluations": {
"status": pending_status,
"rows": pending_eval_rows,
"max_rows": args.max_pending_eval_rows,
"pending_older_than_seconds": max_pending_eval_age.total_seconds(),
},
}
overall_status = "ok"
failing = [name for name, item in checks.items() if item["status"] != "ok"]
if failing:
overall_status = "error"
report = {
"generated_at": now.isoformat(),
"site": args.site,
"model_name": args.model_name,
"status": overall_status,
"failing_checks": failing,
"checks": checks,
}
print(f"rain pipeline health: {overall_status}")
for name, item in checks.items():
print(f" {name}: {item['status']}")
if failing:
print(f" failing: {', '.join(failing)}")
if args.json_out:
out_dir = os.path.dirname(args.json_out)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
with open(args.json_out, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, default=str)
print(f"saved health report to {args.json_out}")
return 0 if overall_status == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -47,6 +47,19 @@ def parse_args() -> argparse.Namespace:
default="ecmwf",
help="Forecast model name when inference features require forecast columns.",
)
parser.set_defaults(allow_empty=True)
parser.add_argument(
"--allow-empty",
dest="allow_empty",
action="store_true",
help="Exit successfully when required source rows are temporarily unavailable (default: enabled).",
)
parser.add_argument(
"--strict-source-data",
dest="allow_empty",
action="store_false",
help="Fail when required source rows are unavailable.",
)
parser.add_argument("--dry-run", action="store_true", help="Do not write prediction to DB.")
return parser.parse_args()
@@ -96,11 +109,28 @@ def main() -> int:
if needs_forecast:
forecast = fetch_forecast(conn, args.site, fetch_start, fetch_end, model=forecast_model)
if ws90.empty:
message = "no ws90 observations found in source window"
if args.allow_empty:
print(f"Rain inference skipped: {message}.")
return 0
raise RuntimeError(message)
if baro.empty:
message = "no barometer observations found in source window"
if args.allow_empty:
print(f"Rain inference skipped: {message}.")
return 0
raise RuntimeError(message)
full_df = build_dataset(ws90, baro, forecast=forecast)
feature_df = model_frame(full_df, feature_cols=features, require_target=False)
candidates = feature_df.loc[feature_df.index <= at]
if candidates.empty:
raise RuntimeError("no feature-complete row available at or before requested timestamp")
message = "no feature-complete row available at or before requested timestamp"
if args.allow_empty:
print(f"Rain inference skipped: {message}.")
return 0
raise RuntimeError(message)
row = candidates.tail(1)
pred_ts = row.index[0].to_pydatetime()

View File

@@ -36,6 +36,8 @@ python scripts/train_rain_model.py \
--train-ratio 0.7 \
--val-ratio 0.15 \
--min-precision 0.70 \
--walk-forward-folds 0 \
--calibration-methods "none" \
--feature-set "$FEATURE_SET" \
--model-family "$MODEL_FAMILY" \
--forecast-model "$FORECAST_MODEL" \

View File

@@ -50,7 +50,13 @@ class WorkerConfig:
train_ratio: float
val_ratio: float
min_precision: float
tune_hyperparameters: bool
max_hyperparam_trials: int
calibration_methods: str
walk_forward_folds: int
allow_empty_data: bool
dataset_path_template: str
model_card_path_template: str
model_path: Path
report_path: Path
audit_path: Path
@@ -86,12 +92,15 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
start, end = training_window(cfg.lookback_days)
model_version = f"{cfg.model_version_base}-{now_utc().strftime('%Y%m%d%H%M')}"
dataset_out = cfg.dataset_path_template.format(model_version=model_version, feature_set=cfg.feature_set)
model_card_out = cfg.model_card_path_template.format(model_version=model_version)
ensure_parent(cfg.audit_path)
ensure_parent(cfg.report_path)
ensure_parent(cfg.model_path)
if dataset_out:
ensure_parent(Path(dataset_out))
if model_card_out:
ensure_parent(Path(model_card_out))
run_cmd(
[
@@ -113,39 +122,51 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
env,
)
run_cmd(
[
sys.executable,
"scripts/train_rain_model.py",
"--site",
cfg.site,
"--start",
start,
"--end",
end,
"--train-ratio",
str(cfg.train_ratio),
"--val-ratio",
str(cfg.val_ratio),
"--min-precision",
str(cfg.min_precision),
"--feature-set",
cfg.feature_set,
"--model-family",
cfg.model_family,
"--forecast-model",
cfg.forecast_model,
"--model-version",
model_version,
"--out",
str(cfg.model_path),
"--report-out",
str(cfg.report_path),
"--dataset-out",
dataset_out,
],
env,
)
train_cmd = [
sys.executable,
"scripts/train_rain_model.py",
"--site",
cfg.site,
"--start",
start,
"--end",
end,
"--train-ratio",
str(cfg.train_ratio),
"--val-ratio",
str(cfg.val_ratio),
"--min-precision",
str(cfg.min_precision),
"--max-hyperparam-trials",
str(cfg.max_hyperparam_trials),
"--calibration-methods",
cfg.calibration_methods,
"--walk-forward-folds",
str(cfg.walk_forward_folds),
"--feature-set",
cfg.feature_set,
"--model-family",
cfg.model_family,
"--forecast-model",
cfg.forecast_model,
"--model-version",
model_version,
"--out",
str(cfg.model_path),
"--report-out",
str(cfg.report_path),
"--model-card-out",
model_card_out,
"--dataset-out",
dataset_out,
]
if cfg.tune_hyperparameters:
train_cmd.append("--tune-hyperparameters")
if cfg.allow_empty_data:
train_cmd.append("--allow-empty")
else:
train_cmd.append("--strict-source-data")
run_cmd(train_cmd, env)
def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None:
@@ -164,6 +185,7 @@ def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None:
cfg.model_name,
"--forecast-model",
cfg.forecast_model,
*(["--allow-empty"] if cfg.allow_empty_data else ["--strict-source-data"]),
],
env,
)
@@ -188,10 +210,19 @@ def load_config() -> WorkerConfig:
train_ratio=read_env_float("RAIN_TRAIN_RATIO", 0.7),
val_ratio=read_env_float("RAIN_VAL_RATIO", 0.15),
min_precision=read_env_float("RAIN_MIN_PRECISION", 0.70),
tune_hyperparameters=read_env_bool("RAIN_TUNE_HYPERPARAMETERS", False),
max_hyperparam_trials=read_env_int("RAIN_MAX_HYPERPARAM_TRIALS", 12),
calibration_methods=read_env("RAIN_CALIBRATION_METHODS", "none,sigmoid,isotonic"),
walk_forward_folds=read_env_int("RAIN_WALK_FORWARD_FOLDS", 0),
allow_empty_data=read_env_bool("RAIN_ALLOW_EMPTY_DATA", True),
dataset_path_template=read_env(
"RAIN_DATASET_PATH",
"models/datasets/rain_dataset_{model_version}_{feature_set}.csv",
),
model_card_path_template=read_env(
"RAIN_MODEL_CARD_PATH",
"models/model_card_{model_version}.md",
),
model_path=Path(read_env("RAIN_MODEL_PATH", "models/rain_model.pkl")),
report_path=Path(read_env("RAIN_REPORT_PATH", "models/rain_model_report.json")),
audit_path=Path(read_env("RAIN_AUDIT_PATH", "models/rain_data_audit.json")),
@@ -220,7 +251,10 @@ def main() -> int:
f"feature_set={cfg.feature_set} "
f"forecast_model={cfg.forecast_model} "
f"train_interval_hours={cfg.train_interval_hours} "
f"predict_interval_minutes={cfg.predict_interval_minutes}",
f"predict_interval_minutes={cfg.predict_interval_minutes} "
f"tune_hyperparameters={cfg.tune_hyperparameters} "
f"walk_forward_folds={cfg.walk_forward_folds} "
f"allow_empty_data={cfg.allow_empty_data}",
flush=True,
)

View File

@@ -1,5 +1,8 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import itertools
import json
import os
from datetime import datetime, timezone
@@ -7,8 +10,10 @@ from typing import Any
import numpy as np
import psycopg2
from sklearn.calibration import CalibratedClassifierCV
from sklearn.ensemble import HistGradientBoostingClassifier
from sklearn.linear_model import LogisticRegression
from sklearn.model_selection import TimeSeriesSplit
from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
@@ -38,6 +43,7 @@ except ImportError: # pragma: no cover - optional dependency
MODEL_FAMILIES = ("logreg", "hist_gb", "auto")
CALIBRATION_METHODS = ("none", "sigmoid", "isotonic")
def parse_args() -> argparse.Namespace:
@@ -56,6 +62,19 @@ def parse_args() -> argparse.Namespace:
)
parser.add_argument("--threshold", type=float, help="Optional fixed classification threshold.")
parser.add_argument("--min-rows", type=int, default=200, help="Minimum model-ready rows required.")
parser.set_defaults(allow_empty=True)
parser.add_argument(
"--allow-empty",
dest="allow_empty",
action="store_true",
help="Exit successfully when source/model-ready rows are temporarily unavailable (default: enabled).",
)
parser.add_argument(
"--strict-source-data",
dest="allow_empty",
action="store_false",
help="Fail when source/model-ready rows are unavailable.",
)
parser.add_argument(
"--feature-set",
default="baseline",
@@ -76,6 +95,28 @@ def parse_args() -> argparse.Namespace:
"'auto' compares logreg and hist_gb on validation and selects best by PR-AUC/ROC-AUC/F1."
),
)
parser.add_argument(
"--tune-hyperparameters",
action="store_true",
help="Run a lightweight validation-only hyperparameter search before final training.",
)
parser.add_argument(
"--max-hyperparam-trials",
type=int,
default=12,
help="Maximum hyperparameter trials per model family when tuning is enabled.",
)
parser.add_argument(
"--calibration-methods",
default="none,sigmoid,isotonic",
help="Comma-separated methods from: none,sigmoid,isotonic. Best method selected by validation Brier/ECE.",
)
parser.add_argument(
"--walk-forward-folds",
type=int,
default=4,
help="Number of temporal folds for walk-forward backtest (0 to disable).",
)
parser.add_argument(
"--random-state",
type=int,
@@ -88,6 +129,11 @@ def parse_args() -> argparse.Namespace:
default="models/rain_model_report.json",
help="Path to save JSON training report.",
)
parser.add_argument(
"--model-card-out",
default="models/model_card_{model_version}.md",
help="Path (or template) for markdown model card. Supports {model_version}. Use empty string to disable.",
)
parser.add_argument(
"--dataset-out",
default="models/datasets/rain_dataset_{model_version}_{feature_set}.csv",
@@ -104,25 +150,556 @@ def parse_args() -> argparse.Namespace:
return parser.parse_args()
def make_model(model_family: str, random_state: int):
def parse_calibration_methods(value: str) -> list[str]:
methods: list[str] = []
for token in (value or "").split(","):
method = token.strip().lower()
if not method:
continue
if method not in CALIBRATION_METHODS:
raise ValueError(f"unknown calibration method: {method}")
if method not in methods:
methods.append(method)
if not methods:
return ["none"]
return methods
def default_model_params(model_family: str) -> dict[str, Any]:
if model_family == "logreg":
return {"c": 1.0}
if model_family == "hist_gb":
return {
"max_iter": 300,
"learning_rate": 0.05,
"max_depth": 5,
"min_samples_leaf": 20,
}
raise ValueError(f"unknown model_family: {model_family}")
def model_param_grid(model_family: str) -> list[dict[str, Any]]:
if model_family == "logreg":
return [{"c": c} for c in [0.05, 0.1, 0.2, 0.5, 1.0, 2.0, 5.0]]
if model_family == "hist_gb":
grid: list[dict[str, Any]] = []
for learning_rate, max_depth, max_iter, min_samples_leaf in itertools.product(
[0.03, 0.05, 0.08],
[3, 5],
[200, 300],
[10, 20],
):
grid.append(
{
"learning_rate": learning_rate,
"max_depth": max_depth,
"max_iter": max_iter,
"min_samples_leaf": min_samples_leaf,
}
)
return grid
raise ValueError(f"unknown model_family: {model_family}")
def limit_trials(grid: list[dict[str, Any]], max_trials: int) -> list[dict[str, Any]]:
if max_trials <= 0 or len(grid) <= max_trials:
return grid
step = max(1, len(grid) // max_trials)
return grid[::step][:max_trials]
def make_model(model_family: str, random_state: int, params: dict[str, Any] | None = None):
model_params = default_model_params(model_family)
if params:
model_params.update(params)
if model_family == "logreg":
c = float(model_params["c"])
return Pipeline(
[
("scaler", StandardScaler()),
("clf", LogisticRegression(max_iter=1000, class_weight="balanced", random_state=random_state)),
(
"clf",
LogisticRegression(
C=c,
max_iter=1000,
class_weight="balanced",
random_state=random_state,
),
),
]
)
if model_family == "hist_gb":
return HistGradientBoostingClassifier(
max_iter=300,
learning_rate=0.05,
max_depth=5,
min_samples_leaf=20,
max_iter=int(model_params["max_iter"]),
learning_rate=float(model_params["learning_rate"]),
max_depth=int(model_params["max_depth"]),
min_samples_leaf=int(model_params["min_samples_leaf"]),
random_state=random_state,
)
raise ValueError(f"unknown model_family: {model_family}")
def threshold_from_probs(
y_true: np.ndarray,
y_prob: np.ndarray,
min_precision: float,
fixed_threshold: float | None,
) -> tuple[float, dict[str, Any]]:
if fixed_threshold is not None:
return float(fixed_threshold), {
"selection_rule": "fixed_cli_threshold",
"threshold": float(fixed_threshold),
}
return select_threshold(y_true=y_true, y_prob=y_prob, min_precision=min_precision)
def metric_key(metrics: dict[str, Any]) -> tuple[float, float, float]:
return (
safe_pr_auc(metrics),
safe_roc_auc(metrics),
float(metrics["f1"]),
)
def expected_calibration_error(y_true: np.ndarray, y_prob: np.ndarray, bins: int = 10) -> float:
if len(y_true) == 0:
return float("nan")
edges = np.linspace(0.0, 1.0, bins + 1)
total = float(len(y_true))
ece = 0.0
for i in range(bins):
lo = edges[i]
hi = edges[i + 1]
if i == bins - 1:
mask = (y_prob >= lo) & (y_prob <= hi)
else:
mask = (y_prob >= lo) & (y_prob < hi)
if not np.any(mask):
continue
bucket_weight = float(np.sum(mask)) / total
bucket_confidence = float(np.mean(y_prob[mask]))
bucket_frequency = float(np.mean(y_true[mask]))
ece += bucket_weight * abs(bucket_frequency - bucket_confidence)
return float(ece)
def calibration_cv_splits(rows: int) -> int:
if rows >= 800:
return 5
if rows >= 400:
return 4
if rows >= 240:
return 3
if rows >= 140:
return 2
return 0
def fit_with_optional_calibration(
model_family: str,
model_params: dict[str, Any],
random_state: int,
x_train,
y_train: np.ndarray,
calibration_method: str,
fallback_to_none: bool = True,
):
base = make_model(model_family=model_family, random_state=random_state, params=model_params)
if calibration_method == "none":
base.fit(x_train, y_train)
return base, {
"requested_method": "none",
"effective_method": "none",
"cv_splits": 0,
}
splits = calibration_cv_splits(len(y_train))
if splits < 2:
if not fallback_to_none:
raise RuntimeError("not enough rows for calibration folds")
base.fit(x_train, y_train)
return base, {
"requested_method": calibration_method,
"effective_method": "none",
"cv_splits": splits,
"warning": "insufficient rows for calibration folds; falling back to uncalibrated model",
}
try:
calibrated = CalibratedClassifierCV(
estimator=base,
method=calibration_method,
cv=TimeSeriesSplit(n_splits=splits),
)
calibrated.fit(x_train, y_train)
return calibrated, {
"requested_method": calibration_method,
"effective_method": calibration_method,
"cv_splits": splits,
}
except Exception as exc:
if not fallback_to_none:
raise
base.fit(x_train, y_train)
return base, {
"requested_method": calibration_method,
"effective_method": "none",
"cv_splits": splits,
"warning": f"calibration failed ({exc}); falling back to uncalibrated model",
}
def hyperparameter_search(
model_family: str,
x_train,
y_train: np.ndarray,
x_val,
y_val: np.ndarray,
random_state: int,
min_precision: float,
fixed_threshold: float | None,
enabled: bool,
max_trials: int,
) -> tuple[dict[str, Any], list[dict[str, Any]]]:
if enabled:
grid = limit_trials(model_param_grid(model_family), max_trials=max_trials)
else:
grid = [default_model_params(model_family)]
trials: list[dict[str, Any]] = []
for params in grid:
model = make_model(model_family=model_family, random_state=random_state, params=params)
model.fit(x_train, y_train)
y_val_prob = model.predict_proba(x_val)[:, 1]
threshold, threshold_info = threshold_from_probs(
y_true=y_val,
y_prob=y_val_prob,
min_precision=min_precision,
fixed_threshold=fixed_threshold,
)
metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=threshold)
trials.append(
{
"model_family": model_family,
"model_params": params,
"threshold": float(threshold),
"threshold_info": threshold_info,
"validation_metrics": metrics,
}
)
best = max(trials, key=lambda v: metric_key(v["validation_metrics"]))
return best["model_params"], trials
def calibration_selection_key(result: dict[str, Any]) -> tuple[float, float, float, float, float]:
metrics = result["validation_metrics"]
brier = float(metrics["brier"])
ece_10 = float(result["calibration_quality"]["ece_10"])
return (
brier,
ece_10,
-safe_pr_auc(metrics),
-safe_roc_auc(metrics),
-float(metrics["f1"]),
)
def evaluate_calibration_methods(
model_family: str,
model_params: dict[str, Any],
calibration_methods: list[str],
x_train,
y_train: np.ndarray,
x_val,
y_val: np.ndarray,
random_state: int,
min_precision: float,
fixed_threshold: float | None,
) -> tuple[dict[str, Any], list[dict[str, Any]]]:
results: list[dict[str, Any]] = []
for method in calibration_methods:
fitted, fit_info = fit_with_optional_calibration(
model_family=model_family,
model_params=model_params,
random_state=random_state,
x_train=x_train,
y_train=y_train,
calibration_method=method,
fallback_to_none=True,
)
y_val_prob = fitted.predict_proba(x_val)[:, 1]
threshold, threshold_info = threshold_from_probs(
y_true=y_val,
y_prob=y_val_prob,
min_precision=min_precision,
fixed_threshold=fixed_threshold,
)
metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=threshold)
results.append(
{
"calibration_method": method,
"fit": fit_info,
"threshold": float(threshold),
"threshold_info": threshold_info,
"validation_metrics": metrics,
"calibration_quality": {
"ece_10": expected_calibration_error(y_true=y_val, y_prob=y_val_prob, bins=10),
},
}
)
selected = min(results, key=calibration_selection_key)
return selected, results
def evaluate_naive_baselines(test_df, y_test: np.ndarray) -> dict[str, Any]:
out: dict[str, Any] = {}
if "rain_last_1h_mm" in test_df.columns:
rain_last = test_df["rain_last_1h_mm"].to_numpy(dtype=float)
persistence_prob = (rain_last >= RAIN_EVENT_THRESHOLD_MM).astype(float)
out["persistence_last_1h"] = {
"rule": f"predict rain when rain_last_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"metrics": evaluate_probs(y_true=y_test, y_prob=persistence_prob, threshold=0.5),
}
has_fc_prob = "fc_precip_prob" in test_df.columns and test_df["fc_precip_prob"].notna().any()
has_fc_mm = "fc_precip_mm" in test_df.columns and test_df["fc_precip_mm"].notna().any()
if has_fc_prob:
fc_prob = test_df["fc_precip_prob"].fillna(0.0).clip(lower=0.0, upper=1.0).to_numpy(dtype=float)
out["forecast_precip_prob"] = {
"rule": "use fc_precip_prob directly as baseline probability",
"metrics": evaluate_probs(y_true=y_test, y_prob=fc_prob, threshold=0.5),
}
if has_fc_prob or has_fc_mm:
fc_prob = (
test_df["fc_precip_prob"].fillna(0.0).clip(lower=0.0, upper=1.0).to_numpy(dtype=float)
if "fc_precip_prob" in test_df.columns
else np.zeros(len(test_df), dtype=float)
)
fc_mm = (
test_df["fc_precip_mm"].fillna(0.0).to_numpy(dtype=float)
if "fc_precip_mm" in test_df.columns
else np.zeros(len(test_df), dtype=float)
)
rule_prob = ((fc_prob >= 0.5) | (fc_mm >= RAIN_EVENT_THRESHOLD_MM)).astype(float)
out["forecast_threshold_rule"] = {
"rule": (
"predict rain when (fc_precip_prob >= 0.50) "
f"OR (fc_precip_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f})"
),
"metrics": evaluate_probs(y_true=y_test, y_prob=rule_prob, threshold=0.5),
}
return out
def walk_forward_backtest(
model_df,
feature_cols: list[str],
model_family: str,
model_params: dict[str, Any],
calibration_method: str,
random_state: int,
min_precision: float,
fixed_threshold: float | None,
folds: int,
) -> dict[str, Any]:
if folds <= 0:
return {"enabled": False, "folds": [], "summary": None}
n = len(model_df)
min_train_rows = max(200, int(0.4 * n))
remaining = n - min_train_rows
if remaining < 50:
return {
"enabled": True,
"folds": [],
"summary": {
"error": "not enough rows for walk-forward folds",
"requested_folds": folds,
"min_train_rows": min_train_rows,
},
}
fold_size = max(25, remaining // folds)
results: list[dict[str, Any]] = []
for idx in range(folds):
train_end = min_train_rows + idx * fold_size
test_end = n if idx == folds - 1 else min(min_train_rows + (idx + 1) * fold_size, n)
if train_end >= test_end:
continue
fold_train = model_df.iloc[:train_end]
fold_test = model_df.iloc[train_end:test_end]
if len(fold_train) < 160 or len(fold_test) < 25:
continue
y_fold_train = fold_train["rain_next_1h"].astype(int).to_numpy()
y_fold_test = fold_test["rain_next_1h"].astype(int).to_numpy()
if len(np.unique(y_fold_train)) < 2:
continue
inner_val_rows = max(40, int(0.2 * len(fold_train)))
if len(fold_train) - inner_val_rows < 80:
continue
inner_train = fold_train.iloc[:-inner_val_rows]
inner_val = fold_train.iloc[-inner_val_rows:]
y_inner_train = inner_train["rain_next_1h"].astype(int).to_numpy()
y_inner_val = inner_val["rain_next_1h"].astype(int).to_numpy()
if len(np.unique(y_inner_train)) < 2:
continue
try:
threshold_model, threshold_fit = fit_with_optional_calibration(
model_family=model_family,
model_params=model_params,
random_state=random_state,
x_train=inner_train[feature_cols],
y_train=y_inner_train,
calibration_method=calibration_method,
fallback_to_none=True,
)
inner_val_prob = threshold_model.predict_proba(inner_val[feature_cols])[:, 1]
fold_threshold, fold_threshold_info = threshold_from_probs(
y_true=y_inner_val,
y_prob=inner_val_prob,
min_precision=min_precision,
fixed_threshold=fixed_threshold,
)
fold_model, fold_fit = fit_with_optional_calibration(
model_family=model_family,
model_params=model_params,
random_state=random_state,
x_train=fold_train[feature_cols],
y_train=y_fold_train,
calibration_method=calibration_method,
fallback_to_none=True,
)
fold_test_prob = fold_model.predict_proba(fold_test[feature_cols])[:, 1]
fold_metrics = evaluate_probs(y_true=y_fold_test, y_prob=fold_test_prob, threshold=fold_threshold)
fold_result = {
"fold_index": idx + 1,
"train_rows": len(fold_train),
"test_rows": len(fold_test),
"train_start": fold_train.index.min(),
"train_end": fold_train.index.max(),
"test_start": fold_test.index.min(),
"test_end": fold_test.index.max(),
"threshold": float(fold_threshold),
"threshold_selection": fold_threshold_info,
"threshold_fit": threshold_fit,
"model_fit": fold_fit,
"metrics": fold_metrics,
}
results.append(fold_result)
except Exception as exc:
results.append(
{
"fold_index": idx + 1,
"train_rows": len(fold_train),
"test_rows": len(fold_test),
"error": str(exc),
}
)
good = [r for r in results if "metrics" in r]
if not good:
return {
"enabled": True,
"folds": results,
"summary": {
"error": "no successful fold evaluations",
"requested_folds": folds,
},
}
summary = {
"successful_folds": len(good),
"requested_folds": folds,
"mean_precision": float(np.mean([f["metrics"]["precision"] for f in good])),
"mean_recall": float(np.mean([f["metrics"]["recall"] for f in good])),
"mean_f1": float(np.mean([f["metrics"]["f1"] for f in good])),
"mean_brier": float(np.mean([f["metrics"]["brier"] for f in good])),
"mean_pr_auc": float(np.mean([f["metrics"]["pr_auc"] for f in good if f["metrics"]["pr_auc"] is not None]))
if any(f["metrics"]["pr_auc"] is not None for f in good)
else None,
"mean_roc_auc": float(np.mean([f["metrics"]["roc_auc"] for f in good if f["metrics"]["roc_auc"] is not None]))
if any(f["metrics"]["roc_auc"] is not None for f in good)
else None,
}
return {"enabled": True, "folds": results, "summary": summary}
def write_model_card(path: str, report: dict[str, Any]) -> None:
lines = [
"# Rain Model Card",
"",
f"- Model version: `{report['model_version']}`",
f"- Generated at (UTC): `{report['generated_at']}`",
f"- Site: `{report['site']}`",
f"- Target: `{report['target_definition']}`",
f"- Feature set: `{report['feature_set']}`",
f"- Model family: `{report['model_family']}`",
f"- Model params: `{report['model_params']}`",
f"- Calibration method: `{report['calibration_method']}`",
f"- Operating threshold: `{report['threshold_selection']['threshold']:.3f}`",
"",
"## Data Window",
"",
f"- Requested start: `{report['data_window']['requested_start']}`",
f"- Requested end: `{report['data_window']['requested_end']}`",
f"- Actual start: `{report['data_window']['actual_start']}`",
f"- Actual end: `{report['data_window']['actual_end']}`",
f"- Rows: train `{report['split']['train_rows']}`, val `{report['split']['val_rows']}`, test `{report['split']['test_rows']}`",
"",
"## Features",
"",
]
for col in report["feature_columns"]:
lines.append(f"- `{col}`")
lines.extend(
[
"",
"## Performance",
"",
"- Validation:",
f" precision `{report['validation_metrics']['precision']:.3f}`, "
f"recall `{report['validation_metrics']['recall']:.3f}`, "
f"PR-AUC `{report['validation_metrics']['pr_auc']}`, "
f"ROC-AUC `{report['validation_metrics']['roc_auc']}`, "
f"Brier `{report['validation_metrics']['brier']:.4f}`",
"- Test:",
f" precision `{report['test_metrics']['precision']:.3f}`, "
f"recall `{report['test_metrics']['recall']:.3f}`, "
f"PR-AUC `{report['test_metrics']['pr_auc']}`, "
f"ROC-AUC `{report['test_metrics']['roc_auc']}`, "
f"Brier `{report['test_metrics']['brier']:.4f}`",
"",
"## Known Limitations",
"",
"- Sensor rain counter resets are clipped at zero increment; extreme spikes are flagged but not fully removed.",
"- Forecast feature availability can vary by upstream model freshness.",
"- Performance may drift seasonally and should be monitored with the drift queries in docs.",
"",
]
)
card_dir = os.path.dirname(path)
if card_dir:
os.makedirs(card_dir, exist_ok=True)
with open(path, "w", encoding="utf-8") as f:
f.write("\n".join(lines))
def train_candidate(
model_family: str,
x_train,
@@ -132,30 +709,51 @@ def train_candidate(
random_state: int,
min_precision: float,
fixed_threshold: float | None,
tune_hyperparameters: bool,
max_hyperparam_trials: int,
calibration_methods: list[str],
) -> dict[str, Any]:
model = make_model(model_family=model_family, random_state=random_state)
model.fit(x_train, y_train)
y_val_prob = model.predict_proba(x_val)[:, 1]
best_params, tuning_trials = hyperparameter_search(
model_family=model_family,
x_train=x_train,
y_train=y_train,
x_val=x_val,
y_val=y_val,
random_state=random_state,
min_precision=min_precision,
fixed_threshold=fixed_threshold,
enabled=tune_hyperparameters,
max_trials=max_hyperparam_trials,
)
if fixed_threshold is not None:
threshold = fixed_threshold
threshold_info = {
"selection_rule": "fixed_cli_threshold",
"threshold": float(fixed_threshold),
}
else:
threshold, threshold_info = select_threshold(
y_true=y_val,
y_prob=y_val_prob,
min_precision=min_precision,
)
selected_calibration, calibration_trials = evaluate_calibration_methods(
model_family=model_family,
model_params=best_params,
calibration_methods=calibration_methods,
x_train=x_train,
y_train=y_train,
x_val=x_val,
y_val=y_val,
random_state=random_state,
min_precision=min_precision,
fixed_threshold=fixed_threshold,
)
val_metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=threshold)
return {
"model_family": model_family,
"threshold": float(threshold),
"threshold_info": threshold_info,
"validation_metrics": val_metrics,
"model_params": best_params,
"hyperparameter_tuning": {
"enabled": tune_hyperparameters,
"trial_count": len(tuning_trials),
"trials": tuning_trials,
},
"calibration_comparison": calibration_trials,
"calibration_method": selected_calibration["calibration_method"],
"calibration_fit": selected_calibration["fit"],
"threshold": float(selected_calibration["threshold"]),
"threshold_info": selected_calibration["threshold_info"],
"validation_metrics": selected_calibration["validation_metrics"],
"calibration_quality": selected_calibration["calibration_quality"],
}
@@ -168,6 +766,7 @@ def main() -> int:
end = parse_time(args.end) if args.end else ""
feature_cols = feature_columns_for_set(args.feature_set)
needs_forecast = feature_columns_need_forecast(feature_cols)
calibration_methods = parse_calibration_methods(args.calibration_methods)
with psycopg2.connect(args.db_url) as conn:
ws90 = fetch_ws90(conn, args.site, start, end)
@@ -176,10 +775,27 @@ def main() -> int:
if needs_forecast:
forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model)
if ws90.empty:
message = "no ws90 observations found in requested window"
if args.allow_empty:
print(f"Rain model training skipped: {message}.")
return 0
raise RuntimeError(message)
if baro.empty:
message = "no barometer observations found in requested window"
if args.allow_empty:
print(f"Rain model training skipped: {message}.")
return 0
raise RuntimeError(message)
full_df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
model_df = model_frame(full_df, feature_cols, require_target=True)
if len(model_df) < args.min_rows:
raise RuntimeError(f"not enough model-ready rows after filtering (need >= {args.min_rows})")
message = f"not enough model-ready rows after filtering (need >= {args.min_rows})"
if args.allow_empty:
print(f"Rain model training skipped: {message}.")
return 0
raise RuntimeError(message)
train_df, val_df, test_df = split_time_ordered(
model_df,
@@ -194,6 +810,11 @@ def main() -> int:
x_test = test_df[feature_cols]
y_test = test_df["rain_next_1h"].astype(int).to_numpy()
if len(np.unique(y_train)) < 2:
raise RuntimeError("training split does not contain both classes; cannot train classifier")
if len(np.unique(y_val)) < 2:
print("warning: validation split has one class; AUC metrics may be unavailable", flush=True)
candidate_families = ["logreg", "hist_gb"] if args.model_family == "auto" else [args.model_family]
candidates = [
train_candidate(
@@ -205,18 +826,19 @@ def main() -> int:
random_state=args.random_state,
min_precision=args.min_precision,
fixed_threshold=args.threshold,
tune_hyperparameters=args.tune_hyperparameters,
max_hyperparam_trials=args.max_hyperparam_trials,
calibration_methods=calibration_methods,
)
for family in candidate_families
]
best_candidate = max(
candidates,
key=lambda c: (
safe_pr_auc(c["validation_metrics"]),
safe_roc_auc(c["validation_metrics"]),
float(c["validation_metrics"]["f1"]),
),
key=lambda c: metric_key(c["validation_metrics"]),
)
selected_model_family = str(best_candidate["model_family"])
selected_model_params = best_candidate["model_params"]
selected_calibration_method = str(best_candidate["calibration_method"])
chosen_threshold = float(best_candidate["threshold"])
threshold_info = best_candidate["threshold_info"]
val_metrics = best_candidate["validation_metrics"]
@@ -225,10 +847,32 @@ def main() -> int:
x_train_val = train_val_df[feature_cols]
y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy()
final_model = make_model(model_family=selected_model_family, random_state=args.random_state)
final_model.fit(x_train_val, y_train_val)
final_model, final_fit_info = fit_with_optional_calibration(
model_family=selected_model_family,
model_params=selected_model_params,
random_state=args.random_state,
x_train=x_train_val,
y_train=y_train_val,
calibration_method=selected_calibration_method,
fallback_to_none=True,
)
y_test_prob = final_model.predict_proba(x_test)[:, 1]
test_metrics = evaluate_probs(y_true=y_test, y_prob=y_test_prob, threshold=chosen_threshold)
test_calibration = {
"ece_10": expected_calibration_error(y_true=y_test, y_prob=y_test_prob, bins=10),
}
naive_baselines_test = evaluate_naive_baselines(test_df=test_df, y_test=y_test)
walk_forward = walk_forward_backtest(
model_df=model_df,
feature_cols=feature_cols,
model_family=selected_model_family,
model_params=selected_model_params,
calibration_method=selected_calibration_method,
random_state=args.random_state,
min_precision=args.min_precision,
fixed_threshold=args.threshold,
folds=args.walk_forward_folds,
)
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
@@ -236,10 +880,14 @@ def main() -> int:
"model_version": args.model_version,
"model_family_requested": args.model_family,
"model_family": selected_model_family,
"model_params": selected_model_params,
"feature_set": args.feature_set,
"target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"feature_columns": feature_cols,
"forecast_model": args.forecast_model if needs_forecast else None,
"calibration_method_requested": calibration_methods,
"calibration_method": selected_calibration_method,
"calibration_fit": final_fit_info,
"data_window": {
"requested_start": start or None,
"requested_end": end or None,
@@ -275,16 +923,25 @@ def main() -> int:
"candidate_models": [
{
"model_family": c["model_family"],
"model_params": c["model_params"],
"hyperparameter_tuning": c["hyperparameter_tuning"],
"calibration_method": c["calibration_method"],
"calibration_fit": c["calibration_fit"],
"calibration_comparison": c["calibration_comparison"],
"threshold_selection": {
**c["threshold_info"],
"min_precision_constraint": args.min_precision,
},
"calibration_quality": c["calibration_quality"],
"validation_metrics": c["validation_metrics"],
}
for c in candidates
],
"validation_metrics": val_metrics,
"test_metrics": test_metrics,
"test_calibration_quality": test_calibration,
"naive_baselines_test": naive_baselines_test,
"walk_forward_backtest": walk_forward,
}
report = to_builtin(report)
@@ -292,8 +949,16 @@ def main() -> int:
print(f" site: {args.site}")
print(f" model_version: {args.model_version}")
print(f" model_family: {selected_model_family} (requested={args.model_family})")
print(f" model_params: {selected_model_params}")
print(f" calibration_method: {report['calibration_method']}")
print(f" feature_set: {args.feature_set} ({len(feature_cols)} features)")
print(f" rows: total={report['data_window']['model_rows']} train={report['split']['train_rows']} val={report['split']['val_rows']} test={report['split']['test_rows']}")
print(
" rows: "
f"total={report['data_window']['model_rows']} "
f"train={report['split']['train_rows']} "
f"val={report['split']['val_rows']} "
f"test={report['split']['test_rows']}"
)
print(
" threshold: "
f"{report['threshold_selection']['threshold']:.3f} "
@@ -311,8 +976,32 @@ def main() -> int:
f"precision={report['test_metrics']['precision']:.3f} "
f"recall={report['test_metrics']['recall']:.3f} "
f"roc_auc={report['test_metrics']['roc_auc'] if report['test_metrics']['roc_auc'] is not None else 'n/a'} "
f"pr_auc={report['test_metrics']['pr_auc'] if report['test_metrics']['pr_auc'] is not None else 'n/a'}"
f"pr_auc={report['test_metrics']['pr_auc'] if report['test_metrics']['pr_auc'] is not None else 'n/a'} "
f"brier={report['test_metrics']['brier']:.4f} "
f"ece10={report['test_calibration_quality']['ece_10']:.4f}"
)
if report["walk_forward_backtest"]["summary"] is not None:
summary = report["walk_forward_backtest"]["summary"]
if "error" in summary:
print(f" walk-forward: {summary['error']}")
else:
print(
" walk-forward: "
f"folds={summary['successful_folds']}/{summary['requested_folds']} "
f"mean_precision={summary['mean_precision']:.3f} "
f"mean_recall={summary['mean_recall']:.3f} "
f"mean_pr_auc={summary['mean_pr_auc'] if summary['mean_pr_auc'] is not None else 'n/a'}"
)
if report["naive_baselines_test"]:
print(" naive baselines (test):")
for name, baseline in report["naive_baselines_test"].items():
m = baseline["metrics"]
print(
f" {name}: "
f"precision={m['precision']:.3f} recall={m['recall']:.3f} "
f"pr_auc={m['pr_auc'] if m['pr_auc'] is not None else 'n/a'} "
f"brier={m['brier']:.4f}"
)
if args.report_out:
report_dir = os.path.dirname(args.report_out)
@@ -322,6 +1011,11 @@ def main() -> int:
json.dump(report, f, indent=2)
print(f"Saved report to {args.report_out}")
if args.model_card_out:
model_card_out = args.model_card_out.format(model_version=args.model_version)
write_model_card(model_card_out, report)
print(f"Saved model card to {model_card_out}")
if args.dataset_out:
dataset_out = args.dataset_out.format(model_version=args.model_version, feature_set=args.feature_set)
dataset_dir = os.path.dirname(dataset_out)
@@ -341,6 +1035,9 @@ def main() -> int:
artifact = {
"model": final_model,
"model_family": selected_model_family,
"model_params": selected_model_params,
"calibration_method": selected_calibration_method,
"calibration_fit": final_fit_info,
"features": feature_cols,
"feature_set": args.feature_set,
"forecast_model": args.forecast_model if needs_forecast else None,

20
todo.md
View File

@@ -12,7 +12,7 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat
- [x] [P0] Audit `observations_ws90` and `observations_baro` for missingness, gaps, duplicates, and out-of-order rows. (completed on runtime machine)
- [x] [P0] Validate rain label construction from `rain_mm` (counter resets, negative deltas, spikes). (completed on runtime machine)
- [x] [P0] Measure class balance by week (rain-positive vs rain-negative). (completed on runtime machine)
- [ ] [P1] Document known data issues and mitigation rules.
- [x] [P1] Document known data issues and mitigation rules. (see `docs/rain_data_issues.md`)
## 3) Dataset and Feature Engineering
- [x] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow.
@@ -26,16 +26,16 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat
- [x] [P0] Keep logistic regression as baseline.
- [x] [P1] Add at least one tree-based baseline (e.g. gradient boosting). (implemented via `hist_gb`; runtime evaluation pending local Python deps)
- [x] [P0] Use strict time-based train/validation/test splits (no random shuffling).
- [ ] [P1] Add walk-forward backtesting across multiple temporal folds.
- [ ] [P1] Tune hyperparameters on validation data only.
- [ ] [P1] Calibrate probabilities (Platt or isotonic) and compare calibration quality.
- [x] [P1] Add walk-forward backtesting across multiple temporal folds. (`train_rain_model.py --walk-forward-folds`)
- [x] [P1] Tune hyperparameters on validation data only. (`train_rain_model.py --tune-hyperparameters`)
- [x] [P1] Calibrate probabilities (Platt or isotonic) and compare calibration quality. (`--calibration-methods`)
- [x] [P0] Choose and lock the operating threshold based on use-case costs.
## 5) Evaluation and Reporting
- [x] [P0] Report ROC-AUC, PR-AUC, confusion matrix, precision, recall, and Brier score.
- [ ] [P1] Compare against naive baselines (persistence and simple forecast-threshold rules).
- [x] [P1] Compare against naive baselines (persistence and simple forecast-threshold rules).
- [ ] [P2] Slice performance by periods/weather regimes (day/night, rainy weeks, etc.).
- [ ] [P1] Produce a short model card (data window, features, metrics, known limitations).
- [x] [P1] Produce a short model card (data window, features, metrics, known limitations). (`--model-card-out`)
## 6) Packaging and Deployment
- [x] [P1] Version model artifacts and feature schema together.
@@ -45,10 +45,10 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat
- [ ] [P2] Add scheduled retraining with rollback to last-known-good model.
## 7) Monitoring and Operations
- [ ] [P1] Track feature drift and prediction drift over time.
- [ ] [P1] Track calibration drift and realized performance after deployment.
- [ ] [P1] Add alerts for training/inference/data pipeline failures.
- [ ] [P1] Document runbook for train/evaluate/deploy/rollback.
- [x] [P1] Track feature drift and prediction drift over time. (view: `rain_feature_drift_daily`, `rain_prediction_drift_daily`)
- [x] [P1] Track calibration drift and realized performance after deployment. (view: `rain_calibration_drift_daily`)
- [x] [P1] Add alerts for training/inference/data pipeline failures. (`scripts/check_rain_pipeline_health.py`)
- [x] [P1] Document runbook for train/evaluate/deploy/rollback. (see `docs/rain_model_runbook.md`)
## 8) Immediate Next Steps (This Week)
- [x] [P0] Run first full data audit and label-quality checks. (completed on runtime machine)