Browse Source

cleaned up daemon and server mq

0xdcarns 3 years ago
parent
commit
732665c3d4
6 changed files with 63 additions and 74 deletions
  1. 3 27
      main.go
  2. 53 38
      mq/mq.go
  3. 1 1
      mq/util.go
  4. 6 6
      netclient/functions/daemon.go
  5. BIN
      netclient/main
  6. 0 2
      netclient/wireguard/mac.go

+ 3 - 27
main.go

@@ -11,7 +11,6 @@ import (
 	"sync"
 	"syscall"
 
-	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/gravitl/netmaker/auth"
 	controller "github.com/gravitl/netmaker/controllers"
 	"github.com/gravitl/netmaker/database"
@@ -185,32 +184,9 @@ func runGRPC(wg *sync.WaitGroup) {
 // Should we be using a context vice a waitgroup????????????
 func runMessageQueue(wg *sync.WaitGroup) {
 	defer wg.Done()
-	//refactor netclient.functions.SetupMQTT so can be called from here
-	//setupMQTT
-	opts := mqtt.NewClientOptions()
-	opts.AddBroker(servercfg.GetMessageQueueEndpoint())
-	logger.Log(0, "setting broker "+servercfg.GetMessageQueueEndpoint())
-	client := mqtt.NewClient(opts)
-	if token := client.Connect(); token.Wait() && token.Error() != nil {
-		logger.Log(0, "unable to connect to message queue broker, closing down")
-		return
-	}
-	//Set up Subscriptions
-	if servercfg.GetDebug() {
-		if token := client.Subscribe("#", 2, mq.DefaultHandler); token.Wait() && token.Error() != nil {
-			client.Disconnect(240)
-			logger.Log(0, "default subscription failed")
-		}
-	}
-	if token := client.Subscribe("ping/#", 2, mq.Ping); token.Wait() && token.Error() != nil {
-		client.Disconnect(240)
-		logger.Log(0, "ping subscription failed")
-	}
-	if token := client.Subscribe("update/#", 0, mq.UpdateNode); token.Wait() && token.Error() != nil {
-		client.Disconnect(240)
-		logger.Log(0, "node update subscription failed")
-	}
-	//Set Up Keepalive message
+	logger.Log(0, fmt.Sprintf("connecting to mq broker at %s", servercfg.GetMessageQueueEndpoint()))
+	var client = mq.SetupMQTT(false)
+	// Set Up Keepalive message
 	ctx, cancel := context.WithCancel(context.Background())
 	go mq.Keepalive(ctx)
 	quit := make(chan os.Signal, 1)

+ 53 - 38
mq/mq.go

@@ -16,19 +16,20 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
-	"github.com/gravitl/netmaker/serverctl"
 )
 
+// KEEPALIVE_TIMEOUT - time in seconds for timeout
 const KEEPALIVE_TIMEOUT = 60 //timeout in seconds
+// MQ_DISCONNECT - disconnects MQ
 const MQ_DISCONNECT = 250
 
 // DefaultHandler default message queue handler - only called when GetDebug == true
-var DefaultHandler mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
 }
 
 // Ping message Handler -- handles ping topic from client nodes
-var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+func Ping(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "Ping Handler: ", msg.Topic())
 	go func() {
 		id, err := GetID(msg.Topic())
@@ -64,7 +65,7 @@ var Ping mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
 }
 
 // UpdateNode  message Handler -- handles updates from client nodes
-var UpdateNode mqtt.MessageHandler = func(client mqtt.Client, msg mqtt.Message) {
+func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 	go func() {
 		id, err := GetID(msg.Topic())
 		if err != nil {
@@ -189,12 +190,37 @@ func NodeUpdate(node *models.Node) error {
 }
 
 // SetupMQTT creates a connection to broker and return client
-func SetupMQTT() mqtt.Client {
+func SetupMQTT(publish bool) mqtt.Client {
 	opts := mqtt.NewClientOptions()
-	broker := servercfg.GetMessageQueueEndpoint()
-	opts.AddBroker(broker)
+	opts.AddBroker(servercfg.GetMessageQueueEndpoint())
 	id := ncutils.MakeRandomString(23)
 	opts.ClientID = id
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute)
+	opts.SetWriteTimeout(time.Minute)
+	opts.SetOnConnectHandler(func(client mqtt.Client) {
+		if !publish {
+			if servercfg.GetDebug() {
+				if token := client.Subscribe("#", 2, mqtt.MessageHandler(DefaultHandler)); token.Wait() && token.Error() != nil {
+					client.Disconnect(240)
+					logger.Log(0, "default subscription failed")
+				}
+			}
+			if token := client.Subscribe("ping/#", 2, mqtt.MessageHandler(Ping)); token.Wait() && token.Error() != nil {
+				client.Disconnect(240)
+				logger.Log(0, "ping subscription failed")
+			}
+			if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.Wait() && token.Error() != nil {
+				client.Disconnect(240)
+				logger.Log(0, "node update subscription failed")
+			}
+
+			opts.SetOrderMatters(true)
+			opts.SetResumeSubs(true)
+		}
+	})
 	client := mqtt.NewClient(opts)
 	tperiod := time.Now().Add(10 * time.Second)
 	for {
@@ -208,7 +234,6 @@ func SetupMQTT() mqtt.Client {
 		}
 		time.Sleep(2 * time.Second)
 	}
-	logger.Log(2, "connected to message queue", broker)
 	return client
 }
 
@@ -219,7 +244,6 @@ func Keepalive(ctx context.Context) {
 		case <-ctx.Done():
 			return
 		case <-time.After(time.Second * KEEPALIVE_TIMEOUT):
-			client := SetupMQTT()
 			networks, err := logic.GetNetworks()
 			if err != nil {
 				logger.Log(1, "error retrieving networks for keepalive", err.Error())
@@ -230,43 +254,34 @@ func Keepalive(ctx context.Context) {
 					serverNode.SetLastCheckIn()
 					logic.UpdateNode(&serverNode, &serverNode)
 					if network.DefaultUDPHolePunch == "yes" {
-						logic.ShouldPublishPeerPorts(&serverNode)
-					}
-					err = PublishPeerUpdate(&serverNode)
-					if err != nil {
-						logger.Log(1, "error publishing udp port updates for network", network.NetID)
-						logger.Log(1, errN.Error())
+						if logic.ShouldPublishPeerPorts(&serverNode) {
+							err = PublishPeerUpdate(&serverNode)
+							if err != nil {
+								logger.Log(1, "error publishing udp port updates for network", network.NetID)
+								logger.Log(1, errN.Error())
+							}
+						}
 					}
 				} else {
 					logger.Log(1, "unable to retrieve leader for network ", network.NetID)
 					logger.Log(1, errN.Error())
 					continue
 				}
-				if serverNode.Address == "" {
-					logger.Log(1, "leader not defined for network ", network.NetID)
-					continue
-				}
-				publishServerKeepalive(client, &network)
-				err = serverctl.SyncServerNetwork(network.NetID)
-				if err != nil {
-					logger.Log(1, "error syncing server network", err.Error())
-				}
 			}
-			client.Disconnect(MQ_DISCONNECT)
 		}
 	}
 }
 
-func publishServerKeepalive(client mqtt.Client, network *models.Network) {
-	nodes, err := logic.GetNetworkNodes(network.NetID)
-	if err != nil {
-		return
-	}
-	for _, node := range nodes {
-		if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
-			logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
-		} else {
-			logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID)
-		}
-	}
-}
+// func publishServerKeepalive(client mqtt.Client, network *models.Network) {
+// 	nodes, err := logic.GetNetworkNodes(network.NetID)
+// 	if err != nil {
+// 		return
+// 	}
+// 	for _, node := range nodes {
+// 		if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
+// 			logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
+// 		} else {
+// 			logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID)
+// 		}
+// 	}
+// }

+ 1 - 1
mq/util.go

@@ -50,7 +50,7 @@ func encryptMsg(node *models.Node, msg []byte) ([]byte, error) {
 }
 
 func publish(node *models.Node, dest string, msg []byte) error {
-	client := SetupMQTT()
+	client := SetupMQTT(true)
 	defer client.Disconnect(250)
 	encrypted, encryptErr := encryptMsg(node, msg)
 	if encryptErr != nil {

+ 6 - 6
netclient/functions/daemon.go

@@ -26,7 +26,7 @@ import (
 )
 
 // == Message Caches ==
-var keepalive = new(sync.Map)
+// var keepalive = new(sync.Map)
 var messageCache = new(sync.Map)
 var networkcontext = new(sync.Map)
 
@@ -400,11 +400,11 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
 // }
 
 // ServerKeepAlive -- handler to react to keepalive messages published by server
-func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
-	var currentTime = time.Now()
-	keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime)
-	ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
-}
+// func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
+// 	var currentTime = time.Now()
+// 	keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime)
+// 	ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
+// }
 
 // UpdateKeys -- updates private key and returns new publickey
 func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {

BIN
netclient/main


+ 0 - 2
netclient/wireguard/mac.go

@@ -3,7 +3,6 @@ package wireguard
 import (
 	"bufio"
 	"errors"
-	"log"
 	"os"
 	"strconv"
 	"strings"
@@ -28,7 +27,6 @@ func WgQuickDownMac(node *models.Node, iface string) error {
 // RemoveConfMac - bring down mac interface and remove routes
 func RemoveConfMac(iface string) error {
 	realIface, err := getRealIface(iface)
-	log.Println("DELETE ME: attempting to remove " + realIface)
 	if realIface != "" {
 		err = deleteInterface(iface, realIface)
 	}