improve model training
This commit is contained in:
@@ -104,6 +104,7 @@ Runbook/docs:
|
||||
- `docs/rain_prediction.md`
|
||||
- `docs/rain_data_issues.md`
|
||||
- `docs/rain_model_runbook.md`
|
||||
- `scripts/recommend_rain_model.py` (rank reports and recommend deploy candidate)
|
||||
|
||||
## Publish a test WS90 payload
|
||||
```sh
|
||||
|
||||
@@ -46,6 +46,7 @@ services:
|
||||
RAIN_TUNE_HYPERPARAMETERS: "true"
|
||||
RAIN_MAX_HYPERPARAM_TRIALS: "12"
|
||||
RAIN_CALIBRATION_METHODS: "none,sigmoid,isotonic"
|
||||
RAIN_THRESHOLD_POLICY: "validation"
|
||||
RAIN_WALK_FORWARD_FOLDS: "0"
|
||||
RAIN_ALLOW_EMPTY_DATA: "true"
|
||||
RAIN_MODEL_PATH: "/app/models/rain_model.pkl"
|
||||
|
||||
@@ -27,6 +27,7 @@ python scripts/train_rain_model.py \
|
||||
--tune-hyperparameters \
|
||||
--max-hyperparam-trials 12 \
|
||||
--calibration-methods "none,sigmoid,isotonic" \
|
||||
--threshold-policy "walk_forward" \
|
||||
--walk-forward-folds 4 \
|
||||
--model-version "rain-auto-v1-extended" \
|
||||
--out "models/rain_model.pkl" \
|
||||
@@ -40,6 +41,7 @@ Review in report:
|
||||
- `candidate_models[*].calibration_comparison`
|
||||
- `naive_baselines_test`
|
||||
- `sliced_performance_test`
|
||||
- `threshold_tuning_walk_forward`
|
||||
- `walk_forward_backtest`
|
||||
|
||||
## 3) Deploy
|
||||
@@ -133,6 +135,7 @@ The script exits non-zero on failure, so it can directly drive alerting.
|
||||
- `RAIN_TUNE_HYPERPARAMETERS`
|
||||
- `RAIN_MAX_HYPERPARAM_TRIALS`
|
||||
- `RAIN_CALIBRATION_METHODS`
|
||||
- `RAIN_THRESHOLD_POLICY`
|
||||
- `RAIN_WALK_FORWARD_FOLDS`
|
||||
- `RAIN_ALLOW_EMPTY_DATA`
|
||||
- `RAIN_MODEL_BACKUP_PATH`
|
||||
@@ -141,3 +144,15 @@ The script exits non-zero on failure, so it can directly drive alerting.
|
||||
Recommended production defaults:
|
||||
- Enable tuning daily or weekly (`RAIN_TUNE_HYPERPARAMETERS=true`)
|
||||
- Keep walk-forward folds `0` in continuous mode, run fold backtests in scheduled evaluation jobs
|
||||
|
||||
## 8) Auto-Recommend Candidate
|
||||
|
||||
To compare saved training reports and pick a deployment candidate automatically:
|
||||
|
||||
```sh
|
||||
python scripts/recommend_rain_model.py \
|
||||
--reports-glob "models/rain_model_report*.json" \
|
||||
--require-walk-forward \
|
||||
--top-k 5 \
|
||||
--json-out "models/rain_model_recommendation.json"
|
||||
```
|
||||
|
||||
@@ -43,6 +43,7 @@ pip install -r scripts/requirements.txt
|
||||
`predictions_rain_1h`.
|
||||
- `scripts/run_rain_ml_worker.py`: long-running worker for periodic training + prediction.
|
||||
- `scripts/check_rain_pipeline_health.py`: freshness/failure check for alerting.
|
||||
- `scripts/recommend_rain_model.py`: rank saved training reports and recommend a deployment candidate.
|
||||
|
||||
Feature-set options:
|
||||
- `baseline`: original 5 local observation features.
|
||||
@@ -181,6 +182,22 @@ python scripts/train_rain_model.py \
|
||||
--model-card-out "models/model_card_{model_version}.md"
|
||||
```
|
||||
|
||||
### 3f) Walk-forward threshold policy (more temporally robust alert threshold)
|
||||
```sh
|
||||
python scripts/train_rain_model.py \
|
||||
--site "home" \
|
||||
--start "2026-02-01T00:00:00Z" \
|
||||
--end "2026-03-03T23:55:00Z" \
|
||||
--feature-set "extended" \
|
||||
--model-family "auto" \
|
||||
--forecast-model "ecmwf" \
|
||||
--threshold-policy "walk_forward" \
|
||||
--walk-forward-folds 4 \
|
||||
--model-version "rain-auto-v1-extended-wf-threshold" \
|
||||
--out "models/rain_model_auto.pkl" \
|
||||
--report-out "models/rain_model_report_auto.json"
|
||||
```
|
||||
|
||||
### 4) Run inference and store prediction
|
||||
```sh
|
||||
python scripts/predict_rain_model.py \
|
||||
@@ -200,7 +217,7 @@ The `rainml` service in `docker-compose.yml` now runs:
|
||||
- periodic retraining (default every 24 hours)
|
||||
- periodic prediction writes (default every 10 minutes)
|
||||
- configurable tuning/calibration behavior (`RAIN_TUNE_HYPERPARAMETERS`,
|
||||
`RAIN_MAX_HYPERPARAM_TRIALS`, `RAIN_CALIBRATION_METHODS`)
|
||||
`RAIN_MAX_HYPERPARAM_TRIALS`, `RAIN_CALIBRATION_METHODS`, `RAIN_THRESHOLD_POLICY`)
|
||||
- graceful gap handling for temporary source outages (`RAIN_ALLOW_EMPTY_DATA=true`)
|
||||
- automatic rollback path for last-known-good model (`RAIN_MODEL_BACKUP_PATH`)
|
||||
- optional model-card output (`RAIN_MODEL_CARD_PATH`)
|
||||
@@ -222,6 +239,15 @@ docker compose logs -f rainml
|
||||
- Prediction rows: `predictions_rain_1h` (probability + threshold decision + realized
|
||||
outcome fields once available)
|
||||
|
||||
### 7) Recommend deploy candidate from saved reports
|
||||
```sh
|
||||
python scripts/recommend_rain_model.py \
|
||||
--reports-glob "models/rain_model_report*.json" \
|
||||
--require-walk-forward \
|
||||
--top-k 5 \
|
||||
--json-out "models/rain_model_recommendation.json"
|
||||
```
|
||||
|
||||
## Model Features (v1 baseline)
|
||||
- `pressure_trend_1h`
|
||||
- `humidity`
|
||||
|
||||
324
scripts/recommend_rain_model.py
Normal file
324
scripts/recommend_rain_model.py
Normal file
@@ -0,0 +1,324 @@
|
||||
#!/usr/bin/env python3
|
||||
from __future__ import annotations
|
||||
|
||||
import argparse
|
||||
import glob
|
||||
import json
|
||||
import math
|
||||
import os
|
||||
from dataclasses import dataclass
|
||||
from datetime import datetime
|
||||
from pathlib import Path
|
||||
from typing import Any
|
||||
|
||||
|
||||
@dataclass
|
||||
class Candidate:
|
||||
path: Path
|
||||
model_version: str
|
||||
feature_set: str
|
||||
model_family: str
|
||||
generated_at: str | None
|
||||
test_precision: float | None
|
||||
test_recall: float | None
|
||||
test_pr_auc: float | None
|
||||
test_roc_auc: float | None
|
||||
test_brier: float | None
|
||||
wf_precision: float | None
|
||||
wf_recall: float | None
|
||||
wf_pr_auc: float | None
|
||||
wf_brier: float | None
|
||||
score: float
|
||||
eligible: bool
|
||||
ineligible_reasons: list[str]
|
||||
report: dict[str, Any]
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
parser = argparse.ArgumentParser(description="Rank rain-model training reports and recommend a deploy candidate.")
|
||||
parser.add_argument(
|
||||
"--reports-glob",
|
||||
default="models/rain_model_report*.json",
|
||||
help="Glob for report JSON files.",
|
||||
)
|
||||
parser.add_argument("--min-test-precision", type=float, default=0.65)
|
||||
parser.add_argument("--min-test-recall", type=float, default=0.50)
|
||||
parser.add_argument("--min-test-pr-auc", type=float, default=0.40)
|
||||
parser.add_argument("--min-walk-forward-precision", type=float, default=0.30)
|
||||
parser.add_argument("--min-walk-forward-recall", type=float, default=0.25)
|
||||
parser.add_argument(
|
||||
"--require-walk-forward",
|
||||
action="store_true",
|
||||
help="Require walk-forward summary metrics to be present and pass minimums.",
|
||||
)
|
||||
parser.add_argument("--top-k", type=int, default=5)
|
||||
parser.add_argument("--json-out", help="Optional output JSON path.")
|
||||
return parser.parse_args()
|
||||
|
||||
|
||||
def as_float(v: Any) -> float | None:
|
||||
if v is None:
|
||||
return None
|
||||
try:
|
||||
out = float(v)
|
||||
except (TypeError, ValueError):
|
||||
return None
|
||||
if math.isnan(out):
|
||||
return None
|
||||
return out
|
||||
|
||||
|
||||
def load_report(path: Path) -> dict[str, Any]:
|
||||
with open(path, "r", encoding="utf-8") as f:
|
||||
return json.load(f)
|
||||
|
||||
|
||||
def naive_precision_baseline(report: dict[str, Any]) -> float | None:
|
||||
baselines = report.get("naive_baselines_test") or {}
|
||||
out: float | None = None
|
||||
for baseline in baselines.values():
|
||||
metrics = baseline.get("metrics", {})
|
||||
precision = as_float(metrics.get("precision"))
|
||||
if precision is None:
|
||||
continue
|
||||
if out is None or precision > out:
|
||||
out = precision
|
||||
return out
|
||||
|
||||
|
||||
def score_candidate(
|
||||
report: dict[str, Any],
|
||||
min_test_precision: float,
|
||||
min_test_recall: float,
|
||||
min_test_pr_auc: float,
|
||||
min_wf_precision: float,
|
||||
min_wf_recall: float,
|
||||
require_walk_forward: bool,
|
||||
) -> tuple[float, bool, list[str], dict[str, float | None]]:
|
||||
test = report.get("test_metrics") or {}
|
||||
wf_summary = (report.get("walk_forward_backtest") or {}).get("summary") or {}
|
||||
|
||||
test_precision = as_float(test.get("precision"))
|
||||
test_recall = as_float(test.get("recall"))
|
||||
test_pr_auc = as_float(test.get("pr_auc"))
|
||||
test_roc_auc = as_float(test.get("roc_auc"))
|
||||
test_brier = as_float(test.get("brier"))
|
||||
|
||||
wf_precision = as_float(wf_summary.get("mean_precision"))
|
||||
wf_recall = as_float(wf_summary.get("mean_recall"))
|
||||
wf_pr_auc = as_float(wf_summary.get("mean_pr_auc"))
|
||||
wf_brier = as_float(wf_summary.get("mean_brier"))
|
||||
|
||||
metrics = {
|
||||
"test_precision": test_precision,
|
||||
"test_recall": test_recall,
|
||||
"test_pr_auc": test_pr_auc,
|
||||
"test_roc_auc": test_roc_auc,
|
||||
"test_brier": test_brier,
|
||||
"wf_precision": wf_precision,
|
||||
"wf_recall": wf_recall,
|
||||
"wf_pr_auc": wf_pr_auc,
|
||||
"wf_brier": wf_brier,
|
||||
}
|
||||
|
||||
reasons: list[str] = []
|
||||
if test_precision is None or test_precision < min_test_precision:
|
||||
reasons.append(f"test_precision<{min_test_precision:.2f}")
|
||||
if test_recall is None or test_recall < min_test_recall:
|
||||
reasons.append(f"test_recall<{min_test_recall:.2f}")
|
||||
if test_pr_auc is None or test_pr_auc < min_test_pr_auc:
|
||||
reasons.append(f"test_pr_auc<{min_test_pr_auc:.2f}")
|
||||
|
||||
if require_walk_forward and (wf_precision is None or wf_recall is None):
|
||||
reasons.append("walk_forward_missing")
|
||||
if wf_precision is not None and wf_precision < min_wf_precision:
|
||||
reasons.append(f"wf_precision<{min_wf_precision:.2f}")
|
||||
if wf_recall is not None and wf_recall < min_wf_recall:
|
||||
reasons.append(f"wf_recall<{min_wf_recall:.2f}")
|
||||
|
||||
eligible = len(reasons) == 0
|
||||
|
||||
# Weighted utility score with stability penalty.
|
||||
score = 0.0
|
||||
if test_precision is not None:
|
||||
score += 3.0 * test_precision
|
||||
if test_recall is not None:
|
||||
score += 2.5 * test_recall
|
||||
if test_pr_auc is not None:
|
||||
score += 2.5 * test_pr_auc
|
||||
if test_roc_auc is not None:
|
||||
score += 1.0 * test_roc_auc
|
||||
if test_brier is not None:
|
||||
score += 1.5 * (1.0 - min(max(test_brier, 0.0), 1.0))
|
||||
|
||||
if wf_precision is not None:
|
||||
score += 2.0 * wf_precision
|
||||
else:
|
||||
score -= 0.25
|
||||
if wf_recall is not None:
|
||||
score += 1.5 * wf_recall
|
||||
if wf_pr_auc is not None:
|
||||
score += 1.0 * wf_pr_auc
|
||||
if wf_brier is not None:
|
||||
score += 1.0 * (1.0 - min(max(wf_brier, 0.0), 1.0))
|
||||
|
||||
if test_precision is not None and wf_precision is not None:
|
||||
score -= 1.5 * abs(test_precision - wf_precision)
|
||||
if test_recall is not None and wf_recall is not None:
|
||||
score -= 1.0 * abs(test_recall - wf_recall)
|
||||
|
||||
best_naive_precision = naive_precision_baseline(report)
|
||||
if best_naive_precision is not None and test_precision is not None:
|
||||
gap = test_precision - best_naive_precision
|
||||
score += 0.5 * gap
|
||||
|
||||
return score, eligible, reasons, metrics
|
||||
|
||||
|
||||
def parse_generated_at(value: str | None) -> datetime:
|
||||
if not value:
|
||||
return datetime.min
|
||||
try:
|
||||
return datetime.fromisoformat(value.replace("Z", "+00:00"))
|
||||
except ValueError:
|
||||
return datetime.min
|
||||
|
||||
|
||||
def build_candidate(path: Path, report: dict[str, Any], args: argparse.Namespace) -> Candidate:
|
||||
score, eligible, reasons, metrics = score_candidate(
|
||||
report=report,
|
||||
min_test_precision=args.min_test_precision,
|
||||
min_test_recall=args.min_test_recall,
|
||||
min_test_pr_auc=args.min_test_pr_auc,
|
||||
min_wf_precision=args.min_walk_forward_precision,
|
||||
min_wf_recall=args.min_walk_forward_recall,
|
||||
require_walk_forward=args.require_walk_forward,
|
||||
)
|
||||
return Candidate(
|
||||
path=path,
|
||||
model_version=str(report.get("model_version") or "unknown"),
|
||||
feature_set=str(report.get("feature_set") or "unknown"),
|
||||
model_family=str(report.get("model_family") or "unknown"),
|
||||
generated_at=report.get("generated_at"),
|
||||
test_precision=metrics["test_precision"],
|
||||
test_recall=metrics["test_recall"],
|
||||
test_pr_auc=metrics["test_pr_auc"],
|
||||
test_roc_auc=metrics["test_roc_auc"],
|
||||
test_brier=metrics["test_brier"],
|
||||
wf_precision=metrics["wf_precision"],
|
||||
wf_recall=metrics["wf_recall"],
|
||||
wf_pr_auc=metrics["wf_pr_auc"],
|
||||
wf_brier=metrics["wf_brier"],
|
||||
score=score,
|
||||
eligible=eligible,
|
||||
ineligible_reasons=reasons,
|
||||
report=report,
|
||||
)
|
||||
|
||||
|
||||
def main() -> int:
|
||||
args = parse_args()
|
||||
paths = sorted(Path(p) for p in glob.glob(args.reports_glob))
|
||||
if not paths:
|
||||
print(f"No report files matched: {args.reports_glob}")
|
||||
return 1
|
||||
|
||||
candidates: list[Candidate] = []
|
||||
for path in paths:
|
||||
try:
|
||||
report = load_report(path)
|
||||
except Exception as exc:
|
||||
print(f"skip {path}: {exc}")
|
||||
continue
|
||||
candidates.append(build_candidate(path=path, report=report, args=args))
|
||||
|
||||
if not candidates:
|
||||
print("No valid reports loaded.")
|
||||
return 1
|
||||
|
||||
candidates.sort(
|
||||
key=lambda c: (
|
||||
1 if c.eligible else 0,
|
||||
c.score,
|
||||
parse_generated_at(c.generated_at),
|
||||
),
|
||||
reverse=True,
|
||||
)
|
||||
|
||||
print(f"Scanned {len(candidates)} report(s). Top {min(args.top_k, len(candidates))}:")
|
||||
for idx, c in enumerate(candidates[: args.top_k], start=1):
|
||||
wf_part = (
|
||||
f"wf_prec={c.wf_precision:.3f} wf_rec={c.wf_recall:.3f}"
|
||||
if c.wf_precision is not None and c.wf_recall is not None
|
||||
else "wf=n/a"
|
||||
)
|
||||
gate_part = "eligible" if c.eligible else f"ineligible({','.join(c.ineligible_reasons)})"
|
||||
print(
|
||||
f"{idx}. {gate_part} score={c.score:.3f} "
|
||||
f"version={c.model_version} feature_set={c.feature_set} family={c.model_family} "
|
||||
f"test_prec={c.test_precision if c.test_precision is not None else 'n/a'} "
|
||||
f"test_rec={c.test_recall if c.test_recall is not None else 'n/a'} "
|
||||
f"test_pr_auc={c.test_pr_auc if c.test_pr_auc is not None else 'n/a'} "
|
||||
f"{wf_part} "
|
||||
f"path={c.path}"
|
||||
)
|
||||
|
||||
recommendation = next((c for c in candidates if c.eligible), candidates[0])
|
||||
print("")
|
||||
print("Recommended candidate:")
|
||||
print(f" model_version={recommendation.model_version}")
|
||||
print(f" feature_set={recommendation.feature_set}")
|
||||
print(f" model_family={recommendation.model_family}")
|
||||
print(f" report_path={recommendation.path}")
|
||||
print(f" score={recommendation.score:.3f}")
|
||||
if not recommendation.eligible:
|
||||
print(f" note=no fully eligible report; selected highest score with reasons={recommendation.ineligible_reasons}")
|
||||
|
||||
if args.json_out:
|
||||
payload = {
|
||||
"generated_at": datetime.utcnow().isoformat() + "Z",
|
||||
"reports_glob": args.reports_glob,
|
||||
"recommendation": {
|
||||
"model_version": recommendation.model_version,
|
||||
"feature_set": recommendation.feature_set,
|
||||
"model_family": recommendation.model_family,
|
||||
"report_path": str(recommendation.path),
|
||||
"score": recommendation.score,
|
||||
"eligible": recommendation.eligible,
|
||||
"ineligible_reasons": recommendation.ineligible_reasons,
|
||||
},
|
||||
"ranked": [
|
||||
{
|
||||
"model_version": c.model_version,
|
||||
"feature_set": c.feature_set,
|
||||
"model_family": c.model_family,
|
||||
"report_path": str(c.path),
|
||||
"generated_at": c.generated_at,
|
||||
"score": c.score,
|
||||
"eligible": c.eligible,
|
||||
"ineligible_reasons": c.ineligible_reasons,
|
||||
"test_precision": c.test_precision,
|
||||
"test_recall": c.test_recall,
|
||||
"test_pr_auc": c.test_pr_auc,
|
||||
"test_roc_auc": c.test_roc_auc,
|
||||
"test_brier": c.test_brier,
|
||||
"wf_precision": c.wf_precision,
|
||||
"wf_recall": c.wf_recall,
|
||||
"wf_pr_auc": c.wf_pr_auc,
|
||||
"wf_brier": c.wf_brier,
|
||||
}
|
||||
for c in candidates
|
||||
],
|
||||
}
|
||||
out_dir = os.path.dirname(args.json_out)
|
||||
if out_dir:
|
||||
os.makedirs(out_dir, exist_ok=True)
|
||||
with open(args.json_out, "w", encoding="utf-8") as f:
|
||||
json.dump(payload, f, indent=2)
|
||||
print(f"Saved recommendation JSON to {args.json_out}")
|
||||
|
||||
return 0
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
raise SystemExit(main())
|
||||
@@ -54,6 +54,7 @@ class WorkerConfig:
|
||||
tune_hyperparameters: bool
|
||||
max_hyperparam_trials: int
|
||||
calibration_methods: str
|
||||
threshold_policy: str
|
||||
walk_forward_folds: int
|
||||
allow_empty_data: bool
|
||||
dataset_path_template: str
|
||||
@@ -194,6 +195,8 @@ def run_training_cycle(cfg: WorkerConfig, env: dict[str, str]) -> None:
|
||||
str(cfg.max_hyperparam_trials),
|
||||
"--calibration-methods",
|
||||
cfg.calibration_methods,
|
||||
"--threshold-policy",
|
||||
cfg.threshold_policy,
|
||||
"--walk-forward-folds",
|
||||
str(cfg.walk_forward_folds),
|
||||
"--feature-set",
|
||||
@@ -300,6 +303,7 @@ def load_config() -> WorkerConfig:
|
||||
tune_hyperparameters=read_env_bool("RAIN_TUNE_HYPERPARAMETERS", False),
|
||||
max_hyperparam_trials=read_env_int("RAIN_MAX_HYPERPARAM_TRIALS", 12),
|
||||
calibration_methods=read_env("RAIN_CALIBRATION_METHODS", "none,sigmoid,isotonic"),
|
||||
threshold_policy=read_env("RAIN_THRESHOLD_POLICY", "validation"),
|
||||
walk_forward_folds=read_env_int("RAIN_WALK_FORWARD_FOLDS", 0),
|
||||
allow_empty_data=read_env_bool("RAIN_ALLOW_EMPTY_DATA", True),
|
||||
dataset_path_template=read_env(
|
||||
@@ -341,6 +345,7 @@ def main() -> int:
|
||||
f"train_interval_hours={cfg.train_interval_hours} "
|
||||
f"predict_interval_minutes={cfg.predict_interval_minutes} "
|
||||
f"tune_hyperparameters={cfg.tune_hyperparameters} "
|
||||
f"threshold_policy={cfg.threshold_policy} "
|
||||
f"walk_forward_folds={cfg.walk_forward_folds} "
|
||||
f"allow_empty_data={cfg.allow_empty_data} "
|
||||
f"model_backup_path={cfg.model_backup_path}",
|
||||
|
||||
@@ -45,6 +45,7 @@ except ImportError: # pragma: no cover - optional dependency
|
||||
|
||||
MODEL_FAMILIES = ("logreg", "hist_gb", "auto")
|
||||
CALIBRATION_METHODS = ("none", "sigmoid", "isotonic")
|
||||
THRESHOLD_POLICIES = ("validation", "walk_forward")
|
||||
|
||||
|
||||
def parse_args() -> argparse.Namespace:
|
||||
@@ -62,6 +63,12 @@ def parse_args() -> argparse.Namespace:
|
||||
help="Minimum validation precision for threshold selection.",
|
||||
)
|
||||
parser.add_argument("--threshold", type=float, help="Optional fixed classification threshold.")
|
||||
parser.add_argument(
|
||||
"--threshold-policy",
|
||||
default="validation",
|
||||
choices=THRESHOLD_POLICIES,
|
||||
help="How to choose operating threshold when --threshold is not set.",
|
||||
)
|
||||
parser.add_argument("--min-rows", type=int, default=200, help="Minimum model-ready rows required.")
|
||||
parser.set_defaults(allow_empty=True)
|
||||
parser.add_argument(
|
||||
@@ -575,6 +582,127 @@ def evaluate_sliced_performance(
|
||||
return out
|
||||
|
||||
|
||||
def tune_threshold_walk_forward(
|
||||
model_df,
|
||||
feature_cols: list[str],
|
||||
model_family: str,
|
||||
model_params: dict[str, Any],
|
||||
calibration_method: str,
|
||||
random_state: int,
|
||||
min_precision: float,
|
||||
folds: int,
|
||||
) -> dict[str, Any]:
|
||||
if folds <= 0:
|
||||
return {
|
||||
"enabled": False,
|
||||
"status": "disabled",
|
||||
"reason": "walk_forward_folds <= 0",
|
||||
}
|
||||
|
||||
n = len(model_df)
|
||||
min_train_rows = max(200, int(0.4 * n))
|
||||
remaining = n - min_train_rows
|
||||
if remaining < 50:
|
||||
return {
|
||||
"enabled": True,
|
||||
"status": "insufficient_data",
|
||||
"reason": "not enough rows for walk-forward threshold tuning",
|
||||
"requested_folds": folds,
|
||||
"min_train_rows": min_train_rows,
|
||||
}
|
||||
|
||||
fold_size = max(25, remaining // folds)
|
||||
fold_details: list[dict[str, Any]] = []
|
||||
y_true_chunks: list[np.ndarray] = []
|
||||
y_prob_chunks: list[np.ndarray] = []
|
||||
|
||||
for idx in range(folds):
|
||||
train_end = min_train_rows + idx * fold_size
|
||||
test_end = n if idx == folds - 1 else min(min_train_rows + (idx + 1) * fold_size, n)
|
||||
if train_end >= test_end:
|
||||
continue
|
||||
|
||||
fold_train = model_df.iloc[:train_end]
|
||||
fold_test = model_df.iloc[train_end:test_end]
|
||||
if len(fold_train) < 160 or len(fold_test) < 25:
|
||||
continue
|
||||
|
||||
y_fold_train = fold_train["rain_next_1h"].astype(int).to_numpy()
|
||||
y_fold_test = fold_test["rain_next_1h"].astype(int).to_numpy()
|
||||
if len(np.unique(y_fold_train)) < 2:
|
||||
continue
|
||||
|
||||
try:
|
||||
fold_model, fold_fit = fit_with_optional_calibration(
|
||||
model_family=model_family,
|
||||
model_params=model_params,
|
||||
random_state=random_state,
|
||||
x_train=fold_train[feature_cols],
|
||||
y_train=y_fold_train,
|
||||
calibration_method=calibration_method,
|
||||
fallback_to_none=True,
|
||||
)
|
||||
fold_test_prob = fold_model.predict_proba(fold_test[feature_cols])[:, 1]
|
||||
|
||||
y_true_chunks.append(y_fold_test)
|
||||
y_prob_chunks.append(fold_test_prob)
|
||||
fold_details.append(
|
||||
{
|
||||
"fold_index": idx + 1,
|
||||
"train_rows": len(fold_train),
|
||||
"test_rows": len(fold_test),
|
||||
"train_start": fold_train.index.min(),
|
||||
"train_end": fold_train.index.max(),
|
||||
"test_start": fold_test.index.min(),
|
||||
"test_end": fold_test.index.max(),
|
||||
"fit": fold_fit,
|
||||
"test_positive_rate": float(np.mean(y_fold_test)),
|
||||
}
|
||||
)
|
||||
except Exception as exc:
|
||||
fold_details.append(
|
||||
{
|
||||
"fold_index": idx + 1,
|
||||
"error": str(exc),
|
||||
}
|
||||
)
|
||||
|
||||
if not y_true_chunks:
|
||||
return {
|
||||
"enabled": True,
|
||||
"status": "failed",
|
||||
"reason": "no successful folds produced out-of-fold predictions",
|
||||
"requested_folds": folds,
|
||||
"folds": fold_details,
|
||||
}
|
||||
|
||||
y_oof_true = np.concatenate(y_true_chunks)
|
||||
y_oof_prob = np.concatenate(y_prob_chunks)
|
||||
tuned_threshold, tuned_info = select_threshold(
|
||||
y_true=y_oof_true,
|
||||
y_prob=y_oof_prob,
|
||||
min_precision=min_precision,
|
||||
)
|
||||
tuned_info = dict(tuned_info)
|
||||
tuned_info["selection_rule"] = f"walk_forward_{tuned_info['selection_rule']}"
|
||||
|
||||
return {
|
||||
"enabled": True,
|
||||
"status": "ok",
|
||||
"requested_folds": folds,
|
||||
"successful_folds": int(len(y_true_chunks)),
|
||||
"rows_used": int(len(y_oof_true)),
|
||||
"threshold": float(tuned_threshold),
|
||||
"threshold_selection": tuned_info,
|
||||
"oof_metrics_at_threshold": evaluate_probs(
|
||||
y_true=y_oof_true,
|
||||
y_prob=y_oof_prob,
|
||||
threshold=tuned_threshold,
|
||||
),
|
||||
"folds": fold_details,
|
||||
}
|
||||
|
||||
|
||||
def walk_forward_backtest(
|
||||
model_df,
|
||||
feature_cols: list[str],
|
||||
@@ -935,7 +1063,32 @@ def main() -> int:
|
||||
selected_model_params = best_candidate["model_params"]
|
||||
selected_calibration_method = str(best_candidate["calibration_method"])
|
||||
chosen_threshold = float(best_candidate["threshold"])
|
||||
threshold_info = best_candidate["threshold_info"]
|
||||
threshold_info = dict(best_candidate["threshold_info"])
|
||||
threshold_policy_applied = "fixed" if args.threshold is not None else "validation"
|
||||
threshold_tuning_walk_forward = {
|
||||
"enabled": args.threshold_policy == "walk_forward",
|
||||
"status": "not_run",
|
||||
}
|
||||
if args.threshold is None and args.threshold_policy == "walk_forward":
|
||||
threshold_tuning_walk_forward = tune_threshold_walk_forward(
|
||||
model_df=model_df.iloc[: len(train_df) + len(val_df)],
|
||||
feature_cols=feature_cols,
|
||||
model_family=selected_model_family,
|
||||
model_params=selected_model_params,
|
||||
calibration_method=selected_calibration_method,
|
||||
random_state=args.random_state,
|
||||
min_precision=args.min_precision,
|
||||
folds=args.walk_forward_folds,
|
||||
)
|
||||
if threshold_tuning_walk_forward.get("status") == "ok":
|
||||
chosen_threshold = float(threshold_tuning_walk_forward["threshold"])
|
||||
threshold_info = dict(threshold_tuning_walk_forward["threshold_selection"])
|
||||
threshold_policy_applied = "walk_forward"
|
||||
else:
|
||||
threshold_info["warning"] = (
|
||||
"walk-forward threshold tuning unavailable; fell back to validation-selected threshold"
|
||||
)
|
||||
threshold_policy_applied = "validation_fallback"
|
||||
val_metrics = best_candidate["validation_metrics"]
|
||||
|
||||
train_val_df = model_df.iloc[: len(train_df) + len(val_df)]
|
||||
@@ -971,7 +1124,7 @@ def main() -> int:
|
||||
calibration_method=selected_calibration_method,
|
||||
random_state=args.random_state,
|
||||
min_precision=args.min_precision,
|
||||
fixed_threshold=args.threshold,
|
||||
fixed_threshold=chosen_threshold if threshold_policy_applied == "walk_forward" else args.threshold,
|
||||
folds=args.walk_forward_folds,
|
||||
)
|
||||
|
||||
@@ -989,6 +1142,8 @@ def main() -> int:
|
||||
"calibration_method_requested": calibration_methods,
|
||||
"calibration_method": selected_calibration_method,
|
||||
"calibration_fit": final_fit_info,
|
||||
"threshold_policy_requested": args.threshold_policy,
|
||||
"threshold_policy_applied": threshold_policy_applied,
|
||||
"data_window": {
|
||||
"requested_start": start or None,
|
||||
"requested_end": end or None,
|
||||
@@ -1043,6 +1198,7 @@ def main() -> int:
|
||||
"test_calibration_quality": test_calibration,
|
||||
"naive_baselines_test": naive_baselines_test,
|
||||
"sliced_performance_test": sliced_performance,
|
||||
"threshold_tuning_walk_forward": threshold_tuning_walk_forward,
|
||||
"walk_forward_backtest": walk_forward,
|
||||
}
|
||||
report = to_builtin(report)
|
||||
@@ -1053,6 +1209,10 @@ def main() -> int:
|
||||
print(f" model_family: {selected_model_family} (requested={args.model_family})")
|
||||
print(f" model_params: {selected_model_params}")
|
||||
print(f" calibration_method: {report['calibration_method']}")
|
||||
print(
|
||||
f" threshold_policy: requested={report['threshold_policy_requested']} "
|
||||
f"applied={report['threshold_policy_applied']}"
|
||||
)
|
||||
print(f" feature_set: {args.feature_set} ({len(feature_cols)} features)")
|
||||
print(
|
||||
" rows: "
|
||||
|
||||
2
todo.md
2
todo.md
@@ -53,5 +53,5 @@ Priority key: `P0` = critical/blocking, `P1` = important, `P2` = later optimizat
|
||||
## 8) Immediate Next Steps (This Week)
|
||||
- [x] [P0] Run first full data audit and label-quality checks. (completed on runtime machine)
|
||||
- [x] [P0] Train baseline model on full available history and capture metrics. (completed on runtime machine)
|
||||
- [ ] [P1] Add one expanded feature set and rerun evaluation. (feature-set plumbing implemented; rerun pending on runtime machine)
|
||||
- [x] [P1] Add one expanded feature set and rerun evaluation. (completed on runtime machine 2026-03-12 with `feature_set=extended`, `model_version=rain-auto-v1-extended-202603120932`)
|
||||
- [x] [P0] Decide v1 threshold and define deployment interface.
|
||||
|
||||
Reference in New Issue
Block a user