Browse Source

Merge pull request #707 from gravitl/feature_v0.10.0_mq_settings

Feature v0.10.0 mq settings
dcarns 3 years ago
parent
commit
03f37b14f1
6 changed files with 134 additions and 191 deletions
  1. 2 27
      main.go
  2. 53 38
      mq/mq.go
  3. 1 1
      mq/util.go
  4. 78 123
      netclient/functions/daemon.go
  5. BIN
      netclient/main
  6. 0 2
      netclient/wireguard/mac.go

+ 2 - 27
main.go

@@ -11,7 +11,6 @@ import (
 	"sync"
 	"syscall"
 
-	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/auth"
 	controller "github.com/gravitl/netmaker/controllers"
 	"github.com/gravitl/netmaker/database"
@@ -185,32 +184,8 @@ func runGRPC(wg *sync.WaitGroup) {
 // Should we be using a context vice a waitgroup????????????
 func runMessageQueue(wg *sync.WaitGroup) {
 	defer wg.Done()
-	//refactor netclient.functions.SetupMQTT so can be called from here
-	//setupMQTT
-	opts := mqtt.NewClientOptions()
-	opts.AddBroker(servercfg.GetMessageQueueEndpoint())
-	logger.Log(0, "setting broker "+servercfg.GetMessageQueueEndpoint())
-	client := mqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		logger.Log(0, "unable to connect to message queue broker, closing down")
-		return
-	}
-	//Set up Subscriptions
-	if servercfg.GetDebug() {
-		if token := client.Subscribe("#", 2, mq.DefaultHandler); token.Wait() && token.Error() != nil {
-			client.Disconnect(240)
-			logger.Log(0, "default subscription failed")
-		}
-	}
-	if token := client.Subscribe("ping/#", 2, mq.Ping); token.Wait() && token.Error() != nil {
-		client.Disconnect(240)
-		logger.Log(0, "ping subscription failed")
-	}
-	if token := client.Subscribe("update/#", 0, mq.UpdateNode); token.Wait() && token.Error() != nil {
-		client.Disconnect(240)
-		logger.Log(0, "node update subscription failed")
-	}
-	//Set Up Keepalive message
+	logger.Log(0, fmt.Sprintf("connecting to mq broker at %s", servercfg.GetMessageQueueEndpoint()))
+	var client = mq.SetupMQTT(false) // Set up the subscription listener
 	ctx, cancel := context.WithCancel(context.Background())
 	go mq.Keepalive(ctx)
 	quit := make(chan os.Signal, 1)

+ 53 - 38
mq/mq.go

@@ -16,19 +16,20 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
-	"github.com/gravitl/netmaker/serverctl"
 )
 
+// KEEPALIVE_TIMEOUT - time in seconds for timeout
 const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
+// MQ_DISCONNECT - disconnects MQ
 const MQ_DISCONNECT = 250
 
 // DefaultHandler default message queue handler - only called when GetDebug == true
-var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
 }
 
 // Ping message Handler -- handles ping topic from client nodes
-var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+func Ping(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "Ping Handler: ", msg.Topic())
 	go func() {
 		id, err := GetID(msg.Topic())
@@ -64,7 +65,7 @@ var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 }
 
 // UpdateNode  message Handler -- handles updates from client nodes
-var UpdateNode mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 	go func() {
 		id, err := GetID(msg.Topic())
 		if err != nil {
@@ -189,12 +190,37 @@ func NodeUpdate(node *models.Node) error {
 }
 
 // SetupMQTT creates a connection to broker and return client
-func SetupMQTT() mqtt.Client {
+func SetupMQTT(publish bool) mqtt.Client {
 	opts := mqtt.NewClientOptions()
-	broker := servercfg.GetMessageQueueEndpoint()
-	opts.AddBroker(broker)
+	opts.AddBroker(servercfg.GetMessageQueueEndpoint())
 	id := ncutils.MakeRandomString(23)
 	opts.ClientID = id
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute)
+	opts.SetWriteTimeout(time.Minute)
+	opts.SetOnConnectHandler(func(client mqtt.Client) {
+		if !publish {
+			if servercfg.GetDebug() {
+				if token := client.Subscribe("#", 2, mqtt.MessageHandler(DefaultHandler)); token.Wait() && token.Error() != nil {
+					client.Disconnect(240)
+					logger.Log(0, "default subscription failed")
+				}
+			}
+			if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.Wait() && token.Error() != nil {
+				client.Disconnect(240)
+				logger.Log(0, "ping subscription failed")
+			}
+			if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.Wait() && token.Error() != nil {
+				client.Disconnect(240)
+				logger.Log(0, "node update subscription failed")
+			}
+
+			opts.SetOrderMatters(true)
+			opts.SetResumeSubs(true)
+		}
+	})
 	client := mqtt.NewClient(opts)
 	tperiod := time.Now().Add(10 * time.Second)
 	for {
@@ -208,7 +234,6 @@ func SetupMQTT() mqtt.Client {
 		}
 		time.Sleep(2 * time.Second)
 	}
-	logger.Log(2, "connected to message queue", broker)
 	return client
 }
 
@@ -219,7 +244,6 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
-			client := SetupMQTT()
 			networks, err := logic.GetNetworks()
 			if err != nil {
 				logger.Log(1, "error retrieving networks for keepalive", err.Error())
@@ -230,43 +254,34 @@ func Keepalive(ctx context.Context) {
 					serverNode.SetLastCheckIn()
 					logic.UpdateNode(&serverNode, &serverNode)
 					if network.DefaultUDPHolePunch == "yes" {
-						logic.ShouldPublishPeerPorts(&serverNode)
-					}
-					err = PublishPeerUpdate(&serverNode)
-					if err != nil {
-						logger.Log(1, "error publishing udp port updates for network", network.NetID)
-						logger.Log(1, errN.Error())
+						if logic.ShouldPublishPeerPorts(&serverNode) {
+							err = PublishPeerUpdate(&serverNode)
+							if err != nil {
+								logger.Log(1, "error publishing udp port updates for network", network.NetID)
+								logger.Log(1, errN.Error())
+							}
+						}
 					}
 				} else {
 					logger.Log(1, "unable to retrieve leader for network ", network.NetID)
 					logger.Log(1, errN.Error())
 					continue
 				}
-				if serverNode.Address == "" {
-					logger.Log(1, "leader not defined for network ", network.NetID)
-					continue
-				}
-				publishServerKeepalive(client, &network)
-				err = serverctl.SyncServerNetwork(network.NetID)
-				if err != nil {
-					logger.Log(1, "error syncing server network", err.Error())
-				}
 			}
-			client.Disconnect(MQ_DISCONNECT)
 		}
 	}
 }
 
-func publishServerKeepalive(client mqtt.Client, network *models.Network) {
-	nodes, err := logic.GetNetworkNodes(network.NetID)
-	if err != nil {
-		return
-	}
-	for _, node := range nodes {
-		if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
-			logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
-		} else {
-			logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID)
-		}
-	}
-}
+// func publishServerKeepalive(client mqtt.Client, network *models.Network) {
+// 	nodes, err := logic.GetNetworkNodes(network.NetID)
+// 	if err != nil {
+// 		return
+// 	}
+// 	for _, node := range nodes {
+// 		if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
+// 			logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
+// 		} else {
+// 			logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID)
+// 		}
+// 	}
+// }

+ 1 - 1
mq/util.go

@@ -50,7 +50,7 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
 }
 
 func publish(node *models.Node, dest string, msg []byte) error {
-	client := SetupMQTT()
+	client := SetupMQTT(true)
 	defer client.Disconnect(250)
 	encrypted, encryptErr := encryptMsg(node, msg)
 	if encryptErr != nil {

+ 78 - 123
netclient/functions/daemon.go

@@ -26,7 +26,7 @@ import (
 )
 
 // == Message Caches ==
-var keepalive = new(sync.Map)
+// var keepalive = new(sync.Map)
 var messageCache = new(sync.Map)
 var networkcontext = new(sync.Map)
 
@@ -89,13 +89,53 @@ func Daemon() error {
 }
 
 // SetupMQTT creates a connection to broker and return client
-func SetupMQTT(cfg *config.ClientConfig) mqtt.Client {
+func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
 	opts := mqtt.NewClientOptions()
 	server := getServerAddress(cfg)
 	opts.AddBroker(server + ":1883")
 	id := ncutils.MakeRandomString(23)
 	opts.ClientID = id
 	opts.SetDefaultPublishHandler(All)
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute >> 1)
+	opts.SetWriteTimeout(time.Minute)
+	opts.SetOnConnectHandler(func(client mqtt.Client) {
+		if !publish {
+			if cfg.DebugOn {
+				if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
+					ncutils.Log(token.Error().Error())
+					return
+				}
+				ncutils.Log("subscribed to all topics for debugging purposes")
+			}
+			if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			opts.SetOrderMatters(true)
+			opts.SetResumeSubs(true)
+		}
+	})
+	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
+		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
+		_, err := Pull(cfg.Node.Network, true)
+		if err != nil {
+			ncutils.Log("could not run pull, please restart daemon or examine network connectivity --- " + err.Error())
+		}
+	})
+
 	client := mqtt.NewClient(opts)
 	tperiod := time.Now().Add(12 * time.Second)
 	for {
@@ -161,56 +201,14 @@ func MessageQueue(ctx context.Context, network string) {
 	time.Sleep(time.Second << 1)
 	cfg.ReadConfig()
 	ncutils.Log("daemon started for network: " + network)
-	client := SetupMQTT(&cfg)
-	if cfg.DebugOn {
-		if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
-			ncutils.Log(token.Error().Error())
-			return
-		}
-		ncutils.Log("subscribed to all topics for debugging purposes")
-	}
-	if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
-		ncutils.Log(token.Error().Error())
-		return
-	}
-	if cfg.DebugOn {
-		ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-	}
-	if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
-		ncutils.Log(token.Error().Error())
-		return
-	}
-	if cfg.DebugOn {
-		ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-	}
-	var found bool
-	for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
-		if !server.IsLeader {
-			continue
-		}
-		if server.Address != "" {
-			if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			found = true
-			if cfg.DebugOn {
-				ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
-			}
-		}
-	}
-	if !found {
-		ncutils.Log("leader not defined for network " + cfg.Node.Network)
-	}
+	client := SetupMQTT(&cfg, false)
+
 	defer client.Disconnect(250)
 	wg := &sync.WaitGroup{}
 	wg.Add(2)
-	keepalivectx, keepalivecancel := context.WithCancel(context.Background())
-	go MonitorKeepalive(keepalivectx, wg, client, &cfg)
 	checkinctx, checkincancel := context.WithCancel(context.Background())
 	go Checkin(checkinctx, wg, &cfg, network)
 	<-ctx.Done()
-	keepalivecancel()
 	checkincancel()
 	ncutils.Log("shutting down message queue for network " + network)
 	wg.Wait()
@@ -313,10 +311,10 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 			return
 		}
 		time.Sleep(time.Second >> 1)
-		if err = Resubscribe(client, &cfg); err != nil {
-			ncutils.Log("error resubscribing after interface change " + err.Error())
-			return
-		}
+		// if err = Resubscribe(client, &cfg); err != nil {
+		// 	ncutils.Log("error resubscribing after interface change " + err.Error())
+		// 	return
+		// }
 		if newNode.DNSOn == "yes" {
 			for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
 				if server.IsLeader {
@@ -376,80 +374,37 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
 }
 
 // MonitorKeepalive - checks time last server keepalive received.  If more than 3+ minutes, notify and resubscribe
-func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) {
-	defer wg.Done()
-	for {
-		select {
-		case <-ctx.Done():
-			ncutils.Log("cancel recieved, monitor keepalive exiting")
-			return
-		case <-time.After(time.Second * 150):
-			var keepalivetime time.Time
-			keepaliveval, ok := keepalive.Load(cfg.Node.Network)
-			if ok {
-				keepalivetime = keepaliveval.(time.Time)
-				if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
-					ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
-					err := Resubscribe(client, cfg)
-					if err != nil {
-						ncutils.Log("closing " + err.Error())
-					}
-				}
-			}
-		}
-	}
-}
+// func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) {
+// 	defer wg.Done()
+// 	for {
+// 		select {
+// 		case <-ctx.Done():
+// 			ncutils.Log("cancel recieved, monitor keepalive exiting")
+// 			return
+// 		case <-time.After(time.Second * 150):
+// 			var keepalivetime time.Time
+// 			keepaliveval, ok := keepalive.Load(cfg.Node.Network)
+// 			if ok {
+// 				keepalivetime = keepaliveval.(time.Time)
+// 				if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
+// 					// ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
+// 					// err := Resubscribe(client, cfg)
+// 					// if err != nil {
+// 					// 	ncutils.Log("closing " + err.Error())
+// 					// }
+// 					ncutils.Log("maybe wanna call something")
+// 				}
+// 			}
+// 		}
+// 	}
+// }
 
 // ServerKeepAlive -- handler to react to keepalive messages published by server
-func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
-	var currentTime = time.Now()
-	keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime)
-	ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
-}
-
-// Resubscribe --- handles resubscribing if needed
-func Resubscribe(client mqtt.Client, cfg *config.ClientConfig) error {
-	if err := config.ModConfig(&cfg.Node); err == nil {
-		ncutils.Log("resubbing on network " + cfg.Node.Network)
-		client.Disconnect(250)
-		client = SetupMQTT(cfg)
-		if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, NodeUpdate); token.Wait() && token.Error() != nil {
-			ncutils.Log("error resubscribing to updates for " + cfg.Node.Network)
-			return token.Error()
-		}
-		if cfg.DebugOn {
-			ncutils.Log("subscribed to node updates for node " + cfg.Node.Name + " update/" + cfg.Node.ID)
-		}
-		if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, UpdatePeers); token.Wait() && token.Error() != nil {
-			ncutils.Log("error resubscribing to peers for " + cfg.Node.Network)
-			return token.Error()
-		}
-		var found bool
-		for _, server := range cfg.NetworkSettings.DefaultServerAddrs {
-			if !server.IsLeader {
-				continue
-			}
-			if server.Address != "" {
-				if token := client.Subscribe(fmt.Sprintf("serverkeepalive/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(ServerKeepAlive)); token.Wait() && token.Error() != nil {
-					ncutils.Log("error resubscribing to serverkeepalive for " + cfg.Node.Network)
-					return token.Error()
-				}
-				found = true
-				if cfg.DebugOn {
-					ncutils.Log("subscribed to server keepalives for server " + cfg.Node.Network)
-				}
-			}
-		}
-		if !found {
-			ncutils.Log("leader not defined for network " + cfg.Node.Network)
-		}
-		ncutils.Log("finished re subbing")
-		return nil
-	} else {
-		ncutils.Log("could not mod config when re-subbing")
-		return err
-	}
-}
+// func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
+// 	var currentTime = time.Now()
+// 	keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime)
+// 	ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
+// }
 
 // UpdateKeys -- updates private key and returns new publickey
 func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
@@ -577,7 +532,7 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
 		return err
 	}
 
-	client := SetupMQTT(cfg)
+	client := SetupMQTT(cfg, true)
 	defer client.Disconnect(250)
 	encrypted, err := ncutils.BoxEncrypt(msg, serverPubKey, trafficPrivKey)
 	if err != nil {

BIN
netclient/main


+ 0 - 2
netclient/wireguard/mac.go

@@ -3,7 +3,6 @@ package wireguard
 import (
 	"bufio"
 	"errors"
-	"log"
 	"os"
 	"strconv"
 	"strings"
@@ -28,7 +27,6 @@ func WgQuickDownMac(node *models.Node, iface string) error {
 // RemoveConfMac - bring down mac interface and remove routes
 func RemoveConfMac(iface string) error {
 	realIface, err := getRealIface(iface)
-	log.Println("DELETE ME: attempting to remove " + realIface)
 	if realIface != "" {
 		err = deleteInterface(iface, realIface)
 	}