From 20316cee91fd7c8c9218c25d5b7065cc34924e8d Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Thu, 12 Mar 2026 20:29:29 +1100 Subject: [PATCH] another bugfix --- docker-compose.yml | 1 + docs/rain_data_issues.md | 2 +- docs/rain_model_runbook.md | 10 +-- docs/rain_prediction.md | 24 +++++++ scripts/rain_model_common.py | 49 ++++++++++++-- scripts/run_rain_ml_worker.py | 107 ++++++++++++++++++++++++++++--- scripts/train_rain_model.py | 117 ++++++++++++++++++++++++++++++++++ todo.md | 6 +- 8 files changed, 293 insertions(+), 23 deletions(-) diff --git a/docker-compose.yml b/docker-compose.yml index 6b14767..17f8295 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -49,6 +49,7 @@ services: RAIN_WALK_FORWARD_FOLDS: "0" RAIN_ALLOW_EMPTY_DATA: "true" RAIN_MODEL_PATH: "/app/models/rain_model.pkl" + RAIN_MODEL_BACKUP_PATH: "/app/models/rain_model.pkl.last_good" RAIN_REPORT_PATH: "/app/models/rain_model_report.json" RAIN_AUDIT_PATH: "/app/models/rain_data_audit.json" RAIN_DATASET_PATH: "/app/models/datasets/rain_dataset_{model_version}_{feature_set}.csv" diff --git a/docs/rain_data_issues.md b/docs/rain_data_issues.md index 4a12ed5..eb01c13 100644 --- a/docs/rain_data_issues.md +++ b/docs/rain_data_issues.md @@ -11,7 +11,7 @@ This document captures known data-quality issues observed in the rain-model pipe | Sensor gaps | Missing 5-minute buckets from WS90/barometer ingestion. | Resample to 5-minute grid; barometer interpolated with short limit (`limit=3`); gap lengths tracked by audit. | | Out-of-order arrivals | Late MQTT events can arrive with older `ts`. | Audit reports out-of-order count by sorting on `received_at` and checking `ts` monotonicity. | | Duplicate rows | Replays/reconnects can duplicate sensor rows. | Audit reports duplicate counts by `(ts, station_id)` for WS90 and `(ts, source)` for barometer. | -| Forecast sparsity/jitter | Hourly forecast retrieval cadence does not always align with 5-minute features. | Select latest forecast per `ts` (`DISTINCT ON` + `retrieved_at DESC`), resample to 5 minutes, short forward/backfill windows, and clip `fc_precip_prob` to `[0,1]`. | +| Forecast sparsity/jitter | Hourly forecast retrieval cadence does not always align with 5-minute features. | Select latest forecast per `ts` (`DISTINCT ON` + `retrieved_at DESC`), resample to 5 minutes, short forward/backfill windows, and clip `fc_precip_prob` to `[0,1]`. If `precip_prob` is unavailable upstream, backfill from `precip_mm` (`>0 => 1`, else `0`). | | Local vs UTC day boundary | Daily rainfall resets can look wrong when local timezone is not respected. | Station timezone is configured via `site.timezone` and used by Wunderground uploader; model training/inference stays UTC-based for split consistency. | ## Audit Command diff --git a/docs/rain_model_runbook.md b/docs/rain_model_runbook.md index b649c5a..9d08593 100644 --- a/docs/rain_model_runbook.md +++ b/docs/rain_model_runbook.md @@ -39,6 +39,7 @@ Review in report: - `candidate_models[*].hyperparameter_tuning` - `candidate_models[*].calibration_comparison` - `naive_baselines_test` +- `sliced_performance_test` - `walk_forward_backtest` ## 3) Deploy @@ -65,10 +66,10 @@ python scripts/predict_rain_model.py \ ## 4) Rollback -1. Identify the last known-good model artifact in `models/`. -2. Point deployment to that artifact (worker env `RAIN_MODEL_PATH` or manual inference path). -3. Re-run inference command and verify writes in `predictions_rain_1h`. -4. Keep the failed artifact/report for postmortem. +1. The worker now keeps a backup model at `RAIN_MODEL_BACKUP_PATH` and promotes new models only after candidate training succeeds. +2. If promotion fails or no candidate model is produced, the worker keeps the active model unchanged. +3. If inference starts without `RAIN_MODEL_PATH` but backup exists, the worker restores from backup automatically. +4. Keep failed candidate artifacts for postmortem. ## 5) Monitoring @@ -134,6 +135,7 @@ The script exits non-zero on failure, so it can directly drive alerting. - `RAIN_CALIBRATION_METHODS` - `RAIN_WALK_FORWARD_FOLDS` - `RAIN_ALLOW_EMPTY_DATA` +- `RAIN_MODEL_BACKUP_PATH` - `RAIN_MODEL_CARD_PATH` Recommended production defaults: diff --git a/docs/rain_prediction.md b/docs/rain_prediction.md index 64fb746..f814246 100644 --- a/docs/rain_prediction.md +++ b/docs/rain_prediction.md @@ -48,6 +48,8 @@ Feature-set options: - `baseline`: original 5 local observation features. - `extended`: adds wind-direction encoding, lag/rolling stats, recent rain accumulation, and aligned forecast features from `forecast_openmeteo_hourly`. +- `extended_calendar`: `extended` plus UTC calendar seasonality features + (`hour_*`, `dow_*`, `month_*`, `is_weekend`). Model-family options (`train_rain_model.py`): - `logreg`: logistic regression baseline. @@ -117,6 +119,20 @@ python scripts/train_rain_model.py \ --dataset-out "models/datasets/rain_dataset_{model_version}_{feature_set}.csv" ``` +### 3b.1) Train expanded + calendar (P2) feature-set model +```sh +python scripts/train_rain_model.py \ + --site "home" \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --feature-set "extended_calendar" \ + --model-family "auto" \ + --forecast-model "ecmwf" \ + --model-version "rain-auto-v1-extended-calendar" \ + --out "models/rain_model_extended_calendar.pkl" \ + --report-out "models/rain_model_report_extended_calendar.json" +``` + ### 3c) Train tree-based baseline (P1) ```sh python scripts/train_rain_model.py \ @@ -186,6 +202,7 @@ The `rainml` service in `docker-compose.yml` now runs: - configurable tuning/calibration behavior (`RAIN_TUNE_HYPERPARAMETERS`, `RAIN_MAX_HYPERPARAM_TRIALS`, `RAIN_CALIBRATION_METHODS`) - graceful gap handling for temporary source outages (`RAIN_ALLOW_EMPTY_DATA=true`) +- automatic rollback path for last-known-good model (`RAIN_MODEL_BACKUP_PATH`) - optional model-card output (`RAIN_MODEL_CARD_PATH`) Artifacts are persisted to `./models` on the host. @@ -198,6 +215,7 @@ docker compose logs -f rainml ## Output - Audit report: `models/rain_data_audit.json` - Training report: `models/rain_model_report.json` +- Regime slices in training report: `sliced_performance_test` - Model card: `models/model_card_.md` - Model artifact: `models/rain_model.pkl` - Dataset snapshot: `models/datasets/rain_dataset__.csv` @@ -222,6 +240,12 @@ docker compose logs -f rainml - `fc_temp_c`, `fc_rh`, `fc_pressure_msl_hpa`, `fc_wind_m_s`, `fc_wind_gust_m_s`, `fc_precip_mm`, `fc_precip_prob`, `fc_cloud_cover` +## Model Features (extended_calendar extras) +- `hour_sin`, `hour_cos` +- `dow_sin`, `dow_cos` +- `month_sin`, `month_cos` +- `is_weekend` + ## Notes - Data is resampled into 5-minute buckets. - Label is derived from incremental rain from WS90 cumulative `rain_mm`. diff --git a/scripts/rain_model_common.py b/scripts/rain_model_common.py index 645d979..86b1b12 100644 --- a/scripts/rain_model_common.py +++ b/scripts/rain_model_common.py @@ -36,6 +36,16 @@ FORECAST_FEATURE_COLUMNS = [ "fc_cloud_cover", ] +CALENDAR_FEATURE_COLUMNS = [ + "hour_sin", + "hour_cos", + "dow_sin", + "dow_cos", + "month_sin", + "month_cos", + "is_weekend", +] + EXTENDED_FEATURE_COLUMNS = [ "pressure_trend_1h", "temperature_c", @@ -60,9 +70,15 @@ EXTENDED_FEATURE_COLUMNS = [ *FORECAST_FEATURE_COLUMNS, ] +EXTENDED_CALENDAR_FEATURE_COLUMNS = [ + *EXTENDED_FEATURE_COLUMNS, + *CALENDAR_FEATURE_COLUMNS, +] + FEATURE_SETS: dict[str, list[str]] = { "baseline": BASELINE_FEATURE_COLUMNS, "extended": EXTENDED_FEATURE_COLUMNS, + "extended_calendar": EXTENDED_CALENDAR_FEATURE_COLUMNS, } AVAILABLE_FEATURE_SETS = tuple(sorted(FEATURE_SETS.keys())) @@ -116,8 +132,8 @@ def fetch_ws90(conn, site: str, start: str, end: str) -> pd.DataFrame: SELECT ts, station_id, received_at, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, rain_mm FROM observations_ws90 WHERE site = %s - AND (%s = '' OR ts >= %s::timestamptz) - AND (%s = '' OR ts <= %s::timestamptz) + AND (%s = '' OR ts >= NULLIF(%s, '')::timestamptz) + AND (%s = '' OR ts <= NULLIF(%s, '')::timestamptz) ORDER BY ts ASC """ return _fetch_df(conn, sql, (site, start, start, end, end), ["ts", "received_at"]) @@ -128,8 +144,8 @@ def fetch_baro(conn, site: str, start: str, end: str) -> pd.DataFrame: SELECT ts, source, received_at, pressure_hpa FROM observations_baro WHERE site = %s - AND (%s = '' OR ts >= %s::timestamptz) - AND (%s = '' OR ts <= %s::timestamptz) + AND (%s = '' OR ts >= NULLIF(%s, '')::timestamptz) + AND (%s = '' OR ts <= NULLIF(%s, '')::timestamptz) ORDER BY ts ASC """ return _fetch_df(conn, sql, (site, start, start, end, end), ["ts", "received_at"]) @@ -151,8 +167,8 @@ def fetch_forecast(conn, site: str, start: str, end: str, model: str = "ecmwf") FROM forecast_openmeteo_hourly WHERE site = %s AND model = %s - AND (%s = '' OR ts >= %s::timestamptz - INTERVAL '2 hours') - AND (%s = '' OR ts <= %s::timestamptz + INTERVAL '2 hours') + AND (%s = '' OR ts >= NULLIF(%s, '')::timestamptz - INTERVAL '2 hours') + AND (%s = '' OR ts <= NULLIF(%s, '')::timestamptz + INTERVAL '2 hours') ORDER BY ts ASC, retrieved_at DESC """ return _fetch_df(conn, sql, (site, model, start, start, end, end), ["ts", "retrieved_at"]) @@ -199,6 +215,15 @@ def _apply_forecast_features(df: pd.DataFrame, forecast: pd.DataFrame | None) -> out.loc[mask, "fc_precip_prob"] = out.loc[mask, "fc_precip_prob"] / 100.0 out["fc_precip_prob"] = out["fc_precip_prob"].clip(lower=0.0, upper=1.0) + # Some forecast sources (or model configs) provide precip amount but no precip probability. + # Backfill missing probability to keep feature rows usable for training/inference. + if "fc_precip_mm" in out.columns: + fallback_prob = (out["fc_precip_mm"].fillna(0.0) > 0.0).astype(float) + else: + fallback_prob = 0.0 + out["fc_precip_prob"] = out["fc_precip_prob"].fillna(fallback_prob) + out["fc_precip_prob"] = out["fc_precip_prob"].clip(lower=0.0, upper=1.0) + return out @@ -263,6 +288,18 @@ def build_dataset( df["pressure_roll_1h_mean"] = df["pressure_hpa"].rolling(window=window, min_periods=3).mean() df["pressure_roll_1h_std"] = df["pressure_hpa"].rolling(window=window, min_periods=3).std() + # Calendar/seasonality features (UTC based). + hour_of_day = df.index.hour + (df.index.minute / 60.0) + day_of_week = df.index.dayofweek + month_of_year = df.index.month + df["hour_sin"] = np.sin(2.0 * np.pi * hour_of_day / 24.0) + df["hour_cos"] = np.cos(2.0 * np.pi * hour_of_day / 24.0) + df["dow_sin"] = np.sin(2.0 * np.pi * day_of_week / 7.0) + df["dow_cos"] = np.cos(2.0 * np.pi * day_of_week / 7.0) + df["month_sin"] = np.sin(2.0 * np.pi * (month_of_year - 1.0) / 12.0) + df["month_cos"] = np.cos(2.0 * np.pi * (month_of_year - 1.0) / 12.0) + df["is_weekend"] = (day_of_week >= 5).astype(float) + df = _apply_forecast_features(df, forecast) return df diff --git a/scripts/run_rain_ml_worker.py b/scripts/run_rain_ml_worker.py index 5548beb..84b5051 100644 --- a/scripts/run_rain_ml_worker.py +++ b/scripts/run_rain_ml_worker.py @@ -2,6 +2,7 @@ from __future__ import annotations import os +import shutil import subprocess import sys import time @@ -58,6 +59,7 @@ class WorkerConfig: dataset_path_template: str model_card_path_template: str model_path: Path + model_backup_path: Path report_path: Path audit_path: Path run_once: bool @@ -82,6 +84,41 @@ def ensure_parent(path: Path) -> None: path.parent.mkdir(parents=True, exist_ok=True) +def with_suffix(path: Path, suffix: str) -> Path: + return path.with_name(path.name + suffix) + + +def promote_file(candidate: Path, target: Path) -> bool: + if not candidate.exists(): + return False + ensure_parent(target) + candidate.replace(target) + return True + + +def promote_model_candidate(candidate: Path, target: Path, backup: Path) -> bool: + if not candidate.exists(): + return False + + ensure_parent(target) + ensure_parent(backup) + if target.exists(): + shutil.copy2(target, backup) + + try: + candidate.replace(target) + return True + except Exception: + if backup.exists(): + shutil.copy2(backup, target) + raise + + +def remove_if_exists(path: Path) -> None: + if path.exists(): + path.unlink() + + def training_window(lookback_days: int) -> tuple[str, str]: end = now_utc() start = end - timedelta(days=lookback_days) @@ -93,12 +130,28 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: model_version = f"{cfg.model_version_base}-{now_utc().strftime('%Y%m%d%H%M')}" dataset_out = cfg.dataset_path_template.format(model_version=model_version, feature_set=cfg.feature_set) model_card_out = cfg.model_card_path_template.format(model_version=model_version) + model_candidate_path = with_suffix(cfg.model_path, ".candidate") + report_candidate_path = with_suffix(cfg.report_path, ".candidate") + audit_candidate_path = with_suffix(cfg.audit_path, ".candidate") + model_card_candidate_out = f"{model_card_out}.candidate" if model_card_out else "" + model_card_candidate_path = Path(model_card_candidate_out) if model_card_candidate_out else None - ensure_parent(cfg.audit_path) - ensure_parent(cfg.report_path) + # Ensure promotions only use artifacts from the current training cycle. + remove_if_exists(model_candidate_path) + remove_if_exists(report_candidate_path) + remove_if_exists(audit_candidate_path) + if model_card_candidate_path is not None: + remove_if_exists(model_card_candidate_path) + + ensure_parent(audit_candidate_path) + ensure_parent(report_candidate_path) ensure_parent(cfg.model_path) + ensure_parent(model_candidate_path) + ensure_parent(cfg.model_backup_path) if dataset_out: ensure_parent(Path(dataset_out)) + if model_card_candidate_path is not None: + ensure_parent(model_card_candidate_path) if model_card_out: ensure_parent(Path(model_card_out)) @@ -117,7 +170,7 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: "--forecast-model", cfg.forecast_model, "--out", - str(cfg.audit_path), + str(audit_candidate_path), ], env, ) @@ -152,11 +205,11 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: "--model-version", model_version, "--out", - str(cfg.model_path), + str(model_candidate_path), "--report-out", - str(cfg.report_path), + str(report_candidate_path), "--model-card-out", - model_card_out, + model_card_candidate_out, "--dataset-out", dataset_out, ] @@ -168,10 +221,40 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: train_cmd.append("--strict-source-data") run_cmd(train_cmd, env) + promoted_model = promote_model_candidate( + candidate=model_candidate_path, + target=cfg.model_path, + backup=cfg.model_backup_path, + ) + if not promoted_model: + print( + "[rain-ml] training completed without new model artifact; keeping last-known-good model", + flush=True, + ) + return + + promote_file(report_candidate_path, cfg.report_path) + promote_file(audit_candidate_path, cfg.audit_path) + if model_card_candidate_path is not None: + promote_file(model_card_candidate_path, Path(model_card_out)) + print( + f"[rain-ml] promoted new model artifact to {cfg.model_path} (backup={cfg.model_backup_path})", + flush=True, + ) + def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None: if not cfg.model_path.exists(): - raise RuntimeError(f"model artifact not found: {cfg.model_path}") + if cfg.model_backup_path.exists(): + ensure_parent(cfg.model_path) + shutil.copy2(cfg.model_backup_path, cfg.model_path) + print(f"[rain-ml] restored model from backup {cfg.model_backup_path}", flush=True) + else: + print( + f"[rain-ml] prediction skipped: model artifact not found ({cfg.model_path})", + flush=True, + ) + return run_cmd( [ @@ -196,6 +279,10 @@ def load_config() -> WorkerConfig: if not database_url: raise SystemExit("DATABASE_URL is required") + model_path = Path(read_env("RAIN_MODEL_PATH", "models/rain_model.pkl")) + backup_path_raw = read_env("RAIN_MODEL_BACKUP_PATH", "") + model_backup_path = Path(backup_path_raw) if backup_path_raw else with_suffix(model_path, ".last_good") + return WorkerConfig( database_url=database_url, site=read_env("RAIN_SITE", "home"), @@ -223,7 +310,8 @@ def load_config() -> WorkerConfig: "RAIN_MODEL_CARD_PATH", "models/model_card_{model_version}.md", ), - model_path=Path(read_env("RAIN_MODEL_PATH", "models/rain_model.pkl")), + model_path=model_path, + model_backup_path=model_backup_path, report_path=Path(read_env("RAIN_REPORT_PATH", "models/rain_model_report.json")), audit_path=Path(read_env("RAIN_AUDIT_PATH", "models/rain_data_audit.json")), run_once=read_env_bool("RAIN_RUN_ONCE", False), @@ -254,7 +342,8 @@ def main() -> int: f"predict_interval_minutes={cfg.predict_interval_minutes} " f"tune_hyperparameters={cfg.tune_hyperparameters} " f"walk_forward_folds={cfg.walk_forward_folds} " - f"allow_empty_data={cfg.allow_empty_data}", + f"allow_empty_data={cfg.allow_empty_data} " + f"model_backup_path={cfg.model_backup_path}", flush=True, ) diff --git a/scripts/train_rain_model.py b/scripts/train_rain_model.py index b690172..66482d3 100644 --- a/scripts/train_rain_model.py +++ b/scripts/train_rain_model.py @@ -9,6 +9,7 @@ from datetime import datetime, timezone from typing import Any import numpy as np +import pandas as pd import psycopg2 from sklearn.calibration import CalibratedClassifierCV from sklearn.ensemble import HistGradientBoostingClassifier @@ -499,6 +500,81 @@ def evaluate_naive_baselines(test_df, y_test: np.ndarray) -> dict[str, Any]: return out +def evaluate_sliced_performance( + test_df, + y_true: np.ndarray, + y_prob: np.ndarray, + threshold: float, + min_rows_per_slice: int = 30, +) -> dict[str, Any]: + frame = pd.DataFrame( + { + "y_true": y_true.astype(int), + "y_prob": y_prob.astype(float), + }, + index=test_df.index, + ) + overall_rate = float(np.mean(y_true)) + hour = frame.index.hour + is_day = (hour >= 6) & (hour < 18) + + weekly_key = frame.index.to_series().dt.isocalendar() + week_label = weekly_key["year"].astype(str) + "-W" + weekly_key["week"].astype(str).str.zfill(2) + weekly_positive_rate = frame.groupby(week_label)["y_true"].transform("mean") + rainy_week = weekly_positive_rate >= overall_rate + + rain_context = test_df["rain_last_1h_mm"].to_numpy(dtype=float) if "rain_last_1h_mm" in test_df.columns else np.zeros(len(test_df)) + wet_context = rain_context >= RAIN_EVENT_THRESHOLD_MM + + wind_values = test_df["wind_max_m_s"].to_numpy(dtype=float) if "wind_max_m_s" in test_df.columns else np.full(len(test_df), np.nan) + if np.isfinite(wind_values).any(): + wind_q75 = float(np.nanquantile(wind_values, 0.75)) + windy = np.nan_to_num(wind_values, nan=wind_q75) >= wind_q75 + else: + windy = np.zeros(len(test_df), dtype=bool) + + definitions: list[tuple[str, np.ndarray, str]] = [ + ("daytime_utc", np.asarray(is_day, dtype=bool), "06:00-17:59 UTC"), + ("nighttime_utc", np.asarray(~is_day, dtype=bool), "18:00-05:59 UTC"), + ("rainy_weeks", np.asarray(rainy_week, dtype=bool), "weeks with positive-rate >= test positive-rate"), + ("non_rainy_weeks", np.asarray(~rainy_week, dtype=bool), "weeks with positive-rate < test positive-rate"), + ("wet_context_last_1h", np.asarray(wet_context, dtype=bool), f"rain_last_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}"), + ("dry_context_last_1h", np.asarray(~wet_context, dtype=bool), f"rain_last_1h_mm < {RAIN_EVENT_THRESHOLD_MM:.2f}"), + ("windy_q75", np.asarray(windy, dtype=bool), "wind_max_m_s >= test 75th percentile"), + ("calm_below_q75", np.asarray(~windy, dtype=bool), "wind_max_m_s < test 75th percentile"), + ] + + out: dict[str, Any] = {} + for name, mask, description in definitions: + rows = int(np.sum(mask)) + if rows == 0: + out[name] = { + "rows": rows, + "description": description, + "status": "empty", + } + continue + y_slice = y_true[mask] + p_slice = y_prob[mask] + if rows < min_rows_per_slice: + out[name] = { + "rows": rows, + "description": description, + "status": "insufficient_rows", + "min_rows_required": min_rows_per_slice, + } + continue + metrics = evaluate_probs(y_true=y_slice, y_prob=p_slice, threshold=threshold) + out[name] = { + "rows": rows, + "description": description, + "status": "ok", + "metrics": metrics, + "ece_10": expected_calibration_error(y_true=y_slice, y_prob=p_slice, bins=10), + } + return out + + def walk_forward_backtest( model_df, feature_cols: list[str], @@ -683,6 +759,25 @@ def write_model_card(path: str, report: dict[str, Any]) -> None: f"PR-AUC `{report['test_metrics']['pr_auc']}`, " f"ROC-AUC `{report['test_metrics']['roc_auc']}`, " f"Brier `{report['test_metrics']['brier']:.4f}`", + "", + "## Sliced Performance (Test)", + "", + ] + ) + for slice_name, info in report.get("sliced_performance_test", {}).items(): + if info.get("status") != "ok": + continue + metrics = info["metrics"] + lines.append( + f"- `{slice_name}` ({info['rows']} rows): " + f"precision `{metrics['precision']:.3f}`, " + f"recall `{metrics['recall']:.3f}`, " + f"PR-AUC `{metrics['pr_auc']}`, " + f"Brier `{metrics['brier']:.4f}`" + ) + + lines.extend( + [ "", "## Known Limitations", "", @@ -862,6 +957,12 @@ def main() -> int: "ece_10": expected_calibration_error(y_true=y_test, y_prob=y_test_prob, bins=10), } naive_baselines_test = evaluate_naive_baselines(test_df=test_df, y_test=y_test) + sliced_performance = evaluate_sliced_performance( + test_df=test_df, + y_true=y_test, + y_prob=y_test_prob, + threshold=chosen_threshold, + ) walk_forward = walk_forward_backtest( model_df=model_df, feature_cols=feature_cols, @@ -941,6 +1042,7 @@ def main() -> int: "test_metrics": test_metrics, "test_calibration_quality": test_calibration, "naive_baselines_test": naive_baselines_test, + "sliced_performance_test": sliced_performance, "walk_forward_backtest": walk_forward, } report = to_builtin(report) @@ -1002,6 +1104,21 @@ def main() -> int: f"pr_auc={m['pr_auc'] if m['pr_auc'] is not None else 'n/a'} " f"brier={m['brier']:.4f}" ) + sliced_ok = [ + (name, item) + for name, item in report["sliced_performance_test"].items() + if item.get("status") == "ok" + ] + if sliced_ok: + print(" sliced performance (test):") + for name, item in sliced_ok: + m = item["metrics"] + print( + f" {name}: rows={item['rows']} " + f"precision={m['precision']:.3f} recall={m['recall']:.3f} " + f"pr_auc={m['pr_auc'] if m['pr_auc'] is not None else 'n/a'} " + f"brier={m['brier']:.4f}" + ) if args.report_out: report_dir = os.path.dirname(args.report_out) diff --git a/todo.md b/todo.md index 84d4677..ee4d371 100644 --- a/todo.md +++ b/todo.md @@ -18,7 +18,7 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat - [x] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow. - [x] [P1] Add lag/rolling features (means, stddev, deltas) for core sensor inputs. - [x] [P1] Encode wind direction properly (cyclical encoding). -- [ ] [P2] Add calendar features (hour-of-day, day-of-week, seasonality proxies). +- [x] [P2] Add calendar features (hour-of-day, day-of-week, seasonality proxies). (`feature-set=extended_calendar`) - [x] [P1] Join aligned forecast features from `forecast_openmeteo_hourly` (precip prob, cloud cover, wind, pressure). - [x] [P1] Persist versioned dataset snapshots for reproducibility. @@ -34,7 +34,7 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat ## 5) Evaluation and Reporting - [x] [P0] Report ROC-AUC, PR-AUC, confusion matrix, precision, recall, and Brier score. - [x] [P1] Compare against naive baselines (persistence and simple forecast-threshold rules). -- [ ] [P2] Slice performance by periods/weather regimes (day/night, rainy weeks, etc.). +- [x] [P2] Slice performance by periods/weather regimes (day/night, rainy weeks, etc.). (`sliced_performance_test`) - [x] [P1] Produce a short model card (data window, features, metrics, known limitations). (`--model-card-out`) ## 6) Packaging and Deployment @@ -42,7 +42,7 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat - [x] [P0] Implement inference path with feature parity between training and serving. - [x] [P0] Add prediction storage table for predicted probabilities and realized outcomes. - [x] [P1] Expose predictions via API and optionally surface in web dashboard. -- [ ] [P2] Add scheduled retraining with rollback to last-known-good model. +- [x] [P2] Add scheduled retraining with rollback to last-known-good model. (`run_rain_ml_worker.py` candidate promote + `RAIN_MODEL_BACKUP_PATH`) ## 7) Monitoring and Operations - [x] [P1] Track feature drift and prediction drift over time. (view: `rain_feature_drift_daily`, `rain_prediction_drift_daily`)