diff --git a/agent.md b/agent.md new file mode 100644 index 0000000..0e018af --- /dev/null +++ b/agent.md @@ -0,0 +1,44 @@ + +## Workflow Orchestration +### 1. Plan Node Default +- Enter plan mode for ANY non-trivial task (3+ steps or architectural decisions) +- If something goes sideways, STOP and re-plan immediately - don't keep pushing +- Use plan mode for verification steps, not just building +- Write detailed specs upfront to reduce ambiguity +### 2. Subagent Strategy +- Use subagents liberally to keep main context window clean +- Offload research, exploration, and parallel analysis to subagents +- For complex problems, throw more compute at it via subagents +- One tack per subagent for focused execution +### 3. Self-Improvement Loop +- After ANY correction from the user: update tasks/lessons.md +with the pattern +- Write rules for yourself that prevent the same mistake +- Ruthlessly iterate on these lessons until mistake rate drops +- Review lessons at session start for relevant project +### 4. Verification Before Done +- Never mark a task complete without proving it works +- Diff behavior between main and your changes when relevant +- Ask yourself: "Would a staff engineer approve this?" +- Run tests, check logs, demonstrate correctness +### 5. Demand Elegance (Balanced) +- For non-trivial changes: pause and ask "is there a more elegant way?" +- If a fix feels hacky: "Knowing everything I know now, implement the elegant solution" +- Skip this for simple, obvious fixes - don't over-engineer +- Challenge your own work before presenting it +### 6. Autonomous Bug Fizing +- When given a bug report: just fix it. Don't ask for hand-holding +- Point at logs, errors, failing tests - then resolve them +- Zero context switching required from the user +- Go fix failing CI tests without being told how +## Task Management +1. **Plan First**: Write plan to "tasks/todo.md with checkable items +2. **Verify Plan**: Check in before starting implementation +3. **Track Progress**: Mark items complete as you go +4. **Explain Changes**: High-level summary at each step +5. **Document Results**: Add review section to tasks/todo.md" +6. **Capture Lessons**: Update tasks/lessons. md after corrections +## Core Principles +- **Simplicity First**: Make every change as simple as possible. Impact minimal code. +- **No Laziness**: Find root causes. No temporary fixes, Senior developer standards. +- **Minimat Impact**: Changes should only touch what's necessary. Avoid introducing bugs. \ No newline at end of file diff --git a/db/init/001_schema.sql b/db/init/001_schema.sql index 6bdd4c9..b8d85fd 100644 --- a/db/init/001_schema.sql +++ b/db/init/001_schema.sql @@ -84,6 +84,34 @@ SELECT create_hypertable('forecast_openmeteo_hourly', 'ts', if_not_exists => TRU CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC); +-- Rain model predictions (next 1h) +CREATE TABLE IF NOT EXISTS predictions_rain_1h ( + ts TIMESTAMPTZ NOT NULL, + generated_at TIMESTAMPTZ NOT NULL DEFAULT now(), + site TEXT NOT NULL, + model_name TEXT NOT NULL, + model_version TEXT NOT NULL, + threshold DOUBLE PRECISION NOT NULL, + probability DOUBLE PRECISION NOT NULL, + predict_rain BOOLEAN NOT NULL, + + rain_next_1h_mm_actual DOUBLE PRECISION, + rain_next_1h_actual BOOLEAN, + evaluated_at TIMESTAMPTZ, + + metadata JSONB, + + PRIMARY KEY (site, model_name, model_version, ts) +); + +SELECT create_hypertable('predictions_rain_1h', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_site_ts + ON predictions_rain_1h(site, ts DESC); + +CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_pending_eval + ON predictions_rain_1h(site, evaluated_at, ts DESC); + -- Raw retention: 90 days DO $$ BEGIN diff --git a/docs/rain_prediction.md b/docs/rain_prediction.md index c128529..2f5beb4 100644 --- a/docs/rain_prediction.md +++ b/docs/rain_prediction.md @@ -1,21 +1,20 @@ # Rain Prediction (Next 1 Hour) -This project now includes a starter training script for a **binary rain prediction**: +This project includes a baseline workflow for **binary rain prediction**: > **Will we see >= 0.2 mm of rain in the next hour?** -It uses local observations (WS90 + barometric pressure) and trains a lightweight -logistic regression model. This is a baseline you can iterate on as you collect -more data. +It uses local observations (WS90 + barometer), trains a logistic regression +baseline, and writes model-driven predictions back to TimescaleDB. -## What the script does -- Pulls data from TimescaleDB. -- Resamples observations to 5-minute buckets. -- Derives **pressure trend (1h)** from barometer data. -- Computes **future 1-hour rainfall** from the cumulative `rain_mm` counter. -- Trains a model and prints evaluation metrics. - -The output is a saved model file (optional) you can use later for inference. +## P0 Decisions (Locked) +- Target: `rain_next_1h_mm >= 0.2`. +- Primary use-case: low-noise rain heads-up signal for dashboard + alert candidate. +- Frozen v1 training window (UTC): `2026-02-01T00:00:00Z` to `2026-03-03T23:55:00Z`. +- Threshold policy: choose threshold on validation set by maximizing recall under + `precision >= 0.70`; fallback to max-F1 if the precision constraint is unreachable. +- Acceptance gate (test split): report and track `precision`, `recall`, `ROC-AUC`, + `PR-AUC`, `Brier score`, and confusion matrix. ## Requirements Python 3.10+ and: @@ -36,67 +35,76 @@ source .venv/bin/activate pip install -r scripts/requirements.txt ``` +## Scripts +- `scripts/audit_rain_data.py`: data quality + label quality + class balance audit. +- `scripts/train_rain_model.py`: strict time-based split training and metrics report. +- `scripts/predict_rain_model.py`: inference using saved model artifact; upserts into + `predictions_rain_1h`. + ## Usage +### 1) Apply schema update (existing DBs) +`001_schema.sql` now includes `predictions_rain_1h`. ```sh -python scripts/train_rain_model.py \ - --db-url "postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable" \ - --site "home" \ - --start "2026-01-01" \ - --end "2026-02-01" \ - --out "models/rain_model.pkl" +docker compose exec -T timescaledb \ + psql -U postgres -d micrometeo \ + -f /docker-entrypoint-initdb.d/001_schema.sql ``` -You can also provide the connection string via `DATABASE_URL`: - +### 2) Run data audit ```sh export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable" -python scripts/train_rain_model.py --site home + +python scripts/audit_rain_data.py \ + --site home \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --out "models/rain_data_audit.json" +``` + +### 3) Train baseline model +```sh +python scripts/train_rain_model.py \ + --site "home" \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --train-ratio 0.7 \ + --val-ratio 0.15 \ + --min-precision 0.70 \ + --model-version "rain-logreg-v1" \ + --out "models/rain_model.pkl" \ + --report-out "models/rain_model_report.json" +``` + +### 4) Run inference and store prediction +```sh +python scripts/predict_rain_model.py \ + --site home \ + --model-path "models/rain_model.pkl" \ + --model-name "rain_next_1h" +``` + +### 5) One-command P0 workflow +```sh +export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable" +bash scripts/run_p0_rain_workflow.sh ``` ## Output -The script prints metrics including: -- accuracy -- precision / recall -- ROC AUC -- confusion matrix +- Audit report: `models/rain_data_audit.json` +- Training report: `models/rain_model_report.json` +- Model artifact: `models/rain_model.pkl` +- Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized + outcome fields once available) -If `joblib` is installed, it saves a model bundle: +## Model Features (v1) +- `pressure_trend_1h` +- `humidity` +- `temperature_c` +- `wind_avg_m_s` +- `wind_max_m_s` -``` -models/rain_model.pkl -``` - -This bundle contains: -- The trained model pipeline -- The feature list used during training - -## Data needs / when to run -For a reliable model, you will want: -- **At least 2-4 weeks** of observations -- A mix of rainy and non-rainy periods - -Training with only a few days will produce an unstable model. - -## Features used -The baseline model uses: -- `pressure_trend_1h` (hPa) -- `humidity` (%) -- `temperature_c` (C) -- `wind_avg_m_s` (m/s) -- `wind_max_m_s` (m/s) - -These are easy to expand once you have more data (e.g. add forecast features). - -## Notes / assumptions -- Rain detection is based on **incremental rain** derived from the WS90 - `rain_mm` cumulative counter. -- Pressure comes from `observations_baro`. -- All timestamps are treated as UTC. - -## Next improvements -Ideas once more data is available: -- Add forecast precipitation and cloud cover as features -- Try gradient boosted trees (e.g. XGBoost / LightGBM) -- Train per-season models -- Calibrate probabilities (Platt scaling / isotonic regression) +## Notes +- Data is resampled into 5-minute buckets. +- Label is derived from incremental rain from WS90 cumulative `rain_mm`. +- Timestamps are handled as UTC in training/inference workflow. diff --git a/scripts/__pycache__/audit_rain_data.cpython-314.pyc b/scripts/__pycache__/audit_rain_data.cpython-314.pyc new file mode 100644 index 0000000..fa65916 Binary files /dev/null and b/scripts/__pycache__/audit_rain_data.cpython-314.pyc differ diff --git a/scripts/__pycache__/predict_rain_model.cpython-314.pyc b/scripts/__pycache__/predict_rain_model.cpython-314.pyc new file mode 100644 index 0000000..cfcf3e2 Binary files /dev/null and b/scripts/__pycache__/predict_rain_model.cpython-314.pyc differ diff --git a/scripts/__pycache__/rain_model_common.cpython-314.pyc b/scripts/__pycache__/rain_model_common.cpython-314.pyc new file mode 100644 index 0000000..81a6533 Binary files /dev/null and b/scripts/__pycache__/rain_model_common.cpython-314.pyc differ diff --git a/scripts/__pycache__/train_rain_model.cpython-314.pyc b/scripts/__pycache__/train_rain_model.cpython-314.pyc new file mode 100644 index 0000000..e8f884b Binary files /dev/null and b/scripts/__pycache__/train_rain_model.cpython-314.pyc differ diff --git a/scripts/audit_rain_data.py b/scripts/audit_rain_data.py new file mode 100644 index 0000000..0ea33fa --- /dev/null +++ b/scripts/audit_rain_data.py @@ -0,0 +1,186 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import json +import os + +import numpy as np +import psycopg2 + +from rain_model_common import ( + FEATURE_COLUMNS, + RAIN_EVENT_THRESHOLD_MM, + build_dataset, + fetch_baro, + fetch_ws90, + model_frame, + parse_time, + to_builtin, +) + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Audit weather time-series quality for rain model training.") + parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") + parser.add_argument("--site", required=True, help="Site name (e.g. home).") + parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).") + parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).") + parser.add_argument("--out", default="models/rain_data_audit.json", help="Path to save JSON audit report.") + return parser.parse_args() + + +def longest_zero_run(counts: np.ndarray) -> int: + best = 0 + cur = 0 + for v in counts: + if v == 0: + cur += 1 + if cur > best: + best = cur + else: + cur = 0 + return best + + +def build_weekly_balance(model_df): + weekly = model_df.copy() + iso = weekly.index.to_series().dt.isocalendar() + weekly["year_week"] = iso["year"].astype(str) + "-W" + iso["week"].astype(str).str.zfill(2) + + grouped = ( + weekly.groupby("year_week")["rain_next_1h"] + .agg(total_rows="count", positive_rows="sum") + .reset_index() + .sort_values("year_week") + ) + grouped["positive_rate"] = grouped["positive_rows"] / grouped["total_rows"] + return grouped.to_dict(orient="records") + + +def main() -> int: + args = parse_args() + if not args.db_url: + raise SystemExit("missing --db-url or DATABASE_URL") + + start = parse_time(args.start) if args.start else "" + end = parse_time(args.end) if args.end else "" + + with psycopg2.connect(args.db_url) as conn: + ws90 = fetch_ws90(conn, args.site, start, end) + baro = fetch_baro(conn, args.site, start, end) + + df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) + model_df = model_frame(df, FEATURE_COLUMNS, require_target=True) + + ws90_dupes = int(ws90.duplicated(subset=["ts", "station_id"]).sum()) if not ws90.empty else 0 + baro_dupes = int(baro.duplicated(subset=["ts", "source"]).sum()) if not baro.empty else 0 + + ws90_out_of_order = 0 + if not ws90.empty: + ws90_by_received = ws90.sort_values("received_at") + ws90_out_of_order = int((ws90_by_received["ts"].diff().dropna() < np.timedelta64(0, "ns")).sum()) + + baro_out_of_order = 0 + if not baro.empty: + baro_by_received = baro.sort_values("received_at") + baro_out_of_order = int((baro_by_received["ts"].diff().dropna() < np.timedelta64(0, "ns")).sum()) + + ws90_counts = ws90.set_index("ts").resample("5min").size() if not ws90.empty else np.array([]) + baro_counts = baro.set_index("ts").resample("5min").size() if not baro.empty else np.array([]) + + ws90_gap_buckets = int((ws90_counts == 0).sum()) if len(ws90_counts) else 0 + baro_gap_buckets = int((baro_counts == 0).sum()) if len(baro_counts) else 0 + ws90_max_gap_min = longest_zero_run(np.array(ws90_counts)) * 5 if len(ws90_counts) else 0 + baro_max_gap_min = longest_zero_run(np.array(baro_counts)) * 5 if len(baro_counts) else 0 + + missingness = {} + for col in FEATURE_COLUMNS + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]: + if col in df.columns: + missingness[col] = float(df[col].isna().mean()) + + max_rain_inc = None + if "rain_inc" in df.columns and np.isfinite(df["rain_inc"].to_numpy(dtype=float)).any(): + max_rain_inc = float(np.nanmax(df["rain_inc"].to_numpy(dtype=float))) + + report = { + "site": args.site, + "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", + "requested_window": { + "start": start or None, + "end": end or None, + }, + "observed_window": { + "ws90_start": ws90["ts"].min() if not ws90.empty else None, + "ws90_end": ws90["ts"].max() if not ws90.empty else None, + "baro_start": baro["ts"].min() if not baro.empty else None, + "baro_end": baro["ts"].max() if not baro.empty else None, + "model_start": model_df.index.min() if not model_df.empty else None, + "model_end": model_df.index.max() if not model_df.empty else None, + }, + "row_counts": { + "ws90_rows": int(len(ws90)), + "baro_rows": int(len(baro)), + "model_rows": int(len(model_df)), + }, + "duplicates": { + "ws90_ts_station_duplicates": ws90_dupes, + "baro_ts_source_duplicates": baro_dupes, + }, + "out_of_order": { + "ws90_by_received_count": ws90_out_of_order, + "baro_by_received_count": baro_out_of_order, + }, + "gaps_5m": { + "ws90_empty_buckets": ws90_gap_buckets, + "baro_empty_buckets": baro_gap_buckets, + "ws90_max_gap_minutes": ws90_max_gap_min, + "baro_max_gap_minutes": baro_max_gap_min, + }, + "missingness_ratio": missingness, + "label_quality": { + "rain_reset_count": int(np.nansum(df["rain_reset"].fillna(False).to_numpy(dtype=int))), + "rain_spike_5m_count": int(np.nansum(df["rain_spike_5m"].fillna(False).to_numpy(dtype=int))), + "max_rain_increment_5m_mm": max_rain_inc, + }, + "class_balance": { + "overall_positive_rate": float(model_df["rain_next_1h"].mean()) if not model_df.empty else None, + "weekly": build_weekly_balance(model_df) if not model_df.empty else [], + }, + } + report = to_builtin(report) + + print("Rain data audit summary:") + print(f" site: {report['site']}") + print( + " rows: " + f"ws90={report['row_counts']['ws90_rows']} " + f"baro={report['row_counts']['baro_rows']} " + f"model={report['row_counts']['model_rows']}" + ) + print( + " duplicates: " + f"ws90={report['duplicates']['ws90_ts_station_duplicates']} " + f"baro={report['duplicates']['baro_ts_source_duplicates']}" + ) + print( + " rain label checks: " + f"resets={report['label_quality']['rain_reset_count']} " + f"spikes_5m={report['label_quality']['rain_spike_5m_count']} " + f"max_inc_5m={report['label_quality']['max_rain_increment_5m_mm']}" + ) + print(f" overall positive rate: {report['class_balance']['overall_positive_rate']}") + + if args.out: + out_dir = os.path.dirname(args.out) + if out_dir: + os.makedirs(out_dir, exist_ok=True) + with open(args.out, "w", encoding="utf-8") as f: + json.dump(report, f, indent=2) + print(f"Saved audit report to {args.out}") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/predict_rain_model.py b/scripts/predict_rain_model.py new file mode 100644 index 0000000..ff9adbb --- /dev/null +++ b/scripts/predict_rain_model.py @@ -0,0 +1,181 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +import argparse +import os +from datetime import datetime, timedelta, timezone + +import psycopg2 +from psycopg2.extras import Json + +from rain_model_common import build_dataset, fetch_baro, fetch_ws90, model_frame, parse_time, to_builtin + +try: + import joblib +except ImportError: # pragma: no cover - optional dependency + joblib = None + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser(description="Run rain model inference and upsert prediction to Postgres.") + parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") + parser.add_argument("--site", required=True, help="Site name (e.g. home).") + parser.add_argument("--model-path", default="models/rain_model.pkl", help="Path to trained model artifact.") + parser.add_argument("--model-name", default="rain_next_1h", help="Logical prediction model name.") + parser.add_argument("--model-version", help="Override artifact model_version.") + parser.add_argument( + "--at", + help="Prediction timestamp (RFC3339 or YYYY-MM-DD). Default: current UTC time.", + ) + parser.add_argument( + "--history-hours", + type=int, + default=6, + help="History lookback window used to build features.", + ) + parser.add_argument("--dry-run", action="store_true", help="Do not write prediction to DB.") + return parser.parse_args() + + +def load_artifact(path: str): + if joblib is None: + raise RuntimeError("joblib not installed; cannot load model artifact") + if not os.path.exists(path): + raise RuntimeError(f"model artifact not found: {path}") + artifact = joblib.load(path) + if "model" not in artifact: + raise RuntimeError("invalid artifact: missing 'model'") + if "features" not in artifact: + raise RuntimeError("invalid artifact: missing 'features'") + return artifact + + +def parse_at(value: str | None) -> datetime: + if not value: + return datetime.now(timezone.utc) + parsed = parse_time(value) + return datetime.fromisoformat(parsed.replace("Z", "+00:00")).astimezone(timezone.utc) + + +def main() -> int: + args = parse_args() + if not args.db_url: + raise SystemExit("missing --db-url or DATABASE_URL") + + at = parse_at(args.at) + artifact = load_artifact(args.model_path) + model = artifact["model"] + features = artifact["features"] + threshold = float(artifact.get("threshold", 0.5)) + model_version = args.model_version or artifact.get("model_version") or "unknown" + + fetch_start = (at - timedelta(hours=args.history_hours)).isoformat() + fetch_end = (at + timedelta(hours=1, minutes=5)).isoformat() + + with psycopg2.connect(args.db_url) as conn: + ws90 = fetch_ws90(conn, args.site, fetch_start, fetch_end) + baro = fetch_baro(conn, args.site, fetch_start, fetch_end) + + full_df = build_dataset(ws90, baro) + feature_df = model_frame(full_df, feature_cols=features, require_target=False) + candidates = feature_df.loc[feature_df.index <= at] + if candidates.empty: + raise RuntimeError("no feature-complete row available at or before requested timestamp") + + row = candidates.tail(1) + pred_ts = row.index[0].to_pydatetime() + x = row[features] + + probability = float(model.predict_proba(x)[:, 1][0]) + predict_rain = probability >= threshold + + actual_mm = None + actual_flag = None + evaluated_at = None + latest_available = full_df.index.max().to_pydatetime() + if pred_ts + timedelta(hours=1) <= latest_available: + next_mm = full_df.loc[pred_ts, "rain_next_1h_mm"] + next_flag = full_df.loc[pred_ts, "rain_next_1h"] + if next_mm == next_mm: # NaN-safe check + actual_mm = float(next_mm) + if next_flag == next_flag: + actual_flag = bool(next_flag) + evaluated_at = datetime.now(timezone.utc) + + metadata = { + "artifact_path": args.model_path, + "artifact_model_version": artifact.get("model_version"), + "feature_values": {col: float(row.iloc[0][col]) for col in features}, + "source_window_start": fetch_start, + "source_window_end": fetch_end, + "requested_at": at.isoformat(), + "pred_ts": pred_ts.isoformat(), + } + metadata = to_builtin(metadata) + + print("Rain inference summary:") + print(f" site: {args.site}") + print(f" model_name: {args.model_name}") + print(f" model_version: {model_version}") + print(f" pred_ts: {pred_ts.isoformat()}") + print(f" threshold: {threshold:.3f}") + print(f" probability: {probability:.4f}") + print(f" predict_rain: {predict_rain}") + print(f" outcome_available: {actual_flag is not None}") + + if args.dry_run: + print("dry-run enabled; skipping DB upsert.") + return 0 + + with conn.cursor() as cur: + cur.execute( + """ + INSERT INTO predictions_rain_1h ( + ts, + generated_at, + site, + model_name, + model_version, + threshold, + probability, + predict_rain, + rain_next_1h_mm_actual, + rain_next_1h_actual, + evaluated_at, + metadata + ) VALUES ( + %s, now(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s + ) + ON CONFLICT (site, model_name, model_version, ts) + DO UPDATE SET + generated_at = EXCLUDED.generated_at, + threshold = EXCLUDED.threshold, + probability = EXCLUDED.probability, + predict_rain = EXCLUDED.predict_rain, + rain_next_1h_mm_actual = COALESCE(EXCLUDED.rain_next_1h_mm_actual, predictions_rain_1h.rain_next_1h_mm_actual), + rain_next_1h_actual = COALESCE(EXCLUDED.rain_next_1h_actual, predictions_rain_1h.rain_next_1h_actual), + evaluated_at = COALESCE(EXCLUDED.evaluated_at, predictions_rain_1h.evaluated_at), + metadata = EXCLUDED.metadata + """, + ( + pred_ts, + args.site, + args.model_name, + model_version, + threshold, + probability, + predict_rain, + actual_mm, + actual_flag, + evaluated_at, + Json(metadata), + ), + ) + conn.commit() + print("Prediction upserted into predictions_rain_1h.") + + return 0 + + +if __name__ == "__main__": + raise SystemExit(main()) diff --git a/scripts/rain_model_common.py b/scripts/rain_model_common.py new file mode 100644 index 0000000..6126e14 --- /dev/null +++ b/scripts/rain_model_common.py @@ -0,0 +1,226 @@ +#!/usr/bin/env python3 +from __future__ import annotations + +from datetime import datetime +from typing import Any + +import numpy as np +import pandas as pd +from sklearn.metrics import ( + accuracy_score, + average_precision_score, + brier_score_loss, + confusion_matrix, + f1_score, + precision_score, + recall_score, + roc_auc_score, +) + +FEATURE_COLUMNS = [ + "pressure_trend_1h", + "humidity", + "temperature_c", + "wind_avg_m_s", + "wind_max_m_s", +] + +RAIN_EVENT_THRESHOLD_MM = 0.2 +RAIN_SPIKE_THRESHOLD_MM_5M = 5.0 +RAIN_HORIZON_BUCKETS = 12 # 12 * 5m = 1h + + +def parse_time(value: str) -> str: + if not value: + return "" + try: + datetime.fromisoformat(value.replace("Z", "+00:00")) + return value + except ValueError as exc: + raise ValueError(f"invalid time format: {value}") from exc + + +def fetch_ws90(conn, site: str, start: str, end: str) -> pd.DataFrame: + sql = """ + SELECT ts, station_id, received_at, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, rain_mm + FROM observations_ws90 + WHERE site = %s + AND (%s = '' OR ts >= %s::timestamptz) + AND (%s = '' OR ts <= %s::timestamptz) + ORDER BY ts ASC + """ + return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts", "received_at"]) + + +def fetch_baro(conn, site: str, start: str, end: str) -> pd.DataFrame: + sql = """ + SELECT ts, source, received_at, pressure_hpa + FROM observations_baro + WHERE site = %s + AND (%s = '' OR ts >= %s::timestamptz) + AND (%s = '' OR ts <= %s::timestamptz) + ORDER BY ts ASC + """ + return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts", "received_at"]) + + +def build_dataset( + ws90: pd.DataFrame, + baro: pd.DataFrame, + rain_event_threshold_mm: float = RAIN_EVENT_THRESHOLD_MM, +) -> pd.DataFrame: + if ws90.empty: + raise RuntimeError("no ws90 observations found") + if baro.empty: + raise RuntimeError("no barometer observations found") + + ws90 = ws90.set_index("ts").sort_index() + baro = baro.set_index("ts").sort_index() + + ws90_5m = ws90.resample("5min").agg( + { + "temperature_c": "mean", + "humidity": "mean", + "wind_avg_m_s": "mean", + "wind_max_m_s": "max", + "wind_dir_deg": "mean", + "rain_mm": "last", + } + ) + baro_5m = baro.resample("5min").agg({"pressure_hpa": "mean"}) + + df = ws90_5m.join(baro_5m, how="outer") + df["pressure_hpa"] = df["pressure_hpa"].interpolate(limit=3) + + df["rain_inc_raw"] = df["rain_mm"].diff() + df["rain_reset"] = df["rain_inc_raw"] < 0 + df["rain_inc"] = df["rain_inc_raw"].clip(lower=0) + df["rain_spike_5m"] = df["rain_inc"] >= RAIN_SPIKE_THRESHOLD_MM_5M + + window = RAIN_HORIZON_BUCKETS + df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1)) + df["rain_next_1h"] = df["rain_next_1h_mm"] >= rain_event_threshold_mm + + df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window) + + return df + + +def model_frame(df: pd.DataFrame, feature_cols: list[str] | None = None, require_target: bool = True) -> pd.DataFrame: + features = feature_cols or FEATURE_COLUMNS + required = list(features) + if require_target: + required.append("rain_next_1h") + out = df.dropna(subset=required).copy() + return out.sort_index() + + +def split_time_ordered(df: pd.DataFrame, train_ratio: float = 0.7, val_ratio: float = 0.15) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: + if not (0 < train_ratio < 1): + raise ValueError("train_ratio must be between 0 and 1") + if not (0 <= val_ratio < 1): + raise ValueError("val_ratio must be between 0 and 1") + if train_ratio+val_ratio >= 1: + raise ValueError("train_ratio + val_ratio must be < 1") + + n = len(df) + if n < 100: + raise RuntimeError("not enough rows after filtering (need >= 100)") + + train_end = int(n * train_ratio) + val_end = int(n * (train_ratio + val_ratio)) + + train_end = min(max(train_end, 1), n - 2) + val_end = min(max(val_end, train_end + 1), n - 1) + + train_df = df.iloc[:train_end] + val_df = df.iloc[train_end:val_end] + test_df = df.iloc[val_end:] + + if train_df.empty or val_df.empty or test_df.empty: + raise RuntimeError("time split produced empty train/val/test set") + return train_df, val_df, test_df + + +def evaluate_probs(y_true: np.ndarray, y_prob: np.ndarray, threshold: float) -> dict[str, Any]: + y_pred = (y_prob >= threshold).astype(int) + + roc_auc = float("nan") + pr_auc = float("nan") + if len(np.unique(y_true)) > 1: + roc_auc = roc_auc_score(y_true, y_prob) + pr_auc = average_precision_score(y_true, y_prob) + + cm = confusion_matrix(y_true, y_pred, labels=[0, 1]) + metrics = { + "rows": int(len(y_true)), + "positive_rate": float(np.mean(y_true)), + "threshold": float(threshold), + "accuracy": accuracy_score(y_true, y_pred), + "precision": precision_score(y_true, y_pred, zero_division=0), + "recall": recall_score(y_true, y_pred, zero_division=0), + "f1": f1_score(y_true, y_pred, zero_division=0), + "roc_auc": roc_auc, + "pr_auc": pr_auc, + "brier": brier_score_loss(y_true, y_prob), + "confusion_matrix": cm.tolist(), + } + return to_builtin(metrics) + + +def select_threshold(y_true: np.ndarray, y_prob: np.ndarray, min_precision: float = 0.7) -> tuple[float, dict[str, Any]]: + thresholds = np.linspace(0.05, 0.95, 91) + + best: dict[str, Any] | None = None + constrained_best: dict[str, Any] | None = None + for threshold in thresholds: + y_pred = (y_prob >= threshold).astype(int) + precision = precision_score(y_true, y_pred, zero_division=0) + recall = recall_score(y_true, y_pred, zero_division=0) + f1 = f1_score(y_true, y_pred, zero_division=0) + candidate = { + "threshold": float(threshold), + "precision": float(precision), + "recall": float(recall), + "f1": float(f1), + } + + if best is None or candidate["f1"] > best["f1"]: + best = candidate + + if precision >= min_precision: + if constrained_best is None: + constrained_best = candidate + elif candidate["recall"] > constrained_best["recall"]: + constrained_best = candidate + elif candidate["recall"] == constrained_best["recall"] and candidate["f1"] > constrained_best["f1"]: + constrained_best = candidate + + if constrained_best is not None: + constrained_best["selection_rule"] = f"max_recall_where_precision>={min_precision:.2f}" + return float(constrained_best["threshold"]), constrained_best + + assert best is not None + best["selection_rule"] = "fallback_max_f1" + return float(best["threshold"]), best + + +def to_builtin(v: Any) -> Any: + if isinstance(v, dict): + return {k: to_builtin(val) for k, val in v.items()} + if isinstance(v, list): + return [to_builtin(i) for i in v] + if isinstance(v, tuple): + return [to_builtin(i) for i in v] + if isinstance(v, np.integer): + return int(v) + if isinstance(v, np.floating): + out = float(v) + if np.isnan(out): + return None + return out + if isinstance(v, pd.Timestamp): + return v.isoformat() + if isinstance(v, datetime): + return v.isoformat() + return v diff --git a/scripts/run_p0_rain_workflow.sh b/scripts/run_p0_rain_workflow.sh new file mode 100644 index 0000000..8fb9bb7 --- /dev/null +++ b/scripts/run_p0_rain_workflow.sh @@ -0,0 +1,43 @@ +#!/usr/bin/env bash +set -euo pipefail + +SITE="${SITE:-home}" +START="${START:-2026-02-01T00:00:00Z}" +END="${END:-2026-03-03T23:55:00Z}" +MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v1}" +MODEL_PATH="${MODEL_PATH:-models/rain_model.pkl}" +REPORT_PATH="${REPORT_PATH:-models/rain_model_report.json}" +AUDIT_PATH="${AUDIT_PATH:-models/rain_data_audit.json}" + +if [[ -z "${DATABASE_URL:-}" ]]; then + echo "DATABASE_URL is required" + echo "example: export DATABASE_URL='postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable'" + exit 1 +fi + +echo "Running rain data audit..." +python scripts/audit_rain_data.py \ + --site "$SITE" \ + --start "$START" \ + --end "$END" \ + --out "$AUDIT_PATH" + +echo "Training baseline rain model..." +python scripts/train_rain_model.py \ + --site "$SITE" \ + --start "$START" \ + --end "$END" \ + --train-ratio 0.7 \ + --val-ratio 0.15 \ + --min-precision 0.70 \ + --model-version "$MODEL_VERSION" \ + --out "$MODEL_PATH" \ + --report-out "$REPORT_PATH" + +echo "Writing current prediction..." +python scripts/predict_rain_model.py \ + --site "$SITE" \ + --model-path "$MODEL_PATH" \ + --model-name "rain_next_1h" + +echo "P0 rain workflow complete." diff --git a/scripts/train_rain_model.py b/scripts/train_rain_model.py index 2dadd94..332bbda 100644 --- a/scripts/train_rain_model.py +++ b/scripts/train_rain_model.py @@ -1,16 +1,29 @@ #!/usr/bin/env python3 import argparse +import json import os -from datetime import datetime +from datetime import datetime, timezone import numpy as np -import pandas as pd import psycopg2 from sklearn.linear_model import LogisticRegression -from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, roc_auc_score from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler +from rain_model_common import ( + FEATURE_COLUMNS, + RAIN_EVENT_THRESHOLD_MM, + build_dataset, + evaluate_probs, + fetch_baro, + fetch_ws90, + model_frame, + parse_time, + select_threshold, + split_time_ordered, + to_builtin, +) + try: import joblib except ImportError: # pragma: no cover - optional dependency @@ -18,128 +31,42 @@ except ImportError: # pragma: no cover - optional dependency def parse_args() -> argparse.Namespace: - parser = argparse.ArgumentParser(description="Train a simple rain prediction model (next 1h >= 0.2mm).") + parser = argparse.ArgumentParser(description="Train a rain prediction model (next 1h >= 0.2mm).") parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") parser.add_argument("--site", required=True, help="Site name (e.g. home).") parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).") parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).") + parser.add_argument("--train-ratio", type=float, default=0.7, help="Time-ordered train split ratio.") + parser.add_argument("--val-ratio", type=float, default=0.15, help="Time-ordered validation split ratio.") + parser.add_argument( + "--min-precision", + type=float, + default=0.7, + help="Minimum validation precision for threshold selection.", + ) + parser.add_argument("--threshold", type=float, help="Optional fixed classification threshold.") + parser.add_argument("--min-rows", type=int, default=200, help="Minimum model-ready rows required.") parser.add_argument("--out", default="models/rain_model.pkl", help="Path to save model.") + parser.add_argument( + "--report-out", + default="models/rain_model_report.json", + help="Path to save JSON training report.", + ) + parser.add_argument( + "--model-version", + default="rain-logreg-v1", + help="Version label stored in artifact metadata.", + ) return parser.parse_args() -def parse_time(value: str) -> str: - if not value: - return "" - try: - datetime.fromisoformat(value.replace("Z", "+00:00")) - return value - except ValueError: - raise ValueError(f"invalid time format: {value}") - - -def fetch_ws90(conn, site, start, end): - sql = """ - SELECT ts, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, rain_mm - FROM observations_ws90 - WHERE site = %s - AND (%s = '' OR ts >= %s::timestamptz) - AND (%s = '' OR ts <= %s::timestamptz) - ORDER BY ts ASC - """ - return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts"]) - - -def fetch_baro(conn, site, start, end): - sql = """ - SELECT ts, pressure_hpa - FROM observations_baro - WHERE site = %s - AND (%s = '' OR ts >= %s::timestamptz) - AND (%s = '' OR ts <= %s::timestamptz) - ORDER BY ts ASC - """ - return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts"]) - - -def build_dataset(ws90: pd.DataFrame, baro: pd.DataFrame) -> pd.DataFrame: - if ws90.empty: - raise RuntimeError("no ws90 observations found") - if baro.empty: - raise RuntimeError("no barometer observations found") - - ws90 = ws90.set_index("ts").sort_index() - baro = baro.set_index("ts").sort_index() - - ws90_5m = ws90.resample("5min").agg( - { - "temperature_c": "mean", - "humidity": "mean", - "wind_avg_m_s": "mean", - "wind_max_m_s": "max", - "wind_dir_deg": "mean", - "rain_mm": "last", - } - ) - baro_5m = baro.resample("5min").mean() - - df = ws90_5m.join(baro_5m, how="outer") - df["pressure_hpa"] = df["pressure_hpa"].interpolate(limit=3) - - # Compute incremental rain and future 1-hour sum. - df["rain_inc"] = df["rain_mm"].diff().clip(lower=0) - window = 12 # 12 * 5min = 1 hour - df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1)) - df["rain_next_1h"] = df["rain_next_1h_mm"] >= 0.2 - - # Pressure trend over the previous hour. - df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(12) - - return df - - -def train_model(df: pd.DataFrame): - feature_cols = [ - "pressure_trend_1h", - "humidity", - "temperature_c", - "wind_avg_m_s", - "wind_max_m_s", - ] - - df = df.dropna(subset=feature_cols + ["rain_next_1h"]) - if len(df) < 200: - raise RuntimeError("not enough data after filtering (need >= 200 rows)") - - X = df[feature_cols] - y = df["rain_next_1h"].astype(int) - - split_idx = int(len(df) * 0.8) - X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:] - y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:] - - model = Pipeline( +def make_model() -> Pipeline: + return Pipeline( [ ("scaler", StandardScaler()), ("clf", LogisticRegression(max_iter=1000, class_weight="balanced")), ] ) - model.fit(X_train, y_train) - - y_pred = model.predict(X_test) - y_prob = model.predict_proba(X_test)[:, 1] - - metrics = { - "rows": len(df), - "train_rows": len(X_train), - "test_rows": len(X_test), - "accuracy": accuracy_score(y_test, y_pred), - "precision": precision_score(y_test, y_pred, zero_division=0), - "recall": recall_score(y_test, y_pred, zero_division=0), - "roc_auc": roc_auc_score(y_test, y_prob), - "confusion_matrix": confusion_matrix(y_test, y_pred).tolist(), - } - - return model, metrics, feature_cols def main() -> int: @@ -154,12 +81,124 @@ def main() -> int: ws90 = fetch_ws90(conn, args.site, start, end) baro = fetch_baro(conn, args.site, start, end) - df = build_dataset(ws90, baro) - model, metrics, features = train_model(df) + full_df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) + model_df = model_frame(full_df, FEATURE_COLUMNS, require_target=True) + if len(model_df) < args.min_rows: + raise RuntimeError(f"not enough model-ready rows after filtering (need >= {args.min_rows})") - print("Rain model metrics:") - for k, v in metrics.items(): - print(f" {k}: {v}") + train_df, val_df, test_df = split_time_ordered( + model_df, + train_ratio=args.train_ratio, + val_ratio=args.val_ratio, + ) + + x_train = train_df[FEATURE_COLUMNS] + y_train = train_df["rain_next_1h"].astype(int).to_numpy() + x_val = val_df[FEATURE_COLUMNS] + y_val = val_df["rain_next_1h"].astype(int).to_numpy() + x_test = test_df[FEATURE_COLUMNS] + y_test = test_df["rain_next_1h"].astype(int).to_numpy() + + base_model = make_model() + base_model.fit(x_train, y_train) + y_val_prob = base_model.predict_proba(x_val)[:, 1] + + if args.threshold is not None: + chosen_threshold = args.threshold + threshold_info = { + "selection_rule": "fixed_cli_threshold", + "threshold": float(args.threshold), + } + else: + chosen_threshold, threshold_info = select_threshold( + y_true=y_val, + y_prob=y_val_prob, + min_precision=args.min_precision, + ) + + val_metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=chosen_threshold) + + train_val_df = model_df.iloc[: len(train_df) + len(val_df)] + x_train_val = train_val_df[FEATURE_COLUMNS] + y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy() + + final_model = make_model() + final_model.fit(x_train_val, y_train_val) + y_test_prob = final_model.predict_proba(x_test)[:, 1] + test_metrics = evaluate_probs(y_true=y_test, y_prob=y_test_prob, threshold=chosen_threshold) + + report = { + "generated_at": datetime.now(timezone.utc).isoformat(), + "site": args.site, + "model_version": args.model_version, + "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", + "feature_columns": FEATURE_COLUMNS, + "data_window": { + "requested_start": start or None, + "requested_end": end or None, + "actual_start": model_df.index.min(), + "actual_end": model_df.index.max(), + "model_rows": len(model_df), + "ws90_rows": len(ws90), + "baro_rows": len(baro), + }, + "label_quality": { + "rain_reset_count": int(np.nansum(full_df["rain_reset"].fillna(False).to_numpy(dtype=int))), + "rain_spike_5m_count": int(np.nansum(full_df["rain_spike_5m"].fillna(False).to_numpy(dtype=int))), + }, + "split": { + "train_ratio": args.train_ratio, + "val_ratio": args.val_ratio, + "train_rows": len(train_df), + "val_rows": len(val_df), + "test_rows": len(test_df), + "train_start": train_df.index.min(), + "train_end": train_df.index.max(), + "val_start": val_df.index.min(), + "val_end": val_df.index.max(), + "test_start": test_df.index.min(), + "test_end": test_df.index.max(), + }, + "threshold_selection": { + **threshold_info, + "min_precision_constraint": args.min_precision, + }, + "validation_metrics": val_metrics, + "test_metrics": test_metrics, + } + report = to_builtin(report) + + print("Rain model training summary:") + print(f" site: {args.site}") + print(f" model_version: {args.model_version}") + print(f" rows: total={report['data_window']['model_rows']} train={report['split']['train_rows']} val={report['split']['val_rows']} test={report['split']['test_rows']}") + print( + " threshold: " + f"{report['threshold_selection']['threshold']:.3f} " + f"({report['threshold_selection']['selection_rule']})" + ) + print( + " val metrics: " + f"precision={report['validation_metrics']['precision']:.3f} " + f"recall={report['validation_metrics']['recall']:.3f} " + f"roc_auc={report['validation_metrics']['roc_auc'] if report['validation_metrics']['roc_auc'] is not None else 'n/a'} " + f"pr_auc={report['validation_metrics']['pr_auc'] if report['validation_metrics']['pr_auc'] is not None else 'n/a'}" + ) + print( + " test metrics: " + f"precision={report['test_metrics']['precision']:.3f} " + f"recall={report['test_metrics']['recall']:.3f} " + f"roc_auc={report['test_metrics']['roc_auc'] if report['test_metrics']['roc_auc'] is not None else 'n/a'} " + f"pr_auc={report['test_metrics']['pr_auc'] if report['test_metrics']['pr_auc'] is not None else 'n/a'}" + ) + + if args.report_out: + report_dir = os.path.dirname(args.report_out) + if report_dir: + os.makedirs(report_dir, exist_ok=True) + with open(args.report_out, "w", encoding="utf-8") as f: + json.dump(report, f, indent=2) + print(f"Saved report to {args.report_out}") if args.out: out_dir = os.path.dirname(args.out) @@ -168,7 +207,17 @@ def main() -> int: if joblib is None: print("joblib not installed; skipping model save.") else: - joblib.dump({"model": model, "features": features}, args.out) + artifact = { + "model": final_model, + "features": FEATURE_COLUMNS, + "threshold": float(chosen_threshold), + "target_mm": float(RAIN_EVENT_THRESHOLD_MM), + "model_version": args.model_version, + "trained_at": datetime.now(timezone.utc).isoformat(), + "split": report["split"], + "threshold_selection": report["threshold_selection"], + } + joblib.dump(artifact, args.out) print(f"Saved model to {args.out}") return 0 diff --git a/todo.md b/todo.md new file mode 100644 index 0000000..4dc9782 --- /dev/null +++ b/todo.md @@ -0,0 +1,57 @@ +# Predictive Model TODO + +Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimization. + +## 1) Scope and Success Criteria +- [x] [P0] Lock v1 target: predict `rain_next_1h >= 0.2mm`. +- [x] [P0] Define the decision use-case (alerts vs dashboard signal). +- [x] [P0] Set acceptance metrics and thresholds (precision, recall, ROC-AUC). +- [x] [P0] Freeze training window with explicit UTC start/end timestamps. + +## 2) Data Quality and Label Validation +- [ ] [P0] Audit `observations_ws90` and `observations_baro` for missingness, gaps, duplicates, and out-of-order rows. (script ready: `scripts/audit_rain_data.py`; run on runtime machine) +- [ ] [P0] Validate rain label construction from `rain_mm` (counter resets, negative deltas, spikes). (script ready: `scripts/audit_rain_data.py`; run on runtime machine) +- [ ] [P0] Measure class balance by week (rain-positive vs rain-negative). (script ready: `scripts/audit_rain_data.py`; run on runtime machine) +- [ ] [P1] Document known data issues and mitigation rules. + +## 3) Dataset and Feature Engineering +- [ ] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow. +- [ ] [P1] Add lag/rolling features (means, stddev, deltas) for core sensor inputs. +- [ ] [P1] Encode wind direction properly (cyclical encoding). +- [ ] [P2] Add calendar features (hour-of-day, day-of-week, seasonality proxies). +- [ ] [P1] Join aligned forecast features from `forecast_openmeteo_hourly` (precip prob, cloud cover, wind, pressure). +- [ ] [P1] Persist versioned dataset snapshots for reproducibility. + +## 4) Modeling and Validation +- [x] [P0] Keep logistic regression as baseline. +- [ ] [P1] Add at least one tree-based baseline (e.g. gradient boosting). +- [x] [P0] Use strict time-based train/validation/test splits (no random shuffling). +- [ ] [P1] Add walk-forward backtesting across multiple temporal folds. +- [ ] [P1] Tune hyperparameters on validation data only. +- [ ] [P1] Calibrate probabilities (Platt or isotonic) and compare calibration quality. +- [x] [P0] Choose and lock the operating threshold based on use-case costs. + +## 5) Evaluation and Reporting +- [x] [P0] Report ROC-AUC, PR-AUC, confusion matrix, precision, recall, and Brier score. +- [ ] [P1] Compare against naive baselines (persistence and simple forecast-threshold rules). +- [ ] [P2] Slice performance by periods/weather regimes (day/night, rainy weeks, etc.). +- [ ] [P1] Produce a short model card (data window, features, metrics, known limitations). + +## 6) Packaging and Deployment +- [ ] [P1] Version model artifacts and feature schema together. +- [x] [P0] Implement inference path with feature parity between training and serving. +- [x] [P0] Add prediction storage table for predicted probabilities and realized outcomes. +- [ ] [P1] Expose predictions via API and optionally surface in web dashboard. +- [ ] [P2] Add scheduled retraining with rollback to last-known-good model. + +## 7) Monitoring and Operations +- [ ] [P1] Track feature drift and prediction drift over time. +- [ ] [P1] Track calibration drift and realized performance after deployment. +- [ ] [P1] Add alerts for training/inference/data pipeline failures. +- [ ] [P1] Document runbook for train/evaluate/deploy/rollback. + +## 8) Immediate Next Steps (This Week) +- [ ] [P0] Run first full data audit and label-quality checks. (blocked here; run on runtime machine) +- [ ] [P0] Train baseline model on full available history and capture metrics. (blocked here; run on runtime machine) +- [ ] [P1] Add one expanded feature set and rerun evaluation. +- [x] [P0] Decide v1 threshold and define deployment interface.