more work on model training

This commit is contained in:
2026-03-05 20:49:19 +11:00
parent 76270e5650
commit 5b8cad905f
9 changed files with 380 additions and 48 deletions

View File

@@ -36,6 +36,8 @@ services:
RAIN_SITE: "home"
RAIN_MODEL_NAME: "rain_next_1h"
RAIN_MODEL_VERSION_BASE: "rain-logreg-v1"
RAIN_FEATURE_SET: "baseline"
RAIN_FORECAST_MODEL: "ecmwf"
RAIN_LOOKBACK_DAYS: "30"
RAIN_TRAIN_INTERVAL_HOURS: "24"
RAIN_PREDICT_INTERVAL_MINUTES: "10"
@@ -43,6 +45,7 @@ services:
RAIN_MODEL_PATH: "/app/models/rain_model.pkl"
RAIN_REPORT_PATH: "/app/models/rain_model_report.json"
RAIN_AUDIT_PATH: "/app/models/rain_data_audit.json"
RAIN_DATASET_PATH: "/app/models/datasets/rain_dataset_{model_version}_{feature_set}.csv"
volumes:
- ./models:/app/models

View File

@@ -42,6 +42,11 @@ pip install -r scripts/requirements.txt
`predictions_rain_1h`.
- `scripts/run_rain_ml_worker.py`: long-running worker for periodic training + prediction.
Feature-set options:
- `baseline`: original 5 local observation features.
- `extended`: adds wind-direction encoding, lag/rolling stats, recent rain accumulation,
and aligned forecast features from `forecast_openmeteo_hourly`.
## Usage
### 1) Apply schema update (existing DBs)
`001_schema.sql` now includes `predictions_rain_1h`.
@@ -60,6 +65,7 @@ python scripts/audit_rain_data.py \
--site home \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
--feature-set "baseline" \
--out "models/rain_data_audit.json"
```
@@ -72,9 +78,25 @@ python scripts/train_rain_model.py \
--train-ratio 0.7 \
--val-ratio 0.15 \
--min-precision 0.70 \
--feature-set "baseline" \
--model-version "rain-logreg-v1" \
--out "models/rain_model.pkl" \
--report-out "models/rain_model_report.json"
--report-out "models/rain_model_report.json" \
--dataset-out "models/datasets/rain_dataset_{model_version}_{feature_set}.csv"
```
### 3b) Train expanded (P1) feature-set model
```sh
python scripts/train_rain_model.py \
--site "home" \
--start "2026-02-01T00:00:00Z" \
--end "2026-03-03T23:55:00Z" \
--feature-set "extended" \
--forecast-model "ecmwf" \
--model-version "rain-logreg-v1-extended" \
--out "models/rain_model_extended.pkl" \
--report-out "models/rain_model_report_extended.json" \
--dataset-out "models/datasets/rain_dataset_{model_version}_{feature_set}.csv"
```
### 4) Run inference and store prediction
@@ -107,16 +129,28 @@ docker compose logs -f rainml
- Audit report: `models/rain_data_audit.json`
- Training report: `models/rain_model_report.json`
- Model artifact: `models/rain_model.pkl`
- Dataset snapshot: `models/datasets/rain_dataset_<model_version>_<feature_set>.csv`
- Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized
outcome fields once available)
## Model Features (v1)
## Model Features (v1 baseline)
- `pressure_trend_1h`
- `humidity`
- `temperature_c`
- `wind_avg_m_s`
- `wind_max_m_s`
## Model Features (extended set)
- baseline features, plus:
- `wind_dir_sin`, `wind_dir_cos`
- `temp_lag_5m`, `temp_roll_1h_mean`, `temp_roll_1h_std`
- `humidity_lag_5m`, `humidity_roll_1h_mean`, `humidity_roll_1h_std`
- `wind_avg_lag_5m`, `wind_avg_roll_1h_mean`, `wind_gust_roll_1h_max`
- `pressure_lag_5m`, `pressure_roll_1h_mean`, `pressure_roll_1h_std`
- `rain_last_1h_mm`
- `fc_temp_c`, `fc_rh`, `fc_pressure_msl_hpa`, `fc_wind_m_s`, `fc_wind_gust_m_s`,
`fc_precip_mm`, `fc_precip_prob`, `fc_cloud_cover`
## Notes
- Data is resampled into 5-minute buckets.
- Label is derived from incremental rain from WS90 cumulative `rain_mm`.

View File

@@ -9,10 +9,13 @@ import numpy as np
import psycopg2
from rain_model_common import (
FEATURE_COLUMNS,
AVAILABLE_FEATURE_SETS,
RAIN_EVENT_THRESHOLD_MM,
build_dataset,
feature_columns_for_set,
feature_columns_need_forecast,
fetch_baro,
fetch_forecast,
fetch_ws90,
model_frame,
parse_time,
@@ -26,6 +29,17 @@ def parse_args() -> argparse.Namespace:
parser.add_argument("--site", required=True, help="Site name (e.g. home).")
parser.add_argument("--start", help="Start time (RFC3339 or YYYY-MM-DD).")
parser.add_argument("--end", help="End time (RFC3339 or YYYY-MM-DD).")
parser.add_argument(
"--feature-set",
default="baseline",
choices=AVAILABLE_FEATURE_SETS,
help="Named feature set used for model-readiness auditing.",
)
parser.add_argument(
"--forecast-model",
default="ecmwf",
help="Forecast model name when feature set requires forecast columns.",
)
parser.add_argument("--out", default="models/rain_data_audit.json", help="Path to save JSON audit report.")
return parser.parse_args()
@@ -65,13 +79,18 @@ def main() -> int:
start = parse_time(args.start) if args.start else ""
end = parse_time(args.end) if args.end else ""
feature_cols = feature_columns_for_set(args.feature_set)
needs_forecast = feature_columns_need_forecast(feature_cols)
with psycopg2.connect(args.db_url) as conn:
ws90 = fetch_ws90(conn, args.site, start, end)
baro = fetch_baro(conn, args.site, start, end)
forecast = None
if needs_forecast:
forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model)
df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
model_df = model_frame(df, FEATURE_COLUMNS, require_target=True)
df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
model_df = model_frame(df, feature_cols, require_target=True)
ws90_dupes = int(ws90.duplicated(subset=["ts", "station_id"]).sum()) if not ws90.empty else 0
baro_dupes = int(baro.duplicated(subset=["ts", "source"]).sum()) if not baro.empty else 0
@@ -95,7 +114,7 @@ def main() -> int:
baro_max_gap_min = longest_zero_run(np.array(baro_counts)) * 5 if len(baro_counts) else 0
missingness = {}
for col in FEATURE_COLUMNS + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]:
for col in feature_cols + ["pressure_hpa", "rain_mm", "rain_inc", "rain_next_1h_mm"]:
if col in df.columns:
missingness[col] = float(df[col].isna().mean())
@@ -105,6 +124,9 @@ def main() -> int:
report = {
"site": args.site,
"feature_set": args.feature_set,
"feature_columns": feature_cols,
"forecast_model": args.forecast_model if needs_forecast else None,
"target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"requested_window": {
"start": start or None,
@@ -121,6 +143,7 @@ def main() -> int:
"row_counts": {
"ws90_rows": int(len(ws90)),
"baro_rows": int(len(baro)),
"forecast_rows": int(len(forecast)) if forecast is not None else 0,
"model_rows": int(len(model_df)),
},
"duplicates": {
@@ -152,10 +175,12 @@ def main() -> int:
print("Rain data audit summary:")
print(f" site: {report['site']}")
print(f" feature_set: {report['feature_set']}")
print(
" rows: "
f"ws90={report['row_counts']['ws90_rows']} "
f"baro={report['row_counts']['baro_rows']} "
f"forecast={report['row_counts']['forecast_rows']} "
f"model={report['row_counts']['model_rows']}"
)
print(

View File

@@ -8,7 +8,16 @@ from datetime import datetime, timedelta, timezone
import psycopg2
from psycopg2.extras import Json
from rain_model_common import build_dataset, fetch_baro, fetch_ws90, model_frame, parse_time, to_builtin
from rain_model_common import (
build_dataset,
feature_columns_need_forecast,
fetch_baro,
fetch_forecast,
fetch_ws90,
model_frame,
parse_time,
to_builtin,
)
try:
import joblib
@@ -33,6 +42,11 @@ def parse_args() -> argparse.Namespace:
default=6,
help="History lookback window used to build features.",
)
parser.add_argument(
"--forecast-model",
default="ecmwf",
help="Forecast model name when inference features require forecast columns.",
)
parser.add_argument("--dry-run", action="store_true", help="Do not write prediction to DB.")
return parser.parse_args()
@@ -65,9 +79,12 @@ def main() -> int:
at = parse_at(args.at)
artifact = load_artifact(args.model_path)
model = artifact["model"]
features = artifact["features"]
features = list(artifact["features"])
feature_set = artifact.get("feature_set")
needs_forecast = feature_columns_need_forecast(features)
threshold = float(artifact.get("threshold", 0.5))
model_version = args.model_version or artifact.get("model_version") or "unknown"
forecast_model = str(artifact.get("forecast_model") or args.forecast_model)
fetch_start = (at - timedelta(hours=args.history_hours)).isoformat()
fetch_end = (at + timedelta(hours=1, minutes=5)).isoformat()
@@ -75,8 +92,11 @@ def main() -> int:
with psycopg2.connect(args.db_url) as conn:
ws90 = fetch_ws90(conn, args.site, fetch_start, fetch_end)
baro = fetch_baro(conn, args.site, fetch_start, fetch_end)
forecast = None
if needs_forecast:
forecast = fetch_forecast(conn, args.site, fetch_start, fetch_end, model=forecast_model)
full_df = build_dataset(ws90, baro)
full_df = build_dataset(ws90, baro, forecast=forecast)
feature_df = model_frame(full_df, feature_cols=features, require_target=False)
candidates = feature_df.loc[feature_df.index <= at]
if candidates.empty:
@@ -105,6 +125,9 @@ def main() -> int:
metadata = {
"artifact_path": args.model_path,
"artifact_model_version": artifact.get("model_version"),
"artifact_feature_set": feature_set,
"forecast_model": forecast_model if needs_forecast else None,
"needs_forecast_features": needs_forecast,
"feature_values": {col: float(row.iloc[0][col]) for col in features},
"source_window_start": fetch_start,
"source_window_end": fetch_end,
@@ -117,6 +140,8 @@ def main() -> int:
print(f" site: {args.site}")
print(f" model_name: {args.model_name}")
print(f" model_version: {model_version}")
if feature_set:
print(f" feature_set: {feature_set}")
print(f" pred_ts: {pred_ts.isoformat()}")
print(f" threshold: {threshold:.3f}")
print(f" probability: {probability:.4f}")

View File

@@ -17,7 +17,7 @@ from sklearn.metrics import (
roc_auc_score,
)
FEATURE_COLUMNS = [
BASELINE_FEATURE_COLUMNS = [
"pressure_trend_1h",
"humidity",
"temperature_c",
@@ -25,6 +25,49 @@ FEATURE_COLUMNS = [
"wind_max_m_s",
]
FORECAST_FEATURE_COLUMNS = [
"fc_temp_c",
"fc_rh",
"fc_pressure_msl_hpa",
"fc_wind_m_s",
"fc_wind_gust_m_s",
"fc_precip_mm",
"fc_precip_prob",
"fc_cloud_cover",
]
EXTENDED_FEATURE_COLUMNS = [
"pressure_trend_1h",
"temperature_c",
"humidity",
"wind_avg_m_s",
"wind_max_m_s",
"wind_dir_sin",
"wind_dir_cos",
"temp_lag_5m",
"temp_roll_1h_mean",
"temp_roll_1h_std",
"humidity_lag_5m",
"humidity_roll_1h_mean",
"humidity_roll_1h_std",
"wind_avg_lag_5m",
"wind_avg_roll_1h_mean",
"wind_gust_roll_1h_max",
"pressure_lag_5m",
"pressure_roll_1h_mean",
"pressure_roll_1h_std",
"rain_last_1h_mm",
*FORECAST_FEATURE_COLUMNS,
]
FEATURE_SETS: dict[str, list[str]] = {
"baseline": BASELINE_FEATURE_COLUMNS,
"extended": EXTENDED_FEATURE_COLUMNS,
}
AVAILABLE_FEATURE_SETS = tuple(sorted(FEATURE_SETS.keys()))
FEATURE_COLUMNS = BASELINE_FEATURE_COLUMNS
RAIN_EVENT_THRESHOLD_MM = 0.2
RAIN_SPIKE_THRESHOLD_MM_5M = 5.0
RAIN_HORIZON_BUCKETS = 12 # 12 * 5m = 1h
@@ -40,6 +83,34 @@ def parse_time(value: str) -> str:
raise ValueError(f"invalid time format: {value}") from exc
def feature_columns_for_set(feature_set: str) -> list[str]:
out = FEATURE_SETS.get(feature_set.lower())
if out is None:
raise ValueError(f"unknown feature set: {feature_set}")
return list(out)
def feature_columns_need_forecast(feature_cols: list[str]) -> bool:
return any(col in FORECAST_FEATURE_COLUMNS for col in feature_cols)
def feature_set_needs_forecast(feature_set: str) -> bool:
return feature_columns_need_forecast(feature_columns_for_set(feature_set))
def _fetch_df(conn, sql: str, params: tuple[Any, ...], parse_dt_cols: list[str]) -> pd.DataFrame:
with conn.cursor() as cur:
cur.execute(sql, params)
rows = cur.fetchall()
cols = [d.name for d in cur.description]
df = pd.DataFrame.from_records(rows, columns=cols)
if not df.empty:
for col in parse_dt_cols:
df[col] = pd.to_datetime(df[col], utc=True)
return df
def fetch_ws90(conn, site: str, start: str, end: str) -> pd.DataFrame:
sql = """
SELECT ts, station_id, received_at, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, rain_mm
@@ -49,16 +120,7 @@ def fetch_ws90(conn, site: str, start: str, end: str) -> pd.DataFrame:
AND (%s = '' OR ts <= %s::timestamptz)
ORDER BY ts ASC
"""
with conn.cursor() as cur:
cur.execute(sql, (site, start, start, end, end))
rows = cur.fetchall()
cols = [d.name for d in cur.description]
df = pd.DataFrame.from_records(rows, columns=cols)
if not df.empty:
df["ts"] = pd.to_datetime(df["ts"], utc=True)
df["received_at"] = pd.to_datetime(df["received_at"], utc=True)
return df
return _fetch_df(conn, sql, (site, start, start, end, end), ["ts", "received_at"])
def fetch_baro(conn, site: str, start: str, end: str) -> pd.DataFrame:
@@ -70,21 +132,80 @@ def fetch_baro(conn, site: str, start: str, end: str) -> pd.DataFrame:
AND (%s = '' OR ts <= %s::timestamptz)
ORDER BY ts ASC
"""
with conn.cursor() as cur:
cur.execute(sql, (site, start, start, end, end))
rows = cur.fetchall()
cols = [d.name for d in cur.description]
return _fetch_df(conn, sql, (site, start, start, end, end), ["ts", "received_at"])
df = pd.DataFrame.from_records(rows, columns=cols)
if not df.empty:
df["ts"] = pd.to_datetime(df["ts"], utc=True)
df["received_at"] = pd.to_datetime(df["received_at"], utc=True)
return df
def fetch_forecast(conn, site: str, start: str, end: str, model: str = "ecmwf") -> pd.DataFrame:
sql = """
SELECT DISTINCT ON (ts)
ts,
retrieved_at,
temp_c,
rh,
pressure_msl_hpa,
wind_m_s,
wind_gust_m_s,
precip_mm,
precip_prob,
cloud_cover
FROM forecast_openmeteo_hourly
WHERE site = %s
AND model = %s
AND (%s = '' OR ts >= %s::timestamptz - INTERVAL '2 hours')
AND (%s = '' OR ts <= %s::timestamptz + INTERVAL '2 hours')
ORDER BY ts ASC, retrieved_at DESC
"""
return _fetch_df(conn, sql, (site, model, start, start, end, end), ["ts", "retrieved_at"])
def _apply_forecast_features(df: pd.DataFrame, forecast: pd.DataFrame | None) -> pd.DataFrame:
out = df.copy()
for col in FORECAST_FEATURE_COLUMNS:
out[col] = np.nan
if forecast is None or forecast.empty:
return out
fc = forecast.set_index("ts").sort_index().rename(
columns={
"temp_c": "fc_temp_c",
"rh": "fc_rh",
"pressure_msl_hpa": "fc_pressure_msl_hpa",
"wind_m_s": "fc_wind_m_s",
"wind_gust_m_s": "fc_wind_gust_m_s",
"precip_mm": "fc_precip_mm",
"precip_prob": "fc_precip_prob",
"cloud_cover": "fc_cloud_cover",
}
)
keep = [c for c in FORECAST_FEATURE_COLUMNS if c in fc.columns]
fc = fc[keep]
# Bring hourly forecast onto 5m observation grid.
fc_5m = fc.resample("5min").ffill(limit=12)
out = out.join(fc_5m, how="left", rsuffix="_dup")
# Prefer joined forecast values and softly fill small gaps.
for col in keep:
dup_col = f"{col}_dup"
if dup_col in out.columns:
out[col] = out[dup_col]
out.drop(columns=[dup_col], inplace=True)
out[col] = out[col].ffill(limit=12).bfill(limit=2)
# Normalize precip probability to 0..1 if source is 0..100.
if "fc_precip_prob" in out.columns:
mask = out["fc_precip_prob"] > 1.0
out.loc[mask, "fc_precip_prob"] = out.loc[mask, "fc_precip_prob"] / 100.0
out["fc_precip_prob"] = out["fc_precip_prob"].clip(lower=0.0, upper=1.0)
return out
def build_dataset(
ws90: pd.DataFrame,
baro: pd.DataFrame,
forecast: pd.DataFrame | None = None,
rain_event_threshold_mm: float = RAIN_EVENT_THRESHOLD_MM,
) -> pd.DataFrame:
if ws90.empty:
@@ -116,11 +237,33 @@ def build_dataset(
df["rain_spike_5m"] = df["rain_inc"] >= RAIN_SPIKE_THRESHOLD_MM_5M
window = RAIN_HORIZON_BUCKETS
df["rain_last_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum()
df["rain_next_1h_mm"] = df["rain_inc"].rolling(window=window, min_periods=1).sum().shift(-(window - 1))
df["rain_next_1h"] = df["rain_next_1h_mm"] >= rain_event_threshold_mm
df["pressure_trend_1h"] = df["pressure_hpa"] - df["pressure_hpa"].shift(window)
# Wind direction cyclical encoding.
radians = np.deg2rad(df["wind_dir_deg"] % 360.0)
df["wind_dir_sin"] = np.sin(radians)
df["wind_dir_cos"] = np.cos(radians)
# Lags and rolling features (core sensors).
df["temp_lag_5m"] = df["temperature_c"].shift(1)
df["humidity_lag_5m"] = df["humidity"].shift(1)
df["wind_avg_lag_5m"] = df["wind_avg_m_s"].shift(1)
df["pressure_lag_5m"] = df["pressure_hpa"].shift(1)
df["temp_roll_1h_mean"] = df["temperature_c"].rolling(window=window, min_periods=3).mean()
df["temp_roll_1h_std"] = df["temperature_c"].rolling(window=window, min_periods=3).std()
df["humidity_roll_1h_mean"] = df["humidity"].rolling(window=window, min_periods=3).mean()
df["humidity_roll_1h_std"] = df["humidity"].rolling(window=window, min_periods=3).std()
df["wind_avg_roll_1h_mean"] = df["wind_avg_m_s"].rolling(window=window, min_periods=3).mean()
df["wind_gust_roll_1h_max"] = df["wind_max_m_s"].rolling(window=window, min_periods=3).max()
df["pressure_roll_1h_mean"] = df["pressure_hpa"].rolling(window=window, min_periods=3).mean()
df["pressure_roll_1h_std"] = df["pressure_hpa"].rolling(window=window, min_periods=3).std()
df = _apply_forecast_features(df, forecast)
return df
@@ -133,7 +276,11 @@ def model_frame(df: pd.DataFrame, feature_cols: list[str] | None = None, require
return out.sort_index()
def split_time_ordered(df: pd.DataFrame, train_ratio: float = 0.7, val_ratio: float = 0.15) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
def split_time_ordered(
df: pd.DataFrame,
train_ratio: float = 0.7,
val_ratio: float = 0.15,
) -> tuple[pd.DataFrame, pd.DataFrame, pd.DataFrame]:
if not (0 < train_ratio < 1):
raise ValueError("train_ratio must be between 0 and 1")
if not (0 <= val_ratio < 1):
@@ -223,6 +370,26 @@ def select_threshold(y_true: np.ndarray, y_prob: np.ndarray, min_precision: floa
return float(best["threshold"]), best
def safe_pr_auc(v: dict[str, Any]) -> float:
value = v.get("pr_auc")
if value is None:
return float("-inf")
out = float(value)
if np.isnan(out):
return float("-inf")
return out
def safe_roc_auc(v: dict[str, Any]) -> float:
value = v.get("roc_auc")
if value is None:
return float("-inf")
out = float(value)
if np.isnan(out):
return float("-inf")
return out
def to_builtin(v: Any) -> Any:
if isinstance(v, dict):
return {k: to_builtin(val) for k, val in v.items()}

View File

@@ -8,6 +8,9 @@ MODEL_VERSION="${MODEL_VERSION:-rain-logreg-v1}"
MODEL_PATH="${MODEL_PATH:-models/rain_model.pkl}"
REPORT_PATH="${REPORT_PATH:-models/rain_model_report.json}"
AUDIT_PATH="${AUDIT_PATH:-models/rain_data_audit.json}"
FEATURE_SET="${FEATURE_SET:-baseline}"
FORECAST_MODEL="${FORECAST_MODEL:-ecmwf}"
DATASET_PATH="${DATASET_PATH:-models/datasets/rain_dataset_${MODEL_VERSION}_${FEATURE_SET}.csv}"
if [[ -z "${DATABASE_URL:-}" ]]; then
echo "DATABASE_URL is required"
@@ -20,6 +23,8 @@ python scripts/audit_rain_data.py \
--site "$SITE" \
--start "$START" \
--end "$END" \
--feature-set "$FEATURE_SET" \
--forecast-model "$FORECAST_MODEL" \
--out "$AUDIT_PATH"
echo "Training baseline rain model..."
@@ -30,14 +35,18 @@ python scripts/train_rain_model.py \
--train-ratio 0.7 \
--val-ratio 0.15 \
--min-precision 0.70 \
--feature-set "$FEATURE_SET" \
--forecast-model "$FORECAST_MODEL" \
--model-version "$MODEL_VERSION" \
--out "$MODEL_PATH" \
--report-out "$REPORT_PATH"
--report-out "$REPORT_PATH" \
--dataset-out "$DATASET_PATH"
echo "Writing current prediction..."
python scripts/predict_rain_model.py \
--site "$SITE" \
--model-path "$MODEL_PATH" \
--model-name "rain_next_1h"
--model-name "rain_next_1h" \
--forecast-model "$FORECAST_MODEL"
echo "P0 rain workflow complete."

View File

@@ -41,12 +41,15 @@ class WorkerConfig:
site: str
model_name: str
model_version_base: str
feature_set: str
forecast_model: str
train_interval_hours: float
predict_interval_minutes: float
lookback_days: int
train_ratio: float
val_ratio: float
min_precision: float
dataset_path_template: str
model_path: Path
report_path: Path
audit_path: Path
@@ -81,10 +84,13 @@ def training_window(lookback_days: int) -> tuple[str, str]:
def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
start, end = training_window(cfg.lookback_days)
model_version = f"{cfg.model_version_base}-{now_utc().strftime('%Y%m%d%H%M')}"
dataset_out = cfg.dataset_path_template.format(model_version=model_version, feature_set=cfg.feature_set)
ensure_parent(cfg.audit_path)
ensure_parent(cfg.report_path)
ensure_parent(cfg.model_path)
if dataset_out:
ensure_parent(Path(dataset_out))
run_cmd(
[
@@ -96,6 +102,10 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
start,
"--end",
end,
"--feature-set",
cfg.feature_set,
"--forecast-model",
cfg.forecast_model,
"--out",
str(cfg.audit_path),
],
@@ -118,12 +128,18 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
str(cfg.val_ratio),
"--min-precision",
str(cfg.min_precision),
"--feature-set",
cfg.feature_set,
"--forecast-model",
cfg.forecast_model,
"--model-version",
model_version,
"--out",
str(cfg.model_path),
"--report-out",
str(cfg.report_path),
"--dataset-out",
dataset_out,
],
env,
)
@@ -143,6 +159,8 @@ def run_predict_once(cfg: WorkerConfig, env: dict[str, str]) -> None:
str(cfg.model_path),
"--model-name",
cfg.model_name,
"--forecast-model",
cfg.forecast_model,
],
env,
)
@@ -158,12 +176,18 @@ def load_config() -> WorkerConfig:
site=read_env("RAIN_SITE", "home"),
model_name=read_env("RAIN_MODEL_NAME", "rain_next_1h"),
model_version_base=read_env("RAIN_MODEL_VERSION_BASE", "rain-logreg-v1"),
feature_set=read_env("RAIN_FEATURE_SET", "baseline"),
forecast_model=read_env("RAIN_FORECAST_MODEL", "ecmwf"),
train_interval_hours=read_env_float("RAIN_TRAIN_INTERVAL_HOURS", 24.0),
predict_interval_minutes=read_env_float("RAIN_PREDICT_INTERVAL_MINUTES", 10.0),
lookback_days=read_env_int("RAIN_LOOKBACK_DAYS", 30),
train_ratio=read_env_float("RAIN_TRAIN_RATIO", 0.7),
val_ratio=read_env_float("RAIN_VAL_RATIO", 0.15),
min_precision=read_env_float("RAIN_MIN_PRECISION", 0.70),
dataset_path_template=read_env(
"RAIN_DATASET_PATH",
"models/datasets/rain_dataset_{model_version}_{feature_set}.csv",
),
model_path=Path(read_env("RAIN_MODEL_PATH", "models/rain_model.pkl")),
report_path=Path(read_env("RAIN_REPORT_PATH", "models/rain_model_report.json")),
audit_path=Path(read_env("RAIN_AUDIT_PATH", "models/rain_data_audit.json")),
@@ -188,6 +212,8 @@ def main() -> int:
"[rain-ml] worker start "
f"site={cfg.site} "
f"model_name={cfg.model_name} "
f"feature_set={cfg.feature_set} "
f"forecast_model={cfg.forecast_model} "
f"train_interval_hours={cfg.train_interval_hours} "
f"predict_interval_minutes={cfg.predict_interval_minutes}",
flush=True,

View File

@@ -11,12 +11,15 @@ from sklearn.pipeline import Pipeline
from sklearn.preprocessing import StandardScaler
from rain_model_common import (
FEATURE_COLUMNS,
AVAILABLE_FEATURE_SETS,
RAIN_EVENT_THRESHOLD_MM,
build_dataset,
evaluate_probs,
fetch_baro,
fetch_forecast,
fetch_ws90,
feature_columns_for_set,
feature_columns_need_forecast,
model_frame,
parse_time,
select_threshold,
@@ -46,12 +49,31 @@ def parse_args() -> argparse.Namespace:
)
parser.add_argument("--threshold", type=float, help="Optional fixed classification threshold.")
parser.add_argument("--min-rows", type=int, default=200, help="Minimum model-ready rows required.")
parser.add_argument(
"--feature-set",
default="baseline",
choices=AVAILABLE_FEATURE_SETS,
help="Named feature set to train with.",
)
parser.add_argument(
"--forecast-model",
default="ecmwf",
help="Forecast model name when feature set requires forecast columns.",
)
parser.add_argument("--out", default="models/rain_model.pkl", help="Path to save model.")
parser.add_argument(
"--report-out",
default="models/rain_model_report.json",
help="Path to save JSON training report.",
)
parser.add_argument(
"--dataset-out",
default="models/datasets/rain_dataset_{model_version}_{feature_set}.csv",
help=(
"Path (or template) for model-ready dataset snapshot. "
"Supports {model_version} and {feature_set}. Use empty string to disable."
),
)
parser.add_argument(
"--model-version",
default="rain-logreg-v1",
@@ -76,13 +98,18 @@ def main() -> int:
start = parse_time(args.start) if args.start else ""
end = parse_time(args.end) if args.end else ""
feature_cols = feature_columns_for_set(args.feature_set)
needs_forecast = feature_columns_need_forecast(feature_cols)
with psycopg2.connect(args.db_url) as conn:
ws90 = fetch_ws90(conn, args.site, start, end)
baro = fetch_baro(conn, args.site, start, end)
forecast = None
if needs_forecast:
forecast = fetch_forecast(conn, args.site, start, end, model=args.forecast_model)
full_df = build_dataset(ws90, baro, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
model_df = model_frame(full_df, FEATURE_COLUMNS, require_target=True)
full_df = build_dataset(ws90, baro, forecast=forecast, rain_event_threshold_mm=RAIN_EVENT_THRESHOLD_MM)
model_df = model_frame(full_df, feature_cols, require_target=True)
if len(model_df) < args.min_rows:
raise RuntimeError(f"not enough model-ready rows after filtering (need >= {args.min_rows})")
@@ -92,11 +119,11 @@ def main() -> int:
val_ratio=args.val_ratio,
)
x_train = train_df[FEATURE_COLUMNS]
x_train = train_df[feature_cols]
y_train = train_df["rain_next_1h"].astype(int).to_numpy()
x_val = val_df[FEATURE_COLUMNS]
x_val = val_df[feature_cols]
y_val = val_df["rain_next_1h"].astype(int).to_numpy()
x_test = test_df[FEATURE_COLUMNS]
x_test = test_df[feature_cols]
y_test = test_df["rain_next_1h"].astype(int).to_numpy()
base_model = make_model()
@@ -119,7 +146,7 @@ def main() -> int:
val_metrics = evaluate_probs(y_true=y_val, y_prob=y_val_prob, threshold=chosen_threshold)
train_val_df = model_df.iloc[: len(train_df) + len(val_df)]
x_train_val = train_val_df[FEATURE_COLUMNS]
x_train_val = train_val_df[feature_cols]
y_train_val = train_val_df["rain_next_1h"].astype(int).to_numpy()
final_model = make_model()
@@ -131,8 +158,10 @@ def main() -> int:
"generated_at": datetime.now(timezone.utc).isoformat(),
"site": args.site,
"model_version": args.model_version,
"feature_set": args.feature_set,
"target_definition": f"rain_next_1h_mm >= {RAIN_EVENT_THRESHOLD_MM:.2f}",
"feature_columns": FEATURE_COLUMNS,
"feature_columns": feature_cols,
"forecast_model": args.forecast_model if needs_forecast else None,
"data_window": {
"requested_start": start or None,
"requested_end": end or None,
@@ -141,11 +170,13 @@ def main() -> int:
"model_rows": len(model_df),
"ws90_rows": len(ws90),
"baro_rows": len(baro),
"forecast_rows": int(len(forecast)) if forecast is not None else 0,
},
"label_quality": {
"rain_reset_count": int(np.nansum(full_df["rain_reset"].fillna(False).to_numpy(dtype=int))),
"rain_spike_5m_count": int(np.nansum(full_df["rain_spike_5m"].fillna(False).to_numpy(dtype=int))),
},
"feature_missingness_ratio": {col: float(full_df[col].isna().mean()) for col in feature_cols if col in full_df.columns},
"split": {
"train_ratio": args.train_ratio,
"val_ratio": args.val_ratio,
@@ -171,6 +202,7 @@ def main() -> int:
print("Rain model training summary:")
print(f" site: {args.site}")
print(f" model_version: {args.model_version}")
print(f" feature_set: {args.feature_set} ({len(feature_cols)} features)")
print(f" rows: total={report['data_window']['model_rows']} train={report['split']['train_rows']} val={report['split']['val_rows']} test={report['split']['test_rows']}")
print(
" threshold: "
@@ -200,6 +232,15 @@ def main() -> int:
json.dump(report, f, indent=2)
print(f"Saved report to {args.report_out}")
if args.dataset_out:
dataset_out = args.dataset_out.format(model_version=args.model_version, feature_set=args.feature_set)
dataset_dir = os.path.dirname(dataset_out)
if dataset_dir:
os.makedirs(dataset_dir, exist_ok=True)
snapshot_cols = list(dict.fromkeys(feature_cols + ["rain_next_1h", "rain_next_1h_mm"]))
model_df[snapshot_cols].to_csv(dataset_out, index=True, index_label="ts")
print(f"Saved dataset snapshot to {dataset_out}")
if args.out:
out_dir = os.path.dirname(args.out)
if out_dir:
@@ -209,7 +250,9 @@ def main() -> int:
else:
artifact = {
"model": final_model,
"features": FEATURE_COLUMNS,
"features": feature_cols,
"feature_set": args.feature_set,
"forecast_model": args.forecast_model if needs_forecast else None,
"threshold": float(chosen_threshold),
"target_mm": float(RAIN_EVENT_THRESHOLD_MM),
"model_version": args.model_version,

12
todo.md
View File

@@ -15,12 +15,12 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat
- [ ] [P1] Document known data issues and mitigation rules.
## 3) Dataset and Feature Engineering
- [ ] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow.
- [ ] [P1] Add lag/rolling features (means, stddev, deltas) for core sensor inputs.
- [ ] [P1] Encode wind direction properly (cyclical encoding).
- [x] [P1] Extract reusable dataset-builder logic from training script into a maintainable module/workflow.
- [x] [P1] Add lag/rolling features (means, stddev, deltas) for core sensor inputs.
- [x] [P1] Encode wind direction properly (cyclical encoding).
- [ ] [P2] Add calendar features (hour-of-day, day-of-week, seasonality proxies).
- [ ] [P1] Join aligned forecast features from `forecast_openmeteo_hourly` (precip prob, cloud cover, wind, pressure).
- [ ] [P1] Persist versioned dataset snapshots for reproducibility.
- [x] [P1] Join aligned forecast features from `forecast_openmeteo_hourly` (precip prob, cloud cover, wind, pressure).
- [x] [P1] Persist versioned dataset snapshots for reproducibility.
## 4) Modeling and Validation
- [x] [P0] Keep logistic regression as baseline.
@@ -53,5 +53,5 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat
## 8) Immediate Next Steps (This Week)
- [x] [P0] Run first full data audit and label-quality checks. (completed on runtime machine)
- [x] [P0] Train baseline model on full available history and capture metrics. (completed on runtime machine)
- [ ] [P1] Add one expanded feature set and rerun evaluation.
- [ ] [P1] Add one expanded feature set and rerun evaluation. (feature-set plumbing implemented; rerun pending on runtime machine)
- [x] [P0] Decide v1 threshold and define deployment interface.