Go vendor update
This commit is contained in:
6
vendor/github.com/eclipse/paho.mqtt.golang/README.md
generated
vendored
6
vendor/github.com/eclipse/paho.mqtt.golang/README.md
generated
vendored
@@ -113,7 +113,9 @@ identifier; this is as per the [spec](https://docs.oasis-open.org/mqtt/mqtt/v3.1
|
||||
not received, disconnecting` errors).
|
||||
* When QOS1+ subscriptions have been created previously and you connect with `CleanSession` set to false it is possible
|
||||
that the broker will deliver retained messages before `Subscribe` can be called. To process these messages either
|
||||
configure a handler with `AddRoute` or set a `DefaultPublishHandler`.
|
||||
configure a handler with `AddRoute` or set a `DefaultPublishHandler`. If there is no handler (or `DefaultPublishHandler`)
|
||||
then inbound messages will not be acknowledged. Adding a handler (even if it's `opts.SetDefaultPublishHandler(func(mqtt.Client, mqtt.Message) {})`)
|
||||
is highly recommended to avoid inadvertently hitting inflight message limits.
|
||||
* Loss of network connectivity may not be detected immediately. If this is an issue then consider setting
|
||||
`ClientOptions.KeepAlive` (sends regular messages to check the link is active).
|
||||
* Reusing a `Client` is not completely safe. After calling `Disconnect` please create a new Client (`NewClient()`) rather
|
||||
@@ -193,4 +195,4 @@ Discussion of the Paho clients takes place on the [Eclipse paho-dev mailing list
|
||||
|
||||
General questions about the MQTT protocol are discussed in the [MQTT Google Group](https://groups.google.com/forum/?hl=en-US&fromgroups#!forum/mqtt).
|
||||
|
||||
There is much more information available via the [MQTT community site](http://mqtt.org).
|
||||
There is much more information available via the [MQTT community site](http://mqtt.org).
|
||||
|
||||
104
vendor/github.com/eclipse/paho.mqtt.golang/backoff.go
generated
vendored
Normal file
104
vendor/github.com/eclipse/paho.mqtt.golang/backoff.go
generated
vendored
Normal file
@@ -0,0 +1,104 @@
|
||||
/*
|
||||
* Copyright (c) 2021 IBM Corp and others.
|
||||
*
|
||||
* All rights reserved. This program and the accompanying materials
|
||||
* are made available under the terms of the Eclipse Public License v2.0
|
||||
* and Eclipse Distribution License v1.0 which accompany this distribution.
|
||||
*
|
||||
* The Eclipse Public License is available at
|
||||
* https://www.eclipse.org/legal/epl-2.0/
|
||||
* and the Eclipse Distribution License is available at
|
||||
* http://www.eclipse.org/org/documents/edl-v10.php.
|
||||
*
|
||||
* Contributors:
|
||||
* Matt Brittan
|
||||
* Daichi Tomaru
|
||||
*/
|
||||
|
||||
package mqtt
|
||||
|
||||
import (
|
||||
"sync"
|
||||
"time"
|
||||
)
|
||||
|
||||
// Controller for sleep with backoff when the client attempts reconnection
|
||||
// It has statuses for each situations cause reconnection.
|
||||
type backoffController struct {
|
||||
sync.RWMutex
|
||||
statusMap map[string]*backoffStatus
|
||||
}
|
||||
|
||||
type backoffStatus struct {
|
||||
lastSleepPeriod time.Duration
|
||||
lastErrorTime time.Time
|
||||
}
|
||||
|
||||
func newBackoffController() *backoffController {
|
||||
return &backoffController{
|
||||
statusMap: map[string]*backoffStatus{},
|
||||
}
|
||||
}
|
||||
|
||||
// Calculate next sleep period from the specified parameters.
|
||||
// Returned values are next sleep period and whether the error situation is continual.
|
||||
// If connection errors continuouslly occurs, its sleep period is exponentially increased.
|
||||
// Also if there is a lot of time between last and this error, sleep period is initialized.
|
||||
func (b *backoffController) getBackoffSleepTime(
|
||||
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
|
||||
) (time.Duration, bool) {
|
||||
// Decide first sleep time if the situation is not continual.
|
||||
var firstProcess = func(status *backoffStatus, init time.Duration, skip bool) (time.Duration, bool) {
|
||||
if skip {
|
||||
status.lastSleepPeriod = 0
|
||||
return 0, false
|
||||
}
|
||||
status.lastSleepPeriod = init
|
||||
return init, false
|
||||
}
|
||||
|
||||
// Prioritize maxSleep.
|
||||
if initSleepPeriod > maxSleepPeriod {
|
||||
initSleepPeriod = maxSleepPeriod
|
||||
}
|
||||
b.Lock()
|
||||
defer b.Unlock()
|
||||
|
||||
status, exist := b.statusMap[situation]
|
||||
if !exist {
|
||||
b.statusMap[situation] = &backoffStatus{initSleepPeriod, time.Now()}
|
||||
return firstProcess(b.statusMap[situation], initSleepPeriod, skipFirst)
|
||||
}
|
||||
|
||||
oldTime := status.lastErrorTime
|
||||
status.lastErrorTime = time.Now()
|
||||
|
||||
// When there is a lot of time between last and this error, sleep period is initialized.
|
||||
if status.lastErrorTime.Sub(oldTime) > (processTime * 2 + status.lastSleepPeriod) {
|
||||
return firstProcess(status, initSleepPeriod, skipFirst)
|
||||
}
|
||||
|
||||
if status.lastSleepPeriod == 0 {
|
||||
status.lastSleepPeriod = initSleepPeriod
|
||||
return initSleepPeriod, true
|
||||
}
|
||||
|
||||
if nextSleepPeriod := status.lastSleepPeriod * 2; nextSleepPeriod <= maxSleepPeriod {
|
||||
status.lastSleepPeriod = nextSleepPeriod
|
||||
} else {
|
||||
status.lastSleepPeriod = maxSleepPeriod
|
||||
}
|
||||
|
||||
return status.lastSleepPeriod, true
|
||||
}
|
||||
|
||||
// Execute sleep the time returned from getBackoffSleepTime.
|
||||
func (b *backoffController) sleepWithBackoff(
|
||||
situation string, initSleepPeriod time.Duration, maxSleepPeriod time.Duration, processTime time.Duration, skipFirst bool,
|
||||
) (time.Duration, bool) {
|
||||
sleep, isFirst := b.getBackoffSleepTime(situation, initSleepPeriod, maxSleepPeriod, processTime, skipFirst)
|
||||
if sleep != 0 {
|
||||
time.Sleep(sleep)
|
||||
}
|
||||
return sleep, isFirst
|
||||
}
|
||||
22
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
22
vendor/github.com/eclipse/paho.mqtt.golang/client.go
generated
vendored
@@ -141,6 +141,8 @@ type client struct {
|
||||
stop chan struct{} // Closed to request that workers stop
|
||||
workers sync.WaitGroup // used to wait for workers to complete (ping, keepalive, errwatch, resume)
|
||||
commsStopped chan struct{} // closed when the comms routines have stopped (kept running until after workers have closed to avoid deadlocks)
|
||||
|
||||
backoff *backoffController
|
||||
}
|
||||
|
||||
// NewClient will create an MQTT v3.1.1 client with all of the options specified
|
||||
@@ -169,6 +171,7 @@ func NewClient(o *ClientOptions) Client {
|
||||
c.msgRouter.setDefaultHandler(c.options.DefaultPublishHandler)
|
||||
c.obound = make(chan *PacketAndToken)
|
||||
c.oboundP = make(chan *PacketAndToken)
|
||||
c.backoff = newBackoffController()
|
||||
return c
|
||||
}
|
||||
|
||||
@@ -302,10 +305,16 @@ func (c *client) Connect() Token {
|
||||
func (c *client) reconnect(connectionUp connCompletedFn) {
|
||||
DEBUG.Println(CLI, "enter reconnect")
|
||||
var (
|
||||
sleep = 1 * time.Second
|
||||
initSleep = 1 * time.Second
|
||||
conn net.Conn
|
||||
)
|
||||
|
||||
// If the reason of connection lost is same as the before one, sleep timer is set before attempting connection is started.
|
||||
// Sleep time is exponentially increased as the same situation continues
|
||||
if slp, isContinual := c.backoff.sleepWithBackoff("connectionLost", initSleep, c.options.MaxReconnectInterval, 3 * time.Second, true); isContinual {
|
||||
DEBUG.Println(CLI, "Detect continual connection lost after reconnect, slept for", int(slp.Seconds()), "seconds")
|
||||
}
|
||||
|
||||
for {
|
||||
if nil != c.options.OnReconnecting {
|
||||
c.options.OnReconnecting(c, &c.options)
|
||||
@@ -315,15 +324,8 @@ func (c *client) reconnect(connectionUp connCompletedFn) {
|
||||
if err == nil {
|
||||
break
|
||||
}
|
||||
DEBUG.Println(CLI, "Reconnect failed, sleeping for", int(sleep.Seconds()), "seconds:", err)
|
||||
time.Sleep(sleep)
|
||||
if sleep < c.options.MaxReconnectInterval {
|
||||
sleep *= 2
|
||||
}
|
||||
|
||||
if sleep > c.options.MaxReconnectInterval {
|
||||
sleep = c.options.MaxReconnectInterval
|
||||
}
|
||||
sleep, _ := c.backoff.sleepWithBackoff("attemptReconnection", initSleep, c.options.MaxReconnectInterval, c.options.ConnectTimeout, false)
|
||||
DEBUG.Println(CLI, "Reconnect failed, slept for", int(sleep.Seconds()), "seconds:", err)
|
||||
|
||||
if c.status.ConnectionStatus() != reconnecting { // Disconnect may have been called
|
||||
if err := connectionUp(false); err != nil { // Should always return an error
|
||||
|
||||
8
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
8
vendor/github.com/eclipse/paho.mqtt.golang/netconn.go
generated
vendored
@@ -40,10 +40,14 @@ import (
|
||||
func openConnection(uri *url.URL, tlsc *tls.Config, timeout time.Duration, headers http.Header, websocketOptions *WebsocketOptions, dialer *net.Dialer) (net.Conn, error) {
|
||||
switch uri.Scheme {
|
||||
case "ws":
|
||||
conn, err := NewWebsocket(uri.String(), nil, timeout, headers, websocketOptions)
|
||||
dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
|
||||
dialURI.User = nil
|
||||
conn, err := NewWebsocket(dialURI.String(), nil, timeout, headers, websocketOptions)
|
||||
return conn, err
|
||||
case "wss":
|
||||
conn, err := NewWebsocket(uri.String(), tlsc, timeout, headers, websocketOptions)
|
||||
dialURI := *uri // #623 - Gorilla Websockets does not accept URL's where uri.User != nil
|
||||
dialURI.User = nil
|
||||
conn, err := NewWebsocket(dialURI.String(), tlsc, timeout, headers, websocketOptions)
|
||||
return conn, err
|
||||
case "mqtt", "tcp":
|
||||
allProxy := os.Getenv("all_proxy")
|
||||
|
||||
8
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
8
vendor/github.com/eclipse/paho.mqtt.golang/ping.go
generated
vendored
@@ -32,16 +32,16 @@ import (
|
||||
func keepalive(c *client, conn io.Writer) {
|
||||
defer c.workers.Done()
|
||||
DEBUG.Println(PNG, "keepalive starting")
|
||||
var checkInterval int64
|
||||
var checkInterval time.Duration
|
||||
var pingSent time.Time
|
||||
|
||||
if c.options.KeepAlive > 10 {
|
||||
checkInterval = 5
|
||||
checkInterval = 5 * time.Second
|
||||
} else {
|
||||
checkInterval = c.options.KeepAlive / 2
|
||||
checkInterval = time.Duration(c.options.KeepAlive) * time.Second / 2
|
||||
}
|
||||
|
||||
intervalTicker := time.NewTicker(time.Duration(checkInterval * int64(time.Second)))
|
||||
intervalTicker := time.NewTicker(checkInterval)
|
||||
defer intervalTicker.Stop()
|
||||
|
||||
for {
|
||||
|
||||
Reference in New Issue
Block a user