Browse Source

Merge pull request #788 from gravitl/feature_v0.10.1_client_signals

Feature v0.10.1 client signals
Alex Feiszli 3 years ago
parent
commit
a0f74946b7
10 changed files with 408 additions and 438 deletions
  1. 3 13
      controllers/network.go
  2. 50 28
      controllers/node.go
  3. 2 7
      controllers/node_grpc.go
  4. 2 2
      controllers/relay.go
  5. 0 34
      controllers/server_util.go
  6. 105 0
      mq/handlers.go
  7. 4 223
      mq/mq.go
  8. 125 0
      mq/publishers.go
  9. 11 0
      mq/util.go
  10. 106 131
      netclient/functions/daemon.go

+ 3 - 13
controllers/network.go

@@ -3,7 +3,6 @@ package controller
 import (
 	"encoding/json"
 	"errors"
-	"fmt"
 	"net/http"
 	"strings"
 
@@ -12,7 +11,6 @@ import (
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
-	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
 )
 
@@ -109,17 +107,9 @@ func keyUpdate(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	for _, node := range nodes {
-		fmt.Println("updating node ", node.Name, " for a key update")
-		if err := mq.NodeUpdate(&node); err != nil {
-			logger.Log(2, "failed key update ", node.Name)
-		}
-	}
-	node, err := logic.GetNetworkServerLeader(netname)
-	if err != nil {
-		logger.Log(2, "failed to get server node")
-		return
+		logger.Log(3, "updating node ", node.Name, " for a key update")
+		runUpdates(&node, true)
 	}
-	runUpdates(&node, false, false)
 }
 
 // Update a network
@@ -184,7 +174,7 @@ func updateNetwork(w http.ResponseWriter, r *http.Request) {
 			return
 		}
 		for _, node := range nodes {
-			runUpdates(&node, true, false)
+			runUpdates(&node, true)
 		}
 	}
 

+ 50 - 28
controllers/node.go

@@ -5,7 +5,7 @@ import (
 	"fmt"
 	"net/http"
 	"strings"
-	"time"
+	"sync"
 
 	"github.com/gorilla/mux"
 	"github.com/gravitl/netmaker/database"
@@ -396,8 +396,8 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 	validKey := logic.IsKeyValid(networkName, node.AccessKey)
 
 	if !validKey {
-		//Check to see if network will allow manual sign up
-		//may want to switch this up with the valid key check and avoid a DB call that way.
+		// Check to see if network will allow manual sign up
+		// may want to switch this up with the valid key check and avoid a DB call that way.
 		if network.AllowManualSignUp == "yes" {
 			node.IsPending = "yes"
 		} else {
@@ -418,12 +418,10 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "created new node", node.Name, "on network", node.Network)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
-
-	runUpdates(&node, false, false)
 }
 
-//Takes node out of pending state
-//TODO: May want to use cordon/uncordon terminology instead of "ispending".
+// Takes node out of pending state
+// TODO: May want to use cordon/uncordon terminology instead of "ispending".
 func uncordonNode(w http.ResponseWriter, r *http.Request) {
 	var params = mux.Vars(r)
 	w.Header().Set("Content-Type", "application/json")
@@ -437,9 +435,11 @@ func uncordonNode(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode("SUCCESS")
 
-	runUpdates(&node, true, false)
+	runUpdates(&node, false)
 }
 
+// == EGRESS ==
+
 func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 	var gateway models.EgressGatewayRequest
 	var params = mux.Vars(r)
@@ -461,7 +461,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
 
-	runUpdates(&node, true, false)
+	runUpdates(&node, true)
 }
 
 func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
@@ -479,7 +479,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
 
-	runUpdates(&node, true, false)
+	runUpdates(&node, true)
 }
 
 // == INGRESS ==
@@ -499,7 +499,7 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
 
-	runUpdates(&node, true, false)
+	runUpdates(&node, true)
 }
 
 func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
@@ -516,7 +516,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
 
-	runUpdates(&node, true, false)
+	runUpdates(&node, true)
 }
 
 func updateNode(w http.ResponseWriter, r *http.Request) {
@@ -572,10 +572,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 		}
 		if len(updatenodes) > 0 {
 			for _, relayedNode := range updatenodes {
-				err = mq.NodeUpdate(&relayedNode)
-				if err != nil {
-					logger.Log(1, "error sending update to relayed node ", relayedNode.Address, "on network", node.Network, ": ", err.Error())
-				}
+				runUpdates(&relayedNode, false)
 			}
 		}
 	}
@@ -588,7 +585,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(newNode)
 
-	runUpdates(&newNode, true, ifaceDelta)
+	runUpdates(&newNode, ifaceDelta)
 }
 
 func deleteNode(w http.ResponseWriter, r *http.Request) {
@@ -623,27 +620,52 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 	returnSuccessResponse(w, r, nodeid+" deleted.")
 
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
-	runUpdates(&node, false, true)
+	runUpdates(&node, false)
 }
 
-func runUpdates(node *models.Node, nodeUpdate bool, requiresPause bool) error {
-	//don't publish to server node
-
-	if nodeUpdate && !isServer(node) {
+func runUpdates(node *models.Node, ifaceDelta bool) {
+	go func() { // don't block http response
+		err := logic.TimerCheckpoint()
+		if err != nil {
+			logger.Log(3, "error occurred on timer,", err.Error())
+		}
+		if err := runServerUpdate(node, ifaceDelta); err != nil {
+			logger.Log(1, "error running server update", err.Error())
+		}
+		// publish node update if not server
 		if err := mq.NodeUpdate(node); err != nil {
-			logger.Log(1, "error publishing node update", err.Error())
-			return err
+			logger.Log(1, "error publishing node update to node", node.Name, node.ID, err.Error())
 		}
+	}()
+}
+
+// updates local peers for a server on a given node's network
+func runServerUpdate(node *models.Node, ifaceDelta bool) error {
+	var mutex sync.Mutex
+	mutex.Lock()
+	defer mutex.Unlock()
+	if servercfg.IsClientMode() != "on" {
+		return nil
 	}
 
-	if requiresPause { // TODO in future, detect when a node has finished iface update
-		time.Sleep(time.Second * 10)
+	if !isServer(node) && ifaceDelta {
+		ifaceDelta = false
 	}
 
-	if err := runServerPeerUpdate(node, isServer(node)); err != nil {
-		logger.Log(1, "internal error when running peer node:", err.Error())
+	currentServerNode, err := logic.GetNetworkServerLocal(node.Network)
+	if err != nil {
 		return err
 	}
 
+	if ifaceDelta && logic.IsLeader(&currentServerNode) {
+		if err := mq.PublishPeerUpdate(&currentServerNode); err != nil {
+			logger.Log(1, "failed to publish peer update "+err.Error())
+		}
+	}
+
+	if err := logic.ServerUpdate(&currentServerNode, ifaceDelta); err != nil {
+		logger.Log(1, "server node:", currentServerNode.ID, "failed update")
+		return err
+	}
 	return nil
 }

+ 2 - 7
controllers/node_grpc.go

@@ -107,8 +107,6 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 		Type: nodepb.NODE_TYPE,
 	}
 
-	runUpdates(&node, false, false)
-
 	go func(node *models.Node) {
 		if node.UDPHolePunch == "yes" {
 			var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network)
@@ -134,6 +132,7 @@ func (s *NodeServiceServer) CreateNode(ctx context.Context, req *nodepb.Object)
 }
 
 // NodeServiceServer.UpdateNode updates a node and responds over gRPC
+// DELETE ONE DAY - DEPRECATED
 func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object) (*nodepb.Object, error) {
 
 	var newnode models.Node
@@ -146,8 +145,6 @@ func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object)
 		return nil, err
 	}
 
-	ifaceDelta := logic.IfaceDelta(&node, &newnode)
-
 	if !servercfg.GetRce() {
 		newnode.PostDown = node.PostDown
 		newnode.PostUp = node.PostUp
@@ -168,8 +165,6 @@ func (s *NodeServiceServer) UpdateNode(ctx context.Context, req *nodepb.Object)
 		return nil, err
 	}
 
-	runUpdates(&newnode, false, ifaceDelta)
-
 	return &nodepb.Object{
 		Data: string(nodeData),
 		Type: nodepb.NODE_TYPE,
@@ -219,7 +214,7 @@ func (s *NodeServiceServer) DeleteNode(ctx context.Context, req *nodepb.Object)
 		return nil, err
 	}
 
-	runServerPeerUpdate(&node, false)
+	runUpdates(&node, false)
 
 	return &nodepb.Object{
 		Data: "success",

+ 2 - 2
controllers/relay.go

@@ -36,7 +36,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 	}
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
-	runUpdates(&node, true, false)
+	runUpdates(&node, true)
 }
 
 func deleteRelay(w http.ResponseWriter, r *http.Request) {
@@ -58,5 +58,5 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	}
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
-	runUpdates(&node, true, false)
+	runUpdates(&node, true)
 }

+ 0 - 34
controllers/server_util.go

@@ -1,34 +0,0 @@
-package controller
-
-import (
-	"github.com/gravitl/netmaker/logger"
-	"github.com/gravitl/netmaker/logic"
-	"github.com/gravitl/netmaker/models"
-	"github.com/gravitl/netmaker/mq"
-	"github.com/gravitl/netmaker/servercfg"
-)
-
-func runServerPeerUpdate(node *models.Node, ifaceDelta bool) error {
-
-	err := logic.TimerCheckpoint()
-	if err != nil {
-		logger.Log(3, "error occurred on timer,", err.Error())
-	}
-
-	if err := mq.PublishPeerUpdate(node); err != nil {
-		logger.Log(0, "failed to inform peers of new node ", err.Error())
-	}
-
-	if servercfg.IsClientMode() != "on" {
-		return nil
-	}
-	var currentServerNode, getErr = logic.GetNetworkServerLeader(node.Network)
-	if err != nil {
-		return getErr
-	}
-	if err = logic.ServerUpdate(&currentServerNode, ifaceDelta); err != nil {
-		logger.Log(1, "server node:", currentServerNode.ID, "failed update")
-		return err
-	}
-	return nil
-}

+ 105 - 0
mq/handlers.go

@@ -0,0 +1,105 @@
+package mq
+
+import (
+	"encoding/json"
+
+	mqtt "github.com/eclipse/paho.mqtt.golang"
+	"github.com/gravitl/netmaker/database"
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+)
+
+// DefaultHandler default message queue handler - only called when GetDebug == true
+func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
+	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
+}
+
+// Ping message Handler -- handles ping topic from client nodes
+func Ping(client mqtt.Client, msg mqtt.Message) {
+	logger.Log(0, "Ping Handler: ", msg.Topic())
+	go func() {
+		id, err := getID(msg.Topic())
+		if err != nil {
+			logger.Log(0, "error getting node.ID sent on ping topic ")
+			return
+		}
+		node, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(0, "mq-ping error getting node: ", err.Error())
+			record, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
+			if err != nil {
+				logger.Log(0, "error reading database ", err.Error())
+				return
+			}
+			logger.Log(0, "record from database")
+			logger.Log(0, record)
+			return
+		}
+		_, decryptErr := decryptMsg(&node, msg.Payload())
+		if decryptErr != nil {
+			logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error())
+			return
+		}
+		node.SetLastCheckIn()
+		if err := logic.UpdateNode(&node, &node); err != nil {
+			logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error())
+			return
+		}
+		logger.Log(3, "ping processed for node", node.ID)
+		// --TODO --set client version once feature is implemented.
+		//node.SetClientVersion(msg.Payload())
+	}()
+}
+
+// UpdateNode  message Handler -- handles updates from client nodes
+func UpdateNode(client mqtt.Client, msg mqtt.Message) {
+	go func() {
+		id, err := getID(msg.Topic())
+		if err != nil {
+			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
+			return
+		}
+		currentNode, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(1, "error getting node ", id, err.Error())
+			return
+		}
+		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
+		if decryptErr != nil {
+			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
+			return
+		}
+		var newNode models.Node
+		if err := json.Unmarshal(decrypted, &newNode); err != nil {
+			logger.Log(1, "error unmarshaling payload ", err.Error())
+			return
+		}
+		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
+			logger.Log(1, "error saving node", err.Error())
+			return
+		}
+		logger.Log(1, "updated node", id, newNode.Name)
+	}()
+}
+
+// ClientPeerUpdate  message handler -- handles updating peers after signal from client nodes
+func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
+	go func() {
+		id, err := getID(msg.Topic())
+		if err != nil {
+			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
+			return
+		}
+		currentNode, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(1, "error getting node ", id, err.Error())
+			return
+		}
+		if err := PublishPeerUpdate(&currentNode); err != nil {
+			logger.Log(1, "error publishing peer update ", err.Error())
+			return
+		}
+		logger.Log(1, "sent peer updates after signal received from", id, currentNode.Name)
+	}()
+}

+ 4 - 223
mq/mq.go

@@ -2,18 +2,11 @@ package mq
 
 import (
 	"context"
-	"encoding/json"
-	"errors"
-	"fmt"
 	"log"
-	"strings"
 	"time"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
-	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/logger"
-	"github.com/gravitl/netmaker/logic"
-	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
 )
@@ -25,171 +18,6 @@ const MQ_DISCONNECT = 250
 
 var peer_force_send = 0
 
-// DefaultHandler default message queue handler - only called when GetDebug == true
-func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
-	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
-}
-
-// Ping message Handler -- handles ping topic from client nodes
-func Ping(client mqtt.Client, msg mqtt.Message) {
-	logger.Log(0, "Ping Handler: ", msg.Topic())
-	go func() {
-		id, err := GetID(msg.Topic())
-		if err != nil {
-			logger.Log(0, "error getting node.ID sent on ping topic ")
-			return
-		}
-		node, err := logic.GetNodeByID(id)
-		if err != nil {
-			logger.Log(0, "mq-ping error getting node: ", err.Error())
-			record, err := database.FetchRecord(database.NODES_TABLE_NAME, id)
-			if err != nil {
-				logger.Log(0, "error reading database ", err.Error())
-				return
-			}
-			logger.Log(0, "record from database")
-			logger.Log(0, record)
-			return
-		}
-		_, decryptErr := decryptMsg(&node, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(0, "error decrypting when updating node ", node.ID, decryptErr.Error())
-			return
-		}
-		node.SetLastCheckIn()
-		if err := logic.UpdateNode(&node, &node); err != nil {
-			logger.Log(0, "error updating node", node.Name, node.ID, " on checkin", err.Error())
-			return
-		}
-		logger.Log(3, "ping processed for node", node.ID)
-		// --TODO --set client version once feature is implemented.
-		//node.SetClientVersion(msg.Payload())
-	}()
-}
-
-// UpdateNode  message Handler -- handles updates from client nodes
-func UpdateNode(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := GetID(msg.Topic())
-		if err != nil {
-			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
-			return
-		}
-		currentNode, err := logic.GetNodeByID(id)
-		if err != nil {
-			logger.Log(1, "error getting node ", id, err.Error())
-			return
-		}
-		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
-			return
-		}
-		var newNode models.Node
-		if err := json.Unmarshal(decrypted, &newNode); err != nil {
-			logger.Log(1, "error unmarshaling payload ", err.Error())
-			return
-		}
-		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
-			logger.Log(1, "error saving node", err.Error())
-			return
-		}
-		if err := PublishPeerUpdate(&newNode); err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
-			return
-		}
-		logger.Log(1, "Updated node", id, newNode.Name)
-	}()
-}
-
-// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
-func PublishPeerUpdate(newNode *models.Node) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-	networkNodes, err := logic.GetNetworkNodes(newNode.Network)
-	if err != nil {
-		logger.Log(1, "err getting Network Nodes", err.Error())
-		return err
-	}
-	for _, node := range networkNodes {
-
-		if node.IsServer == "yes" || node.ID == newNode.ID {
-			continue
-		}
-		peerUpdate, err := logic.GetPeerUpdate(&node)
-		if err != nil {
-			logger.Log(1, "error getting peer update for node", node.ID, err.Error())
-			continue
-		}
-		data, err := json.Marshal(&peerUpdate)
-		if err != nil {
-			logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
-			continue
-		}
-		if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
-			logger.Log(1, "failed to publish peer update for node", node.ID)
-		} else {
-			logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network)
-		}
-	}
-	return nil
-}
-
-// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
-func PublishExtPeerUpdate(node *models.Node) error {
-	var err error
-	if logic.IsLocalServer(node) {
-		if err = logic.ServerUpdate(node, false); err != nil {
-			logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients")
-			return err
-		} else {
-			return nil
-		}
-	}
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-	peerUpdate, err := logic.GetPeerUpdate(node)
-	if err != nil {
-		return err
-	}
-	data, err := json.Marshal(&peerUpdate)
-	if err != nil {
-		return err
-	}
-	return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data)
-}
-
-// GetID -- decodes a message queue topic and returns the embedded node.ID
-func GetID(topic string) (string, error) {
-	parts := strings.Split(topic, "/")
-	count := len(parts)
-	if count == 1 {
-		return "", errors.New("invalid topic")
-	}
-	//the last part of the topic will be the node.ID
-	return parts[count-1], nil
-}
-
-// NodeUpdate -- publishes a node update
-func NodeUpdate(node *models.Node) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-	logger.Log(3, "publishing node update to "+node.Name)
-	data, err := json.Marshal(node)
-	if err != nil {
-		logger.Log(2, "error marshalling node update ", err.Error())
-		return err
-	}
-	if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
-		logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
-		return err
-	}
-	return nil
-}
-
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT(publish bool) mqtt.Client {
 	opts := mqtt.NewClientOptions()
@@ -217,6 +45,10 @@ func SetupMQTT(publish bool) mqtt.Client {
 				client.Disconnect(240)
 				logger.Log(0, "node update subscription failed")
 			}
+			if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.Wait() && token.Error() != nil {
+				client.Disconnect(240)
+				logger.Log(0, "node client subscription failed")
+			}
 
 			opts.SetOrderMatters(true)
 			opts.SetResumeSubs(true)
@@ -249,54 +81,3 @@ func Keepalive(ctx context.Context) {
 		}
 	}
 }
-
-// sendPeers - retrieve networks, send peer ports to all peers
-func sendPeers() {
-	var force bool
-	peer_force_send++
-	if peer_force_send == 5 {
-		force = true
-		peer_force_send = 0
-	}
-	networks, err := logic.GetNetworks()
-	if err != nil {
-		logger.Log(1, "error retrieving networks for keepalive", err.Error())
-	}
-	for _, network := range networks {
-		serverNode, errN := logic.GetNetworkServerLeader(network.NetID)
-		if errN == nil {
-			serverNode.SetLastCheckIn()
-			logic.UpdateNode(&serverNode, &serverNode)
-			if network.DefaultUDPHolePunch == "yes" {
-				if logic.ShouldPublishPeerPorts(&serverNode) || force {
-					if force {
-						logger.Log(2, "sending scheduled peer update (5 min)")
-					}
-					err = PublishPeerUpdate(&serverNode)
-					if err != nil {
-						logger.Log(1, "error publishing udp port updates for network", network.NetID)
-						logger.Log(1, errN.Error())
-					}
-				}
-			}
-		} else {
-			logger.Log(1, "unable to retrieve leader for network ", network.NetID)
-			logger.Log(1, errN.Error())
-			continue
-		}
-	}
-}
-
-// func publishServerKeepalive(client mqtt.Client, network *models.Network) {
-// 	nodes, err := logic.GetNetworkNodes(network.NetID)
-// 	if err != nil {
-// 		return
-// 	}
-// 	for _, node := range nodes {
-// 		if token := client.Publish(fmt.Sprintf("serverkeepalive/%s/%s", network.NetID, node.ID), 0, false, servercfg.GetVersion()); token.Wait() && token.Error() != nil {
-// 			logger.Log(1, "error publishing server keepalive for network", network.NetID, token.Error().Error())
-// 		} else {
-// 			logger.Log(2, "keepalive sent for network/node", network.NetID, node.ID)
-// 		}
-// 	}
-// }

+ 125 - 0
mq/publishers.go

@@ -0,0 +1,125 @@
+package mq
+
+import (
+	"encoding/json"
+	"fmt"
+
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/servercfg"
+)
+
+// PublishPeerUpdate --- deterines and publishes a peer update to all the peers of a node
+func PublishPeerUpdate(newNode *models.Node) error {
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
+	networkNodes, err := logic.GetNetworkNodes(newNode.Network)
+	if err != nil {
+		logger.Log(1, "err getting Network Nodes", err.Error())
+		return err
+	}
+	for _, node := range networkNodes {
+
+		if node.IsServer == "yes" || node.ID == newNode.ID {
+			continue
+		}
+		peerUpdate, err := logic.GetPeerUpdate(&node)
+		if err != nil {
+			logger.Log(1, "error getting peer update for node", node.ID, err.Error())
+			continue
+		}
+		data, err := json.Marshal(&peerUpdate)
+		if err != nil {
+			logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
+			continue
+		}
+		if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
+			logger.Log(1, "failed to publish peer update for node", node.ID)
+		} else {
+			logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network)
+		}
+	}
+	return nil
+}
+
+// PublishPeerUpdate --- publishes a peer update to all the peers of a node
+func PublishExtPeerUpdate(node *models.Node) error {
+	var err error
+	if logic.IsLocalServer(node) {
+		if err = logic.ServerUpdate(node, false); err != nil {
+			logger.Log(1, "server node:", node.ID, "failed to update peers with ext clients")
+			return err
+		} else {
+			return nil
+		}
+	}
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
+	peerUpdate, err := logic.GetPeerUpdate(node)
+	if err != nil {
+		return err
+	}
+	data, err := json.Marshal(&peerUpdate)
+	if err != nil {
+		return err
+	}
+	return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data)
+}
+
+// NodeUpdate -- publishes a node update
+func NodeUpdate(node *models.Node) error {
+	if !servercfg.IsMessageQueueBackend() || node.IsServer == "yes" {
+		return nil
+	}
+	logger.Log(3, "publishing node update to "+node.Name)
+	data, err := json.Marshal(node)
+	if err != nil {
+		logger.Log(2, "error marshalling node update ", err.Error())
+		return err
+	}
+	if err = publish(node, fmt.Sprintf("update/%s/%s", node.Network, node.ID), data); err != nil {
+		logger.Log(2, "error publishing node update to peer ", node.ID, err.Error())
+		return err
+	}
+	return nil
+}
+
+// sendPeers - retrieve networks, send peer ports to all peers
+func sendPeers() {
+	var force bool
+	peer_force_send++
+	if peer_force_send == 5 {
+		force = true
+		peer_force_send = 0
+	}
+	networks, err := logic.GetNetworks()
+	if err != nil {
+		logger.Log(1, "error retrieving networks for keepalive", err.Error())
+	}
+	for _, network := range networks {
+		serverNode, errN := logic.GetNetworkServerLeader(network.NetID)
+		if errN == nil {
+			serverNode.SetLastCheckIn()
+			logic.UpdateNode(&serverNode, &serverNode)
+			if network.DefaultUDPHolePunch == "yes" {
+				if logic.ShouldPublishPeerPorts(&serverNode) || force {
+					if force {
+						logger.Log(2, "sending scheduled peer update (5 min)")
+					}
+					err = PublishPeerUpdate(&serverNode)
+					if err != nil {
+						logger.Log(1, "error publishing udp port updates for network", network.NetID)
+						logger.Log(1, errN.Error())
+					}
+				}
+			}
+		} else {
+			logger.Log(1, "unable to retrieve leader for network ", network.NetID)
+			logger.Log(1, errN.Error())
+			continue
+		}
+	}
+}

+ 11 - 0
mq/util.go

@@ -70,3 +70,14 @@ func publish(node *models.Node, dest string, msg []byte) error {
 	}
 	return nil
 }
+
+//  decodes a message queue topic and returns the embedded node.ID
+func getID(topic string) (string, error) {
+	parts := strings.Split(topic, "/")
+	count := len(parts)
+	if count == 1 {
+		return "", fmt.Errorf("invalid topic")
+	}
+	//the last part of the topic will be the node.ID
+	return parts[count-1], nil
+}

+ 106 - 131
netclient/functions/daemon.go

@@ -26,7 +26,6 @@ import (
 )
 
 // == Message Caches ==
-// var keepalive = new(sync.Map)
 var messageCache = new(sync.Map)
 var networkcontext = new(sync.Map)
 
@@ -88,94 +87,6 @@ func Daemon() error {
 
 }
 
-// SetupMQTT creates a connection to broker and return client
-func SetupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
-	opts := mqtt.NewClientOptions()
-	server := getServerAddress(cfg)
-	opts.AddBroker(server + ":1883")
-	id := ncutils.MakeRandomString(23)
-	opts.ClientID = id
-	opts.SetDefaultPublishHandler(All)
-	opts.SetAutoReconnect(true)
-	opts.SetConnectRetry(true)
-	opts.SetConnectRetryInterval(time.Second << 2)
-	opts.SetKeepAlive(time.Minute >> 1)
-	opts.SetWriteTimeout(time.Minute)
-	opts.SetOnConnectHandler(func(client mqtt.Client) {
-		if !publish {
-			if cfg.DebugOn {
-				if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
-					ncutils.Log(token.Error().Error())
-					return
-				}
-				ncutils.Log("subscribed to all topics for debugging purposes")
-			}
-			if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			if cfg.DebugOn {
-				ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-			}
-			if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
-				ncutils.Log(token.Error().Error())
-				return
-			}
-			if cfg.DebugOn {
-				ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
-			}
-			opts.SetOrderMatters(true)
-			opts.SetResumeSubs(true)
-		}
-	})
-	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
-		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
-		_, err := Pull(cfg.Node.Network, true)
-		if err != nil {
-			ncutils.Log("could not run pull, server unreachable: " + err.Error())
-			ncutils.Log("waiting to retry...")
-			/*
-				//Consider putting in logic to restart - daemon may take long time to refresh
-				time.Sleep(time.Minute * 5)
-					ncutils.Log("restarting netclient")
-					daemon.Restart()
-			*/
-		}
-		ncutils.Log("connection re-established with mqtt server")
-	})
-
-	client := mqtt.NewClient(opts)
-	tperiod := time.Now().Add(12 * time.Second)
-	for {
-		//if after 12 seconds, try a gRPC pull on the last try
-		if time.Now().After(tperiod) {
-			ncutils.Log("running pull for " + cfg.Node.Network)
-			_, err := Pull(cfg.Node.Network, true)
-			if err != nil {
-				ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error())
-				return client
-			}
-			time.Sleep(time.Second)
-		}
-		if token := client.Connect(); token.Wait() && token.Error() != nil {
-			ncutils.Log("unable to connect to broker, retrying ...")
-			if time.Now().After(tperiod) {
-				ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
-				if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") {
-					ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0)
-					Pull(cfg.Node.Network, true)
-					daemon.Restart()
-				}
-				return client
-			}
-		} else {
-			break
-		}
-		time.Sleep(2 * time.Second)
-	}
-	return client
-}
-
 // MessageQueue sets up Message Queue and subsribes/publishes updates to/from server
 func MessageQueue(ctx context.Context, network string) {
 	ncutils.Log("netclient go routine started for " + network)
@@ -185,7 +96,7 @@ func MessageQueue(ctx context.Context, network string) {
 
 	cfg.ReadConfig()
 	ncutils.Log("daemon started for network: " + network)
-	client := SetupMQTT(&cfg, false)
+	client := setupMQTT(&cfg, false)
 
 	defer client.Disconnect(250)
 	wg := &sync.WaitGroup{}
@@ -287,15 +198,15 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 		ncutils.Log("error updating wireguard config " + err.Error())
 		return
 	}
-	if ifaceDelta {
+	if ifaceDelta { // if a change caused an ifacedelta we need to notify the server to update the peers
 		ncutils.Log("applying WG conf to " + file)
 		err = wireguard.ApplyConf(&cfg.Node, cfg.Node.Interface, file)
 		if err != nil {
 			ncutils.Log("error restarting wg after node update " + err.Error())
 			return
 		}
-		time.Sleep(time.Second >> 1)
 
+		time.Sleep(time.Second >> 1)
 		if newNode.DNSOn == "yes" {
 			for _, server := range newNode.NetworkSettings.DefaultServerAddrs {
 				if server.IsLeader {
@@ -304,6 +215,7 @@ func NodeUpdate(client mqtt.Client, msg mqtt.Message) {
 				}
 			}
 		}
+		publishClientPeers(&cfg)
 	}
 	//deal with DNS
 	if newNode.DNSOn != "yes" && shouldDNSChange && cfg.Node.Interface != "" {
@@ -361,39 +273,6 @@ func UpdatePeers(client mqtt.Client, msg mqtt.Message) {
 	}
 }
 
-// MonitorKeepalive - checks time last server keepalive received.  If more than 3+ minutes, notify and resubscribe
-// func MonitorKeepalive(ctx context.Context, wg *sync.WaitGroup, client mqtt.Client, cfg *config.ClientConfig) {
-// 	defer wg.Done()
-// 	for {
-// 		select {
-// 		case <-ctx.Done():
-// 			ncutils.Log("cancel recieved, monitor keepalive exiting")
-// 			return
-// 		case <-time.After(time.Second * 150):
-// 			var keepalivetime time.Time
-// 			keepaliveval, ok := keepalive.Load(cfg.Node.Network)
-// 			if ok {
-// 				keepalivetime = keepaliveval.(time.Time)
-// 				if !keepalivetime.IsZero() && time.Since(keepalivetime) > time.Second*120 { // more than 2+ minutes
-// 					// ncutils.Log("server keepalive not recieved recently, resubscribe to message queue")
-// 					// err := Resubscribe(client, cfg)
-// 					// if err != nil {
-// 					// 	ncutils.Log("closing " + err.Error())
-// 					// }
-// 					ncutils.Log("maybe wanna call something")
-// 				}
-// 			}
-// 		}
-// 	}
-// }
-
-// ServerKeepAlive -- handler to react to keepalive messages published by server
-// func ServerKeepAlive(client mqtt.Client, msg mqtt.Message) {
-// 	var currentTime = time.Now()
-// 	keepalive.Store(parseNetworkFromTopic(msg.Topic()), currentTime)
-// 	ncutils.PrintLog("received server keepalive at "+currentTime.String(), 2)
-// }
-
 // UpdateKeys -- updates private key and returns new publickey
 func UpdateKeys(cfg *config.ClientConfig, client mqtt.Client) error {
 	ncutils.Log("received message to update keys")
@@ -487,7 +366,7 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error {
 	if err != nil {
 		return err
 	}
-	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data); err != nil {
+	if err = publish(cfg, fmt.Sprintf("update/%s", cfg.Node.ID), data, 1); err != nil {
 		return err
 	}
 	return nil
@@ -495,19 +374,115 @@ func PublishNodeUpdate(cfg *config.ClientConfig) error {
 
 // Hello -- ping the broker to let server know node is alive and doing fine
 func Hello(cfg *config.ClientConfig, network string) {
-	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version)); err != nil {
+	if err := publish(cfg, fmt.Sprintf("ping/%s", cfg.Node.ID), []byte(ncutils.Version), 0); err != nil {
 		ncutils.Log(fmt.Sprintf("error publishing ping, %v", err))
 		ncutils.Log("running pull on " + cfg.Node.Network + " to reconnect")
 		_, err := Pull(cfg.Node.Network, true)
 		if err != nil {
 			ncutils.Log("could not run pull on " + cfg.Node.Network + ", error: " + err.Error())
 		}
-
 	}
 }
 
 // == Private ==
 
+// setupMQTT creates a connection to broker and return client
+func setupMQTT(cfg *config.ClientConfig, publish bool) mqtt.Client {
+	opts := mqtt.NewClientOptions()
+	server := getServerAddress(cfg)
+	opts.AddBroker(server + ":1883")
+	id := ncutils.MakeRandomString(23)
+	opts.ClientID = id
+	opts.SetDefaultPublishHandler(All)
+	opts.SetAutoReconnect(true)
+	opts.SetConnectRetry(true)
+	opts.SetConnectRetryInterval(time.Second << 2)
+	opts.SetKeepAlive(time.Minute >> 1)
+	opts.SetWriteTimeout(time.Minute)
+	opts.SetOnConnectHandler(func(client mqtt.Client) {
+		if !publish {
+			if cfg.DebugOn {
+				if token := client.Subscribe("#", 0, nil); token.Wait() && token.Error() != nil {
+					ncutils.Log(token.Error().Error())
+					return
+				}
+				ncutils.Log("subscribed to all topics for debugging purposes")
+			}
+			if token := client.Subscribe(fmt.Sprintf("update/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(NodeUpdate)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to node updates for node %s update/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			if token := client.Subscribe(fmt.Sprintf("peers/%s/%s", cfg.Node.Network, cfg.Node.ID), 0, mqtt.MessageHandler(UpdatePeers)); token.Wait() && token.Error() != nil {
+				ncutils.Log(token.Error().Error())
+				return
+			}
+			if cfg.DebugOn {
+				ncutils.Log(fmt.Sprintf("subscribed to peer updates for node %s peers/%s/%s", cfg.Node.Name, cfg.Node.Network, cfg.Node.ID))
+			}
+			opts.SetOrderMatters(true)
+			opts.SetResumeSubs(true)
+		}
+	})
+	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
+		ncutils.Log("detected broker connection lost, running pull for " + cfg.Node.Network)
+		_, err := Pull(cfg.Node.Network, true)
+		if err != nil {
+			ncutils.Log("could not run pull, server unreachable: " + err.Error())
+			ncutils.Log("waiting to retry...")
+			/*
+				//Consider putting in logic to restart - daemon may take long time to refresh
+				time.Sleep(time.Minute * 5)
+					ncutils.Log("restarting netclient")
+					daemon.Restart()
+			*/
+		}
+		ncutils.Log("connection re-established with mqtt server")
+	})
+
+	client := mqtt.NewClient(opts)
+	tperiod := time.Now().Add(12 * time.Second)
+	for {
+		//if after 12 seconds, try a gRPC pull on the last try
+		if time.Now().After(tperiod) {
+			ncutils.Log("running pull for " + cfg.Node.Network)
+			_, err := Pull(cfg.Node.Network, true)
+			if err != nil {
+				ncutils.Log("could not run pull, exiting " + cfg.Node.Network + " setup: " + err.Error())
+				return client
+			}
+			time.Sleep(time.Second)
+		}
+		if token := client.Connect(); token.Wait() && token.Error() != nil {
+			ncutils.Log("unable to connect to broker, retrying ...")
+			if time.Now().After(tperiod) {
+				ncutils.Log("could not connect to broker, exiting " + cfg.Node.Network + " setup: " + token.Error().Error())
+				if strings.Contains(token.Error().Error(), "connectex") || strings.Contains(token.Error().Error(), "i/o timeout") {
+					ncutils.PrintLog("connection issue detected.. pulling and restarting daemon", 0)
+					Pull(cfg.Node.Network, true)
+					daemon.Restart()
+				}
+				return client
+			}
+		} else {
+			break
+		}
+		time.Sleep(2 * time.Second)
+	}
+	return client
+}
+
+// publishes a message to server to update peers on this peer's behalf
+func publishClientPeers(cfg *config.ClientConfig) error {
+	payload := []byte(ncutils.MakeRandomString(16)) // just random string for now to keep the bytes different
+	if err := publish(cfg, fmt.Sprintf("signal/%s", cfg.Node.ID), payload, 1); err != nil {
+		return err
+	}
+	return nil
+}
+
 func initialPull(network string) {
 	ncutils.Log("pulling latest config for " + network)
 	var configPath = fmt.Sprintf("%snetconfig-%s", ncutils.GetNetclientPathSpecific(), network)
@@ -536,7 +511,7 @@ func initialPull(network string) {
 	}
 }
 
-func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
+func publish(cfg *config.ClientConfig, dest string, msg []byte, qos byte) error {
 	// setup the keys
 	trafficPrivKey, err := auth.RetrieveTrafficKey(cfg.Node.Network)
 	if err != nil {
@@ -548,14 +523,14 @@ func publish(cfg *config.ClientConfig, dest string, msg []byte) error {
 		return err
 	}
 
-	client := SetupMQTT(cfg, true)
+	client := setupMQTT(cfg, true)
 	defer client.Disconnect(250)
 	encrypted, err := ncutils.Chunk(msg, serverPubKey, trafficPrivKey)
 	if err != nil {
 		return err
 	}
 
-	if token := client.Publish(dest, 0, false, encrypted); token.Wait() && token.Error() != nil {
+	if token := client.Publish(dest, qos, false, encrypted); token.Wait() && token.Error() != nil {
 		return token.Error()
 	}
 	return nil