54 lines
1.2 KiB
Go
54 lines
1.2 KiB
Go
package mqttingest
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"time"
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
)
|
|
|
|
type MQTTConfig struct {
|
|
Broker string
|
|
ClientID string
|
|
Username string
|
|
Password string
|
|
Topic string
|
|
QoS byte
|
|
}
|
|
|
|
type Handler func(ctx context.Context, topic string, payload []byte) error
|
|
|
|
func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error {
|
|
opts := mqtt.NewClientOptions().
|
|
AddBroker(cfg.Broker).
|
|
SetClientID(cfg.ClientID).
|
|
SetAutoReconnect(true).
|
|
SetConnectRetry(true).
|
|
SetConnectRetryInterval(5 * time.Second)
|
|
|
|
if cfg.Username != "" {
|
|
opts.SetUsername(cfg.Username)
|
|
opts.SetPassword(cfg.Password)
|
|
}
|
|
|
|
client := mqtt.NewClient(opts)
|
|
if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
|
|
return fmt.Errorf("mqtt connect: %w", tok.Error())
|
|
}
|
|
|
|
// Subscribe
|
|
if tok := client.Subscribe(cfg.Topic, cfg.QoS, func(_ mqtt.Client, msg mqtt.Message) {
|
|
// Keep callback short; do work with context
|
|
_ = h(ctx, msg.Topic(), msg.Payload())
|
|
}); tok.Wait() && tok.Error() != nil {
|
|
client.Disconnect(250)
|
|
return fmt.Errorf("mqtt subscribe: %w", tok.Error())
|
|
}
|
|
|
|
// Block until ctx cancelled
|
|
<-ctx.Done()
|
|
client.Disconnect(250)
|
|
return nil
|
|
}
|