Ver Fonte

reset mq connection on timeout

Abhishek Kondur há 1 ano atrás
pai
commit
338c2ce36d
3 ficheiros alterados com 52 adições e 36 exclusões
  1. 37 26
      main.go
  2. 14 10
      mq/mq.go
  3. 1 0
      mq/util.go

+ 37 - 26
main.go

@@ -11,6 +11,7 @@ import (
 	"runtime/debug"
 	"sync"
 	"syscall"
+	"time"
 
 	"github.com/gravitl/netmaker/auth"
 	"github.com/gravitl/netmaker/config"
@@ -28,7 +29,6 @@ import (
 	"golang.org/x/exp/slog"
 )
 
-
 var version = "v0.21.2"
 
 // Start DB Connection and start API Request Handler
@@ -138,11 +138,23 @@ func startControllers(wg *sync.WaitGroup, ctx context.Context) {
 		go controller.HandleRESTRequests(wg, ctx)
 	}
 	//Run MessageQueue
-	if servercfg.IsMessageQueueBackend() {
-		wg.Add(1)
-		go runMessageQueue(wg, ctx)
-	}
-
+	wg.Add(1)
+	go runMessageQueue(wg, ctx)
+	go func() {
+		peerUpdate := make(chan *models.Node)
+		go logic.ManageZombies(ctx, peerUpdate)
+		go logic.DeleteExpiredNodes(ctx, peerUpdate)
+		for {
+			select {
+			case nodeUpdate := <-peerUpdate:
+				if err := mq.NodeUpdate(nodeUpdate); err != nil {
+					logger.Log(0, "failed to send peer update for deleted node: ", nodeUpdate.ID.String(), err.Error())
+				}
+			case <-ctx.Done():
+				return
+			}
+		}
+	}()
 	if !servercfg.IsRestBackend() && !servercfg.IsMessageQueueBackend() {
 		logger.Log(0, "No Server Mode selected, so nothing is being served! Set Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.")
 	}
@@ -154,28 +166,27 @@ func startControllers(wg *sync.WaitGroup, ctx context.Context) {
 // Should we be using a context vice a waitgroup????????????
 func runMessageQueue(wg *sync.WaitGroup, ctx context.Context) {
 	defer wg.Done()
-	brokerHost, _ := servercfg.GetMessageQueueEndpoint()
-	logger.Log(0, "connecting to mq broker at", brokerHost)
-	mq.SetupMQTT()
-	if mq.IsConnected() {
-		logger.Log(0, "connected to MQ Broker")
-	} else {
-		logger.FatalLog("error connecting to MQ Broker")
-	}
-	defer mq.CloseClient()
 	go mq.Keepalive(ctx)
-	go func() {
-		peerUpdate := make(chan *models.Node)
-		go logic.ManageZombies(ctx, peerUpdate)
-		go logic.DeleteExpiredNodes(ctx, peerUpdate)
-		for nodeUpdate := range peerUpdate {
-			if err := mq.NodeUpdate(nodeUpdate); err != nil {
-				logger.Log(0, "failed to send peer update for deleted node: ", nodeUpdate.ID.String(), err.Error())
-			}
+	defer mq.CloseClient()
+	for {
+		brokerHost, _ := servercfg.GetMessageQueueEndpoint()
+		logger.Log(0, "connecting to mq broker at", brokerHost)
+		mq.SetupMQTT()
+		if mq.IsConnected() {
+			logger.Log(0, "connected to MQ Broker")
+		} else {
+			logger.FatalLog("error connecting to MQ Broker")
 		}
-	}()
-	<-ctx.Done()
-	logger.Log(0, "Message Queue shutting down")
+		select {
+		case <-mq.ResetCh:
+			mq.CloseClient()
+			time.Sleep(time.Second * 2)
+			continue
+		case <-ctx.Done():
+			return
+		}
+	}
+
 }
 
 func setVerbosity() {

+ 14 - 10
mq/mq.go

@@ -12,17 +12,21 @@ import (
 	"github.com/gravitl/netmaker/servercfg"
 )
 
-// KEEPALIVE_TIMEOUT - time in seconds for timeout
-const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
-// MQ_DISCONNECT - disconnects MQ
-const MQ_DISCONNECT = 250
-
-// MQ_TIMEOUT - timeout for MQ
-const MQ_TIMEOUT = 30
-
-var peer_force_send = 0
+const (
+	// KEEPALIVE_TIMEOUT - time in seconds for timeout
+	KEEPALIVE_TIMEOUT = 60 //timeout in seconds
+	// MQ_DISCONNECT - disconnects MQ
+	MQ_DISCONNECT = 250
+	// MQ_TIMEOUT - timeout for MQ
+	MQ_TIMEOUT = 30
+)
 
-var mqclient mqtt.Client
+var (
+	peer_force_send = 0
+	mqclient        mqtt.Client
+	// mq channetl to reset mq connection
+	ResetCh = make(chan struct{}, 2)
+)
 
 func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 	broker, _ := servercfg.GetMessageQueueEndpoint()

+ 1 - 0
mq/util.go

@@ -83,6 +83,7 @@ func publish(host *models.Host, dest string, msg []byte) error {
 	if token := mqclient.Publish(dest, 0, true, encrypted); !token.WaitTimeout(MQ_TIMEOUT*time.Second) || token.Error() != nil {
 		var err error
 		if token.Error() == nil {
+			ResetCh <- struct{}{}
 			err = errors.New("connection timeout")
 		} else {
 			err = token.Error()