diff --git a/cmd/ingestd/main.go b/cmd/ingestd/main.go index 88f8ab0..494d997 100644 --- a/cmd/ingestd/main.go +++ b/cmd/ingestd/main.go @@ -62,6 +62,12 @@ func main() { } defer d.Close() + // Rebuild recent rain increments from DB so wunderground rain rates/totals + // remain stable across container restarts. + hydrateCtx, hydrateCancel := context.WithTimeout(ctx, 10*time.Second) + hydrateLatestFromDB(hydrateCtx, d, latest, cfg.Site.Name, rainDayLoc) + hydrateCancel() + site := providers.Site{ Name: cfg.Site.Name, Latitude: cfg.Site.Latitude, @@ -219,6 +225,159 @@ func matchMQTTBinding(bindings []mqttTopicBinding, topic string) *mqttTopicBindi return nil } +func hydrateLatestFromDB(ctx context.Context, d *db.DB, latest *mqttingest.Latest, site string, rainDayLoc *time.Location) { + if d == nil || d.Pool == nil || latest == nil { + return + } + if rainDayLoc == nil { + rainDayLoc = time.UTC + } + + now := time.Now().UTC() + localNow := now.In(rainDayLoc) + localMidnight := time.Date(localNow.Year(), localNow.Month(), localNow.Day(), 0, 0, 0, 0, rainDayLoc) + start := localMidnight.Add(-1 * time.Hour).UTC() + + rows, err := d.Pool.Query(ctx, ` + SELECT + ts, + station_id, + coalesce(model, ''), + coalesce(battery_ok, 0), + coalesce(battery_mv, 0), + coalesce(temperature_c, 0), + coalesce(humidity, 0), + coalesce(wind_dir_deg, 0), + coalesce(wind_avg_m_s, 0), + coalesce(wind_max_m_s, 0), + coalesce(uvi, 0), + coalesce(light_lux, 0), + coalesce(flags, 0), + coalesce(rain_mm, 0), + coalesce(rain_start, 0), + coalesce(supercap_v, 0), + coalesce(firmware, 0), + coalesce(raw_data, ''), + coalesce(mic, ''), + coalesce(protocol, ''), + coalesce(rssi, 0), + coalesce(duration, 0) + FROM observations_ws90 + WHERE site = $1 + AND ts >= $2 + ORDER BY ts ASC + `, site, start) + if err != nil { + log.Printf("startup hydrate ws90 query error: %v", err) + return + } + defer rows.Close() + + hydrated := 0 + for rows.Next() { + var ( + ts time.Time + stationID int64 + model string + batteryOK, batteryMV int32 + tempC, humidity float64 + windDirDeg, windAvgMS, windMaxMS float64 + uvi, lightLux float64 + flags int32 + rainMM, supercapV float64 + rainStart, duration int64 + firmware, rssi int32 + rawData, mic, protocol string + ) + if err := rows.Scan( + &ts, + &stationID, + &model, + &batteryOK, + &batteryMV, + &tempC, + &humidity, + &windDirDeg, + &windAvgMS, + &windMaxMS, + &uvi, + &lightLux, + &flags, + &rainMM, + &rainStart, + &supercapV, + &firmware, + &rawData, + &mic, + &protocol, + &rssi, + &duration, + ); err != nil { + log.Printf("startup hydrate ws90 scan error: %v", err) + return + } + + latest.Update(ts, &mqttingest.WS90Payload{ + Model: model, + ID: stationID, + BatteryOK: int(batteryOK), + BatteryMV: int(batteryMV), + TemperatureC: tempC, + Humidity: humidity, + WindDirDeg: windDirDeg, + WindAvgMS: windAvgMS, + WindMaxMS: windMaxMS, + UVI: uvi, + LightLux: lightLux, + Flags: int(flags), + RainMM: rainMM, + RainStart: rainStart, + SupercapV: supercapV, + Firmware: int(firmware), + Data: rawData, + MIC: mic, + Protocol: protocol, + RSSI: int(rssi), + Duration: duration, + }) + hydrated++ + } + if err := rows.Err(); err != nil { + log.Printf("startup hydrate ws90 rows error: %v", err) + return + } + + if hydrated > 0 { + log.Printf( + "startup hydrate ws90 rows=%d window_start=%s window_end=%s", + hydrated, + start.Format(time.RFC3339), + now.Format(time.RFC3339), + ) + } else { + log.Printf("startup hydrate ws90 rows=0 window_start=%s", start.Format(time.RFC3339)) + } + + var ( + baroTS time.Time + pressureHPA float64 + ) + err = d.Pool.QueryRow(ctx, ` + SELECT ts, pressure_hpa + FROM observations_baro + WHERE site = $1 + AND pressure_hpa IS NOT NULL + ORDER BY ts DESC + LIMIT 1 + `, site).Scan(&baroTS, &pressureHPA) + if err != nil { + log.Printf("startup hydrate barometer skipped: %v", err) + return + } + latest.UpdateBarometer(baroTS, pressureHPA) + log.Printf("startup hydrate barometer ts=%s pressure_hpa=%.2f", baroTS.UTC().Format(time.RFC3339), pressureHPA) +} + func runOpenMeteoPoller(ctx context.Context, d *db.DB, cache *ForecastCache, site providers.Site, model string, interval time.Duration) { p := &providers.OpenMeteo{} t := time.NewTicker(interval)