add support for barometric pressure
This commit is contained in:
15
README.md
15
README.md
@@ -10,7 +10,7 @@ Starter weather-station data pipeline:
|
|||||||
`docker compose up -d`
|
`docker compose up -d`
|
||||||
|
|
||||||
2) Configure:
|
2) Configure:
|
||||||
edit `config.yaml` (or `test.yaml`) with your MQTT broker, topic, and site coordinates.
|
edit `config.yaml` (or `test.yaml`) with your MQTT broker, topics, and site coordinates.
|
||||||
|
|
||||||
3) Run the ingest service locally:
|
3) Run the ingest service locally:
|
||||||
`go run ./cmd/ingestd -config config.yaml`
|
`go run ./cmd/ingestd -config config.yaml`
|
||||||
@@ -27,8 +27,14 @@ mqtt:
|
|||||||
client_id: "go-weatherstation-ingestd" # Client ID
|
client_id: "go-weatherstation-ingestd" # Client ID
|
||||||
username: "" # Optional username
|
username: "" # Optional username
|
||||||
password: "" # Optional password
|
password: "" # Optional password
|
||||||
topic: "ecowitt/ws90" # Topic to subscribe to
|
qos: 1 # Default MQTT QoS (0, 1, or 2)
|
||||||
qos: 1 # MQTT QoS (0, 1, or 2)
|
topics:
|
||||||
|
- name: "ws90"
|
||||||
|
topic: "ecowitt/ws90" # WS90 payload topic
|
||||||
|
type: "ws90"
|
||||||
|
- name: "baro"
|
||||||
|
topic: "sensors/barometer" # Barometric pressure topic
|
||||||
|
type: "baro"
|
||||||
|
|
||||||
db:
|
db:
|
||||||
conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable"
|
conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable"
|
||||||
@@ -59,11 +65,13 @@ wunderground:
|
|||||||
### Notes
|
### Notes
|
||||||
- The Open-Meteo ECMWF endpoint is queried by the poller only. The UI reads forecasts from TimescaleDB.
|
- The Open-Meteo ECMWF endpoint is queried by the poller only. The UI reads forecasts from TimescaleDB.
|
||||||
- Web UI supports Local/UTC toggle and date-aligned ranges (6h, 24h, 72h, 7d).
|
- Web UI supports Local/UTC toggle and date-aligned ranges (6h, 24h, 72h, 7d).
|
||||||
|
- `mqtt.topic` is still supported for single-topic configs, but `mqtt.topics` is preferred.
|
||||||
|
|
||||||
## Schema & tables
|
## Schema & tables
|
||||||
TimescaleDB schema is initialized from `db/init/001_schema.sql` and includes:
|
TimescaleDB schema is initialized from `db/init/001_schema.sql` and includes:
|
||||||
|
|
||||||
- `observations_ws90` (hypertable): raw WS90 observations with payload metadata, plus the full JSON payload (`payload_json`).
|
- `observations_ws90` (hypertable): raw WS90 observations with payload metadata, plus the full JSON payload (`payload_json`).
|
||||||
|
- `observations_baro` (hypertable): barometric pressure observations from other MQTT topics.
|
||||||
- `forecast_openmeteo_hourly` (hypertable): hourly forecast points keyed by `(site, model, retrieved_at, ts)`.
|
- `forecast_openmeteo_hourly` (hypertable): hourly forecast points keyed by `(site, model, retrieved_at, ts)`.
|
||||||
- Continuous aggregates:
|
- Continuous aggregates:
|
||||||
- `cagg_ws90_1m`: 1‑minute rollups (avg/min/max for temp, humidity, wind, uvi, light, rain).
|
- `cagg_ws90_1m`: 1‑minute rollups (avg/min/max for temp, humidity, wind, uvi, light, rain).
|
||||||
@@ -71,6 +79,7 @@ TimescaleDB schema is initialized from `db/init/001_schema.sql` and includes:
|
|||||||
|
|
||||||
Retention/compression:
|
Retention/compression:
|
||||||
- `observations_ws90` has a 90‑day retention policy and compression after 7 days.
|
- `observations_ws90` has a 90‑day retention policy and compression after 7 days.
|
||||||
|
- `observations_baro` has a 90‑day retention policy and compression after 7 days.
|
||||||
|
|
||||||
## Publish a test WS90 payload
|
## Publish a test WS90 payload
|
||||||
```sh
|
```sh
|
||||||
|
|||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"log"
|
"log"
|
||||||
"os"
|
"os"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
|
"strings"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -70,17 +71,44 @@ func main() {
|
|||||||
go runWundergroundUploader(ctx, latest, cfg.Wunderground.StationID, cfg.Wunderground.StationKey, cfg.Wunderground.Interval)
|
go runWundergroundUploader(ctx, latest, cfg.Wunderground.StationID, cfg.Wunderground.StationKey, cfg.Wunderground.Interval)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
bindings := make([]mqttTopicBinding, 0, len(cfg.MQTT.Topics))
|
||||||
|
subscriptions := make([]mqttingest.Subscription, 0, len(cfg.MQTT.Topics))
|
||||||
|
for _, t := range cfg.MQTT.Topics {
|
||||||
|
qos := cfg.MQTT.QoS
|
||||||
|
if t.QoS != nil {
|
||||||
|
qos = *t.QoS
|
||||||
|
}
|
||||||
|
topicType := strings.ToLower(t.Type)
|
||||||
|
bindings = append(bindings, mqttTopicBinding{
|
||||||
|
Name: t.Name,
|
||||||
|
Topic: t.Topic,
|
||||||
|
Type: topicType,
|
||||||
|
QoS: qos,
|
||||||
|
})
|
||||||
|
subscriptions = append(subscriptions, mqttingest.Subscription{
|
||||||
|
Topic: t.Topic,
|
||||||
|
QoS: qos,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
// MQTT subscriber (blocks until ctx done)
|
// MQTT subscriber (blocks until ctx done)
|
||||||
err = mqttingest.RunSubscriber(ctx, mqttingest.MQTTConfig{
|
err = mqttingest.RunSubscriber(ctx, mqttingest.MQTTConfig{
|
||||||
Broker: cfg.MQTT.Broker,
|
Broker: cfg.MQTT.Broker,
|
||||||
ClientID: cfg.MQTT.ClientID,
|
ClientID: cfg.MQTT.ClientID,
|
||||||
Username: cfg.MQTT.Username,
|
Username: cfg.MQTT.Username,
|
||||||
Password: cfg.MQTT.Password,
|
Password: cfg.MQTT.Password,
|
||||||
Topic: cfg.MQTT.Topic,
|
Topics: subscriptions,
|
||||||
QoS: cfg.MQTT.QoS,
|
|
||||||
}, func(ctx context.Context, topic string, payload []byte) error {
|
}, func(ctx context.Context, topic string, payload []byte) error {
|
||||||
log.Printf("mqtt message topic=%s bytes=%d payload=%s", topic, len(payload), string(payload))
|
log.Printf("mqtt message topic=%s bytes=%d payload=%s", topic, len(payload), string(payload))
|
||||||
|
|
||||||
|
binding := matchMQTTBinding(bindings, topic)
|
||||||
|
if binding == nil {
|
||||||
|
log.Printf("mqtt message ignored topic=%s reason=unmatched", topic)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
switch binding.Type {
|
||||||
|
case "ws90":
|
||||||
p, raw, err := mqttingest.ParseWS90(payload)
|
p, raw, err := mqttingest.ParseWS90(payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload))
|
log.Printf("ws90 parse error topic=%s err=%v payload=%s", topic, err, string(payload))
|
||||||
@@ -124,6 +152,33 @@ func main() {
|
|||||||
}); err != nil {
|
}); err != nil {
|
||||||
log.Printf("db insert ws90 error: %v", err)
|
log.Printf("db insert ws90 error: %v", err)
|
||||||
}
|
}
|
||||||
|
case "baro", "barometer", "pressure":
|
||||||
|
p, raw, err := mqttingest.ParseBarometer(payload)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("barometer parse error topic=%s err=%v payload=%s", topic, err, string(payload))
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
ts := time.Now().UTC()
|
||||||
|
source := binding.Name
|
||||||
|
if source == "" {
|
||||||
|
source = binding.Topic
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := d.InsertBarometer(ctx, db.InsertBarometerParams{
|
||||||
|
TS: ts,
|
||||||
|
Site: cfg.Site.Name,
|
||||||
|
Source: source,
|
||||||
|
PressureHPA: p.PressureHPA,
|
||||||
|
Payload: raw,
|
||||||
|
}); err != nil {
|
||||||
|
log.Printf("db insert barometer error: %v", err)
|
||||||
|
} else {
|
||||||
|
log.Printf("barometer stored source=%s pressure_hpa=%.2f", source, p.PressureHPA)
|
||||||
|
}
|
||||||
|
default:
|
||||||
|
log.Printf("mqtt message ignored topic=%s reason=unknown_type type=%s", topic, binding.Type)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
})
|
})
|
||||||
@@ -132,6 +187,22 @@ func main() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type mqttTopicBinding struct {
|
||||||
|
Name string
|
||||||
|
Topic string
|
||||||
|
Type string
|
||||||
|
QoS byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func matchMQTTBinding(bindings []mqttTopicBinding, topic string) *mqttTopicBinding {
|
||||||
|
for i := range bindings {
|
||||||
|
if mqttingest.TopicMatches(bindings[i].Topic, topic) {
|
||||||
|
return &bindings[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
func runOpenMeteoPoller(ctx context.Context, d *db.DB, cache *ForecastCache, site providers.Site, model string, interval time.Duration) {
|
func runOpenMeteoPoller(ctx context.Context, d *db.DB, cache *ForecastCache, site providers.Site, model string, interval time.Duration) {
|
||||||
p := &providers.OpenMeteo{}
|
p := &providers.OpenMeteo{}
|
||||||
t := time.NewTicker(interval)
|
t := time.NewTicker(interval)
|
||||||
|
|||||||
@@ -1,8 +1,14 @@
|
|||||||
mqtt:
|
mqtt:
|
||||||
broker: "tcp://mosquitto:1883"
|
broker: "tcp://mosquitto:1883"
|
||||||
client_id: "go-weatherstation-ingestd"
|
client_id: "go-weatherstation-ingestd"
|
||||||
topic: "ecowitt/ws90"
|
|
||||||
qos: 1
|
qos: 1
|
||||||
|
topics:
|
||||||
|
- name: "ws90"
|
||||||
|
topic: "ecowitt/ws90"
|
||||||
|
type: "ws90"
|
||||||
|
# - name: "baro"
|
||||||
|
# topic: "sensors/barometer"
|
||||||
|
# type: "baro"
|
||||||
|
|
||||||
db:
|
db:
|
||||||
conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable"
|
conn_string: "postgres://postgres:postgres@timescaledb:5432/micrometeo?sslmode=disable"
|
||||||
|
|||||||
@@ -41,6 +41,21 @@ SELECT create_hypertable('observations_ws90', 'ts', if_not_exists => TRUE);
|
|||||||
CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC);
|
CREATE INDEX IF NOT EXISTS idx_observations_ws90_site_ts ON observations_ws90(site, ts DESC);
|
||||||
CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC);
|
CREATE INDEX IF NOT EXISTS idx_observations_ws90_station_ts ON observations_ws90(station_id, ts DESC);
|
||||||
|
|
||||||
|
-- Barometric pressure observations (from other MQTT sources)
|
||||||
|
CREATE TABLE IF NOT EXISTS observations_baro (
|
||||||
|
ts TIMESTAMPTZ NOT NULL,
|
||||||
|
received_at TIMESTAMPTZ NOT NULL DEFAULT now(),
|
||||||
|
site TEXT NOT NULL,
|
||||||
|
source TEXT NOT NULL,
|
||||||
|
pressure_hpa DOUBLE PRECISION,
|
||||||
|
payload_json JSONB
|
||||||
|
);
|
||||||
|
|
||||||
|
SELECT create_hypertable('observations_baro', 'ts', if_not_exists => TRUE);
|
||||||
|
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_observations_baro_site_ts ON observations_baro(site, ts DESC);
|
||||||
|
CREATE INDEX IF NOT EXISTS idx_observations_baro_source_ts ON observations_baro(source, ts DESC);
|
||||||
|
|
||||||
-- Open-Meteo hourly baseline forecasts
|
-- Open-Meteo hourly baseline forecasts
|
||||||
CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly (
|
CREATE TABLE IF NOT EXISTS forecast_openmeteo_hourly (
|
||||||
ts TIMESTAMPTZ NOT NULL,
|
ts TIMESTAMPTZ NOT NULL,
|
||||||
@@ -100,6 +115,37 @@ BEGIN
|
|||||||
END IF;
|
END IF;
|
||||||
END$$;
|
END$$;
|
||||||
|
|
||||||
|
-- Raw retention: 90 days (barometric pressure)
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (
|
||||||
|
SELECT 1
|
||||||
|
FROM timescaledb_information.jobs j
|
||||||
|
WHERE j.proc_name = 'policy_retention'
|
||||||
|
AND j.hypertable_name = 'observations_baro'
|
||||||
|
) THEN
|
||||||
|
PERFORM add_retention_policy('observations_baro', INTERVAL '90 days');
|
||||||
|
END IF;
|
||||||
|
END$$;
|
||||||
|
|
||||||
|
-- Compression after 7 days
|
||||||
|
ALTER TABLE observations_baro SET (
|
||||||
|
timescaledb.compress,
|
||||||
|
timescaledb.compress_segmentby = 'site,source'
|
||||||
|
);
|
||||||
|
|
||||||
|
DO $$
|
||||||
|
BEGIN
|
||||||
|
IF NOT EXISTS (
|
||||||
|
SELECT 1
|
||||||
|
FROM timescaledb_information.jobs
|
||||||
|
WHERE proc_name = 'policy_compression'
|
||||||
|
AND hypertable_name = 'observations_baro'
|
||||||
|
) THEN
|
||||||
|
PERFORM add_compression_policy('observations_baro', INTERVAL '7 days');
|
||||||
|
END IF;
|
||||||
|
END$$;
|
||||||
|
|
||||||
-- 1-minute continuous aggregate
|
-- 1-minute continuous aggregate
|
||||||
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m
|
CREATE MATERIALIZED VIEW IF NOT EXISTS cagg_ws90_1m
|
||||||
WITH (timescaledb.continuous) AS
|
WITH (timescaledb.continuous) AS
|
||||||
|
|||||||
@@ -3,6 +3,7 @@ package config
|
|||||||
import (
|
import (
|
||||||
"errors"
|
"errors"
|
||||||
"os"
|
"os"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"gopkg.in/yaml.v3"
|
"gopkg.in/yaml.v3"
|
||||||
@@ -18,6 +19,7 @@ type Config struct {
|
|||||||
Password string `yaml:"password"`
|
Password string `yaml:"password"`
|
||||||
Topic string `yaml:"topic"`
|
Topic string `yaml:"topic"`
|
||||||
QoS byte `yaml:"qos"`
|
QoS byte `yaml:"qos"`
|
||||||
|
Topics []MQTTTopic `yaml:"topics"`
|
||||||
} `yaml:"mqtt"`
|
} `yaml:"mqtt"`
|
||||||
|
|
||||||
DB struct {
|
DB struct {
|
||||||
@@ -52,6 +54,13 @@ type Config struct {
|
|||||||
} `yaml:"wunderground"`
|
} `yaml:"wunderground"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type MQTTTopic struct {
|
||||||
|
Name string `yaml:"name"`
|
||||||
|
Topic string `yaml:"topic"`
|
||||||
|
Type string `yaml:"type"`
|
||||||
|
QoS *byte `yaml:"qos"`
|
||||||
|
}
|
||||||
|
|
||||||
func Load(path string) (*Config, error) {
|
func Load(path string) (*Config, error) {
|
||||||
b, err := os.ReadFile(path)
|
b, err := os.ReadFile(path)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -64,8 +73,31 @@ func Load(path string) (*Config, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Minimal validation
|
// Minimal validation
|
||||||
if c.MQTT.Broker == "" || c.MQTT.Topic == "" {
|
if c.MQTT.Broker == "" {
|
||||||
return nil, errors.New("mqtt broker and topic are required")
|
return nil, errors.New("mqtt broker is required")
|
||||||
|
}
|
||||||
|
if len(c.MQTT.Topics) == 0 && c.MQTT.Topic != "" {
|
||||||
|
qos := c.MQTT.QoS
|
||||||
|
c.MQTT.Topics = []MQTTTopic{{
|
||||||
|
Name: "ws90",
|
||||||
|
Topic: c.MQTT.Topic,
|
||||||
|
Type: "ws90",
|
||||||
|
QoS: &qos,
|
||||||
|
}}
|
||||||
|
}
|
||||||
|
if len(c.MQTT.Topics) == 0 {
|
||||||
|
return nil, errors.New("mqtt topic(s) are required")
|
||||||
|
}
|
||||||
|
for i := range c.MQTT.Topics {
|
||||||
|
t := c.MQTT.Topics[i]
|
||||||
|
if t.Topic == "" {
|
||||||
|
return nil, errors.New("mqtt topics must include topic")
|
||||||
|
}
|
||||||
|
if t.Type == "" {
|
||||||
|
t.Type = "ws90"
|
||||||
|
}
|
||||||
|
t.Type = strings.ToLower(t.Type)
|
||||||
|
c.MQTT.Topics[i] = t
|
||||||
}
|
}
|
||||||
if c.DB.ConnString == "" {
|
if c.DB.ConnString == "" {
|
||||||
return nil, errors.New("db conn_string is required")
|
return nil, errors.New("db conn_string is required")
|
||||||
|
|||||||
@@ -122,3 +122,27 @@ func (d *DB) UpsertOpenMeteoHourly(ctx context.Context, p InsertOpenMeteoHourlyP
|
|||||||
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type InsertBarometerParams struct {
|
||||||
|
TS time.Time
|
||||||
|
Site string
|
||||||
|
Source string
|
||||||
|
PressureHPA float64
|
||||||
|
Payload map[string]any
|
||||||
|
}
|
||||||
|
|
||||||
|
func (d *DB) InsertBarometer(ctx context.Context, p InsertBarometerParams) error {
|
||||||
|
b, _ := json.Marshal(p.Payload)
|
||||||
|
|
||||||
|
payloadJSON := json.RawMessage(b)
|
||||||
|
|
||||||
|
_, err := d.Pool.Exec(ctx, `
|
||||||
|
INSERT INTO observations_baro (
|
||||||
|
ts, site, source, pressure_hpa, payload_json
|
||||||
|
) VALUES (
|
||||||
|
$1,$2,$3,$4,$5
|
||||||
|
)
|
||||||
|
`, p.TS, p.Site, p.Source, p.PressureHPA, payloadJSON)
|
||||||
|
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|||||||
105
internal/mqttingest/barometer.go
Normal file
105
internal/mqttingest/barometer.go
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
package mqttingest
|
||||||
|
|
||||||
|
import (
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"strconv"
|
||||||
|
"strings"
|
||||||
|
)
|
||||||
|
|
||||||
|
type BarometerPayload struct {
|
||||||
|
PressureHPA float64
|
||||||
|
}
|
||||||
|
|
||||||
|
func ParseBarometer(b []byte) (*BarometerPayload, map[string]any, error) {
|
||||||
|
var raw map[string]any
|
||||||
|
if err := json.Unmarshal(b, &raw); err != nil {
|
||||||
|
return nil, nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
pressure, ok := pressureHPAFromPayload(raw)
|
||||||
|
if !ok {
|
||||||
|
return nil, raw, fmt.Errorf("barometer payload missing pressure field")
|
||||||
|
}
|
||||||
|
|
||||||
|
return &BarometerPayload{
|
||||||
|
PressureHPA: pressure,
|
||||||
|
}, raw, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func pressureHPAFromPayload(raw map[string]any) (float64, bool) {
|
||||||
|
if v, ok := findFloat(raw,
|
||||||
|
"pressure_hpa",
|
||||||
|
"pressure_mb",
|
||||||
|
"pressure_mbar",
|
||||||
|
"barometer_hpa",
|
||||||
|
"baro_hpa",
|
||||||
|
"pressure",
|
||||||
|
); ok {
|
||||||
|
return v, true
|
||||||
|
}
|
||||||
|
if v, ok := findFloat(raw, "pressure_pa"); ok {
|
||||||
|
return v / 100.0, true
|
||||||
|
}
|
||||||
|
if v, ok := findFloat(raw, "pressure_kpa"); ok {
|
||||||
|
return v * 10.0, true
|
||||||
|
}
|
||||||
|
if v, ok := findFloat(raw, "pressure_inhg", "barometer_inhg"); ok {
|
||||||
|
return v * 33.8638866667, true
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func findFloat(raw map[string]any, keys ...string) (float64, bool) {
|
||||||
|
for _, key := range keys {
|
||||||
|
v, ok := raw[key]
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if f, ok := asFloat(v); ok {
|
||||||
|
return f, true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
|
||||||
|
func asFloat(v any) (float64, bool) {
|
||||||
|
switch t := v.(type) {
|
||||||
|
case float64:
|
||||||
|
return t, true
|
||||||
|
case float32:
|
||||||
|
return float64(t), true
|
||||||
|
case int:
|
||||||
|
return float64(t), true
|
||||||
|
case int8:
|
||||||
|
return float64(t), true
|
||||||
|
case int16:
|
||||||
|
return float64(t), true
|
||||||
|
case int32:
|
||||||
|
return float64(t), true
|
||||||
|
case int64:
|
||||||
|
return float64(t), true
|
||||||
|
case uint:
|
||||||
|
return float64(t), true
|
||||||
|
case uint8:
|
||||||
|
return float64(t), true
|
||||||
|
case uint16:
|
||||||
|
return float64(t), true
|
||||||
|
case uint32:
|
||||||
|
return float64(t), true
|
||||||
|
case uint64:
|
||||||
|
return float64(t), true
|
||||||
|
case json.Number:
|
||||||
|
f, err := t.Float64()
|
||||||
|
return f, err == nil
|
||||||
|
case string:
|
||||||
|
s := strings.TrimSpace(t)
|
||||||
|
if s == "" {
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
f, err := strconv.ParseFloat(s, 64)
|
||||||
|
return f, err == nil
|
||||||
|
default:
|
||||||
|
return 0, false
|
||||||
|
}
|
||||||
|
}
|
||||||
@@ -15,6 +15,12 @@ type MQTTConfig struct {
|
|||||||
Password string
|
Password string
|
||||||
Topic string
|
Topic string
|
||||||
QoS byte
|
QoS byte
|
||||||
|
Topics []Subscription
|
||||||
|
}
|
||||||
|
|
||||||
|
type Subscription struct {
|
||||||
|
Topic string
|
||||||
|
QoS byte
|
||||||
}
|
}
|
||||||
|
|
||||||
type Handler func(ctx context.Context, topic string, payload []byte) error
|
type Handler func(ctx context.Context, topic string, payload []byte) error
|
||||||
@@ -38,7 +44,22 @@ func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Subscribe
|
// Subscribe
|
||||||
if tok := client.Subscribe(cfg.Topic, cfg.QoS, func(_ mqtt.Client, msg mqtt.Message) {
|
subs := map[string]byte{}
|
||||||
|
if len(cfg.Topics) > 0 {
|
||||||
|
for _, sub := range cfg.Topics {
|
||||||
|
if sub.Topic == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
subs[sub.Topic] = sub.QoS
|
||||||
|
}
|
||||||
|
} else if cfg.Topic != "" {
|
||||||
|
subs[cfg.Topic] = cfg.QoS
|
||||||
|
}
|
||||||
|
if len(subs) == 0 {
|
||||||
|
return fmt.Errorf("mqtt subscribe: no topics configured")
|
||||||
|
}
|
||||||
|
|
||||||
|
if tok := client.SubscribeMultiple(subs, func(_ mqtt.Client, msg mqtt.Message) {
|
||||||
// Keep callback short; do work with context
|
// Keep callback short; do work with context
|
||||||
_ = h(ctx, msg.Topic(), msg.Payload())
|
_ = h(ctx, msg.Topic(), msg.Payload())
|
||||||
}); tok.Wait() && tok.Error() != nil {
|
}); tok.Wait() && tok.Error() != nil {
|
||||||
|
|||||||
36
internal/mqttingest/topic_match.go
Normal file
36
internal/mqttingest/topic_match.go
Normal file
@@ -0,0 +1,36 @@
|
|||||||
|
package mqttingest
|
||||||
|
|
||||||
|
import "strings"
|
||||||
|
|
||||||
|
// TopicMatches reports whether a topic filter (with + or # wildcards) matches a topic name.
|
||||||
|
// It follows the MQTT v3.1.1 wildcard rules and supports shared subscriptions ($share).
|
||||||
|
func TopicMatches(filter, topic string) bool {
|
||||||
|
return matchTopic(routeSplit(filter), strings.Split(topic, "/"))
|
||||||
|
}
|
||||||
|
|
||||||
|
func matchTopic(route []string, topic []string) bool {
|
||||||
|
if len(route) == 0 {
|
||||||
|
return len(topic) == 0
|
||||||
|
}
|
||||||
|
if len(topic) == 0 {
|
||||||
|
return route[0] == "#"
|
||||||
|
}
|
||||||
|
if route[0] == "#" {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
if route[0] == "+" || route[0] == topic[0] {
|
||||||
|
return matchTopic(route[1:], topic[1:])
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// routeSplit removes $share/group/ when matching shared subscription filters.
|
||||||
|
func routeSplit(route string) []string {
|
||||||
|
if strings.HasPrefix(route, "$share/") {
|
||||||
|
parts := strings.Split(route, "/")
|
||||||
|
if len(parts) > 2 {
|
||||||
|
return parts[2:]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return strings.Split(route, "/")
|
||||||
|
}
|
||||||
Reference in New Issue
Block a user