package db import ( "context" "database/sql" "errors" "fmt" "time" "github.com/jackc/pgx/v5" ) type ObservationPoint struct { TS time.Time `json:"ts"` TempC *float64 `json:"temp_c,omitempty"` RH *float64 `json:"rh,omitempty"` PressureHPA *float64 `json:"pressure_hpa,omitempty"` // PressureTrend1h is the change in pressure over the last hour (hPa). PressureTrend1h *float64 `json:"pressure_trend_1h,omitempty"` WindMS *float64 `json:"wind_m_s,omitempty"` WindGustMS *float64 `json:"wind_gust_m_s,omitempty"` WindDirDeg *float64 `json:"wind_dir_deg,omitempty"` UVI *float64 `json:"uvi,omitempty"` LightLux *float64 `json:"light_lux,omitempty"` BatteryMV *float64 `json:"battery_mv,omitempty"` SupercapV *float64 `json:"supercap_v,omitempty"` RainMM *float64 `json:"rain_mm,omitempty"` RainStart *int64 `json:"rain_start,omitempty"` } type ForecastPoint struct { TS time.Time `json:"ts"` TempC *float64 `json:"temp_c,omitempty"` RH *float64 `json:"rh,omitempty"` PressureMSLH *float64 `json:"pressure_msl_hpa,omitempty"` WindMS *float64 `json:"wind_m_s,omitempty"` WindGustMS *float64 `json:"wind_gust_m_s,omitempty"` WindDirDeg *float64 `json:"wind_dir_deg,omitempty"` PrecipMM *float64 `json:"precip_mm,omitempty"` PrecipProb *float64 `json:"precip_prob,omitempty"` CloudCover *float64 `json:"cloud_cover,omitempty"` } type ForecastSeries struct { RetrievedAt time.Time `json:"retrieved_at"` Points []ForecastPoint `json:"points"` } func (d *DB) ObservationSeries(ctx context.Context, site, bucket string, start, end time.Time) ([]ObservationPoint, error) { if end.Before(start) || end.Equal(start) { return nil, errors.New("invalid time range") } interval := "5 minutes" switch bucket { case "1m": interval = "1 minute" case "5m": interval = "5 minutes" default: return nil, fmt.Errorf("unsupported bucket: %s", bucket) } query := fmt.Sprintf(` WITH ws AS ( SELECT time_bucket(INTERVAL '%s', ts) AS bucket, avg(temperature_c) AS temp_c_avg, avg(humidity) AS rh_avg, avg(wind_avg_m_s) AS wind_avg_ms_avg, max(wind_max_m_s) AS wind_gust_ms_max, avg(wind_dir_deg) AS wind_dir_deg_avg, max(uvi) AS uvi_max, max(light_lux) AS light_lux_max, avg(battery_mv) AS battery_mv_avg, avg(supercap_v) AS supercap_v_avg, avg(rain_mm) AS rain_mm_avg, max(rain_start) AS rain_start_max FROM observations_ws90 WHERE site = $1 AND ts >= $2 AND ts <= $3 GROUP BY bucket ), baro AS ( SELECT time_bucket(INTERVAL '%s', ts) AS bucket, avg(pressure_hpa) AS pressure_hpa_avg FROM observations_baro WHERE site = $1 AND ts >= $2 AND ts <= $3 GROUP BY bucket ) SELECT COALESCE(ws.bucket, baro.bucket) AS bucket, ws.temp_c_avg, ws.rh_avg, baro.pressure_hpa_avg, ws.wind_avg_ms_avg, ws.wind_gust_ms_max, ws.wind_dir_deg_avg, ws.uvi_max, ws.light_lux_max, ws.battery_mv_avg, ws.supercap_v_avg, ws.rain_mm_avg, ws.rain_start_max FROM ws FULL OUTER JOIN baro ON ws.bucket = baro.bucket ORDER BY bucket ASC `, interval, interval) rows, err := d.Pool.Query(ctx, query, site, start, end) if err != nil { return nil, err } defer rows.Close() points := make([]ObservationPoint, 0, 512) for rows.Next() { var ( ts time.Time temp, rh, pressure, wind, gust sql.NullFloat64 dir, uvi, light, battery, supercap sql.NullFloat64 rainMM sql.NullFloat64 rainStart sql.NullInt64 ) if err := rows.Scan(&ts, &temp, &rh, &pressure, &wind, &gust, &dir, &uvi, &light, &battery, &supercap, &rainMM, &rainStart); err != nil { return nil, err } points = append(points, ObservationPoint{ TS: ts, TempC: nullFloatPtr(temp), RH: nullFloatPtr(rh), PressureHPA: nullFloatPtr(pressure), WindMS: nullFloatPtr(wind), WindGustMS: nullFloatPtr(gust), WindDirDeg: nullFloatPtr(dir), UVI: nullFloatPtr(uvi), LightLux: nullFloatPtr(light), BatteryMV: nullFloatPtr(battery), SupercapV: nullFloatPtr(supercap), RainMM: nullFloatPtr(rainMM), RainStart: nullIntPtr(rainStart), }) } if rows.Err() != nil { return nil, rows.Err() } indexByTime := make(map[time.Time]int, len(points)) for i := range points { indexByTime[points[i].TS] = i } for i := range points { if points[i].PressureHPA == nil { continue } target := points[i].TS.Add(-1 * time.Hour) j, ok := indexByTime[target] if !ok || points[j].PressureHPA == nil { continue } trend := *points[i].PressureHPA - *points[j].PressureHPA points[i].PressureTrend1h = &trend } return points, nil } func (d *DB) LatestObservation(ctx context.Context, site string) (*ObservationPoint, error) { query := ` SELECT ts, temperature_c, humidity, (SELECT pressure_hpa FROM observations_baro WHERE site = $1 ORDER BY ts DESC LIMIT 1) AS pressure_hpa, wind_avg_m_s, wind_max_m_s, wind_dir_deg, uvi, light_lux, battery_mv, supercap_v, rain_mm, rain_start FROM observations_ws90 WHERE site = $1 ORDER BY ts DESC LIMIT 1 ` var ( ts time.Time temp, rh, pressure, wind, gust sql.NullFloat64 dir, uvi, light, battery, supercap sql.NullFloat64 rainMM sql.NullFloat64 rainStart sql.NullInt64 ) err := d.Pool.QueryRow(ctx, query, site).Scan(&ts, &temp, &rh, &pressure, &wind, &gust, &dir, &uvi, &light, &battery, &supercap, &rainMM, &rainStart) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, nil } return nil, err } return &ObservationPoint{ TS: ts, TempC: nullFloatPtr(temp), RH: nullFloatPtr(rh), PressureHPA: nullFloatPtr(pressure), WindMS: nullFloatPtr(wind), WindGustMS: nullFloatPtr(gust), WindDirDeg: nullFloatPtr(dir), UVI: nullFloatPtr(uvi), LightLux: nullFloatPtr(light), BatteryMV: nullFloatPtr(battery), SupercapV: nullFloatPtr(supercap), RainMM: nullFloatPtr(rainMM), RainStart: nullIntPtr(rainStart), }, nil } func (d *DB) ForecastSeriesLatest(ctx context.Context, site, model string) (ForecastSeries, error) { var retrieved sql.NullTime err := d.Pool.QueryRow(ctx, ` SELECT max(retrieved_at) FROM forecast_openmeteo_hourly WHERE site = $1 AND model = $2 `, site, model).Scan(&retrieved) if err != nil { return ForecastSeries{}, err } if !retrieved.Valid { return ForecastSeries{}, nil } rows, err := d.Pool.Query(ctx, ` SELECT ts, temp_c, rh, pressure_msl_hpa, wind_m_s, wind_gust_m_s, wind_dir_deg, precip_mm, precip_prob, cloud_cover FROM forecast_openmeteo_hourly WHERE site = $1 AND model = $2 AND retrieved_at = $3 ORDER BY ts ASC `, site, model, retrieved.Time) if err != nil { return ForecastSeries{}, err } defer rows.Close() points := make([]ForecastPoint, 0, 256) for rows.Next() { var ( ts time.Time temp, rh, msl, wind, gust sql.NullFloat64 dir, precip, prob, cloud sql.NullFloat64 ) if err := rows.Scan(&ts, &temp, &rh, &msl, &wind, &gust, &dir, &precip, &prob, &cloud); err != nil { return ForecastSeries{}, err } points = append(points, ForecastPoint{ TS: ts, TempC: nullFloatPtr(temp), RH: nullFloatPtr(rh), PressureMSLH: nullFloatPtr(msl), WindMS: nullFloatPtr(wind), WindGustMS: nullFloatPtr(gust), WindDirDeg: nullFloatPtr(dir), PrecipMM: nullFloatPtr(precip), PrecipProb: nullFloatPtr(prob), CloudCover: nullFloatPtr(cloud), }) } if rows.Err() != nil { return ForecastSeries{}, rows.Err() } return ForecastSeries{ RetrievedAt: retrieved.Time, Points: points, }, nil } func (d *DB) ForecastSeriesRange(ctx context.Context, site, model string, start, end time.Time) (ForecastSeries, error) { rows, err := d.Pool.Query(ctx, ` SELECT DISTINCT ON (ts) ts, retrieved_at, temp_c, rh, pressure_msl_hpa, wind_m_s, wind_gust_m_s, wind_dir_deg, precip_mm, precip_prob, cloud_cover FROM forecast_openmeteo_hourly WHERE site = $1 AND model = $2 AND ts >= $3 AND ts <= $4 ORDER BY ts ASC, retrieved_at DESC `, site, model, start, end) if err != nil { return ForecastSeries{}, err } defer rows.Close() points := make([]ForecastPoint, 0, 256) var maxRetrieved time.Time for rows.Next() { var ( ts time.Time retrieved time.Time temp, rh, msl, wind, gust sql.NullFloat64 dir, precip, prob, cloud sql.NullFloat64 ) if err := rows.Scan(&ts, &retrieved, &temp, &rh, &msl, &wind, &gust, &dir, &precip, &prob, &cloud); err != nil { return ForecastSeries{}, err } if retrieved.After(maxRetrieved) { maxRetrieved = retrieved } points = append(points, ForecastPoint{ TS: ts, TempC: nullFloatPtr(temp), RH: nullFloatPtr(rh), PressureMSLH: nullFloatPtr(msl), WindMS: nullFloatPtr(wind), WindGustMS: nullFloatPtr(gust), WindDirDeg: nullFloatPtr(dir), PrecipMM: nullFloatPtr(precip), PrecipProb: nullFloatPtr(prob), CloudCover: nullFloatPtr(cloud), }) } if rows.Err() != nil { return ForecastSeries{}, rows.Err() } return ForecastSeries{ RetrievedAt: maxRetrieved, Points: points, }, nil } func nullFloatPtr(v sql.NullFloat64) *float64 { if !v.Valid { return nil } val := v.Float64 return &val } func nullIntPtr(v sql.NullInt64) *int64 { if !v.Valid { return nil } val := v.Int64 return &val }