292 lines
7.3 KiB
Go
292 lines
7.3 KiB
Go
package db
|
|
|
|
import (
|
|
"context"
|
|
"database/sql"
|
|
"errors"
|
|
"fmt"
|
|
"time"
|
|
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
type ObservationPoint struct {
|
|
TS time.Time `json:"ts"`
|
|
TempC *float64 `json:"temp_c,omitempty"`
|
|
RH *float64 `json:"rh,omitempty"`
|
|
WindMS *float64 `json:"wind_m_s,omitempty"`
|
|
WindGustMS *float64 `json:"wind_gust_m_s,omitempty"`
|
|
WindDirDeg *float64 `json:"wind_dir_deg,omitempty"`
|
|
UVI *float64 `json:"uvi,omitempty"`
|
|
LightLux *float64 `json:"light_lux,omitempty"`
|
|
BatteryMV *float64 `json:"battery_mv,omitempty"`
|
|
SupercapV *float64 `json:"supercap_v,omitempty"`
|
|
}
|
|
|
|
type ForecastPoint struct {
|
|
TS time.Time `json:"ts"`
|
|
TempC *float64 `json:"temp_c,omitempty"`
|
|
RH *float64 `json:"rh,omitempty"`
|
|
PressureMSLH *float64 `json:"pressure_msl_hpa,omitempty"`
|
|
WindMS *float64 `json:"wind_m_s,omitempty"`
|
|
WindGustMS *float64 `json:"wind_gust_m_s,omitempty"`
|
|
WindDirDeg *float64 `json:"wind_dir_deg,omitempty"`
|
|
PrecipMM *float64 `json:"precip_mm,omitempty"`
|
|
CloudCover *float64 `json:"cloud_cover,omitempty"`
|
|
}
|
|
|
|
type ForecastSeries struct {
|
|
RetrievedAt time.Time `json:"retrieved_at"`
|
|
Points []ForecastPoint `json:"points"`
|
|
}
|
|
|
|
func (d *DB) ObservationSeries(ctx context.Context, site, bucket string, start, end time.Time) ([]ObservationPoint, error) {
|
|
if end.Before(start) || end.Equal(start) {
|
|
return nil, errors.New("invalid time range")
|
|
}
|
|
|
|
interval := "5 minutes"
|
|
switch bucket {
|
|
case "1m":
|
|
interval = "1 minute"
|
|
case "5m":
|
|
interval = "5 minutes"
|
|
default:
|
|
return nil, fmt.Errorf("unsupported bucket: %s", bucket)
|
|
}
|
|
|
|
query := fmt.Sprintf(`
|
|
SELECT
|
|
time_bucket(INTERVAL '%s', ts) AS bucket,
|
|
avg(temperature_c) AS temp_c_avg,
|
|
avg(humidity) AS rh_avg,
|
|
avg(wind_avg_m_s) AS wind_avg_ms_avg,
|
|
max(wind_max_m_s) AS wind_gust_ms_max,
|
|
avg(wind_dir_deg) AS wind_dir_deg_avg,
|
|
max(uvi) AS uvi_max,
|
|
max(light_lux) AS light_lux_max,
|
|
avg(battery_mv) AS battery_mv_avg,
|
|
avg(supercap_v) AS supercap_v_avg
|
|
FROM observations_ws90
|
|
WHERE site = $1
|
|
AND ts >= $2
|
|
AND ts <= $3
|
|
GROUP BY bucket
|
|
ORDER BY bucket ASC
|
|
`, interval)
|
|
|
|
rows, err := d.Pool.Query(ctx, query, site, start, end)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
points := make([]ObservationPoint, 0, 512)
|
|
for rows.Next() {
|
|
var (
|
|
ts time.Time
|
|
temp, rh, wind, gust sql.NullFloat64
|
|
dir, uvi, light, battery, supercap sql.NullFloat64
|
|
)
|
|
if err := rows.Scan(&ts, &temp, &rh, &wind, &gust, &dir, &uvi, &light, &battery, &supercap); err != nil {
|
|
return nil, err
|
|
}
|
|
points = append(points, ObservationPoint{
|
|
TS: ts,
|
|
TempC: nullFloatPtr(temp),
|
|
RH: nullFloatPtr(rh),
|
|
WindMS: nullFloatPtr(wind),
|
|
WindGustMS: nullFloatPtr(gust),
|
|
WindDirDeg: nullFloatPtr(dir),
|
|
UVI: nullFloatPtr(uvi),
|
|
LightLux: nullFloatPtr(light),
|
|
BatteryMV: nullFloatPtr(battery),
|
|
SupercapV: nullFloatPtr(supercap),
|
|
})
|
|
}
|
|
if rows.Err() != nil {
|
|
return nil, rows.Err()
|
|
}
|
|
|
|
return points, nil
|
|
}
|
|
|
|
func (d *DB) LatestObservation(ctx context.Context, site string) (*ObservationPoint, error) {
|
|
query := `
|
|
SELECT
|
|
ts,
|
|
temperature_c,
|
|
humidity,
|
|
wind_avg_m_s,
|
|
wind_max_m_s,
|
|
wind_dir_deg,
|
|
uvi,
|
|
light_lux,
|
|
battery_mv,
|
|
supercap_v
|
|
FROM observations_ws90
|
|
WHERE site = $1
|
|
ORDER BY ts DESC
|
|
LIMIT 1
|
|
`
|
|
|
|
var (
|
|
ts time.Time
|
|
temp, rh, wind, gust sql.NullFloat64
|
|
dir, uvi, light, battery, supercap sql.NullFloat64
|
|
)
|
|
err := d.Pool.QueryRow(ctx, query, site).Scan(&ts, &temp, &rh, &wind, &gust, &dir, &uvi, &light, &battery, &supercap)
|
|
if err != nil {
|
|
if errors.Is(err, pgx.ErrNoRows) {
|
|
return nil, nil
|
|
}
|
|
return nil, err
|
|
}
|
|
|
|
return &ObservationPoint{
|
|
TS: ts,
|
|
TempC: nullFloatPtr(temp),
|
|
RH: nullFloatPtr(rh),
|
|
WindMS: nullFloatPtr(wind),
|
|
WindGustMS: nullFloatPtr(gust),
|
|
WindDirDeg: nullFloatPtr(dir),
|
|
UVI: nullFloatPtr(uvi),
|
|
LightLux: nullFloatPtr(light),
|
|
BatteryMV: nullFloatPtr(battery),
|
|
SupercapV: nullFloatPtr(supercap),
|
|
}, nil
|
|
}
|
|
|
|
func (d *DB) ForecastSeriesLatest(ctx context.Context, site, model string) (ForecastSeries, error) {
|
|
var retrieved sql.NullTime
|
|
err := d.Pool.QueryRow(ctx, `
|
|
SELECT max(retrieved_at)
|
|
FROM forecast_openmeteo_hourly
|
|
WHERE site = $1 AND model = $2
|
|
`, site, model).Scan(&retrieved)
|
|
if err != nil {
|
|
return ForecastSeries{}, err
|
|
}
|
|
if !retrieved.Valid {
|
|
return ForecastSeries{}, nil
|
|
}
|
|
|
|
rows, err := d.Pool.Query(ctx, `
|
|
SELECT
|
|
ts,
|
|
temp_c,
|
|
rh,
|
|
pressure_msl_hpa,
|
|
wind_m_s,
|
|
wind_gust_m_s,
|
|
wind_dir_deg,
|
|
precip_mm,
|
|
cloud_cover
|
|
FROM forecast_openmeteo_hourly
|
|
WHERE site = $1 AND model = $2 AND retrieved_at = $3
|
|
ORDER BY ts ASC
|
|
`, site, model, retrieved.Time)
|
|
if err != nil {
|
|
return ForecastSeries{}, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
points := make([]ForecastPoint, 0, 256)
|
|
for rows.Next() {
|
|
var (
|
|
ts time.Time
|
|
temp, rh, msl, wind, gust sql.NullFloat64
|
|
dir, precip, cloud sql.NullFloat64
|
|
)
|
|
if err := rows.Scan(&ts, &temp, &rh, &msl, &wind, &gust, &dir, &precip, &cloud); err != nil {
|
|
return ForecastSeries{}, err
|
|
}
|
|
points = append(points, ForecastPoint{
|
|
TS: ts,
|
|
TempC: nullFloatPtr(temp),
|
|
RH: nullFloatPtr(rh),
|
|
PressureMSLH: nullFloatPtr(msl),
|
|
WindMS: nullFloatPtr(wind),
|
|
WindGustMS: nullFloatPtr(gust),
|
|
WindDirDeg: nullFloatPtr(dir),
|
|
PrecipMM: nullFloatPtr(precip),
|
|
CloudCover: nullFloatPtr(cloud),
|
|
})
|
|
}
|
|
if rows.Err() != nil {
|
|
return ForecastSeries{}, rows.Err()
|
|
}
|
|
|
|
return ForecastSeries{
|
|
RetrievedAt: retrieved.Time,
|
|
Points: points,
|
|
}, nil
|
|
}
|
|
|
|
func (d *DB) ForecastSeriesRange(ctx context.Context, site, model string, start, end time.Time) (ForecastSeries, error) {
|
|
rows, err := d.Pool.Query(ctx, `
|
|
SELECT DISTINCT ON (ts)
|
|
ts,
|
|
retrieved_at,
|
|
temp_c,
|
|
rh,
|
|
pressure_msl_hpa,
|
|
wind_m_s,
|
|
wind_gust_m_s,
|
|
wind_dir_deg,
|
|
precip_mm,
|
|
cloud_cover
|
|
FROM forecast_openmeteo_hourly
|
|
WHERE site = $1 AND model = $2 AND ts >= $3 AND ts <= $4
|
|
ORDER BY ts ASC, retrieved_at DESC
|
|
`, site, model, start, end)
|
|
if err != nil {
|
|
return ForecastSeries{}, err
|
|
}
|
|
defer rows.Close()
|
|
|
|
points := make([]ForecastPoint, 0, 256)
|
|
var maxRetrieved time.Time
|
|
for rows.Next() {
|
|
var (
|
|
ts time.Time
|
|
retrieved time.Time
|
|
temp, rh, msl, wind, gust sql.NullFloat64
|
|
dir, precip, cloud sql.NullFloat64
|
|
)
|
|
if err := rows.Scan(&ts, &retrieved, &temp, &rh, &msl, &wind, &gust, &dir, &precip, &cloud); err != nil {
|
|
return ForecastSeries{}, err
|
|
}
|
|
if retrieved.After(maxRetrieved) {
|
|
maxRetrieved = retrieved
|
|
}
|
|
points = append(points, ForecastPoint{
|
|
TS: ts,
|
|
TempC: nullFloatPtr(temp),
|
|
RH: nullFloatPtr(rh),
|
|
PressureMSLH: nullFloatPtr(msl),
|
|
WindMS: nullFloatPtr(wind),
|
|
WindGustMS: nullFloatPtr(gust),
|
|
WindDirDeg: nullFloatPtr(dir),
|
|
PrecipMM: nullFloatPtr(precip),
|
|
CloudCover: nullFloatPtr(cloud),
|
|
})
|
|
}
|
|
if rows.Err() != nil {
|
|
return ForecastSeries{}, rows.Err()
|
|
}
|
|
|
|
return ForecastSeries{
|
|
RetrievedAt: maxRetrieved,
|
|
Points: points,
|
|
}, nil
|
|
}
|
|
|
|
func nullFloatPtr(v sql.NullFloat64) *float64 {
|
|
if !v.Valid {
|
|
return nil
|
|
}
|
|
val := v.Float64
|
|
return &val
|
|
}
|