Prechádzať zdrojové kódy

Merge branch 'NET-1440' into NET-1440-batchpeerupdate

Max Ma 1 rok pred
rodič
commit
b0a2dee098
4 zmenil súbory, kde vykonal 7 pridanie a 9 odobranie
  1. 1 0
      mq/emqx_on_prem.go
  2. 3 1
      mq/mq.go
  3. 0 6
      mq/publishers.go
  4. 3 2
      mq/util.go

+ 1 - 0
mq/emqx_on_prem.go

@@ -63,6 +63,7 @@ func getEmqxAuthToken() (string, error) {
 	if err != nil {
 		return "", err
 	}
+	defer resp.Body.Close()
 	msg, err := io.ReadAll(resp.Body)
 	if err != nil {
 		return "", err

+ 3 - 1
mq/mq.go

@@ -36,7 +36,6 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 	opts.SetCleanSession(true)
 	opts.SetConnectRetryInterval(time.Second * 1)
 	opts.SetKeepAlive(time.Second * 10)
-	opts.SetCleanSession(true)
 	opts.SetWriteTimeout(time.Minute)
 }
 
@@ -131,6 +130,9 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
+			if mqclient == nil || !mqclient.IsConnectionOpen() {
+				SetupMQTT(false)
+			}
 			sendPeers()
 		}
 	}

+ 0 - 6
mq/publishers.go

@@ -261,12 +261,6 @@ func PushMetricsToExporter(metrics models.Metrics) error {
 
 // sendPeers - retrieve networks, send peer ports to all peers
 func sendPeers() {
-
-	// hosts, err := logic.GetAllHosts()
-	// if err != nil && len(hosts) > 0 {
-	// 	logger.Log(1, "error retrieving networks for keepalive", err.Error())
-	// }
-
 	peer_force_send++
 	if peer_force_send == 5 {
 		servercfg.SetHost()

+ 3 - 2
mq/util.go

@@ -4,6 +4,7 @@ import (
 	"errors"
 	"fmt"
 	"strings"
+	"time"
 
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
@@ -78,11 +79,11 @@ func publish(host *models.Host, dest string, msg []byte) error {
 	if encryptErr != nil {
 		return encryptErr
 	}
-	if mqclient == nil || !mqclient.IsConnected() {
+	if mqclient == nil || !mqclient.IsConnectionOpen() {
 		return errors.New("cannot publish ... mqclient not connected")
 	}
 
-	if token := mqclient.Publish(dest, 0, true, encrypted); token.Wait() && token.Error() != nil {
+	if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
 		var err error
 		if token.Error() == nil {
 			err = errors.New("connection timeout")