Modernize invertergui: MQTT write support, HA integration, UI updates
Some checks failed
build / inverter_gui_pipeline (push) Has been cancelled
Some checks failed
build / inverter_gui_pipeline (push) Has been cancelled
This commit is contained in:
59
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
59
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
@@ -142,7 +142,7 @@ type client struct {
|
||||
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
|
||||
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
|
||||
|
||||
backoff *backoffController
|
||||
backoff *backoffController
|
||||
}
|
||||
|
||||
// NewClient will create an MQTT v3.1.1 client with all of the options specified
|
||||
@@ -258,12 +258,15 @@ func (c *client) Connect() Token {
|
||||
return
|
||||
}
|
||||
|
||||
var attemptCount int
|
||||
|
||||
RETRYCONN:
|
||||
var conn net.Conn
|
||||
var rc byte
|
||||
var err error
|
||||
conn, rc, t.sessionPresent, err = c.attemptConnection()
|
||||
conn, rc, t.sessionPresent, err = c.attemptConnection(false, attemptCount)
|
||||
if err != nil {
|
||||
attemptCount++
|
||||
if c.options.ConnectRetry {
|
||||
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
|
||||
time.Sleep(c.options.ConnectRetryInterval)
|
||||
@@ -306,24 +309,26 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
|
||||
DEBUG.Println(CLI, "enter reconnect")
|
||||
var (
|
||||
initSleep = 1 * time.Second
|
||||
conn net.Conn
|
||||
conn net.Conn
|
||||
)
|
||||
|
||||
// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
|
||||
// Sleep time is exponentially increased as the same situation continues
|
||||
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual {
|
||||
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3*time.Second, true); isContinual {
|
||||
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
|
||||
}
|
||||
|
||||
var attemptCount int
|
||||
for {
|
||||
if nil != c.options.OnReconnecting {
|
||||
c.options.OnReconnecting(c, &c.options)
|
||||
}
|
||||
var err error
|
||||
conn, _, _, err = c.attemptConnection()
|
||||
conn, _, _, err = c.attemptConnection(true, attemptCount)
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
attemptCount++
|
||||
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
|
||||
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
|
||||
|
||||
@@ -351,7 +356,7 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
|
||||
// byte - Return code (packets.Accepted indicates a successful connection).
|
||||
// bool - SessionPresent flag from the connect ack (only valid if packets.Accepted)
|
||||
// err - Error (err != nil guarantees that conn has been set to active connection).
|
||||
func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
func (c *client) attemptConnection(isReconnect bool, attempt int) (net.Conn, byte, bool, error) {
|
||||
protocolVersion := c.options.ProtocolVersion
|
||||
var (
|
||||
sessionPresent bool
|
||||
@@ -360,6 +365,10 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
rc byte
|
||||
)
|
||||
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationConnecting{isReconnect, attempt})
|
||||
}
|
||||
|
||||
c.optionsMu.Lock() // Protect c.options.Servers so that servers can be added in test cases
|
||||
brokers := c.options.Servers
|
||||
c.optionsMu.Unlock()
|
||||
@@ -372,6 +381,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
|
||||
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
|
||||
}
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationBroker{broker})
|
||||
}
|
||||
connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established
|
||||
dialer := c.options.Dialer
|
||||
if dialer == nil { //
|
||||
@@ -388,6 +400,9 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
ERROR.Println(CLI, err.Error())
|
||||
WARN.Println(CLI, "failed to connect to broker, trying next")
|
||||
rc = packets.ErrNetworkError
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationBrokerFailed{broker, err})
|
||||
}
|
||||
continue
|
||||
}
|
||||
DEBUG.Println(CLI, "socket connected to broker")
|
||||
@@ -427,9 +442,12 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
|
||||
if rc != packets.ErrNetworkError { // mqtt error
|
||||
err = packets.ConnErrors[rc]
|
||||
} else { // network error (if this occurred in ConnectMQTT then err will be nil)
|
||||
err = fmt.Errorf("%s : %s", packets.ConnErrors[rc], err)
|
||||
err = fmt.Errorf("%w : %w", packets.ConnErrors[rc], err)
|
||||
}
|
||||
}
|
||||
if err != nil && c.options.OnConnectionNotification != nil {
|
||||
c.options.OnConnectionNotification(c, ConnectionNotificationFailed{err})
|
||||
}
|
||||
return conn, rc, sessionPresent, err
|
||||
}
|
||||
|
||||
@@ -564,6 +582,9 @@ func (c *client) internalConnLost(whyConnLost error) {
|
||||
if c.options.OnConnectionLost != nil {
|
||||
go c.options.OnConnectionLost(c, whyConnLost)
|
||||
}
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
go c.options.OnConnectionNotification(c, ConnectionNotificationLost{whyConnLost})
|
||||
}
|
||||
DEBUG.Println(CLI, "internalConnLost complete")
|
||||
}()
|
||||
}
|
||||
@@ -601,21 +622,21 @@ func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn,
|
||||
c.workers.Add(1) // Done will be called when ackOut is closed
|
||||
ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
|
||||
|
||||
// The connection is now ready for use (we spin up a few go routines below). It is possible that
|
||||
// Disconnect has been called in the interim...
|
||||
// The connection is now ready for use (we spin up a few go routines below).
|
||||
// It is possible that Disconnect has been called in the interim...
|
||||
// issue 675:we will allow the connection to complete before the Disconnect is allowed to proceed
|
||||
// as if a Disconnect event occurred immediately after connectionUp(true) completed.
|
||||
if err := connectionUp(true); err != nil {
|
||||
DEBUG.Println(CLI, err)
|
||||
close(c.stop) // Tidy up anything we have already started
|
||||
close(incomingPubChan)
|
||||
c.workers.Wait()
|
||||
c.conn.Close()
|
||||
c.conn = nil
|
||||
return false
|
||||
ERROR.Println(CLI, err)
|
||||
}
|
||||
|
||||
DEBUG.Println(CLI, "client is connected/reconnected")
|
||||
if c.options.OnConnect != nil {
|
||||
go c.options.OnConnect(c)
|
||||
}
|
||||
if c.options.OnConnectionNotification != nil {
|
||||
go c.options.OnConnectionNotification(c, ConnectionNotificationConnected{})
|
||||
}
|
||||
|
||||
// c.oboundP and c.obound need to stay active for the life of the client because, depending upon the options,
|
||||
// messages may be published while the client is disconnected (they will block unless in a goroutine). However
|
||||
@@ -799,9 +820,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
|
||||
if publishWaitTimeout == 0 {
|
||||
publishWaitTimeout = time.Second * 30
|
||||
}
|
||||
|
||||
t := time.NewTimer(publishWaitTimeout)
|
||||
defer t.Stop()
|
||||
|
||||
select {
|
||||
case c.obound <- &PacketAndToken{p: pub, t: token}:
|
||||
case <-time.After(publishWaitTimeout):
|
||||
case <-t.C:
|
||||
token.setError(errors.New("publish was broken by timeout"))
|
||||
}
|
||||
}
|
||||
|
||||
13
vendor/github.com/eclipse/paho.mqtt.golang/filestore.go
generated
vendored
13
vendor/github.com/eclipse/paho.mqtt.golang/filestore.go
generated
vendored
@@ -19,7 +19,7 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"io/ioutil"
|
||||
"io/fs"
|
||||
"os"
|
||||
"path"
|
||||
"sort"
|
||||
@@ -159,15 +159,20 @@ func (store *FileStore) Reset() {
|
||||
func (store *FileStore) all() []string {
|
||||
var err error
|
||||
var keys []string
|
||||
var files fileInfos
|
||||
|
||||
if !store.opened {
|
||||
ERROR.Println(STR, "trying to use file store, but not open")
|
||||
return nil
|
||||
}
|
||||
|
||||
files, err = ioutil.ReadDir(store.directory)
|
||||
entries, err := os.ReadDir(store.directory)
|
||||
chkerr(err)
|
||||
files := make(fileInfos, 0, len(entries))
|
||||
for _, entry := range entries {
|
||||
info, err := entry.Info()
|
||||
chkerr(err)
|
||||
files = append(files, info)
|
||||
}
|
||||
sort.Sort(files)
|
||||
for _, f := range files {
|
||||
DEBUG.Println(STR, "file in All():", f.Name())
|
||||
@@ -246,7 +251,7 @@ func exists(file string) bool {
|
||||
return true
|
||||
}
|
||||
|
||||
type fileInfos []os.FileInfo
|
||||
type fileInfos []fs.FileInfo
|
||||
|
||||
func (f fileInfos) Len() int {
|
||||
return len(f)
|
||||
|
||||
13
vendor/github.com/eclipse/paho.mqtt.golang/net.go
generated
vendored
13
vendor/github.com/eclipse/paho.mqtt.golang/net.go
generated
vendored
@@ -444,24 +444,23 @@ func startComms(conn net.Conn, // Network connection (must be active)
|
||||
}
|
||||
|
||||
// ackFunc acknowledges a packet
|
||||
// WARNING the function returned must not be called if the comms routine is shutting down or not running
|
||||
// (it needs outgoing comms in order to send the acknowledgement). Currently this is only called from
|
||||
// matchAndDispatch which will be shutdown before the comms are
|
||||
func ackFunc(oboundP chan *PacketAndToken, persist Store, packet *packets.PublishPacket) func() {
|
||||
// WARNING sendAck may be called at any time (even after the connection is dead). At the time of writing ACK sent after
|
||||
// connection loss will be dropped (this is not ideal)
|
||||
func ackFunc(sendAck func(*PacketAndToken), persist Store, packet *packets.PublishPacket) func() {
|
||||
return func() {
|
||||
switch packet.Qos {
|
||||
case 2:
|
||||
pr := packets.NewControlPacket(packets.Pubrec).(*packets.PubrecPacket)
|
||||
pr.MessageID = packet.MessageID
|
||||
DEBUG.Println(NET, "putting pubrec msg on obound")
|
||||
oboundP <- &PacketAndToken{p: pr, t: nil}
|
||||
sendAck(&PacketAndToken{p: pr, t: nil})
|
||||
DEBUG.Println(NET, "done putting pubrec msg on obound")
|
||||
case 1:
|
||||
pa := packets.NewControlPacket(packets.Puback).(*packets.PubackPacket)
|
||||
pa.MessageID = packet.MessageID
|
||||
DEBUG.Println(NET, "putting puback msg on obound")
|
||||
persistOutbound(persist, pa)
|
||||
oboundP <- &PacketAndToken{p: pa, t: nil}
|
||||
persistOutbound(persist, pa) // May fail if store has been closed
|
||||
sendAck(&PacketAndToken{p: pa, t: nil})
|
||||
DEBUG.Println(NET, "done putting puback msg on obound")
|
||||
case 0:
|
||||
// do nothing, since there is no need to send an ack packet back
|
||||
|
||||
11
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
11
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
@@ -50,16 +50,7 @@ func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, heade
|
||||
conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions)
|
||||
return conn, err
|
||||
case "mqtt", "tcp":
|
||||
allProxy := os.Getenv("all_proxy")
|
||||
if len(allProxy) == 0 {
|
||||
conn, err := dialer.Dial("tcp", uri.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return conn, nil
|
||||
}
|
||||
proxyDialer := proxy.FromEnvironment()
|
||||
|
||||
proxyDialer := proxy.FromEnvironmentUsing(dialer)
|
||||
conn, err := proxyDialer.Dial("tcp", uri.Host)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
|
||||
164
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
164
vendor/github.com/eclipse/paho.mqtt.golang/options.go
generated
vendored
@@ -62,93 +62,99 @@ type ConnectionAttemptHandler func(broker *url.URL, tlsCfg *tls.Config) *tls.Con
|
||||
// Does not carry out any MQTT specific handshakes.
|
||||
type OpenConnectionFunc func(uri *url.URL, options ClientOptions) (net.Conn, error)
|
||||
|
||||
// ConnectionNotificationHandler is invoked for any type of connection event.
|
||||
type ConnectionNotificationHandler func(Client, ConnectionNotification)
|
||||
|
||||
// ClientOptions contains configurable options for an Client. Note that these should be set using the
|
||||
// relevant methods (e.g. AddBroker) rather than directly. See those functions for information on usage.
|
||||
// WARNING: Create the below using NewClientOptions unless you have a compelling reason not to. It is easy
|
||||
// to create a configuration with difficult to trace issues (e.g. Mosquitto 2.0.12+ will reject connections
|
||||
// with KeepAlive=0 by default).
|
||||
type ClientOptions struct {
|
||||
Servers []*url.URL
|
||||
ClientID string
|
||||
Username string
|
||||
Password string
|
||||
CredentialsProvider CredentialsProvider
|
||||
CleanSession bool
|
||||
Order bool
|
||||
WillEnabled bool
|
||||
WillTopic string
|
||||
WillPayload []byte
|
||||
WillQos byte
|
||||
WillRetained bool
|
||||
ProtocolVersion uint
|
||||
protocolVersionExplicit bool
|
||||
TLSConfig *tls.Config
|
||||
KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0.
|
||||
PingTimeout time.Duration
|
||||
ConnectTimeout time.Duration
|
||||
MaxReconnectInterval time.Duration
|
||||
AutoReconnect bool
|
||||
ConnectRetryInterval time.Duration
|
||||
ConnectRetry bool
|
||||
Store Store
|
||||
DefaultPublishHandler MessageHandler
|
||||
OnConnect OnConnectHandler
|
||||
OnConnectionLost ConnectionLostHandler
|
||||
OnReconnecting ReconnectHandler
|
||||
OnConnectAttempt ConnectionAttemptHandler
|
||||
WriteTimeout time.Duration
|
||||
MessageChannelDepth uint
|
||||
ResumeSubs bool
|
||||
HTTPHeaders http.Header
|
||||
WebsocketOptions *WebsocketOptions
|
||||
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
|
||||
Dialer *net.Dialer
|
||||
CustomOpenConnectionFn OpenConnectionFunc
|
||||
AutoAckDisabled bool
|
||||
Servers []*url.URL
|
||||
ClientID string
|
||||
Username string
|
||||
Password string
|
||||
CredentialsProvider CredentialsProvider
|
||||
CleanSession bool
|
||||
Order bool
|
||||
WillEnabled bool
|
||||
WillTopic string
|
||||
WillPayload []byte
|
||||
WillQos byte
|
||||
WillRetained bool
|
||||
ProtocolVersion uint
|
||||
protocolVersionExplicit bool
|
||||
TLSConfig *tls.Config
|
||||
KeepAlive int64 // Warning: Some brokers may reject connections with Keepalive = 0.
|
||||
PingTimeout time.Duration
|
||||
ConnectTimeout time.Duration
|
||||
MaxReconnectInterval time.Duration
|
||||
AutoReconnect bool
|
||||
ConnectRetryInterval time.Duration
|
||||
ConnectRetry bool
|
||||
Store Store
|
||||
DefaultPublishHandler MessageHandler
|
||||
OnConnect OnConnectHandler
|
||||
OnConnectionLost ConnectionLostHandler
|
||||
OnReconnecting ReconnectHandler
|
||||
OnConnectAttempt ConnectionAttemptHandler
|
||||
OnConnectionNotification ConnectionNotificationHandler
|
||||
WriteTimeout time.Duration
|
||||
MessageChannelDepth uint
|
||||
ResumeSubs bool
|
||||
HTTPHeaders http.Header
|
||||
WebsocketOptions *WebsocketOptions
|
||||
MaxResumePubInFlight int // // 0 = no limit; otherwise this is the maximum simultaneous messages sent while resuming
|
||||
Dialer *net.Dialer
|
||||
CustomOpenConnectionFn OpenConnectionFunc
|
||||
AutoAckDisabled bool
|
||||
}
|
||||
|
||||
// NewClientOptions will create a new ClientClientOptions type with some
|
||||
// default values.
|
||||
// Port: 1883
|
||||
// CleanSession: True
|
||||
// Order: True (note: it is recommended that this be set to FALSE unless order is important)
|
||||
// KeepAlive: 30 (seconds)
|
||||
// ConnectTimeout: 30 (seconds)
|
||||
// MaxReconnectInterval 10 (minutes)
|
||||
// AutoReconnect: True
|
||||
//
|
||||
// Port: 1883
|
||||
// CleanSession: True
|
||||
// Order: True (note: it is recommended that this be set to FALSE unless order is important)
|
||||
// KeepAlive: 30 (seconds)
|
||||
// ConnectTimeout: 30 (seconds)
|
||||
// MaxReconnectInterval 10 (minutes)
|
||||
// AutoReconnect: True
|
||||
func NewClientOptions() *ClientOptions {
|
||||
o := &ClientOptions{
|
||||
Servers: nil,
|
||||
ClientID: "",
|
||||
Username: "",
|
||||
Password: "",
|
||||
CleanSession: true,
|
||||
Order: true,
|
||||
WillEnabled: false,
|
||||
WillTopic: "",
|
||||
WillPayload: nil,
|
||||
WillQos: 0,
|
||||
WillRetained: false,
|
||||
ProtocolVersion: 0,
|
||||
protocolVersionExplicit: false,
|
||||
KeepAlive: 30,
|
||||
PingTimeout: 10 * time.Second,
|
||||
ConnectTimeout: 30 * time.Second,
|
||||
MaxReconnectInterval: 10 * time.Minute,
|
||||
AutoReconnect: true,
|
||||
ConnectRetryInterval: 30 * time.Second,
|
||||
ConnectRetry: false,
|
||||
Store: nil,
|
||||
OnConnect: nil,
|
||||
OnConnectionLost: DefaultConnectionLostHandler,
|
||||
OnConnectAttempt: nil,
|
||||
WriteTimeout: 0, // 0 represents timeout disabled
|
||||
ResumeSubs: false,
|
||||
HTTPHeaders: make(map[string][]string),
|
||||
WebsocketOptions: &WebsocketOptions{},
|
||||
Dialer: &net.Dialer{Timeout: 30 * time.Second},
|
||||
CustomOpenConnectionFn: nil,
|
||||
AutoAckDisabled: false,
|
||||
Servers: nil,
|
||||
ClientID: "",
|
||||
Username: "",
|
||||
Password: "",
|
||||
CleanSession: true,
|
||||
Order: true,
|
||||
WillEnabled: false,
|
||||
WillTopic: "",
|
||||
WillPayload: nil,
|
||||
WillQos: 0,
|
||||
WillRetained: false,
|
||||
ProtocolVersion: 0,
|
||||
protocolVersionExplicit: false,
|
||||
KeepAlive: 30,
|
||||
PingTimeout: 10 * time.Second,
|
||||
ConnectTimeout: 30 * time.Second,
|
||||
MaxReconnectInterval: 10 * time.Minute,
|
||||
AutoReconnect: true,
|
||||
ConnectRetryInterval: 30 * time.Second,
|
||||
ConnectRetry: false,
|
||||
Store: nil,
|
||||
OnConnect: nil,
|
||||
OnConnectionLost: DefaultConnectionLostHandler,
|
||||
OnConnectAttempt: nil,
|
||||
OnConnectionNotification: nil,
|
||||
WriteTimeout: 0, // 0 represents timeout disabled
|
||||
ResumeSubs: false,
|
||||
HTTPHeaders: make(map[string][]string),
|
||||
WebsocketOptions: &WebsocketOptions{},
|
||||
Dialer: &net.Dialer{Timeout: 30 * time.Second},
|
||||
CustomOpenConnectionFn: nil,
|
||||
AutoAckDisabled: false,
|
||||
}
|
||||
return o
|
||||
}
|
||||
@@ -355,6 +361,13 @@ func (o *ClientOptions) SetConnectionAttemptHandler(onConnectAttempt ConnectionA
|
||||
return o
|
||||
}
|
||||
|
||||
// SetConnectionNotificationHandler sets the ConnectionNotificationHandler callback to receive all types of connection
|
||||
// events.
|
||||
func (o *ClientOptions) SetConnectionNotificationHandler(onConnectionNotification ConnectionNotificationHandler) *ClientOptions {
|
||||
o.OnConnectionNotification = onConnectionNotification
|
||||
return o
|
||||
}
|
||||
|
||||
// SetWriteTimeout puts a limit on how long a mqtt publish should block until it unblocks with a
|
||||
// timeout error. A duration of 0 never times out. Default never times out
|
||||
func (o *ClientOptions) SetWriteTimeout(t time.Duration) *ClientOptions {
|
||||
@@ -450,6 +463,7 @@ func (o *ClientOptions) SetCustomOpenConnectionFn(customOpenConnectionFn OpenCon
|
||||
}
|
||||
|
||||
// SetAutoAckDisabled enables or disables the Automated Acking of Messages received by the handler.
|
||||
//
|
||||
// By default it is set to false. Setting it to true will disable the auto-ack globally.
|
||||
func (o *ClientOptions) SetAutoAckDisabled(autoAckDisabled bool) *ClientOptions {
|
||||
o.AutoAckDisabled = autoAckDisabled
|
||||
|
||||
15
vendor/github.com/eclipse/paho.mqtt.golang/options_reader.go
generated
vendored
15
vendor/github.com/eclipse/paho.mqtt.golang/options_reader.go
generated
vendored
@@ -30,6 +30,21 @@ type ClientOptionsReader struct {
|
||||
options *ClientOptions
|
||||
}
|
||||
|
||||
// NewOptionsReader creates a ClientOptionsReader, this should only be used for mocking purposes.
|
||||
//
|
||||
// An example implementation:
|
||||
//
|
||||
// func (c *mqttClientMock) OptionsReader() mqtt.ClientOptionsReader {
|
||||
// opts := mqtt.NewClientOptions()
|
||||
// opts.UserName = "TestUserName"
|
||||
// return mqtt.NewOptionsReader(opts)
|
||||
// }
|
||||
func NewOptionsReader(o *ClientOptions) ClientOptionsReader {
|
||||
return ClientOptionsReader{
|
||||
options: o,
|
||||
}
|
||||
}
|
||||
|
||||
// Servers returns a slice of the servers defined in the clientoptions
|
||||
func (r *ClientOptionsReader) Servers() []*url.URL {
|
||||
s := make([]*url.URL, len(r.options.Servers))
|
||||
|
||||
5
vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
generated
vendored
5
vendor/github.com/eclipse/paho.mqtt.golang/packets/packets.go
generated
vendored
@@ -330,6 +330,11 @@ func decodeBytes(b io.Reader) ([]byte, error) {
|
||||
}
|
||||
|
||||
func encodeBytes(field []byte) []byte {
|
||||
// Attempting to encode more than 65,535 bytes would lead to an unexpected 16-bit length and extra data written
|
||||
// (which would be parsed as later parts of the message). The safest option is to truncate.
|
||||
if len(field) > 65535 {
|
||||
field = field[0:65535]
|
||||
}
|
||||
fieldLength := make([]byte, 2)
|
||||
binary.BigEndian.PutUint16(fieldLength, uint16(len(field)))
|
||||
return append(fieldLength, field...)
|
||||
|
||||
2
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
2
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
@@ -38,7 +38,7 @@ func keepalive(c *client, conn io.Writer) {
|
||||
if c.options.KeepAlive > 10 {
|
||||
checkInterval = 5 * time.Second
|
||||
} else {
|
||||
checkInterval = time.Duration(c.options.KeepAlive) * time.Second / 2
|
||||
checkInterval = time.Duration(c.options.KeepAlive) * time.Second / 4
|
||||
}
|
||||
|
||||
intervalTicker := time.NewTicker(checkInterval)
|
||||
|
||||
81
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
81
vendor/github.com/eclipse/paho.mqtt.golang/router.go
generated
vendored
@@ -136,60 +136,41 @@ func (r *router) setDefaultHandler(handler MessageHandler) {
|
||||
// associated callback (or the defaultHandler, if one exists and no other route matched). If
|
||||
// anything is sent down the stop channel the function will end.
|
||||
func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order bool, client *client) <-chan *PacketAndToken {
|
||||
var wg sync.WaitGroup
|
||||
ackOutChan := make(chan *PacketAndToken) // Channel returned to caller; closed when messages channel closed
|
||||
var ackInChan chan *PacketAndToken // ACKs generated by ackFunc get put onto this channel
|
||||
ackChan := make(chan *PacketAndToken) // Channel returned to caller; closed when goroutine terminates
|
||||
|
||||
stopAckCopy := make(chan struct{}) // Closure requests stop of go routine copying ackInChan to ackOutChan
|
||||
ackCopyStopped := make(chan struct{}) // Closure indicates that it is safe to close ackOutChan
|
||||
goRoutinesDone := make(chan struct{}) // closed on wg.Done()
|
||||
if order {
|
||||
ackInChan = ackOutChan // When order = true no go routines are used so safe to use one channel and close when done
|
||||
} else {
|
||||
// When order = false ACK messages are sent in go routines so ackInChan cannot be closed until all goroutines done
|
||||
ackInChan = make(chan *PacketAndToken)
|
||||
go func() { // go routine to copy from ackInChan to ackOutChan until stopped
|
||||
for {
|
||||
select {
|
||||
case a := <-ackInChan:
|
||||
ackOutChan <- a
|
||||
case <-stopAckCopy:
|
||||
close(ackCopyStopped) // Signal main go routine that it is safe to close ackOutChan
|
||||
for {
|
||||
select {
|
||||
case <-ackInChan: // drain ackInChan to ensure all goRoutines can complete cleanly (ACK dropped)
|
||||
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
|
||||
case <-goRoutinesDone:
|
||||
close(ackInChan) // Nothing further should be sent (a panic is probably better than silent failure)
|
||||
DEBUG.Println(ROU, "matchAndDispatch order=false copy goroutine exiting.")
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
// In some cases message acknowledgments may come through after shutdown (connection is down etc). Where this is the
|
||||
// case we need to accept any such requests and then ignore them. Note that this is not a perfect solution, if we
|
||||
// have reconnected, and the session is still live, then the Ack really should be sent (see Issus #726)
|
||||
var ackMutex sync.RWMutex
|
||||
sendAckChan := ackChan // This will be set to nil before ackChan is closed
|
||||
sendAck := func(ack *PacketAndToken) {
|
||||
ackMutex.RLock()
|
||||
defer ackMutex.RUnlock()
|
||||
if sendAckChan != nil {
|
||||
sendAckChan <- ack
|
||||
} else {
|
||||
DEBUG.Println(ROU, "matchAndDispatch received acknowledgment after processing stopped (ACK dropped).")
|
||||
}
|
||||
}
|
||||
|
||||
go func() { // Main go routine handling inbound messages
|
||||
var handlers []MessageHandler
|
||||
for message := range messages {
|
||||
// DEBUG.Println(ROU, "matchAndDispatch received message")
|
||||
sent := false
|
||||
r.RLock()
|
||||
m := messageFromPublish(message, ackFunc(ackInChan, client.persist, message))
|
||||
var handlers []MessageHandler
|
||||
m := messageFromPublish(message, ackFunc(sendAck, client.persist, message))
|
||||
for e := r.routes.Front(); e != nil; e = e.Next() {
|
||||
if e.Value.(*route).match(message.TopicName) {
|
||||
if order {
|
||||
handlers = append(handlers, e.Value.(*route).callback)
|
||||
} else {
|
||||
hd := e.Value.(*route).callback
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
hd(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
sent = true
|
||||
@@ -200,13 +181,11 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
if order {
|
||||
handlers = append(handlers, r.defaultHandler)
|
||||
} else {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
r.defaultHandler(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
wg.Done()
|
||||
}()
|
||||
}
|
||||
} else {
|
||||
@@ -214,26 +193,22 @@ func (r *router) matchAndDispatch(messages <-chan *packets.PublishPacket, order
|
||||
}
|
||||
}
|
||||
r.RUnlock()
|
||||
for _, handler := range handlers {
|
||||
handler(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
if order {
|
||||
for _, handler := range handlers {
|
||||
handler(client, m)
|
||||
if !client.options.AutoAckDisabled {
|
||||
m.Ack()
|
||||
}
|
||||
}
|
||||
handlers = handlers[:0]
|
||||
}
|
||||
// DEBUG.Println(ROU, "matchAndDispatch handled message")
|
||||
}
|
||||
if order {
|
||||
close(ackOutChan)
|
||||
} else { // Ensure that nothing further will be written to ackOutChan before closing it
|
||||
close(stopAckCopy)
|
||||
<-ackCopyStopped
|
||||
close(ackOutChan)
|
||||
go func() {
|
||||
wg.Wait() // Note: If this remains running then the user has handlers that are not returning
|
||||
close(goRoutinesDone)
|
||||
}()
|
||||
}
|
||||
ackMutex.Lock()
|
||||
sendAckChan = nil
|
||||
ackMutex.Unlock()
|
||||
close(ackChan) // as sendAckChan is now nil nothing further will be sent on this
|
||||
DEBUG.Println(ROU, "matchAndDispatch exiting")
|
||||
}()
|
||||
return ackOutChan
|
||||
return ackChan
|
||||
}
|
||||
|
||||
18
vendor/github.com/eclipse/paho.mqtt.golang/token.go
generated
vendored
18
vendor/github.com/eclipse/paho.mqtt.golang/token.go
generated
vendored
@@ -17,6 +17,7 @@
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"errors"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@@ -202,3 +203,20 @@ type UnsubscribeToken struct {
|
||||
type DisconnectToken struct {
|
||||
baseToken
|
||||
}
|
||||
|
||||
// TimedOut is the error returned by WaitTimeout when the timeout expires
|
||||
var TimedOut = errors.New("context canceled")
|
||||
|
||||
// WaitTokenTimeout is a utility function used to simplify the use of token.WaitTimeout
|
||||
// token.WaitTimeout may return `false` due to time out but t.Error() still results
|
||||
// in nil.
|
||||
// `if t := client.X(); t.WaitTimeout(time.Second) && t.Error() != nil {` may evaluate
|
||||
// to false even if the operation fails.
|
||||
// It is important to note that if TimedOut is returned, then the operation may still be running
|
||||
// and could eventually complete successfully.
|
||||
func WaitTokenTimeout(t Token, d time.Duration) error {
|
||||
if !t.WaitTimeout(d) {
|
||||
return TimedOut
|
||||
}
|
||||
return t.Error()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user