From 5d07c5d54b5856f4b27cdd40bd79eb58b4b91d18 Mon Sep 17 00:00:00 2001 From: Nathan Coad Date: Thu, 29 Jan 2026 14:04:18 +1100 Subject: [PATCH] add support for barometric pressure --- README.md | 15 ++- cmd/ingestd/main.go | 147 +++++++++++++++++++++-------- config.yaml | 8 +- db/init/001_schema.sql | 46 +++++++++ internal/config/config.go | 48 ++++++++-- internal/db/queries.go | 24 +++++ internal/mqttingest/barometer.go | 105 +++++++++++++++++++++ internal/mqttingest/mqtt.go | 23 ++++- internal/mqttingest/topic_match.go | 36 +++++++ 9 files changed, 401 insertions(+), 51 deletions(-) create mode 100644 internal/mqttingest/barometer.go create mode 100644 internal/mqttingest/topic_match.go diff --git a/README.md b/README.md index 0bfe40d..e2dc3c6 100644 --- a/README.md +++ b/README.md @@ -10,7 +10,7 @@ Starter weather-station data pipeline: `docker compose up -d` 2) Configure: - edit `config.yaml` (or `test.yaml`) with your MQTT broker, topic, and site coordinates. + edit `config.yaml` (or `test.yaml`) with your MQTT broker, topics, and site coordinates. 3) Run the ingest service locally: `go run ./cmd/ingestd -config config.yaml` @@ -27,8 +27,14 @@ mqtt: client_id: "go-weatherstation-ingestd" # Client ID username: "" # Optional username password: "" # Optional password - topic: "ecowitt/ws90" # Topic to subscribe to - qos: 1 # MQTT QoS (0, 1, or 2) + qos: 1 # Default MQTT QoS (0, 1, or 2) + topics: + - name: "ws90" + topic: "ecowitt/ws90" # WS90 payload topic + type: "ws90" + - name: "baro" + topic: "sensors/barometer" # Barometric pressure topic + type: "baro" db: conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable" @@ -59,11 +65,13 @@ wunderground: ### Notes - The Open-Meteo ECMWF endpoint is queried by the poller only. The UI reads forecasts from TimescaleDB. - Web UI supports Local/UTC toggle and date-aligned ranges (6h, 24h, 72h, 7d). +- `mqtt.topic` is still supported for single-topic configs, but `mqtt.topics` is preferred. ## Schema & tables TimescaleDB schema is initialized from `db/init/001_schema.sql` and includes: - `observations_ws90` (hypertable): raw WS90 observations with payload metadata, plus the full JSON payload (`payload_json`). +- `observations_baro` (hypertable): barometric pressure observations from other MQTT topics. - `forecast_openmeteo_hourly` (hypertable): hourly forecast points keyed by `(site, model, retrieved_at, ts)`. - Continuous aggregates: - `cagg_ws90_1m`: 1‑minute rollups (avg/min/max for temp, humidity, wind, uvi, light, rain). @@ -71,6 +79,7 @@ TimescaleDB schema is initialized from `db/init/001_schema.sql` and includes: Retention/compression: - `observations_ws90` has a 90‑day retention policy and compression after 7 days. +- `observations_baro` has a 90‑day retention policy and compression after 7 days. ## Publish a test WS90 payload ```sh diff --git a/cmd/ingestd/main.go b/cmd/ingestd/main.go index 5e2308e..535dcef 100644 --- a/cmd/ingestd/main.go +++ b/cmd/ingestd/main.go @@ -6,6 +6,7 @@ import ( "log" "os" "os/signal" + "strings" "syscall" "time" @@ -70,59 +71,113 @@ func main() { go runWundergroundUploader(ctx, latest, cfg.Wunderground.StationID, cfg.Wunderground.StationKey, cfg.Wunderground.Interval) } + bindings := make([]mqttTopicBinding, 0, len(cfg.MQTT.Topics)) + subscriptions := make([]mqttingest.Subscription, 0, len(cfg.MQTT.Topics)) + for _, t := range cfg.MQTT.Topics { + qos := cfg.MQTT.QoS + if t.QoS != nil { + qos = *t.QoS + } + topicType := strings.ToLower(t.Type) + bindings = append(bindings, mqttTopicBinding{ + Name: t.Name, + Topic: t.Topic, + Type: topicType, + QoS: qos, + }) + subscriptions = append(subscriptions, mqttingest.Subscription{ + Topic: t.Topic, + QoS: qos, + }) + } + // MQTT subscriber (blocks until ctx done) err = mqttingest.RunSubscriber(ctx, mqttingest.MQTTConfig{ Broker: cfg.MQTT.Broker, ClientID: cfg.MQTT.ClientID, Username: cfg.MQTT.Username, Password: cfg.MQTT.Password, - Topic: cfg.MQTT.Topic, - QoS: cfg.MQTT.QoS, + Topics: subscriptions, }, func(ctx context.Context, topic string, payload []byte) error { log.Printf("mqtt message topic=%s bytes=%d payload=%s", topic, len(payload), string(payload)) - p, raw, err := mqttingest.ParseWS90(payload) - if err != nil { - log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload)) + binding := matchMQTTBinding(bindings, topic) + if binding == nil { + log.Printf("mqtt message ignored topic=%s reason=unmatched", topic) return nil } - // Use receive time as observation ts (WS90 payload doesn't include a timestamp). - ts := time.Now().UTC() + switch binding.Type { + case "ws90": + p, raw, err := mqttingest.ParseWS90(payload) + if err != nil { + log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload)) + return nil + } - latest.Update(ts, p) + // Use receive time as observation ts (WS90 payload doesn't include a timestamp). + ts := time.Now().UTC() - if snap, ok := latest.Snapshot(); ok { - logForecastDeviation(forecastCache, snap) - } + latest.Update(ts, p) - if err := d.InsertWS90(ctx, db.InsertWS90Params{ - TS: ts, - Site: cfg.Site.Name, - StationID: p.ID, - Model: p.Model, - BatteryOK: p.BatteryOK, - BatteryMV: p.BatteryMV, - TempC: p.TemperatureC, - Humidity: p.Humidity, - WindDirDeg: p.WindDirDeg, - WindAvgMS: p.WindAvgMS, - WindMaxMS: p.WindMaxMS, - UVI: p.UVI, - LightLux: p.LightLux, - Flags: p.Flags, - RainMM: p.RainMM, - RainStart: p.RainStart, - SupercapV: p.SupercapV, - Firmware: p.Firmware, - RawData: p.Data, - MIC: p.MIC, - Protocol: p.Protocol, - RSSI: p.RSSI, - Duration: p.Duration, - Payload: raw, - }); err != nil { - log.Printf("db insert ws90 error: %v", err) + if snap, ok := latest.Snapshot(); ok { + logForecastDeviation(forecastCache, snap) + } + + if err := d.InsertWS90(ctx, db.InsertWS90Params{ + TS: ts, + Site: cfg.Site.Name, + StationID: p.ID, + Model: p.Model, + BatteryOK: p.BatteryOK, + BatteryMV: p.BatteryMV, + TempC: p.TemperatureC, + Humidity: p.Humidity, + WindDirDeg: p.WindDirDeg, + WindAvgMS: p.WindAvgMS, + WindMaxMS: p.WindMaxMS, + UVI: p.UVI, + LightLux: p.LightLux, + Flags: p.Flags, + RainMM: p.RainMM, + RainStart: p.RainStart, + SupercapV: p.SupercapV, + Firmware: p.Firmware, + RawData: p.Data, + MIC: p.MIC, + Protocol: p.Protocol, + RSSI: p.RSSI, + Duration: p.Duration, + Payload: raw, + }); err != nil { + log.Printf("db insert ws90 error: %v", err) + } + case "baro", "barometer", "pressure": + p, raw, err := mqttingest.ParseBarometer(payload) + if err != nil { + log.Printf("barometer parse error topic=%s err=%v payload=%s", topic, err, string(payload)) + return nil + } + + ts := time.Now().UTC() + source := binding.Name + if source == "" { + source = binding.Topic + } + + if err := d.InsertBarometer(ctx, db.InsertBarometerParams{ + TS: ts, + Site: cfg.Site.Name, + Source: source, + PressureHPA: p.PressureHPA, + Payload: raw, + }); err != nil { + log.Printf("db insert barometer error: %v", err) + } else { + log.Printf("barometer stored source=%s pressure_hpa=%.2f", source, p.PressureHPA) + } + default: + log.Printf("mqtt message ignored topic=%s reason=unknown_type type=%s", topic, binding.Type) } return nil @@ -132,6 +187,22 @@ func main() { } } +type mqttTopicBinding struct { + Name string + Topic string + Type string + QoS byte +} + +func matchMQTTBinding(bindings []mqttTopicBinding, topic string) *mqttTopicBinding { + for i := range bindings { + if mqttingest.TopicMatches(bindings[i].Topic, topic) { + return &bindings[i] + } + } + return nil +} + func runOpenMeteoPoller(ctx context.Context, d *db.DB, cache *ForecastCache, site providers.Site, model string, interval time.Duration) { p := &providers.OpenMeteo{} t := time.NewTicker(interval) diff --git a/config.yaml b/config.yaml index 20e9fb8..6d14c81 100644 --- a/config.yaml +++ b/config.yaml @@ -1,8 +1,14 @@ mqtt: broker: "tcp://mosquitto:1883" client_id: "go-weatherstation-ingestd" - topic: "ecowitt/ws90" qos: 1 + topics: + - name: "ws90" + topic: "ecowitt/ws90" + type: "ws90" + # - name: "baro" + # topic: "sensors/barometer" + # type: "baro" db: conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable" diff --git a/db/init/001_schema.sql b/db/init/001_schema.sql index 3e8a29b..6bdd4c9 100644 --- a/db/init/001_schema.sql +++ b/db/init/001_schema.sql @@ -41,6 +41,21 @@ SELECT create_hypertable('observations_ws90', 'ts', if_not_exists => TRUE); CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC); CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC); +-- Barometric pressure observations (from other MQTT sources) +CREATE TABLE IF NOT EXISTS observations_baro ( + ts TIMESTAMPTZ NOT NULL, + received_at TIMESTAMPTZ NOT NULL DEFAULT now(), + site TEXT NOT NULL, + source TEXT NOT NULL, + pressure_hpa DOUBLE PRECISION, + payload_json JSONB +); + +SELECT create_hypertable('observations_baro', 'ts', if_not_exists => TRUE); + +CREATE INDEX IF NOT EXISTS idx_observations_baro_site_ts ON observations_baro(site, ts DESC); +CREATE INDEX IF NOT EXISTS idx_observations_baro_source_ts ON observations_baro(source, ts DESC); + -- Open-Meteo hourly baseline forecasts CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly ( ts TIMESTAMPTZ NOT NULL, @@ -100,6 +115,37 @@ BEGIN END IF; END$$; +-- Raw retention: 90 days (barometric pressure) +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM timescaledb_information.jobs j + WHERE j.proc_name = 'policy_retention' + AND j.hypertable_name = 'observations_baro' + ) THEN + PERFORM add_retention_policy('observations_baro', INTERVAL '90 days'); + END IF; +END$$; + +-- Compression after 7 days +ALTER TABLE observations_baro SET ( + timescaledb.compress, + timescaledb.compress_segmentby = 'site,source' +); + +DO $$ +BEGIN + IF NOT EXISTS ( + SELECT 1 + FROM timescaledb_information.jobs + WHERE proc_name = 'policy_compression' + AND hypertable_name = 'observations_baro' + ) THEN + PERFORM add_compression_policy('observations_baro', INTERVAL '7 days'); + END IF; +END$$; + -- 1-minute continuous aggregate CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m WITH (timescaledb.continuous) AS diff --git a/internal/config/config.go b/internal/config/config.go index 8054c10..7c686e2 100644 --- a/internal/config/config.go +++ b/internal/config/config.go @@ -3,6 +3,7 @@ package config import ( "errors" "os" + "strings" "time" "gopkg.in/yaml.v3" @@ -12,12 +13,13 @@ type Config struct { LogLevel string `yaml:"log_level"` MQTT struct { - Broker string `yaml:"broker"` - ClientID string `yaml:"client_id"` - Username string `yaml:"username"` - Password string `yaml:"password"` - Topic string `yaml:"topic"` - QoS byte `yaml:"qos"` + Broker string `yaml:"broker"` + ClientID string `yaml:"client_id"` + Username string `yaml:"username"` + Password string `yaml:"password"` + Topic string `yaml:"topic"` + QoS byte `yaml:"qos"` + Topics []MQTTTopic `yaml:"topics"` } `yaml:"mqtt"` DB struct { @@ -52,6 +54,13 @@ type Config struct { } `yaml:"wunderground"` } +type MQTTTopic struct { + Name string `yaml:"name"` + Topic string `yaml:"topic"` + Type string `yaml:"type"` + QoS *byte `yaml:"qos"` +} + func Load(path string) (*Config, error) { b, err := os.ReadFile(path) if err != nil { @@ -64,8 +73,31 @@ func Load(path string) (*Config, error) { } // Minimal validation - if c.MQTT.Broker == "" || c.MQTT.Topic == "" { - return nil, errors.New("mqtt broker and topic are required") + if c.MQTT.Broker == "" { + return nil, errors.New("mqtt broker is required") + } + if len(c.MQTT.Topics) == 0 && c.MQTT.Topic != "" { + qos := c.MQTT.QoS + c.MQTT.Topics = []MQTTTopic{{ + Name: "ws90", + Topic: c.MQTT.Topic, + Type: "ws90", + QoS: &qos, + }} + } + if len(c.MQTT.Topics) == 0 { + return nil, errors.New("mqtt topic(s) are required") + } + for i := range c.MQTT.Topics { + t := c.MQTT.Topics[i] + if t.Topic == "" { + return nil, errors.New("mqtt topics must include topic") + } + if t.Type == "" { + t.Type = "ws90" + } + t.Type = strings.ToLower(t.Type) + c.MQTT.Topics[i] = t } if c.DB.ConnString == "" { return nil, errors.New("db conn_string is required") diff --git a/internal/db/queries.go b/internal/db/queries.go index 7a32cf5..05e0262 100644 --- a/internal/db/queries.go +++ b/internal/db/queries.go @@ -122,3 +122,27 @@ func (d *DB) UpsertOpenMeteoHourly(ctx context.Context, p InsertOpenMeteoHourlyP return err } + +type InsertBarometerParams struct { + TS time.Time + Site string + Source string + PressureHPA float64 + Payload map[string]any +} + +func (d *DB) InsertBarometer(ctx context.Context, p InsertBarometerParams) error { + b, _ := json.Marshal(p.Payload) + + payloadJSON := json.RawMessage(b) + + _, err := d.Pool.Exec(ctx, ` + INSERT INTO observations_baro ( + ts, site, source, pressure_hpa, payload_json + ) VALUES ( + $1,$2,$3,$4,$5 + ) + `, p.TS, p.Site, p.Source, p.PressureHPA, payloadJSON) + + return err +} diff --git a/internal/mqttingest/barometer.go b/internal/mqttingest/barometer.go new file mode 100644 index 0000000..71b4529 --- /dev/null +++ b/internal/mqttingest/barometer.go @@ -0,0 +1,105 @@ +package mqttingest + +import ( + "encoding/json" + "fmt" + "strconv" + "strings" +) + +type BarometerPayload struct { + PressureHPA float64 +} + +func ParseBarometer(b []byte) (*BarometerPayload, map[string]any, error) { + var raw map[string]any + if err := json.Unmarshal(b, &raw); err != nil { + return nil, nil, err + } + + pressure, ok := pressureHPAFromPayload(raw) + if !ok { + return nil, raw, fmt.Errorf("barometer payload missing pressure field") + } + + return &BarometerPayload{ + PressureHPA: pressure, + }, raw, nil +} + +func pressureHPAFromPayload(raw map[string]any) (float64, bool) { + if v, ok := findFloat(raw, + "pressure_hpa", + "pressure_mb", + "pressure_mbar", + "barometer_hpa", + "baro_hpa", + "pressure", + ); ok { + return v, true + } + if v, ok := findFloat(raw, "pressure_pa"); ok { + return v / 100.0, true + } + if v, ok := findFloat(raw, "pressure_kpa"); ok { + return v * 10.0, true + } + if v, ok := findFloat(raw, "pressure_inhg", "barometer_inhg"); ok { + return v * 33.8638866667, true + } + return 0, false +} + +func findFloat(raw map[string]any, keys ...string) (float64, bool) { + for _, key := range keys { + v, ok := raw[key] + if !ok { + continue + } + if f, ok := asFloat(v); ok { + return f, true + } + } + return 0, false +} + +func asFloat(v any) (float64, bool) { + switch t := v.(type) { + case float64: + return t, true + case float32: + return float64(t), true + case int: + return float64(t), true + case int8: + return float64(t), true + case int16: + return float64(t), true + case int32: + return float64(t), true + case int64: + return float64(t), true + case uint: + return float64(t), true + case uint8: + return float64(t), true + case uint16: + return float64(t), true + case uint32: + return float64(t), true + case uint64: + return float64(t), true + case json.Number: + f, err := t.Float64() + return f, err == nil + case string: + s := strings.TrimSpace(t) + if s == "" { + return 0, false + } + f, err := strconv.ParseFloat(s, 64) + return f, err == nil + default: + return 0, false + } +} diff --git a/internal/mqttingest/mqtt.go b/internal/mqttingest/mqtt.go index bdcd961..340999c 100644 --- a/internal/mqttingest/mqtt.go +++ b/internal/mqttingest/mqtt.go @@ -15,6 +15,12 @@ type MQTTConfig struct { Password string Topic string QoS byte + Topics []Subscription +} + +type Subscription struct { + Topic string + QoS byte } type Handler func(ctx context.Context, topic string, payload []byte) error @@ -38,7 +44,22 @@ func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error { } // Subscribe - if tok := client.Subscribe(cfg.Topic, cfg.QoS, func(_ mqtt.Client, msg mqtt.Message) { + subs := map[string]byte{} + if len(cfg.Topics) > 0 { + for _, sub := range cfg.Topics { + if sub.Topic == "" { + continue + } + subs[sub.Topic] = sub.QoS + } + } else if cfg.Topic != "" { + subs[cfg.Topic] = cfg.QoS + } + if len(subs) == 0 { + return fmt.Errorf("mqtt subscribe: no topics configured") + } + + if tok := client.SubscribeMultiple(subs, func(_ mqtt.Client, msg mqtt.Message) { // Keep callback short; do work with context _ = h(ctx, msg.Topic(), msg.Payload()) }); tok.Wait() && tok.Error() != nil { diff --git a/internal/mqttingest/topic_match.go b/internal/mqttingest/topic_match.go new file mode 100644 index 0000000..cf6174a --- /dev/null +++ b/internal/mqttingest/topic_match.go @@ -0,0 +1,36 @@ +package mqttingest + +import "strings" + +// TopicMatches reports whether a topic filter (with + or # wildcards) matches a topic name. +// It follows the MQTT v3.1.1 wildcard rules and supports shared subscriptions ($share). +func TopicMatches(filter, topic string) bool { + return matchTopic(routeSplit(filter), strings.Split(topic, "/")) +} + +func matchTopic(route []string, topic []string) bool { + if len(route) == 0 { + return len(topic) == 0 + } + if len(topic) == 0 { + return route[0] == "#" + } + if route[0] == "#" { + return true + } + if route[0] == "+" || route[0] == topic[0] { + return matchTopic(route[1:], topic[1:]) + } + return false +} + +// routeSplit removes $share/group/ when matching shared subscription filters. +func routeSplit(route string) []string { + if strings.HasPrefix(route, "$share/") { + parts := strings.Split(route, "/") + if len(parts) > 2 { + return parts[2:] + } + } + return strings.Split(route, "/") +}