From 7526a7af93bab182d152f4b0710df30f841ac6f6 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Mon, 26 Jan 2026 13:20:30 +1100 Subject: [PATCH] summarise data from openmeteo --- cmd/ingestd/forecast_logging.go | 219 ++++++++++++++++++++++++++++++++ cmd/ingestd/main.go | 23 +++- internal/providers/openmeteo.go | 10 +- 3 files changed, 246 insertions(+), 6 deletions(-) create mode 100644 cmd/ingestd/forecast_logging.go diff --git a/cmd/ingestd/forecast_logging.go b/cmd/ingestd/forecast_logging.go new file mode 100644 index 0000000..abf2b5c --- /dev/null +++ b/cmd/ingestd/forecast_logging.go @@ -0,0 +1,219 @@ +package main + +import ( + "fmt" + "log" + "math" + "strings" + "sync" + "time" + + "go-weatherstation/internal/mqttingest" + "go-weatherstation/internal/providers" +) + +type ForecastCache struct { + mu sync.RWMutex + res *providers.ForecastResult +} + +func (c *ForecastCache) Set(res *providers.ForecastResult) { + if c == nil { + return + } + c.mu.Lock() + c.res = copyForecast(res) + c.mu.Unlock() +} + +func (c *ForecastCache) Get() *providers.ForecastResult { + if c == nil { + return nil + } + c.mu.RLock() + res := c.res + c.mu.RUnlock() + return res +} + +func copyForecast(res *providers.ForecastResult) *providers.ForecastResult { + if res == nil { + return nil + } + out := &providers.ForecastResult{ + RetrievedAt: res.RetrievedAt, + Model: res.Model, + } + if len(res.Hourly) > 0 { + out.Hourly = make([]providers.HourlyForecastPoint, len(res.Hourly)) + copy(out.Hourly, res.Hourly) + } + return out +} + +func forecastSummary(res *providers.ForecastResult) string { + if res == nil || len(res.Hourly) == 0 { + return "" + } + + start := res.Hourly[0].TS + end := res.Hourly[len(res.Hourly)-1].TS + parts := []string{ + fmt.Sprintf("range=%s..%s", start.Format(time.RFC3339), end.Format(time.RFC3339)), + } + + var ( + minTemp, maxTemp float64 + hasTemp bool + maxWind float64 + hasWind bool + maxGust float64 + hasGust bool + totalPrecip float64 + hasPrecip bool + ) + + for _, pt := range res.Hourly { + if pt.TempC != nil { + if !hasTemp { + minTemp, maxTemp = *pt.TempC, *pt.TempC + hasTemp = true + } else { + if *pt.TempC < minTemp { + minTemp = *pt.TempC + } + if *pt.TempC > maxTemp { + maxTemp = *pt.TempC + } + } + } + + if pt.WindMS != nil { + if !hasWind || *pt.WindMS > maxWind { + maxWind = *pt.WindMS + hasWind = true + } + } + + if pt.WindGustMS != nil { + if !hasGust || *pt.WindGustMS > maxGust { + maxGust = *pt.WindGustMS + hasGust = true + } + } + + if pt.PrecipMM != nil { + totalPrecip += *pt.PrecipMM + hasPrecip = true + } + } + + if hasTemp { + parts = append(parts, fmt.Sprintf("temp_c(min=%.2f,max=%.2f)", minTemp, maxTemp)) + } + if hasWind { + parts = append(parts, fmt.Sprintf("wind_ms(max=%.2f)", maxWind)) + } + if hasGust { + parts = append(parts, fmt.Sprintf("gust_ms(max=%.2f)", maxGust)) + } + if hasPrecip { + parts = append(parts, fmt.Sprintf("precip_mm(total=%.2f)", totalPrecip)) + } + + return strings.Join(parts, " ") +} + +func logForecastDeviation(cache *ForecastCache, snap mqttingest.Snapshot) { + res := cache.Get() + if res == nil || len(res.Hourly) == 0 { + return + } + + fc, diff := nearestForecastPoint(res.Hourly, snap.TS) + if fc == nil || diff > 2*time.Hour { + return + } + + parts := []string{ + fmt.Sprintf("model=%s", res.Model), + fmt.Sprintf("obs_ts=%s", snap.TS.Format(time.RFC3339)), + fmt.Sprintf("fc_ts=%s", fc.TS.Format(time.RFC3339)), + fmt.Sprintf("dt=%s", diff.Round(time.Minute)), + } + + if fc.TempC != nil { + parts = append(parts, fmt.Sprintf( + "temp_c(obs=%.2f fc=%.2f d=%.2f)", + snap.P.TemperatureC, *fc.TempC, snap.P.TemperatureC-*fc.TempC, + )) + } + + if fc.WindMS != nil { + parts = append(parts, fmt.Sprintf( + "wind_ms(obs=%.2f fc=%.2f d=%.2f)", + snap.P.WindAvgMS, *fc.WindMS, snap.P.WindAvgMS-*fc.WindMS, + )) + } + + if fc.WindGustMS != nil { + parts = append(parts, fmt.Sprintf( + "gust_ms(obs=%.2f fc=%.2f d=%.2f)", + snap.P.WindMaxMS, *fc.WindGustMS, snap.P.WindMaxMS-*fc.WindGustMS, + )) + } + + if fc.WindDirDeg != nil { + dirDelta := angularDiffDeg(snap.P.WindDirDeg, *fc.WindDirDeg) + parts = append(parts, fmt.Sprintf( + "wind_dir_deg(obs=%.0f fc=%.0f d=%.0f)", + snap.P.WindDirDeg, *fc.WindDirDeg, dirDelta, + )) + } + + if fc.PrecipMM != nil { + parts = append(parts, fmt.Sprintf( + "rain_mm(obs=%.2f fc=%.2f d=%.2f)", + snap.RainLastHourMM, *fc.PrecipMM, snap.RainLastHourMM-*fc.PrecipMM, + )) + } + + if len(parts) <= 4 { + return + } + + log.Printf("forecast deviation %s", strings.Join(parts, " ")) +} + +func nearestForecastPoint(hourly []providers.HourlyForecastPoint, ts time.Time) (*providers.HourlyForecastPoint, time.Duration) { + if len(hourly) == 0 { + return nil, 0 + } + + bestIdx := 0 + bestDiff := absDuration(ts.Sub(hourly[0].TS)) + for i := 1; i < len(hourly); i++ { + diff := absDuration(ts.Sub(hourly[i].TS)) + if diff < bestDiff { + bestIdx = i + bestDiff = diff + } + } + + return &hourly[bestIdx], bestDiff +} + +func absDuration(d time.Duration) time.Duration { + if d < 0 { + return -d + } + return d +} + +func angularDiffDeg(a, b float64) float64 { + diff := math.Mod(math.Abs(a-b), 360.0) + if diff > 180.0 { + diff = 360.0 - diff + } + return diff +} diff --git a/cmd/ingestd/main.go b/cmd/ingestd/main.go index ab0bfbe..054f8b2 100644 --- a/cmd/ingestd/main.go +++ b/cmd/ingestd/main.go @@ -39,6 +39,7 @@ func main() { }() latest := &mqttingest.Latest{} + forecastCache := &ForecastCache{} d, err := db.Open(ctx, cfg.DB.ConnString) if err != nil { @@ -54,7 +55,7 @@ func main() { // Start Open-Meteo poller (optional) if cfg.Pollers.OpenMeteo.Enabled { - go runOpenMeteoPoller(ctx, d, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval) + go runOpenMeteoPoller(ctx, d, forecastCache, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval) } if cfg.Wunderground.Enabled { @@ -83,6 +84,10 @@ func main() { latest.Update(ts, p) + if snap, ok := latest.Snapshot(); ok { + logForecastDeviation(forecastCache, snap) + } + if err := d.InsertWS90(ctx, db.InsertWS90Params{ TS: ts, Site: cfg.Site.Name, @@ -119,20 +124,20 @@ func main() { } } -func runOpenMeteoPoller(ctx context.Context, d *db.DB, site providers.Site, model string, interval time.Duration) { +func runOpenMeteoPoller(ctx context.Context, d *db.DB, cache *ForecastCache, site providers.Site, model string, interval time.Duration) { p := &providers.OpenMeteo{} t := time.NewTicker(interval) defer t.Stop() // poll immediately at startup - pollOnce(ctx, d, p, site, model) + pollOnce(ctx, d, cache, p, site, model) for { select { case <-ctx.Done(): return case <-t.C: - pollOnce(ctx, d, p, site, model) + pollOnce(ctx, d, cache, p, site, model) } } } @@ -179,7 +184,7 @@ func runWundergroundUploader(ctx context.Context, latest *mqttingest.Latest, sta } } -func pollOnce(ctx context.Context, d *db.DB, p providers.Provider, site providers.Site, model string) { +func pollOnce(ctx context.Context, d *db.DB, cache *ForecastCache, p providers.Provider, site providers.Site, model string) { log.Printf("forecast fetch start provider=%s model=%s site=%s", p.Name(), model, site.Name) res, err := p.Fetch(ctx.Done(), site, model) @@ -188,6 +193,14 @@ func pollOnce(ctx context.Context, d *db.DB, p providers.Provider, site provider return } + if cache != nil { + cache.Set(res) + } + + if summary := forecastSummary(res); summary != "" { + log.Printf("forecast summary provider=%s model=%s %s", p.Name(), res.Model, summary) + } + for _, pt := range res.Hourly { err := d.UpsertOpenMeteoHourly(ctx, db.InsertOpenMeteoHourlyParams{ TS: pt.TS, diff --git a/internal/providers/openmeteo.go b/internal/providers/openmeteo.go index 195e6b9..23521c2 100644 --- a/internal/providers/openmeteo.go +++ b/internal/providers/openmeteo.go @@ -5,6 +5,7 @@ import ( "encoding/json" "fmt" "io" + "log" "net/http" "net/url" "time" @@ -38,12 +39,19 @@ func (o *OpenMeteo) Fetch(ctxDone <-chan struct{}, site Site, model string) (*Fo q.Set("latitude", fmt.Sprintf("%.6f", site.Latitude)) q.Set("longitude", fmt.Sprintf("%.6f", site.Longitude)) q.Set("hourly", join(hourly)) - q.Set("timezone", "UTC") q.Set("wind_speed_unit", "ms") q.Set("temperature_unit", "celsius") q.Set("precipitation_unit", "mm") u.RawQuery = q.Encode() + safeURL := *u + safeQuery := safeURL.Query() + if safeQuery.Has("apikey") { + safeQuery.Set("apikey", "redacted") + } + safeURL.RawQuery = safeQuery.Encode() + log.Printf("open-meteo request url=%s", safeURL.String()) + ctx, cancel := context.WithCancel(context.Background()) defer cancel() go func() { <-ctxDone; cancel() }()