Files
go-weatherstation/cmd/ingestd/main.go

484 lines
12 KiB
Go

package main
import (
"context"
"flag"
"log"
"os"
"os/signal"
"strings"
"syscall"
"time"
_ "time/tzdata"
"go-weatherstation/internal/config"
"go-weatherstation/internal/db"
"go-weatherstation/internal/mqttingest"
"go-weatherstation/internal/providers"
)
func main() {
log.SetOutput(os.Stdout)
var cfgPath string
flag.StringVar(&cfgPath, "config", "config.yaml", "path to config yaml")
flag.Parse()
cfg, err := config.Load(cfgPath)
if err != nil {
log.Fatalf("config load: %v", err)
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Shutdown handling
go func() {
ch := make(chan os.Signal, 1)
signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM)
<-ch
cancel()
}()
rainDayLoc := time.Local
if cfg.Site.Timezone != "" {
loc, err := time.LoadLocation(cfg.Site.Timezone)
if err != nil {
log.Fatalf("site timezone load: %v", err)
}
rainDayLoc = loc
log.Printf("site timezone configured: %s", cfg.Site.Timezone)
} else {
log.Printf("site timezone not set; falling back to process local timezone: %s", time.Local)
}
log.Printf("rain day timezone: %s", rainDayLoc)
latest := mqttingest.NewLatest(rainDayLoc)
forecastCache := &ForecastCache{}
d, err := db.Open(ctx, cfg.DB.ConnString)
if err != nil {
log.Fatalf("db open: %v", err)
}
defer d.Close()
// Rebuild recent rain increments from DB so wunderground rain rates/totals
// remain stable across container restarts.
hydrateCtx, hydrateCancel := context.WithTimeout(ctx, 10*time.Second)
hydrateLatestFromDB(hydrateCtx, d, latest, cfg.Site.Name, rainDayLoc)
hydrateCancel()
site := providers.Site{
Name: cfg.Site.Name,
Latitude: cfg.Site.Latitude,
Longitude: cfg.Site.Longitude,
}
// Start Open-Meteo poller (optional)
if cfg.Pollers.OpenMeteo.Enabled {
go runOpenMeteoPoller(ctx, d, forecastCache, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval)
}
if cfg.Web.Enabled != nil && *cfg.Web.Enabled {
go func() {
if err := runWebServer(ctx, d, site, cfg.Pollers.OpenMeteo.Model, cfg.Web.Listen); err != nil {
log.Printf("web server error: %v", err)
}
}()
}
if cfg.Wunderground.Enabled {
go runWundergroundUploader(ctx, latest, cfg.Wunderground.StationID, cfg.Wunderground.StationKey, cfg.Wunderground.Interval)
}
bindings := make([]mqttTopicBinding, 0, len(cfg.MQTT.Topics))
subscriptions := make([]mqttingest.Subscription, 0, len(cfg.MQTT.Topics))
for _, t := range cfg.MQTT.Topics {
qos := cfg.MQTT.QoS
if t.QoS != nil {
qos = *t.QoS
}
topicType := strings.ToLower(t.Type)
bindings = append(bindings, mqttTopicBinding{
Name: t.Name,
Topic: t.Topic,
Type: topicType,
QoS: qos,
})
subscriptions = append(subscriptions, mqttingest.Subscription{
Topic: t.Topic,
QoS: qos,
})
}
// MQTT subscriber (blocks until ctx done)
err = mqttingest.RunSubscriber(ctx, mqttingest.MQTTConfig{
Broker: cfg.MQTT.Broker,
ClientID: cfg.MQTT.ClientID,
Username: cfg.MQTT.Username,
Password: cfg.MQTT.Password,
Topics: subscriptions,
}, func(ctx context.Context, topic string, payload []byte) error {
log.Printf("mqtt message topic=%s bytes=%d payload=%s", topic, len(payload), string(payload))
binding := matchMQTTBinding(bindings, topic)
if binding == nil {
log.Printf("mqtt message ignored topic=%s reason=unmatched", topic)
return nil
}
switch binding.Type {
case "ws90":
p, raw, err := mqttingest.ParseWS90(payload)
if err != nil {
log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload))
return nil
}
// Use receive time as observation ts (WS90 payload doesn't include a timestamp).
ts := time.Now().UTC()
latest.Update(ts, p)
if snap, ok := latest.Snapshot(); ok {
logForecastDeviation(forecastCache, snap)
}
if err := d.InsertWS90(ctx, db.InsertWS90Params{
TS: ts,
Site: cfg.Site.Name,
StationID: p.ID,
Model: p.Model,
BatteryOK: p.BatteryOK,
BatteryMV: p.BatteryMV,
TempC: p.TemperatureC,
Humidity: p.Humidity,
WindDirDeg: p.WindDirDeg,
WindAvgMS: p.WindAvgMS,
WindMaxMS: p.WindMaxMS,
UVI: p.UVI,
LightLux: p.LightLux,
Flags: p.Flags,
RainMM: p.RainMM,
RainStart: p.RainStart,
SupercapV: p.SupercapV,
Firmware: p.Firmware,
RawData: p.Data,
MIC: p.MIC,
Protocol: p.Protocol,
RSSI: p.RSSI,
Duration: p.Duration,
Payload: raw,
}); err != nil {
log.Printf("db insert ws90 error: %v", err)
}
case "baro", "barometer", "pressure":
p, raw, err := mqttingest.ParseBarometer(payload)
if err != nil {
log.Printf("barometer parse error topic=%s err=%v payload=%s", topic, err, string(payload))
return nil
}
ts := time.Now().UTC()
source := binding.Name
if source == "" {
source = binding.Topic
}
latest.UpdateBarometer(ts, p.PressureHPA)
if err := d.InsertBarometer(ctx, db.InsertBarometerParams{
TS: ts,
Site: cfg.Site.Name,
Source: source,
PressureHPA: p.PressureHPA,
Payload: raw,
}); err != nil {
log.Printf("db insert barometer error: %v", err)
} else {
log.Printf("barometer stored source=%s pressure_hpa=%.2f", source, p.PressureHPA)
}
default:
log.Printf("mqtt message ignored topic=%s reason=unknown_type type=%s", topic, binding.Type)
}
return nil
})
if err != nil {
log.Fatalf("mqtt subscriber: %v", err)
}
}
type mqttTopicBinding struct {
Name string
Topic string
Type string
QoS byte
}
func matchMQTTBinding(bindings []mqttTopicBinding, topic string) *mqttTopicBinding {
for i := range bindings {
if mqttingest.TopicMatches(bindings[i].Topic, topic) {
return &bindings[i]
}
}
return nil
}
func hydrateLatestFromDB(ctx context.Context, d *db.DB, latest *mqttingest.Latest, site string, rainDayLoc *time.Location) {
if d == nil || d.Pool == nil || latest == nil {
return
}
if rainDayLoc == nil {
rainDayLoc = time.UTC
}
now := time.Now().UTC()
localNow := now.In(rainDayLoc)
localMidnight := time.Date(localNow.Year(), localNow.Month(), localNow.Day(), 0, 0, 0, 0, rainDayLoc)
start := localMidnight.Add(-1 * time.Hour).UTC()
rows, err := d.Pool.Query(ctx, `
SELECT
ts,
station_id,
coalesce(model, ''),
coalesce(battery_ok, 0),
coalesce(battery_mv, 0),
coalesce(temperature_c, 0),
coalesce(humidity, 0),
coalesce(wind_dir_deg, 0),
coalesce(wind_avg_m_s, 0),
coalesce(wind_max_m_s, 0),
coalesce(uvi, 0),
coalesce(light_lux, 0),
coalesce(flags, 0),
coalesce(rain_mm, 0),
coalesce(rain_start, 0),
coalesce(supercap_v, 0),
coalesce(firmware, 0),
coalesce(raw_data, ''),
coalesce(mic, ''),
coalesce(protocol, ''),
coalesce(rssi, 0),
coalesce(duration, 0)
FROM observations_ws90
WHERE site = $1
AND ts >= $2
ORDER BY ts ASC
`, site, start)
if err != nil {
log.Printf("startup hydrate ws90 query error: %v", err)
return
}
defer rows.Close()
hydrated := 0
for rows.Next() {
var (
ts time.Time
stationID int64
model string
batteryOK, batteryMV int32
tempC, humidity float64
windDirDeg, windAvgMS, windMaxMS float64
uvi, lightLux float64
flags int32
rainMM, supercapV float64
rainStart, duration int64
firmware, rssi int32
rawData, mic, protocol string
)
if err := rows.Scan(
&ts,
&stationID,
&model,
&batteryOK,
&batteryMV,
&tempC,
&humidity,
&windDirDeg,
&windAvgMS,
&windMaxMS,
&uvi,
&lightLux,
&flags,
&rainMM,
&rainStart,
&supercapV,
&firmware,
&rawData,
&mic,
&protocol,
&rssi,
&duration,
); err != nil {
log.Printf("startup hydrate ws90 scan error: %v", err)
return
}
latest.Update(ts, &mqttingest.WS90Payload{
Model: model,
ID: stationID,
BatteryOK: int(batteryOK),
BatteryMV: int(batteryMV),
TemperatureC: tempC,
Humidity: humidity,
WindDirDeg: windDirDeg,
WindAvgMS: windAvgMS,
WindMaxMS: windMaxMS,
UVI: uvi,
LightLux: lightLux,
Flags: int(flags),
RainMM: rainMM,
RainStart: rainStart,
SupercapV: supercapV,
Firmware: int(firmware),
Data: rawData,
MIC: mic,
Protocol: protocol,
RSSI: int(rssi),
Duration: duration,
})
hydrated++
}
if err := rows.Err(); err != nil {
log.Printf("startup hydrate ws90 rows error: %v", err)
return
}
if hydrated > 0 {
log.Printf(
"startup hydrate ws90 rows=%d window_start=%s window_end=%s",
hydrated,
start.Format(time.RFC3339),
now.Format(time.RFC3339),
)
} else {
log.Printf("startup hydrate ws90 rows=0 window_start=%s", start.Format(time.RFC3339))
}
var (
baroTS time.Time
pressureHPA float64
)
err = d.Pool.QueryRow(ctx, `
SELECT ts, pressure_hpa
FROM observations_baro
WHERE site = $1
AND pressure_hpa IS NOT NULL
ORDER BY ts DESC
LIMIT 1
`, site).Scan(&baroTS, &pressureHPA)
if err != nil {
log.Printf("startup hydrate barometer skipped: %v", err)
return
}
latest.UpdateBarometer(baroTS, pressureHPA)
log.Printf("startup hydrate barometer ts=%s pressure_hpa=%.2f", baroTS.UTC().Format(time.RFC3339), pressureHPA)
}
func runOpenMeteoPoller(ctx context.Context, d *db.DB, cache *ForecastCache, site providers.Site, model string, interval time.Duration) {
p := &providers.OpenMeteo{}
t := time.NewTicker(interval)
defer t.Stop()
// poll immediately at startup
pollOnce(ctx, d, cache, p, site, model)
for {
select {
case <-ctx.Done():
return
case <-t.C:
pollOnce(ctx, d, cache, p, site, model)
}
}
}
func runWundergroundUploader(ctx context.Context, latest *mqttingest.Latest, stationID, stationKey string, interval time.Duration) {
client := &providers.WundergroundClient{}
t := time.NewTicker(interval)
defer t.Stop()
for {
select {
case <-ctx.Done():
return
case <-t.C:
snap, ok := latest.Snapshot()
if !ok {
log.Printf("wunderground: no data yet")
continue
}
log.Printf("wunderground upload start station_id=%s", stationID)
up := providers.WUUpload{
StationID: stationID,
StationKey: stationKey,
TempC: snap.P.TemperatureC,
Humidity: snap.P.Humidity,
WindDirDeg: snap.P.WindDirDeg,
WindAvgMS: snap.P.WindAvgMS,
WindMaxMS: snap.P.WindMaxMS,
UVI: snap.P.UVI,
RainLastHourMM: snap.RainLastHourMM,
DailyRainMM: snap.DailyRainMM,
PressureHPA: snap.PressureHPA,
DateUTC: "now",
}
resp, err := client.Upload(ctx, up)
if err != nil {
log.Printf("wunderground upload failed: %v", err)
continue
}
log.Printf("wunderground upload ok: %s", resp)
}
}
}
func pollOnce(ctx context.Context, d *db.DB, cache *ForecastCache, p providers.Provider, site providers.Site, model string) {
log.Printf("forecast fetch start provider=%s model=%s site=%s", p.Name(), model, site.Name)
res, err := p.Fetch(ctx.Done(), site, model)
if err != nil {
log.Printf("forecast fetch error provider=%s err=%v", p.Name(), err)
return
}
if cache != nil {
cache.Set(res)
}
if summary := forecastSummary(res); summary != "" {
log.Printf("forecast summary provider=%s model=%s %s", p.Name(), res.Model, summary)
}
for _, pt := range res.Hourly {
err := d.UpsertOpenMeteoHourly(ctx, db.InsertOpenMeteoHourlyParams{
TS: pt.TS,
RetrievedAt: res.RetrievedAt,
Site: site.Name,
Model: res.Model,
TempC: pt.TempC,
RH: pt.RH,
PressureMSLH: pt.PressureMSLH,
WindMS: pt.WindMS,
WindGustMS: pt.WindGustMS,
WindDirDeg: pt.WindDirDeg,
PrecipMM: pt.PrecipMM,
PrecipProb: pt.PrecipProb,
CloudCover: pt.CloudCover,
SourcePayload: res.Raw,
})
if err != nil {
log.Printf("forecast upsert error ts=%s err=%v", pt.TS.Format(time.RFC3339), err)
}
}
log.Printf("forecast stored provider=%s model=%s points=%d retrieved_at=%s",
p.Name(), res.Model, len(res.Hourly), res.RetrievedAt.Format(time.RFC3339))
}