diff --git a/docker-compose.yml b/docker-compose.yml index 3b41ae4..ddafb2f 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -36,6 +36,8 @@ services: RAIN_SITE: "home" RAIN_MODEL_NAME: "rain_next_1h" RAIN_MODEL_VERSION_BASE: "rain-logreg-v1" + RAIN_FEATURE_SET: "baseline" + RAIN_FORECAST_MODEL: "ecmwf" RAIN_LOOKBACK_DAYS: "30" RAIN_TRAIN_INTERVAL_HOURS: "24" RAIN_PREDICT_INTERVAL_MINUTES: "10" @@ -43,6 +45,7 @@ services: RAIN_MODEL_PATH: "/app/models/rain_model.pkl" RAIN_REPORT_PATH: "/app/models/rain_model_report.json" RAIN_AUDIT_PATH: "/app/models/rain_data_audit.json" + RAIN_DATASET_PATH: "/app/models/datasets/rain_dataset_{model_version}_{feature_set}.csv" volumes: - ./models:/app/models diff --git a/docs/rain_prediction.md b/docs/rain_prediction.md index 8d3d72a..23749d0 100644 --- a/docs/rain_prediction.md +++ b/docs/rain_prediction.md @@ -42,6 +42,11 @@ pip install -r scripts/requirements.txt `predictions_rain_1h`. - `scripts/run_rain_ml_worker.py`: long-running worker for periodic training + prediction. +Feature-set options: +- `baseline`: original 5 local observation features. +- `extended`: adds wind-direction encoding, lag/rolling stats, recent rain accumulation, + and aligned forecast features from `forecast_openmeteo_hourly`. + ## Usage ### 1) Apply schema update (existing DBs) `001_schema.sql` now includes `predictions_rain_1h`. @@ -60,6 +65,7 @@ python scripts/audit_rain_data.py \ --site home \ --start "2026-02-01T00:00:00Z" \ --end "2026-03-03T23:55:00Z" \ + --feature-set "baseline" \ --out "models/rain_data_audit.json" ``` @@ -72,9 +78,25 @@ python scripts/train_rain_model.py \ --train-ratio 0.7 \ --val-ratio 0.15 \ --min-precision 0.70 \ + --feature-set "baseline" \ --model-version "rain-logreg-v1" \ --out "models/rain_model.pkl" \ - --report-out "models/rain_model_report.json" + --report-out "models/rain_model_report.json" \ + --dataset-out "models/datasets/rain_dataset_{model_version}_{feature_set}.csv" +``` + +### 3b) Train expanded (P1) feature-set model +```sh +python scripts/train_rain_model.py \ + --site "home" \ + --start "2026-02-01T00:00:00Z" \ + --end "2026-03-03T23:55:00Z" \ + --feature-set "extended" \ + --forecast-model "ecmwf" \ + --model-version "rain-logreg-v1-extended" \ + --out "models/rain_model_extended.pkl" \ + --report-out "models/rain_model_report_extended.json" \ + --dataset-out "models/datasets/rain_dataset_{model_version}_{feature_set}.csv" ``` ### 4) Run inference and store prediction @@ -107,16 +129,28 @@ docker compose logs -f rainml - Audit report: `models/rain_data_audit.json` - Training report: `models/rain_model_report.json` - Model artifact: `models/rain_model.pkl` +- Dataset snapshot: `models/datasets/rain_dataset__.csv` - Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized outcome fields once available) -## Model Features (v1) +## Model Features (v1 baseline) - `pressure_trend_1h` - `humidity` - `temperature_c` - `wind_avg_m_s` - `wind_max_m_s` +## Model Features (extended set) +- baseline features, plus: +- `wind_dir_sin`, `wind_dir_cos` +- `temp_lag_5m`, `temp_roll_1h_mean`, `temp_roll_1h_std` +- `humidity_lag_5m`, `humidity_roll_1h_mean`, `humidity_roll_1h_std` +- `wind_avg_lag_5m`, `wind_avg_roll_1h_mean`, `wind_gust_roll_1h_max` +- `pressure_lag_5m`, `pressure_roll_1h_mean`, `pressure_roll_1h_std` +- `rain_last_1h_mm` +- `fc_temp_c`, `fc_rh`, `fc_pressure_msl_hpa`, `fc_wind_m_s`, `fc_wind_gust_m_s`, + `fc_precip_mm`, `fc_precip_prob`, `fc_cloud_cover` + ## Notes - Data is resampled into 5-minute buckets. - Label is derived from incremental rain from WS90 cumulative `rain_mm`. diff --git a/scripts/audit_rain_data.py b/scripts/audit_rain_data.py index 0ea33fa..6dd60b6 100644 --- a/scripts/audit_rain_data.py +++ b/scripts/audit_rain_data.py @@ -9,10 +9,13 @@ import numpy as np import psycopg2 from rain_model_common import ( - FEATURE_COLUMNS, + AVAILABLE_FEATURE_SETS, RAIN_EVENT_THRESHOLD_MM, build_dataset, + feature_columns_for_set, + feature_columns_need_forecast, fetch_baro, + fetch_forecast, fetch_ws90, model_frame, parse_time, @@ -26,6 +29,17 @@ def parse_args() -> argparse.Namespace: parser.add_argument("--site", required=True, help="Site name (e.g. home).") parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).") parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).") + parser.add_argument( + "--feature-set", + default="baseline", + choices=AVAILABLE_FEATURE_SETS, + help="Named feature set used for model-readiness auditing.", + ) + parser.add_argument( + "--forecast-model", + default="ecmwf", + help="Forecast model name when feature set requires forecast columns.", + ) parser.add_argument("--out", default="models/rain_data_audit.json", help="Path to save JSON audit report.") return parser.parse_args() @@ -65,13 +79,18 @@ def main() -> int: start = parse_time(args.start) if args.start else "" end = parse_time(args.end) if args.end else "" + feature_cols = feature_columns_for_set(args.feature_set) + needs_forecast = feature_columns_need_forecast(feature_cols) with psycopg2.connect(args.db_url) as conn: ws90 = fetch_ws90(conn, args.site, start, end) baro = fetch_baro(conn, args.site, start, end) + forecast = None + if needs_forecast: + forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model) - df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) - model_df = model_frame(df, FEATURE_COLUMNS, require_target=True) + df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) + model_df = model_frame(df, feature_cols, require_target=True) ws90_dupes = int(ws90.duplicated(subset=["ts", "station_id"]).sum()) if not ws90.empty else 0 baro_dupes = int(baro.duplicated(subset=["ts", "source"]).sum()) if not baro.empty else 0 @@ -95,7 +114,7 @@ def main() -> int: baro_max_gap_min = longest_zero_run(np.array(baro_counts)) * 5 if len(baro_counts) else 0 missingness = {} - for col in FEATURE_COLUMNS + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]: + for col in feature_cols + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]: if col in df.columns: missingness[col] = float(df[col].isna().mean()) @@ -105,6 +124,9 @@ def main() -> int: report = { "site": args.site, + "feature_set": args.feature_set, + "feature_columns": feature_cols, + "forecast_model": args.forecast_model if needs_forecast else None, "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", "requested_window": { "start": start or None, @@ -121,6 +143,7 @@ def main() -> int: "row_counts": { "ws90_rows": int(len(ws90)), "baro_rows": int(len(baro)), + "forecast_rows": int(len(forecast)) if forecast is not None else 0, "model_rows": int(len(model_df)), }, "duplicates": { @@ -152,10 +175,12 @@ def main() -> int: print("Rain data audit summary:") print(f" site: {report['site']}") + print(f" feature_set: {report['feature_set']}") print( " rows: " f"ws90={report['row_counts']['ws90_rows']} " f"baro={report['row_counts']['baro_rows']} " + f"forecast={report['row_counts']['forecast_rows']} " f"model={report['row_counts']['model_rows']}" ) print( diff --git a/scripts/predict_rain_model.py b/scripts/predict_rain_model.py index ff9adbb..63cb9ab 100644 --- a/scripts/predict_rain_model.py +++ b/scripts/predict_rain_model.py @@ -8,7 +8,16 @@ from datetime import datetime, timedelta, timezone import psycopg2 from psycopg2.extras import Json -from rain_model_common import build_dataset, fetch_baro, fetch_ws90, model_frame, parse_time, to_builtin +from rain_model_common import ( + build_dataset, + feature_columns_need_forecast, + fetch_baro, + fetch_forecast, + fetch_ws90, + model_frame, + parse_time, + to_builtin, +) try: import joblib @@ -33,6 +42,11 @@ def parse_args() -> argparse.Namespace: default=6, help="History lookback window used to build features.", ) + parser.add_argument( + "--forecast-model", + default="ecmwf", + help="Forecast model name when inference features require forecast columns.", + ) parser.add_argument("--dry-run", action="store_true", help="Do not write prediction to DB.") return parser.parse_args() @@ -65,9 +79,12 @@ def main() -> int: at = parse_at(args.at) artifact = load_artifact(args.model_path) model = artifact["model"] - features = artifact["features"] + features = list(artifact["features"]) + feature_set = artifact.get("feature_set") + needs_forecast = feature_columns_need_forecast(features) threshold = float(artifact.get("threshold", 0.5)) model_version = args.model_version or artifact.get("model_version") or "unknown" + forecast_model = str(artifact.get("forecast_model") or args.forecast_model) fetch_start = (at - timedelta(hours=args.history_hours)).isoformat() fetch_end = (at + timedelta(hours=1, minutes=5)).isoformat() @@ -75,8 +92,11 @@ def main() -> int: with psycopg2.connect(args.db_url) as conn: ws90 = fetch_ws90(conn, args.site, fetch_start, fetch_end) baro = fetch_baro(conn, args.site, fetch_start, fetch_end) + forecast = None + if needs_forecast: + forecast = fetch_forecast(conn, args.site, fetch_start, fetch_end, model=forecast_model) - full_df = build_dataset(ws90, baro) + full_df = build_dataset(ws90, baro, forecast=forecast) feature_df = model_frame(full_df, feature_cols=features, require_target=False) candidates = feature_df.loc[feature_df.index <= at] if candidates.empty: @@ -105,6 +125,9 @@ def main() -> int: metadata = { "artifact_path": args.model_path, "artifact_model_version": artifact.get("model_version"), + "artifact_feature_set": feature_set, + "forecast_model": forecast_model if needs_forecast else None, + "needs_forecast_features": needs_forecast, "feature_values": {col: float(row.iloc[0][col]) for col in features}, "source_window_start": fetch_start, "source_window_end": fetch_end, @@ -117,6 +140,8 @@ def main() -> int: print(f" site: {args.site}") print(f" model_name: {args.model_name}") print(f" model_version: {model_version}") + if feature_set: + print(f" feature_set: {feature_set}") print(f" pred_ts: {pred_ts.isoformat()}") print(f" threshold: {threshold:.3f}") print(f" probability: {probability:.4f}") diff --git a/scripts/rain_model_common.py b/scripts/rain_model_common.py index 87fe00b..645d979 100644 --- a/scripts/rain_model_common.py +++ b/scripts/rain_model_common.py @@ -17,7 +17,7 @@ from sklearn.metrics import ( roc_auc_score, ) -FEATURE_COLUMNS = [ +BASELINE_FEATURE_COLUMNS = [ "pressure_trend_1h", "humidity", "temperature_c", @@ -25,6 +25,49 @@ FEATURE_COLUMNS = [ "wind_max_m_s", ] +FORECAST_FEATURE_COLUMNS = [ + "fc_temp_c", + "fc_rh", + "fc_pressure_msl_hpa", + "fc_wind_m_s", + "fc_wind_gust_m_s", + "fc_precip_mm", + "fc_precip_prob", + "fc_cloud_cover", +] + +EXTENDED_FEATURE_COLUMNS = [ + "pressure_trend_1h", + "temperature_c", + "humidity", + "wind_avg_m_s", + "wind_max_m_s", + "wind_dir_sin", + "wind_dir_cos", + "temp_lag_5m", + "temp_roll_1h_mean", + "temp_roll_1h_std", + "humidity_lag_5m", + "humidity_roll_1h_mean", + "humidity_roll_1h_std", + "wind_avg_lag_5m", + "wind_avg_roll_1h_mean", + "wind_gust_roll_1h_max", + "pressure_lag_5m", + "pressure_roll_1h_mean", + "pressure_roll_1h_std", + "rain_last_1h_mm", + *FORECAST_FEATURE_COLUMNS, +] + +FEATURE_SETS: dict[str, list[str]] = { + "baseline": BASELINE_FEATURE_COLUMNS, + "extended": EXTENDED_FEATURE_COLUMNS, +} + +AVAILABLE_FEATURE_SETS = tuple(sorted(FEATURE_SETS.keys())) +FEATURE_COLUMNS = BASELINE_FEATURE_COLUMNS + RAIN_EVENT_THRESHOLD_MM = 0.2 RAIN_SPIKE_THRESHOLD_MM_5M = 5.0 RAIN_HORIZON_BUCKETS = 12 # 12 * 5m = 1h @@ -40,6 +83,34 @@ def parse_time(value: str) -> str: raise ValueError(f"invalid time format: {value}") from exc +def feature_columns_for_set(feature_set: str) -> list[str]: + out = FEATURE_SETS.get(feature_set.lower()) + if out is None: + raise ValueError(f"unknown feature set: {feature_set}") + return list(out) + + +def feature_columns_need_forecast(feature_cols: list[str]) -> bool: + return any(col in FORECAST_FEATURE_COLUMNS for col in feature_cols) + + +def feature_set_needs_forecast(feature_set: str) -> bool: + return feature_columns_need_forecast(feature_columns_for_set(feature_set)) + + +def _fetch_df(conn, sql: str, params: tuple[Any, ...], parse_dt_cols: list[str]) -> pd.DataFrame: + with conn.cursor() as cur: + cur.execute(sql, params) + rows = cur.fetchall() + cols = [d.name for d in cur.description] + + df = pd.DataFrame.from_records(rows, columns=cols) + if not df.empty: + for col in parse_dt_cols: + df[col] = pd.to_datetime(df[col], utc=True) + return df + + def fetch_ws90(conn, site: str, start: str, end: str) -> pd.DataFrame: sql = """ SELECT ts, station_id, received_at, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, rain_mm @@ -49,16 +120,7 @@ def fetch_ws90(conn, site: str, start: str, end: str) -> pd.DataFrame: AND (%s = '' OR ts <= %s::timestamptz) ORDER BY ts ASC """ - with conn.cursor() as cur: - cur.execute(sql, (site, start, start, end, end)) - rows = cur.fetchall() - cols = [d.name for d in cur.description] - - df = pd.DataFrame.from_records(rows, columns=cols) - if not df.empty: - df["ts"] = pd.to_datetime(df["ts"], utc=True) - df["received_at"] = pd.to_datetime(df["received_at"], utc=True) - return df + return _fetch_df(conn, sql, (site, start, start, end, end), ["ts", "received_at"]) def fetch_baro(conn, site: str, start: str, end: str) -> pd.DataFrame: @@ -70,21 +132,80 @@ def fetch_baro(conn, site: str, start: str, end: str) -> pd.DataFrame: AND (%s = '' OR ts <= %s::timestamptz) ORDER BY ts ASC """ - with conn.cursor() as cur: - cur.execute(sql, (site, start, start, end, end)) - rows = cur.fetchall() - cols = [d.name for d in cur.description] + return _fetch_df(conn, sql, (site, start, start, end, end), ["ts", "received_at"]) - df = pd.DataFrame.from_records(rows, columns=cols) - if not df.empty: - df["ts"] = pd.to_datetime(df["ts"], utc=True) - df["received_at"] = pd.to_datetime(df["received_at"], utc=True) - return df + +def fetch_forecast(conn, site: str, start: str, end: str, model: str = "ecmwf") -> pd.DataFrame: + sql = """ + SELECT DISTINCT ON (ts) + ts, + retrieved_at, + temp_c, + rh, + pressure_msl_hpa, + wind_m_s, + wind_gust_m_s, + precip_mm, + precip_prob, + cloud_cover + FROM forecast_openmeteo_hourly + WHERE site = %s + AND model = %s + AND (%s = '' OR ts >= %s::timestamptz - INTERVAL '2 hours') + AND (%s = '' OR ts <= %s::timestamptz + INTERVAL '2 hours') + ORDER BY ts ASC, retrieved_at DESC + """ + return _fetch_df(conn, sql, (site, model, start, start, end, end), ["ts", "retrieved_at"]) + + +def _apply_forecast_features(df: pd.DataFrame, forecast: pd.DataFrame | None) -> pd.DataFrame: + out = df.copy() + for col in FORECAST_FEATURE_COLUMNS: + out[col] = np.nan + + if forecast is None or forecast.empty: + return out + + fc = forecast.set_index("ts").sort_index().rename( + columns={ + "temp_c": "fc_temp_c", + "rh": "fc_rh", + "pressure_msl_hpa": "fc_pressure_msl_hpa", + "wind_m_s": "fc_wind_m_s", + "wind_gust_m_s": "fc_wind_gust_m_s", + "precip_mm": "fc_precip_mm", + "precip_prob": "fc_precip_prob", + "cloud_cover": "fc_cloud_cover", + } + ) + + keep = [c for c in FORECAST_FEATURE_COLUMNS if c in fc.columns] + fc = fc[keep] + # Bring hourly forecast onto 5m observation grid. + fc_5m = fc.resample("5min").ffill(limit=12) + out = out.join(fc_5m, how="left", rsuffix="_dup") + + # Prefer joined forecast values and softly fill small gaps. + for col in keep: + dup_col = f"{col}_dup" + if dup_col in out.columns: + out[col] = out[dup_col] + out.drop(columns=[dup_col], inplace=True) + out[col] = out[col].ffill(limit=12).bfill(limit=2) + + # Normalize precip probability to 0..1 if source is 0..100. + if "fc_precip_prob" in out.columns: + mask = out["fc_precip_prob"] > 1.0 + out.loc[mask, "fc_precip_prob"] = out.loc[mask, "fc_precip_prob"] / 100.0 + out["fc_precip_prob"] = out["fc_precip_prob"].clip(lower=0.0, upper=1.0) + + return out def build_dataset( ws90: pd.DataFrame, baro: pd.DataFrame, + forecast: pd.DataFrame | None = None, rain_event_threshold_mm: float = RAIN_EVENT_THRESHOLD_MM, ) -> pd.DataFrame: if ws90.empty: @@ -116,11 +237,33 @@ def build_dataset( df["rain_spike_5m"] = df["rain_inc"] >= RAIN_SPIKE_THRESHOLD_MM_5M window = RAIN_HORIZON_BUCKETS + df["rain_last_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum() df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1)) df["rain_next_1h"] = df["rain_next_1h_mm"] >= rain_event_threshold_mm df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window) + # Wind direction cyclical encoding. + radians = np.deg2rad(df["wind_dir_deg"] % 360.0) + df["wind_dir_sin"] = np.sin(radians) + df["wind_dir_cos"] = np.cos(radians) + + # Lags and rolling features (core sensors). + df["temp_lag_5m"] = df["temperature_c"].shift(1) + df["humidity_lag_5m"] = df["humidity"].shift(1) + df["wind_avg_lag_5m"] = df["wind_avg_m_s"].shift(1) + df["pressure_lag_5m"] = df["pressure_hpa"].shift(1) + + df["temp_roll_1h_mean"] = df["temperature_c"].rolling(window=window, min_periods=3).mean() + df["temp_roll_1h_std"] = df["temperature_c"].rolling(window=window, min_periods=3).std() + df["humidity_roll_1h_mean"] = df["humidity"].rolling(window=window, min_periods=3).mean() + df["humidity_roll_1h_std"] = df["humidity"].rolling(window=window, min_periods=3).std() + df["wind_avg_roll_1h_mean"] = df["wind_avg_m_s"].rolling(window=window, min_periods=3).mean() + df["wind_gust_roll_1h_max"] = df["wind_max_m_s"].rolling(window=window, min_periods=3).max() + df["pressure_roll_1h_mean"] = df["pressure_hpa"].rolling(window=window, min_periods=3).mean() + df["pressure_roll_1h_std"] = df["pressure_hpa"].rolling(window=window, min_periods=3).std() + + df = _apply_forecast_features(df, forecast) return df @@ -133,12 +276,16 @@ def model_frame(df: pd.DataFrame, feature_cols: list[str] | None = None, require return out.sort_index() -def split_time_ordered(df: pd.DataFrame, train_ratio: float = 0.7, val_ratio: float = 0.15) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: +def split_time_ordered( + df: pd.DataFrame, + train_ratio: float = 0.7, + val_ratio: float = 0.15, +) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]: if not (0 < train_ratio < 1): raise ValueError("train_ratio must be between 0 and 1") if not (0 <= val_ratio < 1): raise ValueError("val_ratio must be between 0 and 1") - if train_ratio+val_ratio >= 1: + if train_ratio + val_ratio >= 1: raise ValueError("train_ratio + val_ratio must be < 1") n = len(df) @@ -223,6 +370,26 @@ def select_threshold(y_true: np.ndarray, y_prob: np.ndarray, min_precision: floa return float(best["threshold"]), best +def safe_pr_auc(v: dict[str, Any]) -> float: + value = v.get("pr_auc") + if value is None: + return float("-inf") + out = float(value) + if np.isnan(out): + return float("-inf") + return out + + +def safe_roc_auc(v: dict[str, Any]) -> float: + value = v.get("roc_auc") + if value is None: + return float("-inf") + out = float(value) + if np.isnan(out): + return float("-inf") + return out + + def to_builtin(v: Any) -> Any: if isinstance(v, dict): return {k: to_builtin(val) for k, val in v.items()} diff --git a/scripts/run_p0_rain_workflow.sh b/scripts/run_p0_rain_workflow.sh index 8fb9bb7..46e8482 100644 --- a/scripts/run_p0_rain_workflow.sh +++ b/scripts/run_p0_rain_workflow.sh @@ -8,6 +8,9 @@ MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v1}" MODEL_PATH="${MODEL_PATH:-models/rain_model.pkl}" REPORT_PATH="${REPORT_PATH:-models/rain_model_report.json}" AUDIT_PATH="${AUDIT_PATH:-models/rain_data_audit.json}" +FEATURE_SET="${FEATURE_SET:-baseline}" +FORECAST_MODEL="${FORECAST_MODEL:-ecmwf}" +DATASET_PATH="${DATASET_PATH:-models/datasets/rain_dataset_${MODEL_VERSION}_${FEATURE_SET}.csv}" if [[ -z "${DATABASE_URL:-}" ]]; then echo "DATABASE_URL is required" @@ -20,6 +23,8 @@ python scripts/audit_rain_data.py \ --site "$SITE" \ --start "$START" \ --end "$END" \ + --feature-set "$FEATURE_SET" \ + --forecast-model "$FORECAST_MODEL" \ --out "$AUDIT_PATH" echo "Training baseline rain model..." @@ -30,14 +35,18 @@ python scripts/train_rain_model.py \ --train-ratio 0.7 \ --val-ratio 0.15 \ --min-precision 0.70 \ + --feature-set "$FEATURE_SET" \ + --forecast-model "$FORECAST_MODEL" \ --model-version "$MODEL_VERSION" \ --out "$MODEL_PATH" \ - --report-out "$REPORT_PATH" + --report-out "$REPORT_PATH" \ + --dataset-out "$DATASET_PATH" echo "Writing current prediction..." python scripts/predict_rain_model.py \ --site "$SITE" \ --model-path "$MODEL_PATH" \ - --model-name "rain_next_1h" + --model-name "rain_next_1h" \ + --forecast-model "$FORECAST_MODEL" echo "P0 rain workflow complete." diff --git a/scripts/run_rain_ml_worker.py b/scripts/run_rain_ml_worker.py index 4612bc0..18a19fc 100644 --- a/scripts/run_rain_ml_worker.py +++ b/scripts/run_rain_ml_worker.py @@ -41,12 +41,15 @@ class WorkerConfig: site: str model_name: str model_version_base: str + feature_set: str + forecast_model: str train_interval_hours: float predict_interval_minutes: float lookback_days: int train_ratio: float val_ratio: float min_precision: float + dataset_path_template: str model_path: Path report_path: Path audit_path: Path @@ -81,10 +84,13 @@ def training_window(lookback_days: int) -> tuple[str, str]: def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: start, end = training_window(cfg.lookback_days) model_version = f"{cfg.model_version_base}-{now_utc().strftime('%Y%m%d%H%M')}" + dataset_out = cfg.dataset_path_template.format(model_version=model_version, feature_set=cfg.feature_set) ensure_parent(cfg.audit_path) ensure_parent(cfg.report_path) ensure_parent(cfg.model_path) + if dataset_out: + ensure_parent(Path(dataset_out)) run_cmd( [ @@ -96,6 +102,10 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: start, "--end", end, + "--feature-set", + cfg.feature_set, + "--forecast-model", + cfg.forecast_model, "--out", str(cfg.audit_path), ], @@ -118,12 +128,18 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None: str(cfg.val_ratio), "--min-precision", str(cfg.min_precision), + "--feature-set", + cfg.feature_set, + "--forecast-model", + cfg.forecast_model, "--model-version", model_version, "--out", str(cfg.model_path), "--report-out", str(cfg.report_path), + "--dataset-out", + dataset_out, ], env, ) @@ -143,6 +159,8 @@ def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None: str(cfg.model_path), "--model-name", cfg.model_name, + "--forecast-model", + cfg.forecast_model, ], env, ) @@ -158,12 +176,18 @@ def load_config() -> WorkerConfig: site=read_env("RAIN_SITE", "home"), model_name=read_env("RAIN_MODEL_NAME", "rain_next_1h"), model_version_base=read_env("RAIN_MODEL_VERSION_BASE", "rain-logreg-v1"), + feature_set=read_env("RAIN_FEATURE_SET", "baseline"), + forecast_model=read_env("RAIN_FORECAST_MODEL", "ecmwf"), train_interval_hours=read_env_float("RAIN_TRAIN_INTERVAL_HOURS", 24.0), predict_interval_minutes=read_env_float("RAIN_PREDICT_INTERVAL_MINUTES", 10.0), lookback_days=read_env_int("RAIN_LOOKBACK_DAYS", 30), train_ratio=read_env_float("RAIN_TRAIN_RATIO", 0.7), val_ratio=read_env_float("RAIN_VAL_RATIO", 0.15), min_precision=read_env_float("RAIN_MIN_PRECISION", 0.70), + dataset_path_template=read_env( + "RAIN_DATASET_PATH", + "models/datasets/rain_dataset_{model_version}_{feature_set}.csv", + ), model_path=Path(read_env("RAIN_MODEL_PATH", "models/rain_model.pkl")), report_path=Path(read_env("RAIN_REPORT_PATH", "models/rain_model_report.json")), audit_path=Path(read_env("RAIN_AUDIT_PATH", "models/rain_data_audit.json")), @@ -188,6 +212,8 @@ def main() -> int: "[rain-ml] worker start " f"site={cfg.site} " f"model_name={cfg.model_name} " + f"feature_set={cfg.feature_set} " + f"forecast_model={cfg.forecast_model} " f"train_interval_hours={cfg.train_interval_hours} " f"predict_interval_minutes={cfg.predict_interval_minutes}", flush=True, diff --git a/scripts/train_rain_model.py b/scripts/train_rain_model.py index 332bbda..7a1bd67 100644 --- a/scripts/train_rain_model.py +++ b/scripts/train_rain_model.py @@ -11,12 +11,15 @@ from sklearn.pipeline import Pipeline from sklearn.preprocessing import StandardScaler from rain_model_common import ( - FEATURE_COLUMNS, + AVAILABLE_FEATURE_SETS, RAIN_EVENT_THRESHOLD_MM, build_dataset, evaluate_probs, fetch_baro, + fetch_forecast, fetch_ws90, + feature_columns_for_set, + feature_columns_need_forecast, model_frame, parse_time, select_threshold, @@ -46,12 +49,31 @@ def parse_args() -> argparse.Namespace: ) parser.add_argument("--threshold", type=float, help="Optional fixed classification threshold.") parser.add_argument("--min-rows", type=int, default=200, help="Minimum model-ready rows required.") + parser.add_argument( + "--feature-set", + default="baseline", + choices=AVAILABLE_FEATURE_SETS, + help="Named feature set to train with.", + ) + parser.add_argument( + "--forecast-model", + default="ecmwf", + help="Forecast model name when feature set requires forecast columns.", + ) parser.add_argument("--out", default="models/rain_model.pkl", help="Path to save model.") parser.add_argument( "--report-out", default="models/rain_model_report.json", help="Path to save JSON training report.", ) + parser.add_argument( + "--dataset-out", + default="models/datasets/rain_dataset_{model_version}_{feature_set}.csv", + help=( + "Path (or template) for model-ready dataset snapshot. " + "Supports {model_version} and {feature_set}. Use empty string to disable." + ), + ) parser.add_argument( "--model-version", default="rain-logreg-v1", @@ -76,13 +98,18 @@ def main() -> int: start = parse_time(args.start) if args.start else "" end = parse_time(args.end) if args.end else "" + feature_cols = feature_columns_for_set(args.feature_set) + needs_forecast = feature_columns_need_forecast(feature_cols) with psycopg2.connect(args.db_url) as conn: ws90 = fetch_ws90(conn, args.site, start, end) baro = fetch_baro(conn, args.site, start, end) + forecast = None + if needs_forecast: + forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model) - full_df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) - model_df = model_frame(full_df, FEATURE_COLUMNS, require_target=True) + full_df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM) + model_df = model_frame(full_df, feature_cols, require_target=True) if len(model_df) < args.min_rows: raise RuntimeError(f"not enough model-ready rows after filtering (need >= {args.min_rows})") @@ -92,11 +119,11 @@ def main() -> int: val_ratio=args.val_ratio, ) - x_train = train_df[FEATURE_COLUMNS] + x_train = train_df[feature_cols] y_train = train_df["rain_next_1h"].astype(int).to_numpy() - x_val = val_df[FEATURE_COLUMNS] + x_val = val_df[feature_cols] y_val = val_df["rain_next_1h"].astype(int).to_numpy() - x_test = test_df[FEATURE_COLUMNS] + x_test = test_df[feature_cols] y_test = test_df["rain_next_1h"].astype(int).to_numpy() base_model = make_model() @@ -119,7 +146,7 @@ def main() -> int: val_metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=chosen_threshold) train_val_df = model_df.iloc[: len(train_df) + len(val_df)] - x_train_val = train_val_df[FEATURE_COLUMNS] + x_train_val = train_val_df[feature_cols] y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy() final_model = make_model() @@ -131,8 +158,10 @@ def main() -> int: "generated_at": datetime.now(timezone.utc).isoformat(), "site": args.site, "model_version": args.model_version, + "feature_set": args.feature_set, "target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}", - "feature_columns": FEATURE_COLUMNS, + "feature_columns": feature_cols, + "forecast_model": args.forecast_model if needs_forecast else None, "data_window": { "requested_start": start or None, "requested_end": end or None, @@ -141,11 +170,13 @@ def main() -> int: "model_rows": len(model_df), "ws90_rows": len(ws90), "baro_rows": len(baro), + "forecast_rows": int(len(forecast)) if forecast is not None else 0, }, "label_quality": { "rain_reset_count": int(np.nansum(full_df["rain_reset"].fillna(False).to_numpy(dtype=int))), "rain_spike_5m_count": int(np.nansum(full_df["rain_spike_5m"].fillna(False).to_numpy(dtype=int))), }, + "feature_missingness_ratio": {col: float(full_df[col].isna().mean()) for col in feature_cols if col in full_df.columns}, "split": { "train_ratio": args.train_ratio, "val_ratio": args.val_ratio, @@ -171,6 +202,7 @@ def main() -> int: print("Rain model training summary:") print(f" site: {args.site}") print(f" model_version: {args.model_version}") + print(f" feature_set: {args.feature_set} ({len(feature_cols)} features)") print(f" rows: total={report['data_window']['model_rows']} train={report['split']['train_rows']} val={report['split']['val_rows']} test={report['split']['test_rows']}") print( " threshold: " @@ -200,6 +232,15 @@ def main() -> int: json.dump(report, f, indent=2) print(f"Saved report to {args.report_out}") + if args.dataset_out: + dataset_out = args.dataset_out.format(model_version=args.model_version, feature_set=args.feature_set) + dataset_dir = os.path.dirname(dataset_out) + if dataset_dir: + os.makedirs(dataset_dir, exist_ok=True) + snapshot_cols = list(dict.fromkeys(feature_cols + ["rain_next_1h", "rain_next_1h_mm"])) + model_df[snapshot_cols].to_csv(dataset_out, index=True, index_label="ts") + print(f"Saved dataset snapshot to {dataset_out}") + if args.out: out_dir = os.path.dirname(args.out) if out_dir: @@ -209,7 +250,9 @@ def main() -> int: else: artifact = { "model": final_model, - "features": FEATURE_COLUMNS, + "features": feature_cols, + "feature_set": args.feature_set, + "forecast_model": args.forecast_model if needs_forecast else None, "threshold": float(chosen_threshold), "target_mm": float(RAIN_EVENT_THRESHOLD_MM), "model_version": args.model_version, diff --git a/todo.md b/todo.md index 5e6fa06..9ccdcd1 100644 --- a/todo.md +++ b/todo.md @@ -15,12 +15,12 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat - [ ] [P1] Document known data issues and mitigation rules. ## 3) Dataset and Feature Engineering -- [ ] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow. -- [ ] [P1] Add lag/rolling features (means, stddev, deltas) for core sensor inputs. -- [ ] [P1] Encode wind direction properly (cyclical encoding). +- [x] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow. +- [x] [P1] Add lag/rolling features (means, stddev, deltas) for core sensor inputs. +- [x] [P1] Encode wind direction properly (cyclical encoding). - [ ] [P2] Add calendar features (hour-of-day, day-of-week, seasonality proxies). -- [ ] [P1] Join aligned forecast features from `forecast_openmeteo_hourly` (precip prob, cloud cover, wind, pressure). -- [ ] [P1] Persist versioned dataset snapshots for reproducibility. +- [x] [P1] Join aligned forecast features from `forecast_openmeteo_hourly` (precip prob, cloud cover, wind, pressure). +- [x] [P1] Persist versioned dataset snapshots for reproducibility. ## 4) Modeling and Validation - [x] [P0] Keep logistic regression as baseline. @@ -53,5 +53,5 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat ## 8) Immediate Next Steps (This Week) - [x] [P0] Run first full data audit and label-quality checks. (completed on runtime machine) - [x] [P0] Train baseline model on full available history and capture metrics. (completed on runtime machine) -- [ ] [P1] Add one expanded feature set and rerun evaluation. +- [ ] [P1] Add one expanded feature set and rerun evaluation. (feature-set plumbing implemented; rerun pending on runtime machine) - [x] [P0] Decide v1 threshold and define deployment interface.