Browse Source

use token.WaitTimeout()

Matthew R. Kasun 3 years ago
parent
commit
976ab07de1
3 changed files with 41 additions and 13 deletions
  1. 12 5
      mq/mq.go
  2. 10 2
      mq/util.go
  3. 19 6
      netclient/functions/daemon.go

+ 12 - 5
mq/mq.go

@@ -16,6 +16,9 @@ const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
 // MQ_DISCONNECT - disconnects MQ
 const MQ_DISCONNECT = 250
 
+// MQ_TIMEOUT - timeout for MQ
+const MQ_TIMEOUT = 30
+
 var peer_force_send = 0
 
 // SetupMQTT creates a connection to broker and return client
@@ -31,15 +34,15 @@ func SetupMQTT(publish bool) mqtt.Client {
 	opts.SetWriteTimeout(time.Minute)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {
 		if !publish {
-			if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.Wait() && token.Error() != nil {
+			if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 				client.Disconnect(240)
 				logger.Log(0, "ping subscription failed")
 			}
-			if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.Wait() && token.Error() != nil {
+			if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 				client.Disconnect(240)
 				logger.Log(0, "node update subscription failed")
 			}
-			if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil {
+			if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 				client.Disconnect(240)
 				logger.Log(0, "node client subscription failed")
 			}
@@ -51,10 +54,14 @@ func SetupMQTT(publish bool) mqtt.Client {
 	client := mqtt.NewClient(opts)
 	tperiod := time.Now().Add(10 * time.Second)
 	for {
-		if token := client.Connect(); token.Wait() && token.Error() != nil {
+		if token := client.Connect(); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 			logger.Log(2, "unable to connect to broker, retrying ...")
 			if time.Now().After(tperiod) {
-				log.Fatal(0, "could not connect to broker, exiting ...", token.Error())
+				if token.Error() == nil {
+					log.Fatal(0, "could not connect to broker, token timeout, exiting ...")
+				} else {
+					log.Fatal(0, "could not connect to broker, exiting ...", token.Error())
+				}
 			}
 		} else {
 			break

+ 10 - 2
mq/util.go

@@ -1,8 +1,10 @@
 package mq
 
 import (
+	"errors"
 	"fmt"
 	"strings"
+	"time"
 
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
@@ -65,8 +67,14 @@ func publish(node *models.Node, dest string, msg []byte) error {
 	if encryptErr != nil {
 		return encryptErr
 	}
-	if token := client.Publish(dest, 0, true, encrypted); token.Wait() && token.Error() != nil {
-		return token.Error()
+	if token := client.Publish(dest, 0, true, encrypted); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
+		var err error
+		if token.Error() == nil {
+			err = errors.New("connection timeout")
+		} else {
+			err = token.Error()
+		}
+		return err
 	}
 	return nil
 }

+ 19 - 6
netclient/functions/daemon.go

@@ -18,6 +18,7 @@ import (
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/netclient/auth"
 	"github.com/gravitl/netmaker/netclient/config"
 	"github.com/gravitl/netmaker/netclient/daemon"
@@ -120,8 +121,12 @@ func UpdateKeys(nodeCfg *config.ClientConfig, client mqtt.Client) error {
 // sets MQ client subscriptions for a specific node config
 // should be called for each node belonging to a given server
 func setSubscriptions(client mqtt.Client, nodeCfg *config.ClientConfig) {
-	if token := client.Subscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
-		logger.Log(0, token.Error().Error())
+	if token := client.Subscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.WaitTimeout(mq.MQ_TIMEOUT*time.Second) && token.Error() != nil {
+		if token.Error() == nil {
+			logger.Log(0, "connection timeout")
+		} else {
+			logger.Log(0, token.Error().Error())
+		}
 		return
 	}
 	logger.Log(3, fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", nodeCfg.Node.Name, nodeCfg.Node.Network, nodeCfg.Node.ID))
@@ -137,12 +142,20 @@ func setSubscriptions(client mqtt.Client, nodeCfg *config.ClientConfig) {
 func unsubscribeNode(client mqtt.Client, nodeCfg *config.ClientConfig) {
 	client.Unsubscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID))
 	var ok = true
-	if token := client.Unsubscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID)); token.Wait() && token.Error() != nil {
-		logger.Log(1, "unable to unsubscribe from updates for node ", nodeCfg.Node.Name, "\n", token.Error().Error())
+	if token := client.Unsubscribe(fmt.Sprintf("update/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID)); token.WaitTimeout(mq.MQ_TIMEOUT*time.Second) && token.Error() != nil {
+		if token.Error() == nil {
+			logger.Log(1, "unable to unsubscribe from updates for node ", nodeCfg.Node.Name, "\n", "connection timeout")
+		} else {
+			logger.Log(1, "unable to unsubscribe from updates for node ", nodeCfg.Node.Name, "\n", token.Error().Error())
+		}
 		ok = false
 	}
-	if token := client.Unsubscribe(fmt.Sprintf("peers/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID)); token.Wait() && token.Error() != nil {
-		logger.Log(1, "unable to unsubscribe from peer updates for node ", nodeCfg.Node.Name, "\n", token.Error().Error())
+	if token := client.Unsubscribe(fmt.Sprintf("peers/%s/%s", nodeCfg.Node.Network, nodeCfg.Node.ID)); token.WaitTimeout(mq.MQ_TIMEOUT*time.Second) && token.Error() != nil {
+		if token.Error() == nil {
+			logger.Log(1, "unable to unsubscribe from peer updates for node ", nodeCfg.Node.Name, "\n", "connection timeout")
+		} else {
+			logger.Log(1, "unable to unsubscribe from peer updates for node ", nodeCfg.Node.Name, "\n", token.Error().Error())
+		}
 		ok = false
 	}
 	if ok {