feat: add rain data audit and prediction scripts

This commit is contained in:
2026-03-05 08:01:54 +11:00
parent 5bfa910495
commit 96e72d7c43
13 changed files with 1004 additions and 182 deletions

44
agent.md Normal file
View File

@@ -0,0 +1,44 @@
## Workflow Orchestration
### 1. Plan Node Default
- Enter plan mode for ANY non-trivial task (3+ steps or architectural decisions)
- If something goes sideways, STOP and re-plan immediately - don't keep pushing
- Use plan mode for verification steps, not just building
- Write detailed specs upfront to reduce ambiguity
### 2. Subagent Strategy
- Use subagents liberally to keep main context window clean
- Offload research, exploration, and parallel analysis to subagents
- For complex problems, throw more compute at it via subagents
- One tack per subagent for focused execution
### 3. Self-Improvement Loop
- After ANY correction from the user: update tasks/lessons.md
with the pattern
- Write rules for yourself that prevent the same mistake
- Ruthlessly iterate on these lessons until mistake rate drops
- Review lessons at session start for relevant project
### 4. Verification Before Done
- Never mark a task complete without proving it works
- Diff behavior between main and your changes when relevant
- Ask yourself: "Would a staff engineer approve this?"
- Run tests, check logs, demonstrate correctness
### 5. Demand Elegance (Balanced)
- For non-trivial changes: pause and ask "is there a more elegant way?"
- If a fix feels hacky: "Knowing everything I know now, implement the elegant solution"
- Skip this for simple, obvious fixes - don't over-engineer
- Challenge your own work before presenting it
### 6. Autonomous Bug Fizing
- When given a bug report: just fix it. Don't ask for hand-holding
- Point at logs, errors, failing tests - then resolve them
- Zero context switching required from the user
- Go fix failing CI tests without being told how
## Task Management
1. **Plan First**: Write plan to "tasks/todo.md with checkable items
2. **Verify Plan**: Check in before starting implementation
3. **Track Progress**: Mark items complete as you go
4. **Explain Changes**: High-level summary at each step
5. **Document Results**: Add review section to tasks/todo.md"
6. **Capture Lessons**: Update tasks/lessons. md after corrections
## Core Principles
- **Simplicity First**: Make every change as simple as possible. Impact minimal code.
- **No Laziness**: Find root causes. No temporary fixes, Senior developer standards.
- **Minimat Impact**: Changes should only touch what's necessary. Avoid introducing bugs.

View File

@@ -84,6 +84,34 @@ SELECT create_hypertable('forecast_openmeteo_hourly', 'ts', if_not_exists => TRU
CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC); CREATE INDEX IF NOT EXISTS idx_forecast_openmeteo_site_ts ON forecast_openmeteo_hourly(site, ts DESC);
-- Rain model predictions (next 1h)
CREATE TABLE IF NOT EXISTS predictions_rain_1h (
ts TIMESTAMPTZ NOT NULL,
generated_at TIMESTAMPTZ NOT NULL DEFAULT now(),
site TEXT NOT NULL,
model_name TEXT NOT NULL,
model_version TEXT NOT NULL,
threshold DOUBLE PRECISION NOT NULL,
probability DOUBLE PRECISION NOT NULL,
predict_rain BOOLEAN NOT NULL,
rain_next_1h_mm_actual DOUBLE PRECISION,
rain_next_1h_actual BOOLEAN,
evaluated_at TIMESTAMPTZ,
metadata JSONB,
PRIMARY KEY (site, model_name, model_version, ts)
);
SELECT create_hypertable('predictions_rain_1h', 'ts', if_not_exists => TRUE);
CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_site_ts
ON predictions_rain_1h(site, ts DESC);
CREATE INDEX IF NOT EXISTS idx_predictions_rain_1h_pending_eval
ON predictions_rain_1h(site, evaluated_at, ts DESC);
-- Raw retention: 90 days -- Raw retention: 90 days
DO $$ DO $$
BEGIN BEGIN

View File

@@ -1,21 +1,20 @@
# Rain Prediction (Next 1 Hour) # Rain Prediction (Next 1 Hour)
This project now includes a starter training script for a **binary rain prediction**: This project includes a baseline workflow for **binary rain prediction**:
> **Will we see >= 0.2 mm of rain in the next hour?** > **Will we see >= 0.2 mm of rain in the next hour?**
It uses local observations (WS90 + barometric pressure) and trains a lightweight It uses local observations (WS90 + barometer), trains a logistic regression
logistic regression model. This is a baseline you can iterate on as you collect baseline, and writes model-driven predictions back to TimescaleDB.
more data.
## What the script does ## P0 Decisions (Locked)
- Pulls data from TimescaleDB. - Target: `rain_next_1h_mm >= 0.2`.
- Resamples observations to 5-minute buckets. - Primary use-case: low-noise rain heads-up signal for dashboard + alert candidate.
- Derives **pressure trend (1h)** from barometer data. - Frozen v1 training window (UTC): `2026-02-01T00:00:00Z` to `2026-03-03T23:55:00Z`.
- Computes **future 1-hour rainfall** from the cumulative `rain_mm` counter. - Threshold policy: choose threshold on validation set by maximizing recall under
- Trains a model and prints evaluation metrics. `precision >= 0.70`; fallback to max-F1 if the precision constraint is unreachable.
- Acceptance gate (test split): report and track `precision`, `recall`, `ROC-AUC`,
The output is a saved model file (optional) you can use later for inference. `PR-AUC`, `Brier score`, and confusion matrix.
## Requirements ## Requirements
Python 3.10+ and: Python 3.10+ and:
@@ -36,67 +35,76 @@ source .venv/bin/activate
pip install -r scripts/requirements.txt pip install -r scripts/requirements.txt
``` ```
## Scripts
- `scripts/audit_rain_data.py`: data quality + label quality + class balance audit.
- `scripts/train_rain_model.py`: strict time-based split training and metrics report.
- `scripts/predict_rain_model.py`: inference using saved model artifact; upserts into
`predictions_rain_1h`.
## Usage ## Usage
### 1) Apply schema update (existing DBs)
`001_schema.sql` now includes `predictions_rain_1h`.
```sh ```sh
python scripts/train_rain_model.py \ docker compose exec -T timescaledb \
--db-url "postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable" \ psql -U postgres -d micrometeo \
--site "home" \ -f /docker-entrypoint-initdb.d/001_schema.sql
--start "2026-01-01" \
--end "2026-02-01" \
--out "models/rain_model.pkl"
``` ```
You can also provide the connection string via `DATABASE_URL`: ### 2) Run data audit
```sh ```sh
export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable" export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable"
python scripts/train_rain_model.py --site home
python scripts/audit_rain_data.py \
--site home \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
--out "models/rain_data_audit.json"
```
### 3) Train baseline model
```sh
python scripts/train_rain_model.py \
--site "home" \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
--train-ratio 0.7 \
--val-ratio 0.15 \
--min-precision 0.70 \
--model-version "rain-logreg-v1" \
--out "models/rain_model.pkl" \
--report-out "models/rain_model_report.json"
```
### 4) Run inference and store prediction
```sh
python scripts/predict_rain_model.py \
--site home \
--model-path "models/rain_model.pkl" \
--model-name "rain_next_1h"
```
### 5) One-command P0 workflow
```sh
export DATABASE_URL="postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable"
bash scripts/run_p0_rain_workflow.sh
``` ```
## Output ## Output
The script prints metrics including: - Audit report: `models/rain_data_audit.json`
- accuracy - Training report: `models/rain_model_report.json`
- precision / recall - Model artifact: `models/rain_model.pkl`
- ROC AUC - Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized
- confusion matrix outcome fields once available)
If `joblib` is installed, it saves a model bundle: ## Model Features (v1)
- `pressure_trend_1h`
- `humidity`
- `temperature_c`
- `wind_avg_m_s`
- `wind_max_m_s`
``` ## Notes
models/rain_model.pkl - Data is resampled into 5-minute buckets.
``` - Label is derived from incremental rain from WS90 cumulative `rain_mm`.
- Timestamps are handled as UTC in training/inference workflow.
This bundle contains:
- The trained model pipeline
- The feature list used during training
## Data needs / when to run
For a reliable model, you will want:
- **At least 2-4 weeks** of observations
- A mix of rainy and non-rainy periods
Training with only a few days will produce an unstable model.
## Features used
The baseline model uses:
- `pressure_trend_1h` (hPa)
- `humidity` (%)
- `temperature_c` (C)
- `wind_avg_m_s` (m/s)
- `wind_max_m_s` (m/s)
These are easy to expand once you have more data (e.g. add forecast features).
## Notes / assumptions
- Rain detection is based on **incremental rain** derived from the WS90
`rain_mm` cumulative counter.
- Pressure comes from `observations_baro`.
- All timestamps are treated as UTC.
## Next improvements
Ideas once more data is available:
- Add forecast precipitation and cloud cover as features
- Try gradient boosted trees (e.g. XGBoost / LightGBM)
- Train per-season models
- Calibrate probabilities (Platt scaling / isotonic regression)

Binary file not shown.

Binary file not shown.

Binary file not shown.

Binary file not shown.

186
scripts/audit_rain_data.py Normal file
View File

@@ -0,0 +1,186 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
import numpy as np
import psycopg2
from rain_model_common import (
FEATURE_COLUMNS,
RAIN_EVENT_THRESHOLD_MM,
build_dataset,
fetch_baro,
fetch_ws90,
model_frame,
parse_time,
to_builtin,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Audit weather time-series quality for rain model training.")
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name (e.g. home).")
parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).")
parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).")
parser.add_argument("--out", default="models/rain_data_audit.json", help="Path to save JSON audit report.")
return parser.parse_args()
def longest_zero_run(counts: np.ndarray) -> int:
best = 0
cur = 0
for v in counts:
if v == 0:
cur += 1
if cur > best:
best = cur
else:
cur = 0
return best
def build_weekly_balance(model_df):
weekly = model_df.copy()
iso = weekly.index.to_series().dt.isocalendar()
weekly["year_week"] = iso["year"].astype(str) + "-W" + iso["week"].astype(str).str.zfill(2)
grouped = (
weekly.groupby("year_week")["rain_next_1h"]
.agg(total_rows="count", positive_rows="sum")
.reset_index()
.sort_values("year_week")
)
grouped["positive_rate"] = grouped["positive_rows"] / grouped["total_rows"]
return grouped.to_dict(orient="records")
def main() -> int:
args = parse_args()
if not args.db_url:
raise SystemExit("missing --db-url or DATABASE_URL")
start = parse_time(args.start) if args.start else ""
end = parse_time(args.end) if args.end else ""
with psycopg2.connect(args.db_url) as conn:
ws90 = fetch_ws90(conn, args.site, start, end)
baro = fetch_baro(conn, args.site, start, end)
df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
model_df = model_frame(df, FEATURE_COLUMNS, require_target=True)
ws90_dupes = int(ws90.duplicated(subset=["ts", "station_id"]).sum()) if not ws90.empty else 0
baro_dupes = int(baro.duplicated(subset=["ts", "source"]).sum()) if not baro.empty else 0
ws90_out_of_order = 0
if not ws90.empty:
ws90_by_received = ws90.sort_values("received_at")
ws90_out_of_order = int((ws90_by_received["ts"].diff().dropna() < np.timedelta64(0, "ns")).sum())
baro_out_of_order = 0
if not baro.empty:
baro_by_received = baro.sort_values("received_at")
baro_out_of_order = int((baro_by_received["ts"].diff().dropna() < np.timedelta64(0, "ns")).sum())
ws90_counts = ws90.set_index("ts").resample("5min").size() if not ws90.empty else np.array([])
baro_counts = baro.set_index("ts").resample("5min").size() if not baro.empty else np.array([])
ws90_gap_buckets = int((ws90_counts == 0).sum()) if len(ws90_counts) else 0
baro_gap_buckets = int((baro_counts == 0).sum()) if len(baro_counts) else 0
ws90_max_gap_min = longest_zero_run(np.array(ws90_counts)) * 5 if len(ws90_counts) else 0
baro_max_gap_min = longest_zero_run(np.array(baro_counts)) * 5 if len(baro_counts) else 0
missingness = {}
for col in FEATURE_COLUMNS + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]:
if col in df.columns:
missingness[col] = float(df[col].isna().mean())
max_rain_inc = None
if "rain_inc" in df.columns and np.isfinite(df["rain_inc"].to_numpy(dtype=float)).any():
max_rain_inc = float(np.nanmax(df["rain_inc"].to_numpy(dtype=float)))
report = {
"site": args.site,
"target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"requested_window": {
"start": start or None,
"end": end or None,
},
"observed_window": {
"ws90_start": ws90["ts"].min() if not ws90.empty else None,
"ws90_end": ws90["ts"].max() if not ws90.empty else None,
"baro_start": baro["ts"].min() if not baro.empty else None,
"baro_end": baro["ts"].max() if not baro.empty else None,
"model_start": model_df.index.min() if not model_df.empty else None,
"model_end": model_df.index.max() if not model_df.empty else None,
},
"row_counts": {
"ws90_rows": int(len(ws90)),
"baro_rows": int(len(baro)),
"model_rows": int(len(model_df)),
},
"duplicates": {
"ws90_ts_station_duplicates": ws90_dupes,
"baro_ts_source_duplicates": baro_dupes,
},
"out_of_order": {
"ws90_by_received_count": ws90_out_of_order,
"baro_by_received_count": baro_out_of_order,
},
"gaps_5m": {
"ws90_empty_buckets": ws90_gap_buckets,
"baro_empty_buckets": baro_gap_buckets,
"ws90_max_gap_minutes": ws90_max_gap_min,
"baro_max_gap_minutes": baro_max_gap_min,
},
"missingness_ratio": missingness,
"label_quality": {
"rain_reset_count": int(np.nansum(df["rain_reset"].fillna(False).to_numpy(dtype=int))),
"rain_spike_5m_count": int(np.nansum(df["rain_spike_5m"].fillna(False).to_numpy(dtype=int))),
"max_rain_increment_5m_mm": max_rain_inc,
},
"class_balance": {
"overall_positive_rate": float(model_df["rain_next_1h"].mean()) if not model_df.empty else None,
"weekly": build_weekly_balance(model_df) if not model_df.empty else [],
},
}
report = to_builtin(report)
print("Rain data audit summary:")
print(f" site: {report['site']}")
print(
" rows: "
f"ws90={report['row_counts']['ws90_rows']} "
f"baro={report['row_counts']['baro_rows']} "
f"model={report['row_counts']['model_rows']}"
)
print(
" duplicates: "
f"ws90={report['duplicates']['ws90_ts_station_duplicates']} "
f"baro={report['duplicates']['baro_ts_source_duplicates']}"
)
print(
" rain label checks: "
f"resets={report['label_quality']['rain_reset_count']} "
f"spikes_5m={report['label_quality']['rain_spike_5m_count']} "
f"max_inc_5m={report['label_quality']['max_rain_increment_5m_mm']}"
)
print(f" overall positive rate: {report['class_balance']['overall_positive_rate']}")
if args.out:
out_dir = os.path.dirname(args.out)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
with open(args.out, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2)
print(f"Saved audit report to {args.out}")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,181 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import os
from datetime import datetime, timedelta, timezone
import psycopg2
from psycopg2.extras import Json
from rain_model_common import build_dataset, fetch_baro, fetch_ws90, model_frame, parse_time, to_builtin
try:
import joblib
except ImportError: # pragma: no cover - optional dependency
joblib = None
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Run rain model inference and upsert prediction to Postgres.")
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name (e.g. home).")
parser.add_argument("--model-path", default="models/rain_model.pkl", help="Path to trained model artifact.")
parser.add_argument("--model-name", default="rain_next_1h", help="Logical prediction model name.")
parser.add_argument("--model-version", help="Override artifact model_version.")
parser.add_argument(
"--at",
help="Prediction timestamp (RFC3339 or YYYY-MM-DD). Default: current UTC time.",
)
parser.add_argument(
"--history-hours",
type=int,
default=6,
help="History lookback window used to build features.",
)
parser.add_argument("--dry-run", action="store_true", help="Do not write prediction to DB.")
return parser.parse_args()
def load_artifact(path: str):
if joblib is None:
raise RuntimeError("joblib not installed; cannot load model artifact")
if not os.path.exists(path):
raise RuntimeError(f"model artifact not found: {path}")
artifact = joblib.load(path)
if "model" not in artifact:
raise RuntimeError("invalid artifact: missing 'model'")
if "features" not in artifact:
raise RuntimeError("invalid artifact: missing 'features'")
return artifact
def parse_at(value: str | None) -> datetime:
if not value:
return datetime.now(timezone.utc)
parsed = parse_time(value)
return datetime.fromisoformat(parsed.replace("Z", "+00:00")).astimezone(timezone.utc)
def main() -> int:
args = parse_args()
if not args.db_url:
raise SystemExit("missing --db-url or DATABASE_URL")
at = parse_at(args.at)
artifact = load_artifact(args.model_path)
model = artifact["model"]
features = artifact["features"]
threshold = float(artifact.get("threshold", 0.5))
model_version = args.model_version or artifact.get("model_version") or "unknown"
fetch_start = (at - timedelta(hours=args.history_hours)).isoformat()
fetch_end = (at + timedelta(hours=1, minutes=5)).isoformat()
with psycopg2.connect(args.db_url) as conn:
ws90 = fetch_ws90(conn, args.site, fetch_start, fetch_end)
baro = fetch_baro(conn, args.site, fetch_start, fetch_end)
full_df = build_dataset(ws90, baro)
feature_df = model_frame(full_df, feature_cols=features, require_target=False)
candidates = feature_df.loc[feature_df.index <= at]
if candidates.empty:
raise RuntimeError("no feature-complete row available at or before requested timestamp")
row = candidates.tail(1)
pred_ts = row.index[0].to_pydatetime()
x = row[features]
probability = float(model.predict_proba(x)[:, 1][0])
predict_rain = probability >= threshold
actual_mm = None
actual_flag = None
evaluated_at = None
latest_available = full_df.index.max().to_pydatetime()
if pred_ts + timedelta(hours=1) <= latest_available:
next_mm = full_df.loc[pred_ts, "rain_next_1h_mm"]
next_flag = full_df.loc[pred_ts, "rain_next_1h"]
if next_mm == next_mm: # NaN-safe check
actual_mm = float(next_mm)
if next_flag == next_flag:
actual_flag = bool(next_flag)
evaluated_at = datetime.now(timezone.utc)
metadata = {
"artifact_path": args.model_path,
"artifact_model_version": artifact.get("model_version"),
"feature_values": {col: float(row.iloc[0][col]) for col in features},
"source_window_start": fetch_start,
"source_window_end": fetch_end,
"requested_at": at.isoformat(),
"pred_ts": pred_ts.isoformat(),
}
metadata = to_builtin(metadata)
print("Rain inference summary:")
print(f" site: {args.site}")
print(f" model_name: {args.model_name}")
print(f" model_version: {model_version}")
print(f" pred_ts: {pred_ts.isoformat()}")
print(f" threshold: {threshold:.3f}")
print(f" probability: {probability:.4f}")
print(f" predict_rain: {predict_rain}")
print(f" outcome_available: {actual_flag is not None}")
if args.dry_run:
print("dry-run enabled; skipping DB upsert.")
return 0
with conn.cursor() as cur:
cur.execute(
"""
INSERT INTO predictions_rain_1h (
ts,
generated_at,
site,
model_name,
model_version,
threshold,
probability,
predict_rain,
rain_next_1h_mm_actual,
rain_next_1h_actual,
evaluated_at,
metadata
) VALUES (
%s, now(), %s, %s, %s, %s, %s, %s, %s, %s, %s, %s
)
ON CONFLICT (site, model_name, model_version, ts)
DO UPDATE SET
generated_at = EXCLUDED.generated_at,
threshold = EXCLUDED.threshold,
probability = EXCLUDED.probability,
predict_rain = EXCLUDED.predict_rain,
rain_next_1h_mm_actual = COALESCE(EXCLUDED.rain_next_1h_mm_actual, predictions_rain_1h.rain_next_1h_mm_actual),
rain_next_1h_actual = COALESCE(EXCLUDED.rain_next_1h_actual, predictions_rain_1h.rain_next_1h_actual),
evaluated_at = COALESCE(EXCLUDED.evaluated_at, predictions_rain_1h.evaluated_at),
metadata = EXCLUDED.metadata
""",
(
pred_ts,
args.site,
args.model_name,
model_version,
threshold,
probability,
predict_rain,
actual_mm,
actual_flag,
evaluated_at,
Json(metadata),
),
)
conn.commit()
print("Prediction upserted into predictions_rain_1h.")
return 0
if __name__ == "__main__":
raise SystemExit(main())

View File

@@ -0,0 +1,226 @@
#!/usr/bin/env python3
from __future__ import annotations
from datetime import datetime
from typing import Any
import numpy as np
import pandas as pd
from sklearn.metrics import (
accuracy_score,
average_precision_score,
brier_score_loss,
confusion_matrix,
f1_score,
precision_score,
recall_score,
roc_auc_score,
)
FEATURE_COLUMNS = [
"pressure_trend_1h",
"humidity",
"temperature_c",
"wind_avg_m_s",
"wind_max_m_s",
]
RAIN_EVENT_THRESHOLD_MM = 0.2
RAIN_SPIKE_THRESHOLD_MM_5M = 5.0
RAIN_HORIZON_BUCKETS = 12 # 12 * 5m = 1h
def parse_time(value: str) -> str:
if not value:
return ""
try:
datetime.fromisoformat(value.replace("Z", "+00:00"))
return value
except ValueError as exc:
raise ValueError(f"invalid time format: {value}") from exc
def fetch_ws90(conn, site: str, start: str, end: str) -> pd.DataFrame:
sql = """
SELECT ts, station_id, received_at, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, rain_mm
FROM observations_ws90
WHERE site = %s
AND (%s = '' OR ts >= %s::timestamptz)
AND (%s = '' OR ts <= %s::timestamptz)
ORDER BY ts ASC
"""
return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts", "received_at"])
def fetch_baro(conn, site: str, start: str, end: str) -> pd.DataFrame:
sql = """
SELECT ts, source, received_at, pressure_hpa
FROM observations_baro
WHERE site = %s
AND (%s = '' OR ts >= %s::timestamptz)
AND (%s = '' OR ts <= %s::timestamptz)
ORDER BY ts ASC
"""
return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts", "received_at"])
def build_dataset(
ws90: pd.DataFrame,
baro: pd.DataFrame,
rain_event_threshold_mm: float = RAIN_EVENT_THRESHOLD_MM,
) -> pd.DataFrame:
if ws90.empty:
raise RuntimeError("no ws90 observations found")
if baro.empty:
raise RuntimeError("no barometer observations found")
ws90 = ws90.set_index("ts").sort_index()
baro = baro.set_index("ts").sort_index()
ws90_5m = ws90.resample("5min").agg(
{
"temperature_c": "mean",
"humidity": "mean",
"wind_avg_m_s": "mean",
"wind_max_m_s": "max",
"wind_dir_deg": "mean",
"rain_mm": "last",
}
)
baro_5m = baro.resample("5min").agg({"pressure_hpa": "mean"})
df = ws90_5m.join(baro_5m, how="outer")
df["pressure_hpa"] = df["pressure_hpa"].interpolate(limit=3)
df["rain_inc_raw"] = df["rain_mm"].diff()
df["rain_reset"] = df["rain_inc_raw"] < 0
df["rain_inc"] = df["rain_inc_raw"].clip(lower=0)
df["rain_spike_5m"] = df["rain_inc"] >= RAIN_SPIKE_THRESHOLD_MM_5M
window = RAIN_HORIZON_BUCKETS
df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1))
df["rain_next_1h"] = df["rain_next_1h_mm"] >= rain_event_threshold_mm
df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window)
return df
def model_frame(df: pd.DataFrame, feature_cols: list[str] | None = None, require_target: bool = True) -> pd.DataFrame:
features = feature_cols or FEATURE_COLUMNS
required = list(features)
if require_target:
required.append("rain_next_1h")
out = df.dropna(subset=required).copy()
return out.sort_index()
def split_time_ordered(df: pd.DataFrame, train_ratio: float = 0.7, val_ratio: float = 0.15) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
if not (0 < train_ratio < 1):
raise ValueError("train_ratio must be between 0 and 1")
if not (0 <= val_ratio < 1):
raise ValueError("val_ratio must be between 0 and 1")
if train_ratio+val_ratio >= 1:
raise ValueError("train_ratio + val_ratio must be < 1")
n = len(df)
if n < 100:
raise RuntimeError("not enough rows after filtering (need >= 100)")
train_end = int(n * train_ratio)
val_end = int(n * (train_ratio + val_ratio))
train_end = min(max(train_end, 1), n - 2)
val_end = min(max(val_end, train_end + 1), n - 1)
train_df = df.iloc[:train_end]
val_df = df.iloc[train_end:val_end]
test_df = df.iloc[val_end:]
if train_df.empty or val_df.empty or test_df.empty:
raise RuntimeError("time split produced empty train/val/test set")
return train_df, val_df, test_df
def evaluate_probs(y_true: np.ndarray, y_prob: np.ndarray, threshold: float) -> dict[str, Any]:
y_pred = (y_prob >= threshold).astype(int)
roc_auc = float("nan")
pr_auc = float("nan")
if len(np.unique(y_true)) > 1:
roc_auc = roc_auc_score(y_true, y_prob)
pr_auc = average_precision_score(y_true, y_prob)
cm = confusion_matrix(y_true, y_pred, labels=[0, 1])
metrics = {
"rows": int(len(y_true)),
"positive_rate": float(np.mean(y_true)),
"threshold": float(threshold),
"accuracy": accuracy_score(y_true, y_pred),
"precision": precision_score(y_true, y_pred, zero_division=0),
"recall": recall_score(y_true, y_pred, zero_division=0),
"f1": f1_score(y_true, y_pred, zero_division=0),
"roc_auc": roc_auc,
"pr_auc": pr_auc,
"brier": brier_score_loss(y_true, y_prob),
"confusion_matrix": cm.tolist(),
}
return to_builtin(metrics)
def select_threshold(y_true: np.ndarray, y_prob: np.ndarray, min_precision: float = 0.7) -> tuple[float, dict[str, Any]]:
thresholds = np.linspace(0.05, 0.95, 91)
best: dict[str, Any] | None = None
constrained_best: dict[str, Any] | None = None
for threshold in thresholds:
y_pred = (y_prob >= threshold).astype(int)
precision = precision_score(y_true, y_pred, zero_division=0)
recall = recall_score(y_true, y_pred, zero_division=0)
f1 = f1_score(y_true, y_pred, zero_division=0)
candidate = {
"threshold": float(threshold),
"precision": float(precision),
"recall": float(recall),
"f1": float(f1),
}
if best is None or candidate["f1"] > best["f1"]:
best = candidate
if precision >= min_precision:
if constrained_best is None:
constrained_best = candidate
elif candidate["recall"] > constrained_best["recall"]:
constrained_best = candidate
elif candidate["recall"] == constrained_best["recall"] and candidate["f1"] > constrained_best["f1"]:
constrained_best = candidate
if constrained_best is not None:
constrained_best["selection_rule"] = f"max_recall_where_precision>={min_precision:.2f}"
return float(constrained_best["threshold"]), constrained_best
assert best is not None
best["selection_rule"] = "fallback_max_f1"
return float(best["threshold"]), best
def to_builtin(v: Any) -> Any:
if isinstance(v, dict):
return {k: to_builtin(val) for k, val in v.items()}
if isinstance(v, list):
return [to_builtin(i) for i in v]
if isinstance(v, tuple):
return [to_builtin(i) for i in v]
if isinstance(v, np.integer):
return int(v)
if isinstance(v, np.floating):
out = float(v)
if np.isnan(out):
return None
return out
if isinstance(v, pd.Timestamp):
return v.isoformat()
if isinstance(v, datetime):
return v.isoformat()
return v

View File

@@ -0,0 +1,43 @@
#!/usr/bin/env bash
set -euo pipefail
SITE="${SITE:-home}"
START="${START:-2026-02-01T00:00:00Z}"
END="${END:-2026-03-03T23:55:00Z}"
MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v1}"
MODEL_PATH="${MODEL_PATH:-models/rain_model.pkl}"
REPORT_PATH="${REPORT_PATH:-models/rain_model_report.json}"
AUDIT_PATH="${AUDIT_PATH:-models/rain_data_audit.json}"
if [[ -z "${DATABASE_URL:-}" ]]; then
echo "DATABASE_URL is required"
echo "example: export DATABASE_URL='postgres://postgres:postgres@localhost:5432/micrometeo?sslmode=disable'"
exit 1
fi
echo "Running rain data audit..."
python scripts/audit_rain_data.py \
--site "$SITE" \
--start "$START" \
--end "$END" \
--out "$AUDIT_PATH"
echo "Training baseline rain model..."
python scripts/train_rain_model.py \
--site "$SITE" \
--start "$START" \
--end "$END" \
--train-ratio 0.7 \
--val-ratio 0.15 \
--min-precision 0.70 \
--model-version "$MODEL_VERSION" \
--out "$MODEL_PATH" \
--report-out "$REPORT_PATH"
echo "Writing current prediction..."
python scripts/predict_rain_model.py \
--site "$SITE" \
--model-path "$MODEL_PATH" \
--model-name "rain_next_1h"
echo "P0 rain workflow complete."

View File

@@ -1,16 +1,29 @@
#!/usr/bin/env python3 #!/usr/bin/env python3
import argparse import argparse
import json
import os import os
from datetime import datetime from datetime import datetime, timezone
import numpy as np import numpy as np
import pandas as pd
import psycopg2 import psycopg2
from sklearn.linear_model import LogisticRegression from sklearn.linear_model import LogisticRegression
from sklearn.metrics import accuracy_score, confusion_matrix, precision_score, recall_score, roc_auc_score
from sklearn.pipeline import Pipeline from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler from sklearn.preprocessing import StandardScaler
from rain_model_common import (
FEATURE_COLUMNS,
RAIN_EVENT_THRESHOLD_MM,
build_dataset,
evaluate_probs,
fetch_baro,
fetch_ws90,
model_frame,
parse_time,
select_threshold,
split_time_ordered,
to_builtin,
)
try: try:
import joblib import joblib
except ImportError: # pragma: no cover - optional dependency except ImportError: # pragma: no cover - optional dependency
@@ -18,128 +31,42 @@ except ImportError: # pragma: no cover - optional dependency
def parse_args() -> argparse.Namespace: def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Train a simple rain prediction model (next 1h >= 0.2mm).") parser = argparse.ArgumentParser(description="Train a rain prediction model (next 1h >= 0.2mm).")
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.") parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name (e.g. home).") parser.add_argument("--site", required=True, help="Site name (e.g. home).")
parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).") parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).")
parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).") parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).")
parser.add_argument("--train-ratio", type=float, default=0.7, help="Time-ordered train split ratio.")
parser.add_argument("--val-ratio", type=float, default=0.15, help="Time-ordered validation split ratio.")
parser.add_argument(
"--min-precision",
type=float,
default=0.7,
help="Minimum validation precision for threshold selection.",
)
parser.add_argument("--threshold", type=float, help="Optional fixed classification threshold.")
parser.add_argument("--min-rows", type=int, default=200, help="Minimum model-ready rows required.")
parser.add_argument("--out", default="models/rain_model.pkl", help="Path to save model.") parser.add_argument("--out", default="models/rain_model.pkl", help="Path to save model.")
parser.add_argument(
"--report-out",
default="models/rain_model_report.json",
help="Path to save JSON training report.",
)
parser.add_argument(
"--model-version",
default="rain-logreg-v1",
help="Version label stored in artifact metadata.",
)
return parser.parse_args() return parser.parse_args()
def parse_time(value: str) -> str: def make_model() -> Pipeline:
if not value: return Pipeline(
return ""
try:
datetime.fromisoformat(value.replace("Z", "+00:00"))
return value
except ValueError:
raise ValueError(f"invalid time format: {value}")
def fetch_ws90(conn, site, start, end):
sql = """
SELECT ts, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, rain_mm
FROM observations_ws90
WHERE site = %s
AND (%s = '' OR ts >= %s::timestamptz)
AND (%s = '' OR ts <= %s::timestamptz)
ORDER BY ts ASC
"""
return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts"])
def fetch_baro(conn, site, start, end):
sql = """
SELECT ts, pressure_hpa
FROM observations_baro
WHERE site = %s
AND (%s = '' OR ts >= %s::timestamptz)
AND (%s = '' OR ts <= %s::timestamptz)
ORDER BY ts ASC
"""
return pd.read_sql_query(sql, conn, params=(site, start, start, end, end), parse_dates=["ts"])
def build_dataset(ws90: pd.DataFrame, baro: pd.DataFrame) -> pd.DataFrame:
if ws90.empty:
raise RuntimeError("no ws90 observations found")
if baro.empty:
raise RuntimeError("no barometer observations found")
ws90 = ws90.set_index("ts").sort_index()
baro = baro.set_index("ts").sort_index()
ws90_5m = ws90.resample("5min").agg(
{
"temperature_c": "mean",
"humidity": "mean",
"wind_avg_m_s": "mean",
"wind_max_m_s": "max",
"wind_dir_deg": "mean",
"rain_mm": "last",
}
)
baro_5m = baro.resample("5min").mean()
df = ws90_5m.join(baro_5m, how="outer")
df["pressure_hpa"] = df["pressure_hpa"].interpolate(limit=3)
# Compute incremental rain and future 1-hour sum.
df["rain_inc"] = df["rain_mm"].diff().clip(lower=0)
window = 12 # 12 * 5min = 1 hour
df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1))
df["rain_next_1h"] = df["rain_next_1h_mm"] >= 0.2
# Pressure trend over the previous hour.
df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(12)
return df
def train_model(df: pd.DataFrame):
feature_cols = [
"pressure_trend_1h",
"humidity",
"temperature_c",
"wind_avg_m_s",
"wind_max_m_s",
]
df = df.dropna(subset=feature_cols + ["rain_next_1h"])
if len(df) < 200:
raise RuntimeError("not enough data after filtering (need >= 200 rows)")
X = df[feature_cols]
y = df["rain_next_1h"].astype(int)
split_idx = int(len(df) * 0.8)
X_train, X_test = X.iloc[:split_idx], X.iloc[split_idx:]
y_train, y_test = y.iloc[:split_idx], y.iloc[split_idx:]
model = Pipeline(
[ [
("scaler", StandardScaler()), ("scaler", StandardScaler()),
("clf", LogisticRegression(max_iter=1000, class_weight="balanced")), ("clf", LogisticRegression(max_iter=1000, class_weight="balanced")),
] ]
) )
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
y_prob = model.predict_proba(X_test)[:, 1]
metrics = {
"rows": len(df),
"train_rows": len(X_train),
"test_rows": len(X_test),
"accuracy": accuracy_score(y_test, y_pred),
"precision": precision_score(y_test, y_pred, zero_division=0),
"recall": recall_score(y_test, y_pred, zero_division=0),
"roc_auc": roc_auc_score(y_test, y_prob),
"confusion_matrix": confusion_matrix(y_test, y_pred).tolist(),
}
return model, metrics, feature_cols
def main() -> int: def main() -> int:
@@ -154,12 +81,124 @@ def main() -> int:
ws90 = fetch_ws90(conn, args.site, start, end) ws90 = fetch_ws90(conn, args.site, start, end)
baro = fetch_baro(conn, args.site, start, end) baro = fetch_baro(conn, args.site, start, end)
df = build_dataset(ws90, baro) full_df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
model, metrics, features = train_model(df) model_df = model_frame(full_df, FEATURE_COLUMNS, require_target=True)
if len(model_df) < args.min_rows:
raise RuntimeError(f"not enough model-ready rows after filtering (need >= {args.min_rows})")
print("Rain model metrics:") train_df, val_df, test_df = split_time_ordered(
for k, v in metrics.items(): model_df,
print(f" {k}: {v}") train_ratio=args.train_ratio,
val_ratio=args.val_ratio,
)
x_train = train_df[FEATURE_COLUMNS]
y_train = train_df["rain_next_1h"].astype(int).to_numpy()
x_val = val_df[FEATURE_COLUMNS]
y_val = val_df["rain_next_1h"].astype(int).to_numpy()
x_test = test_df[FEATURE_COLUMNS]
y_test = test_df["rain_next_1h"].astype(int).to_numpy()
base_model = make_model()
base_model.fit(x_train, y_train)
y_val_prob = base_model.predict_proba(x_val)[:, 1]
if args.threshold is not None:
chosen_threshold = args.threshold
threshold_info = {
"selection_rule": "fixed_cli_threshold",
"threshold": float(args.threshold),
}
else:
chosen_threshold, threshold_info = select_threshold(
y_true=y_val,
y_prob=y_val_prob,
min_precision=args.min_precision,
)
val_metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=chosen_threshold)
train_val_df = model_df.iloc[: len(train_df) + len(val_df)]
x_train_val = train_val_df[FEATURE_COLUMNS]
y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy()
final_model = make_model()
final_model.fit(x_train_val, y_train_val)
y_test_prob = final_model.predict_proba(x_test)[:, 1]
test_metrics = evaluate_probs(y_true=y_test, y_prob=y_test_prob, threshold=chosen_threshold)
report = {
"generated_at": datetime.now(timezone.utc).isoformat(),
"site": args.site,
"model_version": args.model_version,
"target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"feature_columns": FEATURE_COLUMNS,
"data_window": {
"requested_start": start or None,
"requested_end": end or None,
"actual_start": model_df.index.min(),
"actual_end": model_df.index.max(),
"model_rows": len(model_df),
"ws90_rows": len(ws90),
"baro_rows": len(baro),
},
"label_quality": {
"rain_reset_count": int(np.nansum(full_df["rain_reset"].fillna(False).to_numpy(dtype=int))),
"rain_spike_5m_count": int(np.nansum(full_df["rain_spike_5m"].fillna(False).to_numpy(dtype=int))),
},
"split": {
"train_ratio": args.train_ratio,
"val_ratio": args.val_ratio,
"train_rows": len(train_df),
"val_rows": len(val_df),
"test_rows": len(test_df),
"train_start": train_df.index.min(),
"train_end": train_df.index.max(),
"val_start": val_df.index.min(),
"val_end": val_df.index.max(),
"test_start": test_df.index.min(),
"test_end": test_df.index.max(),
},
"threshold_selection": {
**threshold_info,
"min_precision_constraint": args.min_precision,
},
"validation_metrics": val_metrics,
"test_metrics": test_metrics,
}
report = to_builtin(report)
print("Rain model training summary:")
print(f" site: {args.site}")
print(f" model_version: {args.model_version}")
print(f" rows: total={report['data_window']['model_rows']} train={report['split']['train_rows']} val={report['split']['val_rows']} test={report['split']['test_rows']}")
print(
" threshold: "
f"{report['threshold_selection']['threshold']:.3f} "
f"({report['threshold_selection']['selection_rule']})"
)
print(
" val metrics: "
f"precision={report['validation_metrics']['precision']:.3f} "
f"recall={report['validation_metrics']['recall']:.3f} "
f"roc_auc={report['validation_metrics']['roc_auc'] if report['validation_metrics']['roc_auc'] is not None else 'n/a'} "
f"pr_auc={report['validation_metrics']['pr_auc'] if report['validation_metrics']['pr_auc'] is not None else 'n/a'}"
)
print(
" test metrics: "
f"precision={report['test_metrics']['precision']:.3f} "
f"recall={report['test_metrics']['recall']:.3f} "
f"roc_auc={report['test_metrics']['roc_auc'] if report['test_metrics']['roc_auc'] is not None else 'n/a'} "
f"pr_auc={report['test_metrics']['pr_auc'] if report['test_metrics']['pr_auc'] is not None else 'n/a'}"
)
if args.report_out:
report_dir = os.path.dirname(args.report_out)
if report_dir:
os.makedirs(report_dir, exist_ok=True)
with open(args.report_out, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2)
print(f"Saved report to {args.report_out}")
if args.out: if args.out:
out_dir = os.path.dirname(args.out) out_dir = os.path.dirname(args.out)
@@ -168,7 +207,17 @@ def main() -> int:
if joblib is None: if joblib is None:
print("joblib not installed; skipping model save.") print("joblib not installed; skipping model save.")
else: else:
joblib.dump({"model": model, "features": features}, args.out) artifact = {
"model": final_model,
"features": FEATURE_COLUMNS,
"threshold": float(chosen_threshold),
"target_mm": float(RAIN_EVENT_THRESHOLD_MM),
"model_version": args.model_version,
"trained_at": datetime.now(timezone.utc).isoformat(),
"split": report["split"],
"threshold_selection": report["threshold_selection"],
}
joblib.dump(artifact, args.out)
print(f"Saved model to {args.out}") print(f"Saved model to {args.out}")
return 0 return 0

57
todo.md Normal file
View File

@@ -0,0 +1,57 @@
# Predictive Model TODO
Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimization.
## 1) Scope and Success Criteria
- [x] [P0] Lock v1 target: predict `rain_next_1h >= 0.2mm`.
- [x] [P0] Define the decision use-case (alerts vs dashboard signal).
- [x] [P0] Set acceptance metrics and thresholds (precision, recall, ROC-AUC).
- [x] [P0] Freeze training window with explicit UTC start/end timestamps.
## 2) Data Quality and Label Validation
- [ ] [P0] Audit `observations_ws90` and `observations_baro` for missingness, gaps, duplicates, and out-of-order rows. (script ready: `scripts/audit_rain_data.py`; run on runtime machine)
- [ ] [P0] Validate rain label construction from `rain_mm` (counter resets, negative deltas, spikes). (script ready: `scripts/audit_rain_data.py`; run on runtime machine)
- [ ] [P0] Measure class balance by week (rain-positive vs rain-negative). (script ready: `scripts/audit_rain_data.py`; run on runtime machine)
- [ ] [P1] Document known data issues and mitigation rules.
## 3) Dataset and Feature Engineering
- [ ] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow.
- [ ] [P1] Add lag/rolling features (means, stddev, deltas) for core sensor inputs.
- [ ] [P1] Encode wind direction properly (cyclical encoding).
- [ ] [P2] Add calendar features (hour-of-day, day-of-week, seasonality proxies).
- [ ] [P1] Join aligned forecast features from `forecast_openmeteo_hourly` (precip prob, cloud cover, wind, pressure).
- [ ] [P1] Persist versioned dataset snapshots for reproducibility.
## 4) Modeling and Validation
- [x] [P0] Keep logistic regression as baseline.
- [ ] [P1] Add at least one tree-based baseline (e.g. gradient boosting).
- [x] [P0] Use strict time-based train/validation/test splits (no random shuffling).
- [ ] [P1] Add walk-forward backtesting across multiple temporal folds.
- [ ] [P1] Tune hyperparameters on validation data only.
- [ ] [P1] Calibrate probabilities (Platt or isotonic) and compare calibration quality.
- [x] [P0] Choose and lock the operating threshold based on use-case costs.
## 5) Evaluation and Reporting
- [x] [P0] Report ROC-AUC, PR-AUC, confusion matrix, precision, recall, and Brier score.
- [ ] [P1] Compare against naive baselines (persistence and simple forecast-threshold rules).
- [ ] [P2] Slice performance by periods/weather regimes (day/night, rainy weeks, etc.).
- [ ] [P1] Produce a short model card (data window, features, metrics, known limitations).
## 6) Packaging and Deployment
- [ ] [P1] Version model artifacts and feature schema together.
- [x] [P0] Implement inference path with feature parity between training and serving.
- [x] [P0] Add prediction storage table for predicted probabilities and realized outcomes.
- [ ] [P1] Expose predictions via API and optionally surface in web dashboard.
- [ ] [P2] Add scheduled retraining with rollback to last-known-good model.
## 7) Monitoring and Operations
- [ ] [P1] Track feature drift and prediction drift over time.
- [ ] [P1] Track calibration drift and realized performance after deployment.
- [ ] [P1] Add alerts for training/inference/data pipeline failures.
- [ ] [P1] Document runbook for train/evaluate/deploy/rollback.
## 8) Immediate Next Steps (This Week)
- [ ] [P0] Run first full data audit and label-quality checks. (blocked here; run on runtime machine)
- [ ] [P0] Train baseline model on full available history and capture metrics. (blocked here; run on runtime machine)
- [ ] [P1] Add one expanded feature set and rerun evaluation.
- [x] [P0] Decide v1 threshold and define deployment interface.