Update vendor

This commit is contained in:
Jedri Visser
2023-03-03 21:26:04 +02:00
parent bf4716e8df
commit c4a095a094
558 changed files with 50116 additions and 35399 deletions

View File

@@ -1,15 +1,20 @@
/*
* Copyright (c) 2013 IBM Corp.
* Copyright (c) 2021 IBM Corp and others.
*
* All rights reserved. This program and the accompanying materials
* are made available under the terms of the Eclipse Public License v1.0
* which accompanies this distribution, and is available at
* http://www.eclipse.org/legal/epl-v10.html
* are made available under the terms of the Eclipse Public License v2.0
* and Eclipse Distribution License v1.0 which accompany this distribution.
*
* The Eclipse Public License is available at
* https://www.eclipse.org/legal/epl-2.0/
* and the Eclipse Distribution License is available at
* http://www.eclipse.org/org/documents/edl-v10.php.
*
* Contributors:
* Seth Hoenig
* Allan Stockdill-Mander
* Mike Robertson
* Matt Brittan
*/
// Portions copyright © 2018 TIBCO Software Inc.
@@ -19,6 +24,7 @@ package mqtt
import (
"bytes"
"context"
"errors"
"fmt"
"net"
@@ -27,14 +33,9 @@ import (
"sync/atomic"
"time"
"github.com/eclipse/paho.mqtt.golang/packets"
)
"golang.org/x/sync/semaphore"
const (
disconnected uint32 = iota
connecting
reconnecting
connected
"github.com/eclipse/paho.mqtt.golang/packets"
)
// Client is the interface definition for a Client as used by this
@@ -44,9 +45,12 @@ const (
// with an MQTT server using non-blocking methods that allow work
// to be done in the background.
// An application may connect to an MQTT server using:
// A plain TCP socket
// A secure SSL/TLS socket
// A websocket
//
// A plain TCP socket (e.g. mqtt://test.mosquitto.org:1833)
// A secure SSL/TLS socket (e.g. tls://test.mosquitto.org:8883)
// A websocket (e.g ws://test.mosquitto.org:8080 or wss://test.mosquitto.org:8081)
// Something else (using `options.CustomOpenConnectionFn`)
//
// To enable ensured message delivery at Quality of Service (QoS) levels
// described in the MQTT spec, a message persistence mechanism must be
// used. This is done by providing a type which implements the Store
@@ -120,8 +124,7 @@ type client struct {
lastReceived atomic.Value // time.Time - the last time a packet was successfully received from network
pingOutstanding int32 // set to 1 if a ping has been sent but response not ret received
status uint32 // see const definitions at top of file for possible values
sync.RWMutex // Protects the above two variables (note: atomic writes are also used somewhat inconsistently)
status connectionStatus // see constants in status.go for values
messageIds // effectively a map from message id to token completor
@@ -161,7 +164,6 @@ func NewClient(o *ClientOptions) Client {
c.options.protocolVersionExplicit = false
}
c.persist = c.options.Store
c.status = disconnected
c.messageIds = messageIds{index: make(map[uint16]tokenCompletor)}
c.msgRouter = newRouter()
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
@@ -188,47 +190,27 @@ func (c *client) AddRoute(topic string, callback MessageHandler) {
// the client is connected or not.
// connected means that the connection is up now OR it will
// be established/reestablished automatically when possible
// Warning: The connection status may change at any time so use this with care!
func (c *client) IsConnected() bool {
c.RLock()
defer c.RUnlock()
status := atomic.LoadUint32(&c.status)
// This will need to change if additional statuses are added
s, r := c.status.ConnectionStatusRetry()
switch {
case status == connected:
case s == connected:
return true
case c.options.AutoReconnect && status > connecting:
return true
case c.options.ConnectRetry && status == connecting:
case c.options.ConnectRetry && s == connecting:
return true
case c.options.AutoReconnect:
return s == reconnecting || (s == disconnecting && r) // r indicates we will reconnect
default:
return false
}
}
// IsConnectionOpen return a bool signifying whether the client has an active
// connection to mqtt broker, i.e not in disconnected or reconnect mode
// connection to mqtt broker, i.e. not in disconnected or reconnect mode
// Warning: The connection status may change at any time so use this with care!
func (c *client) IsConnectionOpen() bool {
c.RLock()
defer c.RUnlock()
status := atomic.LoadUint32(&c.status)
switch {
case status == connected:
return true
default:
return false
}
}
func (c *client) connectionStatus() uint32 {
c.RLock()
defer c.RUnlock()
status := atomic.LoadUint32(&c.status)
return status
}
func (c *client) setConnected(status uint32) {
c.Lock()
defer c.Unlock()
atomic.StoreUint32(&c.status, status)
return c.status.ConnectionStatus() == connected
}
// ErrNotConnected is the error returned from function calls that are
@@ -245,25 +227,31 @@ func (c *client) Connect() Token {
t := newToken(packets.Connect).(*ConnectToken)
DEBUG.Println(CLI, "Connect()")
if c.options.ConnectRetry && atomic.LoadUint32(&c.status) != disconnected {
// if in any state other than disconnected and ConnectRetry is
// enabled then the connection will come up automatically
// client can assume connection is up
WARN.Println(CLI, "Connect() called but not disconnected")
t.returnCode = packets.Accepted
t.flowComplete()
connectionUp, err := c.status.Connecting()
if err != nil {
if err == errAlreadyConnectedOrReconnecting && c.options.AutoReconnect {
// When reconnection is active we don't consider calls tro Connect to ba an error (mainly for compatability)
WARN.Println(CLI, "Connect() called but not disconnected")
t.returnCode = packets.Accepted
t.flowComplete()
return t
}
ERROR.Println(CLI, err) // CONNECT should never be called unless we are disconnected
t.setError(err)
return t
}
c.persist.Open()
if c.options.ConnectRetry {
c.reserveStoredPublishIDs() // Reserve IDs to allow publish before connect complete
c.reserveStoredPublishIDs() // Reserve IDs to allow publishing before connect complete
}
c.setConnected(connecting)
go func() {
if len(c.options.Servers) == 0 {
t.setError(fmt.Errorf("no servers defined to connect to"))
if err := connectionUp(false); err != nil {
ERROR.Println(CLI, err.Error())
}
return
}
@@ -274,29 +262,31 @@ func (c *client) Connect() Token {
conn, rc, t.sessionPresent, err = c.attemptConnection()
if err != nil {
if c.options.ConnectRetry {
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry")
DEBUG.Println(CLI, "Connect failed, sleeping for", int(c.options.ConnectRetryInterval.Seconds()), "seconds and will then retry, error:", err.Error())
time.Sleep(c.options.ConnectRetryInterval)
if atomic.LoadUint32(&c.status) == connecting {
if c.status.ConnectionStatus() == connecting { // Possible connection aborted elsewhere
goto RETRYCONN
}
}
ERROR.Println(CLI, "Failed to connect to a broker")
c.setConnected(disconnected)
c.persist.Close()
t.returnCode = rc
t.setError(err)
if err := connectionUp(false); err != nil {
ERROR.Println(CLI, err.Error())
}
return
}
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, inboundFromStore) {
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected)
// Take care of any messages in the store
if !c.options.CleanSession {
c.resume(c.options.ResumeSubs, inboundFromStore)
} else {
c.persist.Reset()
}
} else {
} else { // Note: With the new status subsystem this should only happen if Disconnect called simultaneously with the above
WARN.Println(CLI, "Connect() called but connection established in another goroutine")
}
@@ -308,7 +298,8 @@ func (c *client) Connect() Token {
}
// internal function used to reconnect the client when it loses its connection
func (c *client) reconnect() {
// The connection status MUST be reconnecting prior to calling this function (via call to status.connectionLost)
func (c *client) reconnect(connectionUp connCompletedFn) {
DEBUG.Println(CLI, "enter reconnect")
var (
sleep = 1 * time.Second
@@ -333,23 +324,18 @@ func (c *client) reconnect() {
if sleep > c.options.MaxReconnectInterval {
sleep = c.options.MaxReconnectInterval
}
// Disconnect may have been called
if atomic.LoadUint32(&c.status) == disconnected {
break
if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
if err := connectionUp(false); err != nil { // Should always return an error
ERROR.Println(CLI, err.Error())
}
DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
return
}
}
// Disconnect() must have been called while we were trying to reconnect.
if c.connectionStatus() == disconnected {
if conn != nil {
conn.Close()
}
DEBUG.Println(CLI, "Client moved to disconnected state while reconnecting, abandoning reconnect")
return
}
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, inboundFromStore) {
inboundFromStore := make(chan packets.ControlPacket) // there may be some inbound comms packets in the store that are awaiting processing
if c.startCommsWorkers(conn, connectionUp, inboundFromStore) { // note that this takes care of updating the status (to connected or disconnected)
c.resume(c.options.ResumeSubs, inboundFromStore)
}
close(inboundFromStore)
@@ -379,8 +365,23 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
cm := newConnectMsgFromOptions(&c.options, broker)
DEBUG.Println(CLI, "about to write new connect msg")
CONN:
tlsCfg := c.options.TLSConfig
if c.options.OnConnectAttempt != nil {
DEBUG.Println(CLI, "using custom onConnectAttempt handler...")
tlsCfg = c.options.OnConnectAttempt(broker, c.options.TLSConfig)
}
connDeadline := time.Now().Add(c.options.ConnectTimeout) // Time by which connection must be established
dialer := c.options.Dialer
if dialer == nil { //
WARN.Println(CLI, "dialer was nil, using default")
dialer = &net.Dialer{Timeout: 30 * time.Second}
}
// Start by opening the network connection (tcp, tls, ws) etc
conn, err = openConnection(broker, c.options.TLSConfig, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions)
if c.options.CustomOpenConnectionFn != nil {
conn, err = c.options.CustomOpenConnectionFn(broker, c.options)
} else {
conn, err = openConnection(broker, tlsCfg, c.options.ConnectTimeout, c.options.HTTPHeaders, c.options.WebsocketOptions, dialer)
}
if err != nil {
ERROR.Println(CLI, err.Error())
WARN.Println(CLI, "failed to connect to broker, trying next")
@@ -389,16 +390,23 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
}
DEBUG.Println(CLI, "socket connected to broker")
// Now we send the perform the MQTT connection handshake
// Now we perform the MQTT connection handshake ensuring that it does not exceed the timeout
if err := conn.SetDeadline(connDeadline); err != nil {
ERROR.Println(CLI, "set deadline for handshake ", err)
}
// Now we perform the MQTT connection handshake
rc, sessionPresent, err = connectMQTT(conn, cm, protocolVersion)
if rc == packets.Accepted {
if err := conn.SetDeadline(time.Time{}); err != nil {
ERROR.Println(CLI, "reset deadline following handshake ", err)
}
break // successfully connected
}
// We may be have to attempt the connection with MQTT 3.1
if conn != nil {
conn.Close()
}
// We may have to attempt the connection with MQTT 3.1
_ = conn.Close()
if !c.options.protocolVersionExplicit && protocolVersion == 4 { // try falling back to 3.1?
DEBUG.Println(CLI, "Trying reconnect using MQTT 3.1 protocol")
protocolVersion = 3
@@ -426,37 +434,63 @@ func (c *client) attemptConnection() (net.Conn, byte, bool, error) {
// Disconnect will end the connection with the server, but not before waiting
// the specified number of milliseconds to wait for existing work to be
// completed.
// WARNING: `Disconnect` may return before all activities (goroutines) have completed. This means that
// reusing the `client` may lead to panics. If you want to reconnect when the connection drops then use
// `SetAutoReconnect` and/or `SetConnectRetry`options instead of implementing this yourself.
func (c *client) Disconnect(quiesce uint) {
status := atomic.LoadUint32(&c.status)
if status == connected {
done := make(chan struct{}) // Simplest way to ensure quiesce is always honoured
go func() {
defer close(done)
disDone, err := c.status.Disconnecting()
if err != nil {
// Status has been set to disconnecting, but we had to wait for something else to complete
WARN.Println(CLI, err.Error())
return
}
defer func() {
c.disconnect() // Force disconnection
disDone() // Update status
}()
DEBUG.Println(CLI, "disconnecting")
c.setConnected(disconnected)
dm := packets.NewControlPacket(packets.Disconnect).(*packets.DisconnectPacket)
dt := newToken(packets.Disconnect)
c.oboundP <- &PacketAndToken{p: dm, t: dt}
select {
case c.oboundP <- &PacketAndToken{p: dm, t: dt}:
// wait for work to finish, or quiesce time consumed
DEBUG.Println(CLI, "calling WaitTimeout")
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
DEBUG.Println(CLI, "WaitTimeout done")
// Below code causes a potential data race. Following status refactor it should no longer be required
// but leaving in as need to check code further.
// case <-c.commsStopped:
// WARN.Println("Disconnect packet could not be sent because comms stopped")
case <-time.After(time.Duration(quiesce) * time.Millisecond):
WARN.Println("Disconnect packet not sent due to timeout")
}
}()
// wait for work to finish, or quiesce time consumed
DEBUG.Println(CLI, "calling WaitTimeout")
dt.WaitTimeout(time.Duration(quiesce) * time.Millisecond)
DEBUG.Println(CLI, "WaitTimeout done")
} else {
WARN.Println(CLI, "Disconnect() called but not connected (disconnected/reconnecting)")
c.setConnected(disconnected)
// Return when done or after timeout expires (would like to change but this maintains compatibility)
delay := time.NewTimer(time.Duration(quiesce) * time.Millisecond)
select {
case <-done:
if !delay.Stop() {
<-delay.C
}
case <-delay.C:
}
c.disconnect()
}
// forceDisconnect will end the connection with the mqtt broker immediately (used for tests only)
func (c *client) forceDisconnect() {
if !c.IsConnected() {
WARN.Println(CLI, "already disconnected")
disDone, err := c.status.Disconnecting()
if err != nil {
// Possible that we are not actually connected
WARN.Println(CLI, err.Error())
return
}
c.setConnected(disconnected)
DEBUG.Println(CLI, "forcefully disconnecting")
c.disconnect()
disDone()
}
// disconnect cleans up after a final disconnection (user requested so no auto reconnection)
@@ -473,47 +507,79 @@ func (c *client) disconnect() {
// internalConnLost cleanup when connection is lost or an error occurs
// Note: This function will not block
func (c *client) internalConnLost(err error) {
func (c *client) internalConnLost(whyConnLost error) {
// It is possible that internalConnLost will be called multiple times simultaneously
// (including after sending a DisconnectPacket) as such we only do cleanup etc if the
// routines were actually running and are not being disconnected at users request
DEBUG.Println(CLI, "internalConnLost called")
stopDone := c.stopCommsWorkers()
if stopDone != nil { // stopDone will be nil if workers already in the process of stopping or stopped
go func() {
DEBUG.Println(CLI, "internalConnLost waiting on workers")
<-stopDone
DEBUG.Println(CLI, "internalConnLost workers stopped")
// It is possible that Disconnect was called which led to this error so reconnection depends upon status
reconnect := c.options.AutoReconnect && c.connectionStatus() > connecting
if c.options.CleanSession && !reconnect {
c.messageIds.cleanUp()
}
if reconnect {
c.setConnected(reconnecting)
go c.reconnect()
} else {
c.setConnected(disconnected)
}
if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, err)
}
DEBUG.Println(CLI, "internalConnLost complete")
}()
disDone, err := c.status.ConnectionLost(c.options.AutoReconnect && c.status.ConnectionStatus() > connecting)
if err != nil {
if err == errConnLossWhileDisconnecting || err == errAlreadyHandlingConnectionLoss {
return // Loss of connection is expected or already being handled
}
ERROR.Println(CLI, fmt.Sprintf("internalConnLost unexpected status: %s", err.Error()))
return
}
// c.stopCommsWorker returns a channel that is closed when the operation completes. This was required prior
// to the implementation of proper status management but has been left in place, for now, to minimise change
stopDone := c.stopCommsWorkers()
// stopDone was required in previous versions because there was no connectionLost status (and there were
// issues with status handling). This code has been left in place for the time being just in case the new
// status handling contains bugs (refactoring required at some point).
if stopDone == nil { // stopDone will be nil if workers already in the process of stopping or stopped
ERROR.Println(CLI, "internalConnLost stopDone unexpectedly nil - BUG BUG")
// Cannot really do anything other than leave things disconnected
if _, err = disDone(false); err != nil { // Safest option - cannot leave status as connectionLost
ERROR.Println(CLI, fmt.Sprintf("internalConnLost failed to set status to disconnected (stopDone): %s", err.Error()))
}
return
}
// It may take a while for the disconnection to complete whatever called us needs to exit cleanly so finnish in goRoutine
go func() {
DEBUG.Println(CLI, "internalConnLost waiting on workers")
<-stopDone
DEBUG.Println(CLI, "internalConnLost workers stopped")
reConnDone, err := disDone(true)
if err != nil {
ERROR.Println(CLI, "failure whilst reporting completion of disconnect", err)
} else if reConnDone == nil { // Should never happen
ERROR.Println(CLI, "BUG BUG BUG reconnection function is nil", err)
}
reconnect := err == nil && reConnDone != nil
if c.options.CleanSession && !reconnect {
c.messageIds.cleanUp() // completes PUB/SUB/UNSUB tokens
} else if !c.options.ResumeSubs {
c.messageIds.cleanUpSubscribe() // completes SUB/UNSUB tokens
}
if reconnect {
go c.reconnect(reConnDone) // Will set connection status to reconnecting
}
if c.options.OnConnectionLost != nil {
go c.options.OnConnectionLost(c, whyConnLost)
}
DEBUG.Println(CLI, "internalConnLost complete")
}()
}
// startCommsWorkers is called when the connection is up. It starts off all of the routines needed to process incoming and
// outgoing messages.
// Returns true if the comms workers were started (i.e. they were not already running)
func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packets.ControlPacket) bool {
// startCommsWorkers is called when the connection is up.
// It starts off the routines needed to process incoming and outgoing messages.
// Returns true if the comms workers were started (i.e. successful connection)
// connectionUp(true) will be called once everything is up; connectionUp(false) will be called on failure
func (c *client) startCommsWorkers(conn net.Conn, connectionUp connCompletedFn, inboundFromStore <-chan packets.ControlPacket) bool {
DEBUG.Println(CLI, "startCommsWorkers called")
c.connMu.Lock()
defer c.connMu.Unlock()
if c.conn != nil {
WARN.Println(CLI, "startCommsWorkers called when commsworkers already running")
conn.Close() // No use for the new network connection
if c.conn != nil { // Should never happen due to new status handling; leaving in for safety for the time being
WARN.Println(CLI, "startCommsWorkers called when commsworkers already running BUG BUG")
_ = conn.Close() // No use for the new network connection
if err := connectionUp(false); err != nil {
ERROR.Println(CLI, err.Error())
}
return false
}
c.conn = conn // Store the connection
@@ -533,7 +599,17 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
c.workers.Add(1) // Done will be called when ackOut is closed
ackOut := c.msgRouter.matchAndDispatch(incomingPubChan, c.options.Order, c)
c.setConnected(connected)
// The connection is now ready for use (we spin up a few go routines below). It is possible that
// Disconnect has been called in the interim...
if err := connectionUp(true); err != nil {
DEBUG.Println(CLI, err)
close(c.stop) // Tidy up anything we have already started
close(incomingPubChan)
c.workers.Wait()
c.conn.Close()
c.conn = nil
return false
}
DEBUG.Println(CLI, "client is connected/reconnected")
if c.options.OnConnect != nil {
go c.options.OnConnect(c)
@@ -592,7 +668,22 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
commsIncomingPub = nil
continue
}
incomingPubChan <- pub
// Care is needed here because an error elsewhere could trigger a deadlock
sendPubLoop:
for {
select {
case incomingPubChan <- pub:
break sendPubLoop
case err, ok := <-commsErrors:
if !ok { // commsErrors has been closed so we can ignore it
commsErrors = nil
continue
}
ERROR.Println(CLI, "Connect comms goroutine - error triggered during send Pub", err)
c.internalConnLost(err) // no harm in calling this if the connection is already down (or shutdown is in progress)
continue
}
}
case err, ok := <-commsErrors:
if !ok {
commsErrors = nil
@@ -611,8 +702,9 @@ func (c *client) startCommsWorkers(conn net.Conn, inboundFromStore <-chan packet
}
// stopWorkersAndComms - Cleanly shuts down worker go routines (including the comms routines) and waits until everything has stopped
// Returns nil it workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
// Returns nil if workers did not need to be stopped; otherwise returns a channel which will be closed when the stop is complete
// Note: This may block so run as a go routine if calling from any of the comms routines
// Note2: It should be possible to simplify this now that the new status management code is in place.
func (c *client) stopCommsWorkers() chan struct{} {
DEBUG.Println(CLI, "stopCommsWorkers called")
// It is possible that this function will be called multiple times simultaneously due to the way things get shutdown
@@ -661,7 +753,8 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
case !c.IsConnected():
token.setError(ErrNotConnected)
return token
case c.connectionStatus() == reconnecting && qos == 0:
case c.status.ConnectionStatus() == reconnecting && qos == 0:
// message written to store and will be sent when connection comes up
token.flowComplete()
return token
}
@@ -691,11 +784,13 @@ func (c *client) Publish(topic string, qos byte, retained bool, payload interfac
token.messageID = mID
}
persistOutbound(c.persist, pub)
switch c.connectionStatus() {
switch c.status.ConnectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing publish message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing publish message (reconnecting), topic:", topic)
case disconnecting:
DEBUG.Println(CLI, "storing publish message (disconnecting), topic:", topic)
default:
DEBUG.Println(CLI, "sending publish message, topic:", topic)
publishWaitTimeout := c.options.WriteTimeout
@@ -728,11 +823,11 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this sub will be thrown away
// if not connected and resumeSubs not set this sub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this sub will be thrown away
case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
// if reconnecting and cleanSession is true this sub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
@@ -770,12 +865,16 @@ func (c *client) Subscribe(topic string, qos byte, callback MessageHandler) Toke
}
DEBUG.Println(CLI, sub.String())
persistOutbound(c.persist, sub)
switch c.connectionStatus() {
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.status.ConnectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topic:", topic)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topic:", topic)
case disconnecting:
DEBUG.Println(CLI, "storing subscribe message (disconnecting), topic:", topic)
default:
DEBUG.Println(CLI, "sending subscribe message, topic:", topic)
subscribeWaitTimeout := c.options.WriteTimeout
@@ -813,8 +912,8 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
// if not connected and resumesubs not set this sub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this sub will be thrown away
case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
// if reconnecting and cleanSession is true this sub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
@@ -842,12 +941,16 @@ func (c *client) SubscribeMultiple(filters map[string]byte, callback MessageHand
sub.MessageID = mID
token.messageID = mID
}
persistOutbound(c.persist, sub)
switch c.connectionStatus() {
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, sub)
}
switch c.status.ConnectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing subscribe message (connecting), topics:", sub.Topics)
case reconnecting:
DEBUG.Println(CLI, "storing subscribe message (reconnecting), topics:", sub.Topics)
case disconnecting:
DEBUG.Println(CLI, "storing subscribe message (disconnecting), topics:", sub.Topics)
default:
DEBUG.Println(CLI, "sending subscribe message, topics:", sub.Topics)
subscribeWaitTimeout := c.options.WriteTimeout
@@ -889,10 +992,42 @@ func (c *client) reserveStoredPublishIDs() {
// Load all stored messages and resend them
// Call this to ensure QOS > 1,2 even after an application crash
// Note: This function will exit if c.stop is closed (this allows the shutdown to proceed avoiding a potential deadlock)
//
// other than that it does not return until all messages in the store have been sent (connect() does not complete its
// token before this completes)
func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
DEBUG.Println(STR, "enter Resume")
// Prior to sending a message getSemaphore will be called and once sent releaseSemaphore will be called
// with the token (so semaphore can be released when ACK received if applicable).
// Using a weighted semaphore rather than channels because this retains ordering
getSemaphore := func() {} // Default = do nothing
releaseSemaphore := func(_ *PublishToken) {} // Default = do nothing
var sem *semaphore.Weighted
if c.options.MaxResumePubInFlight > 0 {
sem = semaphore.NewWeighted(int64(c.options.MaxResumePubInFlight))
ctx, cancel := context.WithCancel(context.Background()) // Context needed for semaphore
defer cancel() // ensure context gets cancelled
go func() {
select {
case <-c.stop: // Request to stop (due to comm error etc)
cancel()
case <-ctx.Done(): // resume completed normally
}
}()
getSemaphore = func() { sem.Acquire(ctx, 1) }
releaseSemaphore = func(token *PublishToken) { // Note: If token never completes then resume() may stall (will still exit on ctx.Done())
go func() {
select {
case <-token.Done():
case <-ctx.Done():
}
sem.Release(1)
}()
}
}
storedKeys := c.persist.All()
for _, key := range storedKeys {
packet := c.persist.Get(key)
@@ -956,14 +1091,16 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
c.claimID(token, details.MessageID)
DEBUG.Println(STR, fmt.Sprintf("loaded pending publish (%d)", details.MessageID))
DEBUG.Println(STR, details)
getSemaphore()
select {
case c.obound <- &PacketAndToken{p: p, t: token}:
case <-c.stop:
DEBUG.Println(STR, "resume exiting due to stop")
return
}
releaseSemaphore(token) // If limiting simultaneous messages then we need to know when message is acknowledged
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
ERROR.Println(STR, fmt.Sprintf("invalid message type (inbound - %T) in store (discarded)", packet))
c.persist.Del(key)
}
} else {
@@ -977,7 +1114,7 @@ func (c *client) resume(subscription bool, ibound chan packets.ControlPacket) {
return
}
default:
ERROR.Println(STR, "invalid message type in store (discarded)")
ERROR.Println(STR, fmt.Sprintf("invalid message type (%T) in store (discarded)", packet))
c.persist.Del(key)
}
}
@@ -998,11 +1135,11 @@ func (c *client) Unsubscribe(topics ...string) Token {
if !c.IsConnectionOpen() {
switch {
case !c.options.ResumeSubs:
// if not connected and resumesubs not set this unsub will be thrown away
// if not connected and resumeSubs not set this unsub will be thrown away
token.setError(fmt.Errorf("not currently connected and ResumeSubs not set"))
return token
case c.options.CleanSession && c.connectionStatus() == reconnecting:
// if reconnecting and cleansession is true this unsub will be thrown away
case c.options.CleanSession && c.status.ConnectionStatus() == reconnecting:
// if reconnecting and cleanSession is true this unsub will be thrown away
token.setError(fmt.Errorf("reconnecting state and cleansession is true"))
return token
}
@@ -1021,13 +1158,17 @@ func (c *client) Unsubscribe(topics ...string) Token {
token.messageID = mID
}
persistOutbound(c.persist, unsub)
if c.options.ResumeSubs { // Only persist if we need this to resume subs after a disconnection
persistOutbound(c.persist, unsub)
}
switch c.connectionStatus() {
switch c.status.ConnectionStatus() {
case connecting:
DEBUG.Println(CLI, "storing unsubscribe message (connecting), topics:", topics)
case reconnecting:
DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
case disconnecting:
DEBUG.Println(CLI, "storing unsubscribe message (reconnecting), topics:", topics)
default:
DEBUG.Println(CLI, "sending unsubscribe message, topics:", topics)
subscribeWaitTimeout := c.options.WriteTimeout