package mqttingest import ( "context" "fmt" "time" mqtt "github.com/eclipse/paho.mqtt.golang" ) type MQTTConfig struct { Broker string ClientID string Username string Password string Topic string QoS byte Topics []Subscription } type Subscription struct { Topic string QoS byte } type Handler func(ctx context.Context, topic string, payload []byte) error func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error { opts := mqtt.NewClientOptions(). AddBroker(cfg.Broker). SetClientID(cfg.ClientID). SetAutoReconnect(true). SetConnectRetry(true). SetConnectRetryInterval(5 * time.Second) if cfg.Username != "" { opts.SetUsername(cfg.Username) opts.SetPassword(cfg.Password) } client := mqtt.NewClient(opts) if tok := client.Connect(); tok.Wait() && tok.Error() != nil { return fmt.Errorf("mqtt connect: %w", tok.Error()) } // Subscribe subs := map[string]byte{} if len(cfg.Topics) > 0 { for _, sub := range cfg.Topics { if sub.Topic == "" { continue } subs[sub.Topic] = sub.QoS } } else if cfg.Topic != "" { subs[cfg.Topic] = cfg.QoS } if len(subs) == 0 { return fmt.Errorf("mqtt subscribe: no topics configured") } if tok := client.SubscribeMultiple(subs, func(_ mqtt.Client, msg mqtt.Message) { // Keep callback short; do work with context _ = h(ctx, msg.Topic(), msg.Payload()) }); tok.Wait() && tok.Error() != nil { client.Disconnect(250) return fmt.Errorf("mqtt subscribe: %w", tok.Error()) } // Block until ctx cancelled <-ctx.Done() client.Disconnect(250) return nil }