package db import ( "context" "database/sql" "errors" "fmt" "time" "github.com/jackc/pgx/v5" ) type ObservationPoint struct { TS time.Time `json:"ts"` TempC *float64 `json:"temp_c,omitempty"` RH *float64 `json:"rh,omitempty"` WindMS *float64 `json:"wind_m_s,omitempty"` WindGustMS *float64 `json:"wind_gust_m_s,omitempty"` WindDirDeg *float64 `json:"wind_dir_deg,omitempty"` UVI *float64 `json:"uvi,omitempty"` LightLux *float64 `json:"light_lux,omitempty"` } type ForecastPoint struct { TS time.Time `json:"ts"` TempC *float64 `json:"temp_c,omitempty"` RH *float64 `json:"rh,omitempty"` PressureMSLH *float64 `json:"pressure_msl_hpa,omitempty"` WindMS *float64 `json:"wind_m_s,omitempty"` WindGustMS *float64 `json:"wind_gust_m_s,omitempty"` WindDirDeg *float64 `json:"wind_dir_deg,omitempty"` PrecipMM *float64 `json:"precip_mm,omitempty"` CloudCover *float64 `json:"cloud_cover,omitempty"` } type ForecastSeries struct { RetrievedAt time.Time `json:"retrieved_at"` Points []ForecastPoint `json:"points"` } func (d *DB) ObservationSeries(ctx context.Context, site, bucket string, rangeSeconds int64) ([]ObservationPoint, error) { if rangeSeconds <= 0 { return nil, errors.New("range must be > 0") } table := "cagg_ws90_5m" switch bucket { case "1m": table = "cagg_ws90_1m" case "5m": table = "cagg_ws90_5m" default: return nil, fmt.Errorf("unsupported bucket: %s", bucket) } query := fmt.Sprintf(` SELECT bucket, temp_c_avg, rh_avg, wind_avg_ms_avg, wind_gust_ms_max, wind_dir_deg_avg, uvi_max, light_lux_max FROM %s WHERE site = $1 AND bucket >= now() - make_interval(secs => $2) ORDER BY bucket ASC `, table) rows, err := d.Pool.Query(ctx, query, site, rangeSeconds) if err != nil { return nil, err } defer rows.Close() points := make([]ObservationPoint, 0, 512) for rows.Next() { var ( ts time.Time temp, rh, wind, gust sql.NullFloat64 dir, uvi, light sql.NullFloat64 ) if err := rows.Scan(&ts, &temp, &rh, &wind, &gust, &dir, &uvi, &light); err != nil { return nil, err } points = append(points, ObservationPoint{ TS: ts, TempC: nullFloatPtr(temp), RH: nullFloatPtr(rh), WindMS: nullFloatPtr(wind), WindGustMS: nullFloatPtr(gust), WindDirDeg: nullFloatPtr(dir), UVI: nullFloatPtr(uvi), LightLux: nullFloatPtr(light), }) } if rows.Err() != nil { return nil, rows.Err() } return points, nil } func (d *DB) LatestObservation(ctx context.Context, site string) (*ObservationPoint, error) { query := ` SELECT ts, temperature_c, humidity, wind_avg_m_s, wind_max_m_s, wind_dir_deg, uvi, light_lux FROM observations_ws90 WHERE site = $1 ORDER BY ts DESC LIMIT 1 ` var ( ts time.Time temp, rh, wind, gust sql.NullFloat64 dir, uvi, light sql.NullFloat64 ) err := d.Pool.QueryRow(ctx, query, site).Scan(&ts, &temp, &rh, &wind, &gust, &dir, &uvi, &light) if err != nil { if errors.Is(err, pgx.ErrNoRows) { return nil, nil } return nil, err } return &ObservationPoint{ TS: ts, TempC: nullFloatPtr(temp), RH: nullFloatPtr(rh), WindMS: nullFloatPtr(wind), WindGustMS: nullFloatPtr(gust), WindDirDeg: nullFloatPtr(dir), UVI: nullFloatPtr(uvi), LightLux: nullFloatPtr(light), }, nil } func (d *DB) ForecastSeriesLatest(ctx context.Context, site, model string) (ForecastSeries, error) { var retrieved sql.NullTime err := d.Pool.QueryRow(ctx, ` SELECT max(retrieved_at) FROM forecast_openmeteo_hourly WHERE site = $1 AND model = $2 `, site, model).Scan(&retrieved) if err != nil { return ForecastSeries{}, err } if !retrieved.Valid { return ForecastSeries{}, nil } rows, err := d.Pool.Query(ctx, ` SELECT ts, temp_c, rh, pressure_msl_hpa, wind_m_s, wind_gust_m_s, wind_dir_deg, precip_mm, cloud_cover FROM forecast_openmeteo_hourly WHERE site = $1 AND model = $2 AND retrieved_at = $3 ORDER BY ts ASC `, site, model, retrieved.Time) if err != nil { return ForecastSeries{}, err } defer rows.Close() points := make([]ForecastPoint, 0, 256) for rows.Next() { var ( ts time.Time temp, rh, msl, wind, gust sql.NullFloat64 dir, precip, cloud sql.NullFloat64 ) if err := rows.Scan(&ts, &temp, &rh, &msl, &wind, &gust, &dir, &precip, &cloud); err != nil { return ForecastSeries{}, err } points = append(points, ForecastPoint{ TS: ts, TempC: nullFloatPtr(temp), RH: nullFloatPtr(rh), PressureMSLH: nullFloatPtr(msl), WindMS: nullFloatPtr(wind), WindGustMS: nullFloatPtr(gust), WindDirDeg: nullFloatPtr(dir), PrecipMM: nullFloatPtr(precip), CloudCover: nullFloatPtr(cloud), }) } if rows.Err() != nil { return ForecastSeries{}, rows.Err() } return ForecastSeries{ RetrievedAt: retrieved.Time, Points: points, }, nil } func nullFloatPtr(v sql.NullFloat64) *float64 { if !v.Valid { return nil } val := v.Float64 return &val }