Files
go-weatherstation/internal/mqttingest/mqtt.go
2026-01-26 12:40:47 +11:00

54 lines
1.2 KiB
Go

package mqttingest
import (
"context"
"fmt"
"time"
mqtt "github.com/eclipse/paho.mqtt.golang"
)
type MQTTConfig struct {
Broker string
ClientID string
Username string
Password string
Topic string
QoS byte
}
type Handler func(ctx context.Context, topic string, payload []byte) error
func RunSubscriber(ctx context.Context, cfg MQTTConfig, h Handler) error {
opts := mqtt.NewClientOptions().
AddBroker(cfg.Broker).
SetClientID(cfg.ClientID).
SetAutoReconnect(true).
SetConnectRetry(true).
SetConnectRetryInterval(5 * time.Second)
if cfg.Username != "" {
opts.SetUsername(cfg.Username)
opts.SetPassword(cfg.Password)
}
client := mqtt.NewClient(opts)
if tok := client.Connect(); tok.Wait() && tok.Error() != nil {
return fmt.Errorf("mqtt connect: %w", tok.Error())
}
// Subscribe
if tok := client.Subscribe(cfg.Topic, cfg.QoS, func(_ mqtt.Client, msg mqtt.Message) {
// Keep callback short; do work with context
_ = h(ctx, msg.Topic(), msg.Payload())
}); tok.Wait() && tok.Error() != nil {
client.Disconnect(250)
return fmt.Errorf("mqtt subscribe: %w", tok.Error())
}
// Block until ctx cancelled
<-ctx.Done()
client.Disconnect(250)
return nil
}