Files
go-weatherstation/scripts/check_rain_pipeline_health.py
2026-03-12 19:55:51 +11:00

215 lines
7.3 KiB
Python

#!/usr/bin/env python3
from __future__ import annotations
import argparse
import json
import os
from datetime import datetime, timedelta, timezone
from typing import Any
import psycopg2
def parse_duration(value: str) -> timedelta:
raw = value.strip().lower()
if not raw:
raise ValueError("duration is empty")
units = {"s": 1, "m": 60, "h": 3600, "d": 86400}
suffix = raw[-1]
if suffix not in units:
raise ValueError(f"invalid duration suffix: {value} (expected one of s,m,h,d)")
amount = float(raw[:-1])
return timedelta(seconds=amount * units[suffix])
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="Check freshness/health of rain-model data and predictions.")
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
parser.add_argument("--site", required=True, help="Site name.")
parser.add_argument("--model-name", default="rain_next_1h", help="Prediction model_name to check.")
parser.add_argument("--max-ws90-age", default="20m", help="Max allowed age for ws90 latest row.")
parser.add_argument("--max-baro-age", default="30m", help="Max allowed age for barometer latest row.")
parser.add_argument("--max-forecast-age", default="3h", help="Max allowed age for forecast latest row.")
parser.add_argument("--max-prediction-age", default="30m", help="Max allowed age for latest prediction write.")
parser.add_argument(
"--max-pending-eval-age",
default="3h",
help="Pending evaluations older than this count toward alert.",
)
parser.add_argument(
"--max-pending-eval-rows",
type=int,
default=200,
help="Alert if pending evaluation rows older than max-pending-eval-age exceed this count.",
)
parser.add_argument(
"--json-out",
help="Optional path to save JSON health report.",
)
return parser.parse_args()
def fetch_latest_ts(conn, sql: str, params: tuple[Any, ...]):
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
return row[0] if row else None
def fetch_count(conn, sql: str, params: tuple[Any, ...]) -> int:
with conn.cursor() as cur:
cur.execute(sql, params)
row = cur.fetchone()
if not row:
return 0
return int(row[0] or 0)
def age_seconds(now: datetime, ts: datetime | None) -> float | None:
if ts is None:
return None
return float((now - ts.astimezone(timezone.utc)).total_seconds())
def status_for_age(age_s: float | None, max_age: timedelta) -> tuple[str, str]:
if age_s is None:
return "error", "missing"
if age_s <= max_age.total_seconds():
return "ok", "fresh"
return "error", "stale"
def main() -> int:
args = parse_args()
if not args.db_url:
raise SystemExit("missing --db-url or DATABASE_URL")
now = datetime.now(timezone.utc)
max_ws90_age = parse_duration(args.max_ws90_age)
max_baro_age = parse_duration(args.max_baro_age)
max_forecast_age = parse_duration(args.max_forecast_age)
max_prediction_age = parse_duration(args.max_prediction_age)
max_pending_eval_age = parse_duration(args.max_pending_eval_age)
with psycopg2.connect(args.db_url) as conn:
ws90_latest = fetch_latest_ts(
conn,
"SELECT max(ts) FROM observations_ws90 WHERE site = %s",
(args.site,),
)
baro_latest = fetch_latest_ts(
conn,
"SELECT max(ts) FROM observations_baro WHERE site = %s",
(args.site,),
)
forecast_latest = fetch_latest_ts(
conn,
"SELECT max(ts) FROM forecast_openmeteo_hourly WHERE site = %s",
(args.site,),
)
prediction_latest = fetch_latest_ts(
conn,
"""
SELECT max(generated_at)
FROM predictions_rain_1h
WHERE site = %s
AND model_name = %s
""",
(args.site, args.model_name),
)
pending_eval_rows = fetch_count(
conn,
"""
SELECT count(*)
FROM predictions_rain_1h
WHERE site = %s
AND model_name = %s
AND evaluated_at IS NULL
AND ts < (now() - %s::interval)
""",
(args.site, args.model_name, f"{int(max_pending_eval_age.total_seconds())} seconds"),
)
ws90_age_s = age_seconds(now, ws90_latest)
baro_age_s = age_seconds(now, baro_latest)
forecast_age_s = age_seconds(now, forecast_latest)
prediction_age_s = age_seconds(now, prediction_latest)
ws90_status, ws90_reason = status_for_age(ws90_age_s, max_ws90_age)
baro_status, baro_reason = status_for_age(baro_age_s, max_baro_age)
forecast_status, forecast_reason = status_for_age(forecast_age_s, max_forecast_age)
prediction_status, prediction_reason = status_for_age(prediction_age_s, max_prediction_age)
pending_status = "ok" if pending_eval_rows <= args.max_pending_eval_rows else "error"
checks = {
"ws90_freshness": {
"status": ws90_status,
"reason": ws90_reason,
"latest_ts": ws90_latest,
"age_seconds": ws90_age_s,
"max_age_seconds": max_ws90_age.total_seconds(),
},
"baro_freshness": {
"status": baro_status,
"reason": baro_reason,
"latest_ts": baro_latest,
"age_seconds": baro_age_s,
"max_age_seconds": max_baro_age.total_seconds(),
},
"forecast_freshness": {
"status": forecast_status,
"reason": forecast_reason,
"latest_ts": forecast_latest,
"age_seconds": forecast_age_s,
"max_age_seconds": max_forecast_age.total_seconds(),
},
"prediction_freshness": {
"status": prediction_status,
"reason": prediction_reason,
"latest_ts": prediction_latest,
"age_seconds": prediction_age_s,
"max_age_seconds": max_prediction_age.total_seconds(),
},
"pending_evaluations": {
"status": pending_status,
"rows": pending_eval_rows,
"max_rows": args.max_pending_eval_rows,
"pending_older_than_seconds": max_pending_eval_age.total_seconds(),
},
}
overall_status = "ok"
failing = [name for name, item in checks.items() if item["status"] != "ok"]
if failing:
overall_status = "error"
report = {
"generated_at": now.isoformat(),
"site": args.site,
"model_name": args.model_name,
"status": overall_status,
"failing_checks": failing,
"checks": checks,
}
print(f"rain pipeline health: {overall_status}")
for name, item in checks.items():
print(f" {name}: {item['status']}")
if failing:
print(f" failing: {', '.join(failing)}")
if args.json_out:
out_dir = os.path.dirname(args.json_out)
if out_dir:
os.makedirs(out_dir, exist_ok=True)
with open(args.json_out, "w", encoding="utf-8") as f:
json.dump(report, f, indent=2, default=str)
print(f"saved health report to {args.json_out}")
return 0 if overall_status == "ok" else 1
if __name__ == "__main__":
raise SystemExit(main())