75 lines
1.6 KiB
Go
75 lines
1.6 KiB
Go
package mqttingest
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
type MQTTConfig struct {
|
|
Broker string
|
|
ClientID string
|
|
Username string
|
|
Password string
|
|
Topic string
|
|
QoS byte
|
|
Topics []Subscription
|
|
}
|
|
|
|
type Subscription struct {
|
|
Topic string
|
|
QoS byte
|
|
}
|
|
|
|
type Handler func(ctx context.Context, topic string, payload []byte) error
|
|
|
|
func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error {
|
|
opts := mqtt.NewClientOptions().
|
|
AddBroker(cfg.Broker).
|
|
SetClientID(cfg.ClientID).
|
|
SetAutoReconnect(true).
|
|
SetConnectRetry(true).
|
|
SetConnectRetryInterval(5 * time.Second)
|
|
|
|
if cfg.Username != "" {
|
|
opts.SetUsername(cfg.Username)
|
|
opts.SetPassword(cfg.Password)
|
|
}
|
|
|
|
client := mqtt.NewClient(opts)
|
|
if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
|
|
return fmt.Errorf("mqtt connect: %w", tok.Error())
|
|
}
|
|
|
|
// Subscribe
|
|
subs := map[string]byte{}
|
|
if len(cfg.Topics) > 0 {
|
|
for _, sub := range cfg.Topics {
|
|
if sub.Topic == "" {
|
|
continue
|
|
}
|
|
subs[sub.Topic] = sub.QoS
|
|
}
|
|
} else if cfg.Topic != "" {
|
|
subs[cfg.Topic] = cfg.QoS
|
|
}
|
|
if len(subs) == 0 {
|
|
return fmt.Errorf("mqtt subscribe: no topics configured")
|
|
}
|
|
|
|
if tok := client.SubscribeMultiple(subs, func(_ mqtt.Client, msg mqtt.Message) {
|
|
// Keep callback short; do work with context
|
|
_ = h(ctx, msg.Topic(), msg.Payload())
|
|
}); tok.Wait() && tok.Error() != nil {
|
|
client.Disconnect(250)
|
|
return fmt.Errorf("mqtt subscribe: %w", tok.Error())
|
|
}
|
|
|
|
// Block until ctx cancelled
|
|
<-ctx.Done()
|
|
client.Disconnect(250)
|
|
return nil
|
|
}
|