package main import ( "context" "flag" "log" "os" "os/signal" "syscall" "time" "go-weatherstation/internal/config" "go-weatherstation/internal/db" "go-weatherstation/internal/mqttingest" "go-weatherstation/internal/providers" ) func main() { var cfgPath string flag.StringVar(&cfgPath, "config", "config.yaml", "path to config yaml") flag.Parse() cfg, err := config.Load(cfgPath) if err != nil { log.Fatalf("config load: %v", err) } ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Shutdown handling go func() { ch := make(chan os.Signal, 1) signal.Notify(ch, syscall.SIGINT, syscall.SIGTERM) <-ch cancel() }() latest := &mqttingest.Latest{} d, err := db.Open(ctx, cfg.DB.ConnString) if err != nil { log.Fatalf("db open: %v", err) } defer d.Close() site := providers.Site{ Name: cfg.Site.Name, Latitude: cfg.Site.Latitude, Longitude: cfg.Site.Longitude, } // Start Open-Meteo poller (optional) if cfg.Pollers.OpenMeteo.Enabled { go runOpenMeteoPoller(ctx, d, site, cfg.Pollers.OpenMeteo.Model, cfg.Pollers.OpenMeteo.Interval) } if cfg.Wunderground.Enabled { go runWundergroundUploader(ctx, latest, cfg.Wunderground.StationID, cfg.Wunderground.StationKey, cfg.Wunderground.Interval) } // MQTT subscriber (blocks until ctx done) err = mqttingest.RunSubscriber(ctx, mqttingest.MQTTConfig{ Broker: cfg.MQTT.Broker, ClientID: cfg.MQTT.ClientID, Username: cfg.MQTT.Username, Password: cfg.MQTT.Password, Topic: cfg.MQTT.Topic, QoS: cfg.MQTT.QoS, }, func(ctx context.Context, topic string, payload []byte) error { p, raw, err := mqttingest.ParseWS90(payload) if err != nil { log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload)) return nil } // Use receive time as observation ts (WS90 payload doesn't include a timestamp). ts := time.Now().UTC() latest.Update(ts, p) if err := d.InsertWS90(ctx, db.InsertWS90Params{ TS: ts, Site: cfg.Site.Name, StationID: p.ID, Model: p.Model, BatteryOK: p.BatteryOK, BatteryMV: p.BatteryMV, TempC: p.TemperatureC, Humidity: p.Humidity, WindDirDeg: p.WindDirDeg, WindAvgMS: p.WindAvgMS, WindMaxMS: p.WindMaxMS, UVI: p.UVI, LightLux: p.LightLux, Flags: p.Flags, RainMM: p.RainMM, RainStart: p.RainStart, SupercapV: p.SupercapV, Firmware: p.Firmware, RawData: p.Data, MIC: p.MIC, Protocol: p.Protocol, RSSI: p.RSSI, Duration: p.Duration, Payload: raw, }); err != nil { log.Printf("db insert ws90 error: %v", err) } return nil }) if err != nil { log.Fatalf("mqtt subscriber: %v", err) } } func runOpenMeteoPoller(ctx context.Context, d *db.DB, site providers.Site, model string, interval time.Duration) { p := &providers.OpenMeteo{} t := time.NewTicker(interval) defer t.Stop() // poll immediately at startup pollOnce(ctx, d, p, site, model) for { select { case <-ctx.Done(): return case <-t.C: pollOnce(ctx, d, p, site, model) } } } func runWundergroundUploader(ctx context.Context, latest *mqttingest.Latest, stationID, stationKey string, interval time.Duration) { client := &providers.WundergroundClient{} t := time.NewTicker(interval) defer t.Stop() for { select { case <-ctx.Done(): return case <-t.C: snap, ok := latest.Snapshot() if !ok { log.Printf("wunderground: no data yet") continue } up := providers.WUUpload{ StationID: stationID, StationKey: stationKey, TempC: snap.P.TemperatureC, Humidity: snap.P.Humidity, WindDirDeg: snap.P.WindDirDeg, WindAvgMS: snap.P.WindAvgMS, WindMaxMS: snap.P.WindMaxMS, UVI: snap.P.UVI, RainLastHourMM: snap.RainLastHourMM, DailyRainMM: snap.DailyRainMM, DateUTC: "now", } resp, err := client.Upload(ctx, up) if err != nil { log.Printf("wunderground upload failed: %v", err) continue } log.Printf("wunderground upload ok: %s", resp) } } } func pollOnce(ctx context.Context, d *db.DB, p providers.Provider, site providers.Site, model string) { res, err := p.Fetch(ctx.Done(), site, model) if err != nil { log.Printf("forecast fetch error provider=%s err=%v", p.Name(), err) return } for _, pt := range res.Hourly { err := d.UpsertOpenMeteoHourly(ctx, db.InsertOpenMeteoHourlyParams{ TS: pt.TS, RetrievedAt: res.RetrievedAt, Site: site.Name, Model: res.Model, TempC: pt.TempC, RH: pt.RH, PressureMSLH: pt.PressureMSLH, WindMS: pt.WindMS, WindGustMS: pt.WindGustMS, WindDirDeg: pt.WindDirDeg, PrecipMM: pt.PrecipMM, PrecipProb: pt.PrecipProb, CloudCover: pt.CloudCover, SourcePayload: res.Raw, }) if err != nil { log.Printf("forecast upsert error ts=%s err=%v", pt.TS.Format(time.RFC3339), err) } } log.Printf("forecast stored provider=%s model=%s points=%d retrieved_at=%s", p.Name(), res.Model, len(res.Hourly), res.RetrievedAt.Format(time.RFC3339)) }