Ver Fonte

mq connection code change

Max Ma há 1 ano atrás
pai
commit
61a129d791
3 ficheiros alterados com 5 adições e 9 exclusões
  1. 2 5
      mq/mq.go
  2. 1 1
      mq/publishers.go
  3. 2 3
      mq/util.go

+ 2 - 5
mq/mq.go

@@ -92,9 +92,9 @@ func SetupMQTT(fatal bool) {
 	})
 	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
 		slog.Warn("detected broker connection lost", "err", e.Error())
-		//c.Disconnect(250)
+		c.Disconnect(250)
 		slog.Info("re-initiating MQ connection")
-		//SetupMQTT(false)
+		SetupMQTT(false)
 
 	})
 	mqclient = mqtt.NewClient(opts)
@@ -131,9 +131,6 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
-			if mqclient == nil || !mqclient.IsConnectionOpen() {
-				SetupMQTT(false)
-			}
 			serverStatusUpdate()
 			sendPeers()
 		}

+ 1 - 1
mq/publishers.go

@@ -28,7 +28,7 @@ type ServerStatus struct {
 
 var serverStatusCache = ServerStatus{}
 
-const batchSize = 20
+const batchSize = 50
 
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {

+ 2 - 3
mq/util.go

@@ -4,7 +4,6 @@ import (
 	"errors"
 	"fmt"
 	"strings"
-	"time"
 
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
@@ -79,11 +78,11 @@ func publish(host *models.Host, dest string, msg []byte) error {
 	if encryptErr != nil {
 		return encryptErr
 	}
-	if mqclient == nil || !mqclient.IsConnectionOpen() {
+	if mqclient == nil || !mqclient.IsConnected() {
 		return errors.New("cannot publish ... mqclient not connected")
 	}
 
-	if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
+	if token := mqclient.Publish(dest, 0, true, encrypted); token.Wait() && token.Error() != nil {
 		var err error
 		if token.Error() == nil {
 			err = errors.New("connection timeout")