Implemented the next 4h-plan phase: dual-run support + explicit cutover gate.

This commit is contained in:
2026-04-06 19:09:20 +10:00
parent 1ef300d25e
commit 1e750e35d1
7 changed files with 238 additions and 20 deletions
+134
View File
@@ -0,0 +1,134 @@
#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
from pathlib import Path
from typing import Any
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Evaluate go/no-go cutover gate for rain model reports.")
parser.add_argument("--baseline", required=True, help="Baseline report JSON path (for example 1h production).")
parser.add_argument("--candidate", required=True, help="Candidate report JSON path (for example 4h shadow).")
parser.add_argument(
"--min-candidate-precision",
type=float,
default=0.60,
help="Minimum allowed candidate test precision.",
)
parser.add_argument(
"--max-precision-drop",
type=float,
default=0.05,
help="Maximum allowed drop: candidate_precision >= baseline_precision - value.",
)
parser.add_argument(
"--max-pr-auc-drop",
type=float,
default=0.05,
help="Maximum allowed drop: candidate_pr_auc >= baseline_pr_auc - value.",
)
parser.add_argument(
"--max-roc-auc-drop",
type=float,
default=0.05,
help="Maximum allowed drop: candidate_roc_auc >= baseline_roc_auc - value.",
)
parser.add_argument(
"--max-brier-increase",
type=float,
default=0.03,
help="Maximum allowed increase: candidate_brier <= baseline_brier + value.",
)
return parser.parse_args()
def load_report(path: str) -> dict[str, Any]:
p = Path(path)
if not p.exists():
raise FileNotFoundError(path)
with p.open("r", encoding="utf-8") as f:
return json.load(f)
def metric(report: dict[str, Any], key: str) -> float:
value = report.get("test_metrics", {}).get(key)
if value is None:
raise ValueError(f"missing test metric: {key}")
return float(value)
def main() -> int:
args = parse_args()
baseline = load_report(args.baseline)
candidate = load_report(args.candidate)
b_precision = metric(baseline, "precision")
c_precision = metric(candidate, "precision")
b_pr_auc = metric(baseline, "pr_auc")
c_pr_auc = metric(candidate, "pr_auc")
b_roc_auc = metric(baseline, "roc_auc")
c_roc_auc = metric(candidate, "roc_auc")
b_brier = metric(baseline, "brier")
c_brier = metric(candidate, "brier")
checks: list[tuple[str, bool, str]] = []
checks.append(
(
"candidate_precision_floor",
c_precision >= args.min_candidate_precision,
f"{c_precision:.4f} >= {args.min_candidate_precision:.4f}",
)
)
checks.append(
(
"precision_drop",
c_precision >= (b_precision - args.max_precision_drop),
f"{c_precision:.4f} >= {b_precision - args.max_precision_drop:.4f}",
)
)
checks.append(
(
"pr_auc_drop",
c_pr_auc >= (b_pr_auc - args.max_pr_auc_drop),
f"{c_pr_auc:.4f} >= {b_pr_auc - args.max_pr_auc_drop:.4f}",
)
)
checks.append(
(
"roc_auc_drop",
c_roc_auc >= (b_roc_auc - args.max_roc_auc_drop),
f"{c_roc_auc:.4f} >= {b_roc_auc - args.max_roc_auc_drop:.4f}",
)
)
checks.append(
(
"brier_increase",
c_brier <= (b_brier + args.max_brier_increase),
f"{c_brier:.4f} <= {b_brier + args.max_brier_increase:.4f}",
)
)
print("Rain cutover gate:")
print(f" baseline: {args.baseline}")
print(f" candidate: {args.candidate}")
print(f" baseline_version={baseline.get('model_version')} candidate_version={candidate.get('model_version')}")
failures: list[str] = []
for name, ok, detail in checks:
status = "ok" if ok else "fail"
print(f" {name}: {status} ({detail})")
if not ok:
failures.append(name)
if failures:
print(f"cutover_decision: FAIL ({', '.join(failures)})")
return 1
print("cutover_decision: PASS")
return 0
if __name__ == "__main__":
raise SystemExit(main())