215 lines
7.3 KiB
Python
215 lines
7.3 KiB
Python
#!/usr/bin/env python3
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
from datetime import datetime, timedelta, timezone
|
|
from typing import Any
|
|
|
|
import psycopg2
|
|
|
|
|
|
def parse_duration(value: str) -> timedelta:
|
|
raw = value.strip().lower()
|
|
if not raw:
|
|
raise ValueError("duration is empty")
|
|
|
|
units = {"s": 1, "m": 60, "h": 3600, "d": 86400}
|
|
suffix = raw[-1]
|
|
if suffix not in units:
|
|
raise ValueError(f"invalid duration suffix: {value} (expected one of s,m,h,d)")
|
|
amount = float(raw[:-1])
|
|
return timedelta(seconds=amount * units[suffix])
|
|
|
|
|
|
def parse_args() -> argparse.Namespace:
|
|
parser = argparse.ArgumentParser(description="Check freshness/health of rain-model data and predictions.")
|
|
parser.add_argument("--db-url", default=os.getenv("DATABASE_URL"), help="Postgres connection string.")
|
|
parser.add_argument("--site", required=True, help="Site name.")
|
|
parser.add_argument("--model-name", default="rain_next_1h", help="Prediction model_name to check.")
|
|
parser.add_argument("--max-ws90-age", default="20m", help="Max allowed age for ws90 latest row.")
|
|
parser.add_argument("--max-baro-age", default="30m", help="Max allowed age for barometer latest row.")
|
|
parser.add_argument("--max-forecast-age", default="3h", help="Max allowed age for forecast latest row.")
|
|
parser.add_argument("--max-prediction-age", default="30m", help="Max allowed age for latest prediction write.")
|
|
parser.add_argument(
|
|
"--max-pending-eval-age",
|
|
default="3h",
|
|
help="Pending evaluations older than this count toward alert.",
|
|
)
|
|
parser.add_argument(
|
|
"--max-pending-eval-rows",
|
|
type=int,
|
|
default=200,
|
|
help="Alert if pending evaluation rows older than max-pending-eval-age exceed this count.",
|
|
)
|
|
parser.add_argument(
|
|
"--json-out",
|
|
help="Optional path to save JSON health report.",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def fetch_latest_ts(conn, sql: str, params: tuple[Any, ...]):
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, params)
|
|
row = cur.fetchone()
|
|
return row[0] if row else None
|
|
|
|
|
|
def fetch_count(conn, sql: str, params: tuple[Any, ...]) -> int:
|
|
with conn.cursor() as cur:
|
|
cur.execute(sql, params)
|
|
row = cur.fetchone()
|
|
if not row:
|
|
return 0
|
|
return int(row[0] or 0)
|
|
|
|
|
|
def age_seconds(now: datetime, ts: datetime | None) -> float | None:
|
|
if ts is None:
|
|
return None
|
|
return float((now - ts.astimezone(timezone.utc)).total_seconds())
|
|
|
|
|
|
def status_for_age(age_s: float | None, max_age: timedelta) -> tuple[str, str]:
|
|
if age_s is None:
|
|
return "error", "missing"
|
|
if age_s <= max_age.total_seconds():
|
|
return "ok", "fresh"
|
|
return "error", "stale"
|
|
|
|
|
|
def main() -> int:
|
|
args = parse_args()
|
|
if not args.db_url:
|
|
raise SystemExit("missing --db-url or DATABASE_URL")
|
|
|
|
now = datetime.now(timezone.utc)
|
|
max_ws90_age = parse_duration(args.max_ws90_age)
|
|
max_baro_age = parse_duration(args.max_baro_age)
|
|
max_forecast_age = parse_duration(args.max_forecast_age)
|
|
max_prediction_age = parse_duration(args.max_prediction_age)
|
|
max_pending_eval_age = parse_duration(args.max_pending_eval_age)
|
|
|
|
with psycopg2.connect(args.db_url) as conn:
|
|
ws90_latest = fetch_latest_ts(
|
|
conn,
|
|
"SELECT max(ts) FROM observations_ws90 WHERE site = %s",
|
|
(args.site,),
|
|
)
|
|
baro_latest = fetch_latest_ts(
|
|
conn,
|
|
"SELECT max(ts) FROM observations_baro WHERE site = %s",
|
|
(args.site,),
|
|
)
|
|
forecast_latest = fetch_latest_ts(
|
|
conn,
|
|
"SELECT max(ts) FROM forecast_openmeteo_hourly WHERE site = %s",
|
|
(args.site,),
|
|
)
|
|
prediction_latest = fetch_latest_ts(
|
|
conn,
|
|
"""
|
|
SELECT max(generated_at)
|
|
FROM predictions_rain_1h
|
|
WHERE site = %s
|
|
AND model_name = %s
|
|
""",
|
|
(args.site, args.model_name),
|
|
)
|
|
pending_eval_rows = fetch_count(
|
|
conn,
|
|
"""
|
|
SELECT count(*)
|
|
FROM predictions_rain_1h
|
|
WHERE site = %s
|
|
AND model_name = %s
|
|
AND evaluated_at IS NULL
|
|
AND ts < (now() - %s::interval)
|
|
""",
|
|
(args.site, args.model_name, f"{int(max_pending_eval_age.total_seconds())} seconds"),
|
|
)
|
|
|
|
ws90_age_s = age_seconds(now, ws90_latest)
|
|
baro_age_s = age_seconds(now, baro_latest)
|
|
forecast_age_s = age_seconds(now, forecast_latest)
|
|
prediction_age_s = age_seconds(now, prediction_latest)
|
|
|
|
ws90_status, ws90_reason = status_for_age(ws90_age_s, max_ws90_age)
|
|
baro_status, baro_reason = status_for_age(baro_age_s, max_baro_age)
|
|
forecast_status, forecast_reason = status_for_age(forecast_age_s, max_forecast_age)
|
|
prediction_status, prediction_reason = status_for_age(prediction_age_s, max_prediction_age)
|
|
pending_status = "ok" if pending_eval_rows <= args.max_pending_eval_rows else "error"
|
|
|
|
checks = {
|
|
"ws90_freshness": {
|
|
"status": ws90_status,
|
|
"reason": ws90_reason,
|
|
"latest_ts": ws90_latest,
|
|
"age_seconds": ws90_age_s,
|
|
"max_age_seconds": max_ws90_age.total_seconds(),
|
|
},
|
|
"baro_freshness": {
|
|
"status": baro_status,
|
|
"reason": baro_reason,
|
|
"latest_ts": baro_latest,
|
|
"age_seconds": baro_age_s,
|
|
"max_age_seconds": max_baro_age.total_seconds(),
|
|
},
|
|
"forecast_freshness": {
|
|
"status": forecast_status,
|
|
"reason": forecast_reason,
|
|
"latest_ts": forecast_latest,
|
|
"age_seconds": forecast_age_s,
|
|
"max_age_seconds": max_forecast_age.total_seconds(),
|
|
},
|
|
"prediction_freshness": {
|
|
"status": prediction_status,
|
|
"reason": prediction_reason,
|
|
"latest_ts": prediction_latest,
|
|
"age_seconds": prediction_age_s,
|
|
"max_age_seconds": max_prediction_age.total_seconds(),
|
|
},
|
|
"pending_evaluations": {
|
|
"status": pending_status,
|
|
"rows": pending_eval_rows,
|
|
"max_rows": args.max_pending_eval_rows,
|
|
"pending_older_than_seconds": max_pending_eval_age.total_seconds(),
|
|
},
|
|
}
|
|
|
|
overall_status = "ok"
|
|
failing = [name for name, item in checks.items() if item["status"] != "ok"]
|
|
if failing:
|
|
overall_status = "error"
|
|
|
|
report = {
|
|
"generated_at": now.isoformat(),
|
|
"site": args.site,
|
|
"model_name": args.model_name,
|
|
"status": overall_status,
|
|
"failing_checks": failing,
|
|
"checks": checks,
|
|
}
|
|
|
|
print(f"rain pipeline health: {overall_status}")
|
|
for name, item in checks.items():
|
|
print(f" {name}: {item['status']}")
|
|
if failing:
|
|
print(f" failing: {', '.join(failing)}")
|
|
|
|
if args.json_out:
|
|
out_dir = os.path.dirname(args.json_out)
|
|
if out_dir:
|
|
os.makedirs(out_dir, exist_ok=True)
|
|
with open(args.json_out, "w", encoding="utf-8") as f:
|
|
json.dump(report, f, indent=2, default=str)
|
|
print(f"saved health report to {args.json_out}")
|
|
|
|
return 0 if overall_status == "ok" else 1
|
|
|
|
|
|
if __name__ == "__main__":
|
|
raise SystemExit(main())
|