Browse Source

NET-877: Replace peers on Refreshkeys peer update (#2761)

* replace peers on key refresh

* add peer conf to metrics map only when allowed
Abhishek K 1 year ago
parent
commit
5bf30b2c10

+ 1 - 1
auth/host_session.go

@@ -253,7 +253,7 @@ func CheckNetRegAndHostUpdate(networks []string, h *models.Host, relayNodeId uui
 			Action: models.RequestAck,
 			Host:   *h,
 		})
-		if err := mq.PublishPeerUpdate(); err != nil {
+		if err := mq.PublishPeerUpdate(false); err != nil {
 			logger.Log(0, "failed to publish peer update during registration -", err.Error())
 		}
 	}

+ 3 - 4
controllers/ext_client.go

@@ -422,7 +422,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	slog.Info("created extclient", "user", r.Header.Get("user"), "network", node.Network, "clientid", extclient.ClientID)
 	w.WriteHeader(http.StatusOK)
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
+		if err := mq.PublishPeerUpdate(false); err != nil {
 			logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
 		}
 		if servercfg.IsDNSMode() {
@@ -510,7 +510,6 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	logger.Log(0, r.Header.Get("user"), "updated ext client", update.ClientID)
-
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(newclient)
 
@@ -521,7 +520,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 		if sendPeerUpdate { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
 			ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID)
 			if err == nil {
-				if err = mq.PublishPeerUpdate(); err != nil {
+				if err = mq.PublishPeerUpdate(false); err != nil {
 					logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
 				}
 			}
@@ -536,7 +535,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 					slog.Error("Failed to get nodes", "error", err)
 					return
 				}
-				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient})
+				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false)
 			}
 		}
 

+ 2 - 2
controllers/hosts.go

@@ -195,7 +195,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
 		logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error())
 	}
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
+		if err := mq.PublishPeerUpdate(false); err != nil {
 			logger.Log(0, "fail to publish peer update: ", err.Error())
 		}
 		if newHost.Name != currHost.Name {
@@ -356,7 +356,7 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 			Host:   *currHost,
 			Node:   *newNode,
 		})
-		mq.PublishPeerUpdate()
+		mq.PublishPeerUpdate(false)
 		if servercfg.IsDNSMode() {
 			logic.SetDNS()
 		}

+ 1 - 1
controllers/migrate.go

@@ -95,7 +95,7 @@ func migrate(w http.ResponseWriter, r *http.Request) {
 	if err := logic.UpsertHost(&host); err != nil {
 		slog.Error("save host", "error", err)
 	}
-	go mq.PublishPeerUpdate()
+	go mq.PublishPeerUpdate(false)
 	response := models.HostPull{
 		Host:         host,
 		Nodes:        nodes,

+ 1 - 1
controllers/network.go

@@ -128,7 +128,7 @@ func updateNetworkACL(w http.ResponseWriter, r *http.Request) {
 
 	// send peer updates
 	go func() {
-		if err = mq.PublishPeerUpdate(); err != nil {
+		if err = mq.PublishPeerUpdate(false); err != nil {
 			logger.Log(0, "failed to publish peer update after ACL update on", netname)
 		}
 	}()

+ 4 - 4
controllers/node.go

@@ -436,7 +436,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			slog.Error("error publishing node update to node", "node", node.ID, "error", err)
 		}
-		mq.PublishPeerUpdate()
+		mq.PublishPeerUpdate(false)
 	}()
 }
 
@@ -479,7 +479,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 		if err := mq.NodeUpdate(&node); err != nil {
 			slog.Error("error publishing node update to node", "node", node.ID, "error", err)
 		}
-		mq.PublishPeerUpdate()
+		mq.PublishPeerUpdate(false)
 	}()
 }
 
@@ -590,7 +590,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 				return
 			}
 			go func() {
-				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:]); err != nil {
+				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil {
 					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
 				}
 				if err := mq.NodeUpdate(&node); err != nil {
@@ -667,7 +667,7 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 			slog.Error("error publishing node update to node", "node", newNode.ID, "error", err)
 		}
 		if aclUpdate || relayupdate || ifaceDelta {
-			if err := mq.PublishPeerUpdate(); err != nil {
+			if err := mq.PublishPeerUpdate(false); err != nil {
 				logger.Log(0, "error during node ACL update for node", newNode.ID.String())
 			}
 		}

+ 1 - 1
controllers/user.go

@@ -138,7 +138,7 @@ func authenticateUser(response http.ResponseWriter, request *http.Request) {
 					} else {
 						// publish peer update to ingress gateway
 						if ingressNode, err := logic.GetNodeByID(newClient.IngressGatewayID); err == nil {
-							if err = mq.PublishPeerUpdate(); err != nil {
+							if err = mq.PublishPeerUpdate(false); err != nil {
 								slog.Error("error updating ext clients on", "ingress", ingressNode.ID.String(), "err", err.Error())
 							}
 						}

+ 1 - 1
logic/peers.go

@@ -210,7 +210,7 @@ func GetPeerUpdateForHost(network string, host *models.Host, allNodes []models.N
 				nodePeer = hostPeerUpdate.Peers[peerIndexMap[peerHost.PublicKey.String()]]
 			}
 
-			if node.Network == network && !peerConfig.Remove { // add to peers map for metrics
+			if node.Network == network && !peerConfig.Remove && len(peerConfig.AllowedIPs) > 0 { // add to peers map for metrics
 				hostPeerUpdate.PeerIDs[peerHost.PublicKey.String()] = models.IDandAddr{
 					ID:         peer.ID.String(),
 					HostID:     peerHost.ID.String(),

+ 1 - 0
models/mqtt.go

@@ -19,6 +19,7 @@ type HostPeerUpdate struct {
 	HostNetworkInfo HostInfoMap           `json:"host_network_info,omitempty" bson:"host_network_info,omitempty" yaml:"host_network_info,omitempty"`
 	EgressRoutes    []EgressNetworkRoutes `json:"egress_network_routes"`
 	FwUpdate        FwUpdate              `json:"fw_update"`
+	ReplacePeers    bool                  `json:"replace_peers"`
 }
 
 // IngressInfo - struct for ingress info

+ 7 - 26
mq/handlers.go

@@ -2,7 +2,6 @@ package mq
 
 import (
 	"encoding/json"
-	"fmt"
 
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
@@ -14,7 +13,6 @@ import (
 	"github.com/gravitl/netmaker/netclient/ncutils"
 	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/exp/slog"
-	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
 // UpdateMetrics  message Handler -- handles updates from client nodes for metrics
@@ -65,10 +63,10 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			}
 			allNodes, err := logic.GetAllNodes()
 			if err == nil {
-				PublishSingleHostPeerUpdate(host, allNodes, nil, nil)
+				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
 			}
 		} else {
-			err = PublishPeerUpdate()
+			err = PublishPeerUpdate(false)
 		}
 		if err != nil {
 			slog.Warn("error updating peers when node informed the server of an interface change", "nodeid", currentNode.ID, "error", err)
@@ -102,6 +100,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	}
 	slog.Info("recieved host update", "name", hostUpdate.Host.Name, "id", hostUpdate.Host.ID)
 	var sendPeerUpdate bool
+	var replacePeers bool
 	switch hostUpdate.Action {
 	case models.CheckIn:
 		sendPeerUpdate = HandleHostCheckin(&hostUpdate.Host, currentHost)
@@ -122,7 +121,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				if err != nil {
 					return
 				}
-				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil); err != nil {
+				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					return
 				}
@@ -131,25 +130,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	case models.UpdateHost:
 		if hostUpdate.Host.PublicKey != currentHost.PublicKey {
 			//remove old peer entry
-			peerUpdate := models.HostPeerUpdate{
-				ServerVersion: servercfg.GetVersion(),
-				Peers: []wgtypes.PeerConfig{
-					{
-						PublicKey: currentHost.PublicKey,
-						Remove:    true,
-					},
-				},
-			}
-			data, err := json.Marshal(&peerUpdate)
-			if err != nil {
-				slog.Error("failed to marshal peer update", "error", err)
-			}
-			hosts := logic.GetRelatedHosts(hostUpdate.Host.ID.String())
-			server := servercfg.GetServer()
-			for _, host := range hosts {
-				publish(&host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), server), data)
-			}
-
+			replacePeers = true
 		}
 		sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
 		err := logic.UpsertHost(currentHost)
@@ -198,7 +179,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	}
 
 	if sendPeerUpdate {
-		err := PublishPeerUpdate()
+		err := PublishPeerUpdate(replacePeers)
 		if err != nil {
 			slog.Error("failed to publish peer update", "error", err)
 		}
@@ -249,7 +230,7 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
 	case ncutils.ACK:
 		// do we still need this
 	case ncutils.DONE:
-		if err = PublishPeerUpdate(); err != nil {
+		if err = PublishPeerUpdate(false); err != nil {
 			slog.Error("error publishing peer update for node", "id", currentNode.ID, "error", err)
 			return
 		}

+ 7 - 6
mq/publishers.go

@@ -14,7 +14,7 @@ import (
 )
 
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
-func PublishPeerUpdate() error {
+func PublishPeerUpdate(replacePeers bool) error {
 	if !servercfg.IsMessageQueueBackend() {
 		return nil
 	}
@@ -30,7 +30,7 @@ func PublishPeerUpdate() error {
 	}
 	for _, host := range hosts {
 		host := host
-		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 	}
@@ -55,7 +55,7 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
 	}
 	for _, host := range hosts {
 		host := host
-		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 	}
@@ -81,7 +81,7 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 	for _, host := range hosts {
 		host := host
 		if host.OS != models.OS_Types.IoT {
-			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}); err != nil {
+			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
 				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			}
 		}
@@ -90,12 +90,13 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 }
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient) error {
+func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {
 
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	if err != nil {
 		return err
 	}
+	peerUpdate.ReplacePeers = replacePeers
 	data, err := json.Marshal(&peerUpdate)
 	if err != nil {
 		return err
@@ -231,7 +232,7 @@ func sendPeers() {
 		for _, host := range hosts {
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil); err != nil {
+			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil, false); err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}
 		}

+ 4 - 4
pro/controllers/failover.go

@@ -70,7 +70,7 @@ func createfailOver(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	go mq.PublishPeerUpdate()
+	go mq.PublishPeerUpdate(false)
 	w.Header().Set("Content-Type", "application/json")
 	logic.ReturnSuccessResponseWithJson(w, r, node, "created failover successfully")
 }
@@ -90,7 +90,7 @@ func resetFailOver(w http.ResponseWriter, r *http.Request) {
 			logic.UpsertNode(&node)
 		}
 	}
-	go mq.PublishPeerUpdate()
+	go mq.PublishPeerUpdate(false)
 	w.Header().Set("Content-Type", "application/json")
 	logic.ReturnSuccessResponse(w, r, "failover has been reset successfully")
 }
@@ -126,7 +126,7 @@ func deletefailOver(w http.ResponseWriter, r *http.Request) {
 	}
 	go func() {
 		proLogic.ResetFailOver(&node)
-		mq.PublishPeerUpdate()
+		mq.PublishPeerUpdate(false)
 	}()
 	w.Header().Set("Content-Type", "application/json")
 	logic.ReturnSuccessResponseWithJson(w, r, node, "deleted failover successfully")
@@ -193,7 +193,7 @@ func failOverME(w http.ResponseWriter, r *http.Request) {
 	sendPeerUpdate = true
 
 	if sendPeerUpdate {
-		go mq.PublishPeerUpdate()
+		go mq.PublishPeerUpdate(false)
 	}
 
 	w.Header().Set("Content-Type", "application/json")

+ 3 - 3
pro/controllers/relay.go

@@ -63,7 +63,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 
 		}
 	}
-	go mq.PublishPeerUpdate()
+	go mq.PublishPeerUpdate(false)
 	logger.Log(1, r.Header.Get("user"), "created relay on node", relayRequest.NodeID, "on network", relayRequest.NetID)
 	apiNode := relayNode.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
@@ -108,13 +108,13 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 						return
 					}
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
-					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil); err != nil {
+					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil {
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 					}
 				}
 			}
 		}
-		mq.PublishPeerUpdate()
+		mq.PublishPeerUpdate(false)
 	}()
 	logger.Log(1, r.Header.Get("user"), "deleted relay on node", node.ID.String(), "on network", node.Network)
 	apiNode := node.ConvertToAPINode()

+ 1 - 1
pro/logic/metrics.go

@@ -89,7 +89,7 @@ func MQUpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 			if err != nil {
 				return
 			}
-			if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil); err != nil {
+			if err = mq.PublishSingleHostPeerUpdate(host, nodes, nil, nil, false); err != nil {
 				slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
 			}
 		}

+ 2 - 2
pro/remote_access_client.go

@@ -64,7 +64,7 @@ func disableExtClient(client *models.ExtClient) error {
 	} else {
 		// publish peer update to ingress gateway
 		if ingressNode, err := logic.GetNodeByID(newClient.IngressGatewayID); err == nil {
-			if err = mq.PublishPeerUpdate(); err != nil {
+			if err = mq.PublishPeerUpdate(false); err != nil {
 				slog.Error("error updating ext clients on", "ingress", ingressNode.ID.String(), "err", err.Error())
 			}
 			ingressHost, err := logic.GetHost(ingressNode.HostID.String())
@@ -75,7 +75,7 @@ func disableExtClient(client *models.ExtClient) error {
 			if err != nil {
 				return err
 			}
-			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client})
+			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false)
 		} else {
 			return err
 		}