summarise data from openmeteo
This commit is contained in:
219
cmd/ingestd/forecast_logging.go
Normal file
219
cmd/ingestd/forecast_logging.go
Normal file
@@ -0,0 +1,219 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
"log"
|
||||||
|
"math"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"go-weatherstation/internal/mqttingest"
|
||||||
|
"go-weatherstation/internal/providers"
|
||||||
|
)
|
||||||
|
|
||||||
|
type ForecastCache struct {
|
||||||
|
mu sync.RWMutex
|
||||||
|
res *providers.ForecastResult
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ForecastCache) Set(res *providers.ForecastResult) {
|
||||||
|
if c == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
c.mu.Lock()
|
||||||
|
c.res = copyForecast(res)
|
||||||
|
c.mu.Unlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
func (c *ForecastCache) Get() *providers.ForecastResult {
|
||||||
|
if c == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
c.mu.RLock()
|
||||||
|
res := c.res
|
||||||
|
c.mu.RUnlock()
|
||||||
|
return res
|
||||||
|
}
|
||||||
|
|
||||||
|
func copyForecast(res *providers.ForecastResult) *providers.ForecastResult {
|
||||||
|
if res == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
out := &providers.ForecastResult{
|
||||||
|
RetrievedAt: res.RetrievedAt,
|
||||||
|
Model: res.Model,
|
||||||
|
}
|
||||||
|
if len(res.Hourly) > 0 {
|
||||||
|
out.Hourly = make([]providers.HourlyForecastPoint, len(res.Hourly))
|
||||||
|
copy(out.Hourly, res.Hourly)
|
||||||
|
}
|
||||||
|
return out
|
||||||
|
}
|
||||||
|
|
||||||
|
func forecastSummary(res *providers.ForecastResult) string {
|
||||||
|
if res == nil || len(res.Hourly) == 0 {
|
||||||
|
return ""
|
||||||
|
}
|
||||||
|
|
||||||
|
start := res.Hourly[0].TS
|
||||||
|
end := res.Hourly[len(res.Hourly)-1].TS
|
||||||
|
parts := []string{
|
||||||
|
fmt.Sprintf("range=%s..%s", start.Format(time.RFC3339), end.Format(time.RFC3339)),
|
||||||
|
}
|
||||||
|
|
||||||
|
var (
|
||||||
|
minTemp, maxTemp float64
|
||||||
|
hasTemp bool
|
||||||
|
maxWind float64
|
||||||
|
hasWind bool
|
||||||
|
maxGust float64
|
||||||
|
hasGust bool
|
||||||
|
totalPrecip float64
|
||||||
|
hasPrecip bool
|
||||||
|
)
|
||||||
|
|
||||||
|
for _, pt := range res.Hourly {
|
||||||
|
if pt.TempC != nil {
|
||||||
|
if !hasTemp {
|
||||||
|
minTemp, maxTemp = *pt.TempC, *pt.TempC
|
||||||
|
hasTemp = true
|
||||||
|
} else {
|
||||||
|
if *pt.TempC < minTemp {
|
||||||
|
minTemp = *pt.TempC
|
||||||
|
}
|
||||||
|
if *pt.TempC > maxTemp {
|
||||||
|
maxTemp = *pt.TempC
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pt.WindMS != nil {
|
||||||
|
if !hasWind || *pt.WindMS > maxWind {
|
||||||
|
maxWind = *pt.WindMS
|
||||||
|
hasWind = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pt.WindGustMS != nil {
|
||||||
|
if !hasGust || *pt.WindGustMS > maxGust {
|
||||||
|
maxGust = *pt.WindGustMS
|
||||||
|
hasGust = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if pt.PrecipMM != nil {
|
||||||
|
totalPrecip += *pt.PrecipMM
|
||||||
|
hasPrecip = true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if hasTemp {
|
||||||
|
parts = append(parts, fmt.Sprintf("temp_c(min=%.2f,max=%.2f)", minTemp, maxTemp))
|
||||||
|
}
|
||||||
|
if hasWind {
|
||||||
|
parts = append(parts, fmt.Sprintf("wind_ms(max=%.2f)", maxWind))
|
||||||
|
}
|
||||||
|
if hasGust {
|
||||||
|
parts = append(parts, fmt.Sprintf("gust_ms(max=%.2f)", maxGust))
|
||||||
|
}
|
||||||
|
if hasPrecip {
|
||||||
|
parts = append(parts, fmt.Sprintf("precip_mm(total=%.2f)", totalPrecip))
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Join(parts, " ")
|
||||||
|
}
|
||||||
|
|
||||||
|
func logForecastDeviation(cache *ForecastCache, snap mqttingest.Snapshot) {
|
||||||
|
res := cache.Get()
|
||||||
|
if res == nil || len(res.Hourly) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fc, diff := nearestForecastPoint(res.Hourly, snap.TS)
|
||||||
|
if fc == nil || diff > 2*time.Hour {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
parts := []string{
|
||||||
|
fmt.Sprintf("model=%s", res.Model),
|
||||||
|
fmt.Sprintf("obs_ts=%s", snap.TS.Format(time.RFC3339)),
|
||||||
|
fmt.Sprintf("fc_ts=%s", fc.TS.Format(time.RFC3339)),
|
||||||
|
fmt.Sprintf("dt=%s", diff.Round(time.Minute)),
|
||||||
|
}
|
||||||
|
|
||||||
|
if fc.TempC != nil {
|
||||||
|
parts = append(parts, fmt.Sprintf(
|
||||||
|
"temp_c(obs=%.2f fc=%.2f d=%.2f)",
|
||||||
|
snap.P.TemperatureC, *fc.TempC, snap.P.TemperatureC-*fc.TempC,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
if fc.WindMS != nil {
|
||||||
|
parts = append(parts, fmt.Sprintf(
|
||||||
|
"wind_ms(obs=%.2f fc=%.2f d=%.2f)",
|
||||||
|
snap.P.WindAvgMS, *fc.WindMS, snap.P.WindAvgMS-*fc.WindMS,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
if fc.WindGustMS != nil {
|
||||||
|
parts = append(parts, fmt.Sprintf(
|
||||||
|
"gust_ms(obs=%.2f fc=%.2f d=%.2f)",
|
||||||
|
snap.P.WindMaxMS, *fc.WindGustMS, snap.P.WindMaxMS-*fc.WindGustMS,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
if fc.WindDirDeg != nil {
|
||||||
|
dirDelta := angularDiffDeg(snap.P.WindDirDeg, *fc.WindDirDeg)
|
||||||
|
parts = append(parts, fmt.Sprintf(
|
||||||
|
"wind_dir_deg(obs=%.0f fc=%.0f d=%.0f)",
|
||||||
|
snap.P.WindDirDeg, *fc.WindDirDeg, dirDelta,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
if fc.PrecipMM != nil {
|
||||||
|
parts = append(parts, fmt.Sprintf(
|
||||||
|
"rain_mm(obs=%.2f fc=%.2f d=%.2f)",
|
||||||
|
snap.RainLastHourMM, *fc.PrecipMM, snap.RainLastHourMM-*fc.PrecipMM,
|
||||||
|
))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(parts) <= 4 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
log.Printf("forecast deviation %s", strings.Join(parts, " "))
|
||||||
|
}
|
||||||
|
|
||||||
|
func nearestForecastPoint(hourly []providers.HourlyForecastPoint, ts time.Time) (*providers.HourlyForecastPoint, time.Duration) {
|
||||||
|
if len(hourly) == 0 {
|
||||||
|
return nil, 0
|
||||||
|
}
|
||||||
|
|
||||||
|
bestIdx := 0
|
||||||
|
bestDiff := absDuration(ts.Sub(hourly[0].TS))
|
||||||
|
for i := 1; i < len(hourly); i++ {
|
||||||
|
diff := absDuration(ts.Sub(hourly[i].TS))
|
||||||
|
if diff < bestDiff {
|
||||||
|
bestIdx = i
|
||||||
|
bestDiff = diff
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return &hourly[bestIdx], bestDiff
|
||||||
|
}
|
||||||
|
|
||||||
|
func absDuration(d time.Duration) time.Duration {
|
||||||
|
if d < 0 {
|
||||||
|
return -d
|
||||||
|
}
|
||||||
|
return d
|
||||||
|
}
|
||||||
|
|
||||||
|
func angularDiffDeg(a, b float64) float64 {
|
||||||
|
diff := math.Mod(math.Abs(a-b), 360.0)
|
||||||
|
if diff > 180.0 {
|
||||||
|
diff = 360.0 - diff
|
||||||
|
}
|
||||||
|
return diff
|
||||||
|
}
|
||||||
@@ -39,6 +39,7 @@ func main() {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
latest := &mqttingest.Latest{}
|
latest := &mqttingest.Latest{}
|
||||||
|
forecastCache := &ForecastCache{}
|
||||||
|
|
||||||
d, err := db.Open(ctx, cfg.DB.ConnString)
|
d, err := db.Open(ctx, cfg.DB.ConnString)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -54,7 +55,7 @@ func main() {
|
|||||||
|
|
||||||
// Start Open-Meteo poller (optional)
|
// Start Open-Meteo poller (optional)
|
||||||
if cfg.Pollers.OpenMeteo.Enabled {
|
if cfg.Pollers.OpenMeteo.Enabled {
|
||||||
go runOpenMeteoPoller(ctx, d, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval)
|
go runOpenMeteoPoller(ctx, d, forecastCache, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
if cfg.Wunderground.Enabled {
|
if cfg.Wunderground.Enabled {
|
||||||
@@ -83,6 +84,10 @@ func main() {
|
|||||||
|
|
||||||
latest.Update(ts, p)
|
latest.Update(ts, p)
|
||||||
|
|
||||||
|
if snap, ok := latest.Snapshot(); ok {
|
||||||
|
logForecastDeviation(forecastCache, snap)
|
||||||
|
}
|
||||||
|
|
||||||
if err := d.InsertWS90(ctx, db.InsertWS90Params{
|
if err := d.InsertWS90(ctx, db.InsertWS90Params{
|
||||||
TS: ts,
|
TS: ts,
|
||||||
Site: cfg.Site.Name,
|
Site: cfg.Site.Name,
|
||||||
@@ -119,20 +124,20 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func runOpenMeteoPoller(ctx context.Context, d *db.DB, site providers.Site, model string, interval time.Duration) {
|
func runOpenMeteoPoller(ctx context.Context, d *db.DB, cache *ForecastCache, site providers.Site, model string, interval time.Duration) {
|
||||||
p := &providers.OpenMeteo{}
|
p := &providers.OpenMeteo{}
|
||||||
t := time.NewTicker(interval)
|
t := time.NewTicker(interval)
|
||||||
defer t.Stop()
|
defer t.Stop()
|
||||||
|
|
||||||
// poll immediately at startup
|
// poll immediately at startup
|
||||||
pollOnce(ctx, d, p, site, model)
|
pollOnce(ctx, d, cache, p, site, model)
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
case <-t.C:
|
case <-t.C:
|
||||||
pollOnce(ctx, d, p, site, model)
|
pollOnce(ctx, d, cache, p, site, model)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -179,7 +184,7 @@ func runWundergroundUploader(ctx context.Context, latest *mqttingest.Latest, sta
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func pollOnce(ctx context.Context, d *db.DB, p providers.Provider, site providers.Site, model string) {
|
func pollOnce(ctx context.Context, d *db.DB, cache *ForecastCache, p providers.Provider, site providers.Site, model string) {
|
||||||
log.Printf("forecast fetch start provider=%s model=%s site=%s", p.Name(), model, site.Name)
|
log.Printf("forecast fetch start provider=%s model=%s site=%s", p.Name(), model, site.Name)
|
||||||
|
|
||||||
res, err := p.Fetch(ctx.Done(), site, model)
|
res, err := p.Fetch(ctx.Done(), site, model)
|
||||||
@@ -188,6 +193,14 @@ func pollOnce(ctx context.Context, d *db.DB, p providers.Provider, site provider
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if cache != nil {
|
||||||
|
cache.Set(res)
|
||||||
|
}
|
||||||
|
|
||||||
|
if summary := forecastSummary(res); summary != "" {
|
||||||
|
log.Printf("forecast summary provider=%s model=%s %s", p.Name(), res.Model, summary)
|
||||||
|
}
|
||||||
|
|
||||||
for _, pt := range res.Hourly {
|
for _, pt := range res.Hourly {
|
||||||
err := d.UpsertOpenMeteoHourly(ctx, db.InsertOpenMeteoHourlyParams{
|
err := d.UpsertOpenMeteoHourly(ctx, db.InsertOpenMeteoHourlyParams{
|
||||||
TS: pt.TS,
|
TS: pt.TS,
|
||||||
|
|||||||
@@ -5,6 +5,7 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"net/http"
|
"net/http"
|
||||||
"net/url"
|
"net/url"
|
||||||
"time"
|
"time"
|
||||||
@@ -38,12 +39,19 @@ func (o *OpenMeteo) Fetch(ctxDone <-chan struct{}, site Site, model string) (*Fo
|
|||||||
q.Set("latitude", fmt.Sprintf("%.6f", site.Latitude))
|
q.Set("latitude", fmt.Sprintf("%.6f", site.Latitude))
|
||||||
q.Set("longitude", fmt.Sprintf("%.6f", site.Longitude))
|
q.Set("longitude", fmt.Sprintf("%.6f", site.Longitude))
|
||||||
q.Set("hourly", join(hourly))
|
q.Set("hourly", join(hourly))
|
||||||
q.Set("timezone", "UTC")
|
|
||||||
q.Set("wind_speed_unit", "ms")
|
q.Set("wind_speed_unit", "ms")
|
||||||
q.Set("temperature_unit", "celsius")
|
q.Set("temperature_unit", "celsius")
|
||||||
q.Set("precipitation_unit", "mm")
|
q.Set("precipitation_unit", "mm")
|
||||||
u.RawQuery = q.Encode()
|
u.RawQuery = q.Encode()
|
||||||
|
|
||||||
|
safeURL := *u
|
||||||
|
safeQuery := safeURL.Query()
|
||||||
|
if safeQuery.Has("apikey") {
|
||||||
|
safeQuery.Set("apikey", "redacted")
|
||||||
|
}
|
||||||
|
safeURL.RawQuery = safeQuery.Encode()
|
||||||
|
log.Printf("open-meteo request url=%s", safeURL.String())
|
||||||
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
ctx, cancel := context.WithCancel(context.Background())
|
||||||
defer cancel()
|
defer cancel()
|
||||||
go func() { <-ctxDone; cancel() }()
|
go func() { <-ctxDone; cancel() }()
|
||||||
|
|||||||
Reference in New Issue
Block a user