Parcourir la source

adapted mq peer publish calls

0xdcarns il y a 2 ans
Parent
commit
378b32e009

+ 0 - 3
controllers/dns.go

@@ -177,9 +177,6 @@ func createDNS(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, "new DNS record added:", entry.Name)
 	if servercfg.IsMessageQueueBackend() {
 		go func() {
-			if err = mq.PublishPeerUpdate(); err != nil {
-				logger.Log(0, "failed to publish peer update after ACL update on", entry.Network)
-			}
 			if err := mq.PublishCustomDNS(&entry); err != nil {
 				logger.Log(0, "error publishing custom dns", err.Error())
 			}

+ 1 - 1
controllers/enrollmentkeys.go

@@ -240,7 +240,7 @@ func checkNetRegAndHostUpdate(networks []string, h *models.Host) {
 			Action: models.RequestAck,
 			Host:   *h,
 		})
-		if err := mq.PublishPeerUpdate(); err != nil {
+		if err := mq.PublishPeerUpdateForHost("", h, nil, nil); err != nil {
 			logger.Log(0, "failed to publish peer update during registration -", err.Error())
 		}
 	}

+ 3 - 4
controllers/ext_client.go

@@ -319,7 +319,6 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 
 	var extclient models.ExtClient
 	var customExtClient models.CustomExtClient
-
 	err := json.NewDecoder(r.Body).Decode(&customExtClient)
 	if err == nil {
 		if customExtClient.ClientID != "" && !validName(customExtClient.ClientID) {
@@ -400,7 +399,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
+		if err := mq.PublishPeerUpdateForClient("", &extclient, false); err != nil {
 			logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
 		}
 		if err := mq.PublishExtCLientDNS(&extclient); err != nil {
@@ -499,7 +498,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "updated ext client", newExtClient.ClientID)
 	if changedEnabled { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
-			if err = mq.PublishPeerUpdate(); err != nil {
+			if err = mq.PublishPeerUpdateForNode("", &ingressNode, false); err != nil {
 				logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
 			}
 		}
@@ -578,7 +577,7 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 	}
 
 	go func() {
-		if err := mq.PublishDeletedClientPeerUpdate(&extclient); err != nil {
+		if err := mq.PublishPeerUpdateForClient("", &extclient, true); err != nil {
 			logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error())
 		}
 		if err = mq.PublishDeleteExtClientDNS(&extclient); err != nil {

+ 3 - 8
controllers/hosts.go

@@ -107,7 +107,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
 		logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error())
 	}
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
+		if err := mq.PublishPeerUpdateForHost("", newHost, nil, nil); err != nil {
 			logger.Log(0, "fail to publish peer update: ", err.Error())
 		}
 		if newHost.Name != currHost.Name {
@@ -289,13 +289,8 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(fmt.Errorf("failed to delete node"), "internal"))
 		return
 	}
-	// notify node change
-
-	runUpdates(node, false)
-	go func() { // notify of peer change
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
-		}
+	runUpdates(node, true, false) // notify node change
+	go func() {                   // notify of peer change
 		if err := mq.PublishDNSDelete(node, currHost); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())
 		}

+ 7 - 4
controllers/network.go

@@ -230,7 +230,8 @@ func updateNetwork(w http.ResponseWriter, r *http.Request) {
 			return
 		}
 		for _, node := range nodes {
-			runUpdates(&node, true)
+			node := node
+			runUpdates(&node, false, false)
 		}
 	}
 
@@ -278,10 +279,12 @@ func updateNetworkACL(w http.ResponseWriter, r *http.Request) {
 	}
 	logger.Log(1, r.Header.Get("user"), "updated ACLs for network", netname)
 
-	// send peer updates
 	if servercfg.IsMessageQueueBackend() {
-		if err = mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "failed to publish peer update after ACL update on", netname)
+		netNodes, err := logic.GetNetworkNodes(netname)
+		if err == nil && len(netNodes) > 0 {
+			if err = mq.PublishPeerUpdateForNode("", &netNodes[0], false); err != nil { // send peer updates
+				logger.Log(0, "failed to publish peer update after ACL update on", netname)
+			}
 		}
 	}
 	w.WriteHeader(http.StatusOK)

+ 18 - 37
controllers/node.go

@@ -452,10 +452,7 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "created egress gateway on node", gateway.NodeID, "on network", gateway.NetID)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	go func() {
-		mq.PublishPeerUpdate()
-	}()
-	runUpdates(&node, true)
+	runUpdates(&node, true, false)
 }
 
 // swagger:route DELETE /api/nodes/{network}/{nodeid}/deletegateway nodes deleteEgressGateway
@@ -488,10 +485,7 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "deleted egress gateway on node", nodeid, "on network", netid)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	go func() {
-		mq.PublishPeerUpdate()
-	}()
-	runUpdates(&node, true)
+	runUpdates(&node, true, false)
 }
 
 // == INGRESS ==
@@ -537,8 +531,7 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-
-	runUpdates(&node, true)
+	runUpdates(&node, true, false)
 }
 
 // swagger:route DELETE /api/nodes/{network}/{nodeid}/deleteingress nodes deleteIngressGateway
@@ -576,8 +569,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "deleted ingress gateway", nodeid)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-
-	runUpdates(&node, true)
+	runUpdates(&node, true, false)
 }
 
 // swagger:route PUT /api/nodes/{network}/{nodeid} nodes updateNode
@@ -667,7 +659,8 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 		updatenodes := logic.UpdateRelay(currentNode.Network, currentNode.RelayAddrs, newNode.RelayAddrs)
 		if len(updatenodes) > 0 {
 			for _, relayedNode := range updatenodes {
-				runUpdates(&relayedNode, false)
+				relayedNode := relayedNode
+				runUpdates(&relayedNode, false, false)
 			}
 		}
 	}
@@ -682,17 +675,12 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "updated node", currentNode.ID.String(), "on network", currentNode.Network)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	runUpdates(newNode, ifaceDelta)
-	go func(aclUpdate bool, newNode *models.Node) {
-		if aclUpdate {
-			if err := mq.PublishPeerUpdate(); err != nil {
-				logger.Log(0, "error during node ACL update for node", newNode.ID.String())
-			}
-		}
+	runUpdates(newNode, aclUpdate || ifaceDelta, false)
+	go func(newNode *models.Node) {
 		if err := mq.PublishReplaceDNS(&currentNode, newNode, host); err != nil {
 			logger.Log(1, "failed to publish dns update", err.Error())
 		}
-	}(aclUpdate, newNode)
+	}(newNode)
 }
 
 // swagger:route DELETE /api/nodes/{network}/{nodeid} nodes deleteNode
@@ -738,20 +726,9 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 	}
 	logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
-	if !fromNode { // notify node change
-		runUpdates(&node, false)
-	}
-	go func(deletedNode *models.Node, fromNode bool) { // notify of peer change
-		var err error
-		if fromNode {
-			err = mq.PublishDeletedNodePeerUpdate(deletedNode)
-		} else {
-			err = mq.PublishPeerUpdate()
-		}
-		if err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
-		}
 
+	runUpdates(&node, true, fromNode)
+	go func(deletedNode *models.Node) { // notify of peer change
 		host, err := logic.GetHost(node.HostID.String())
 		if err != nil {
 			logger.Log(1, "failed to retrieve host for node", node.ID.String(), err.Error())
@@ -759,12 +736,16 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 		if err := mq.PublishDNSDelete(&node, host); err != nil {
 			logger.Log(1, "error publishing dns update", err.Error())
 		}
-	}(&node, fromNode)
+	}(&node)
 }
 
-func runUpdates(node *models.Node, ifaceDelta bool) {
+func runUpdates(node *models.Node, peerUpdate, delete bool) {
 	go func() { // don't block http response
-		// publish node update if not server
+		if peerUpdate {
+			if err := mq.PublishPeerUpdateForNode("", node, delete); err != nil {
+				logger.Log(0, "failed to update peers of node", node.ID.String(), err.Error())
+			}
+		}
 		if err := mq.NodeUpdate(node); err != nil {
 			logger.Log(1, "error publishing node update to node", node.ID.String(), err.Error())
 		}

+ 9 - 10
controllers/relay.go

@@ -45,7 +45,6 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 
 	logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID)
 	for _, relayedNode := range updatenodes {
-
 		err = mq.NodeUpdate(&relayedNode)
 		if err != nil {
 			logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
@@ -55,7 +54,7 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 	apiNode := node.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	runUpdates(&node, true)
+	runUpdates(&node, true, false)
 }
 
 // swagger:route DELETE /api/nodes/{network}/{nodeid}/deleterelay nodes deleteRelay
@@ -90,7 +89,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	apiNode := node.ConvertToAPINode()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	runUpdates(&node, true)
+	runUpdates(&node, true, false)
 }
 
 // swagger:route POST /api/hosts/{hostid}/relay hosts createHostRelay
@@ -130,9 +129,10 @@ func createHostRelay(w http.ResponseWriter, r *http.Request) {
 		logger.Log(0, "failed to send host update: ", relayHost.ID.String(), err.Error())
 	}
 	logger.Log(1, r.Header.Get("user"), "created relay on host", relay.HostID)
-	go func(relayHostID string) {
-		relatedhosts := logic.GetRelatedHosts(relayHostID)
+	go func(relayHost *models.Host) {
+		relatedhosts := logic.GetRelatedHosts(relayHost.ID.String())
 		for _, relatedHost := range relatedhosts {
+			relatedHost := relatedHost
 			relatedHost.ProxyEnabled = true
 			logic.UpsertHost(&relatedHost)
 			if err := mq.HostUpdate(&models.HostUpdate{
@@ -142,11 +142,10 @@ func createHostRelay(w http.ResponseWriter, r *http.Request) {
 				logger.Log(0, "failed to send host update: ", relatedHost.ID.String(), err.Error())
 			}
 		}
-		if err := mq.PublishPeerUpdate(); err != nil {
+		if err := mq.PublishPeerUpdateForHost("", relayHost, nil, nil); err != nil {
 			logger.Log(0, "fail to publish peer update: ", err.Error())
 		}
-
-	}(relay.HostID)
+	}(relayHost)
 
 	apiHostData := relayHost.ConvertNMHostToAPI()
 	w.WriteHeader(http.StatusOK)
@@ -176,8 +175,8 @@ func deleteHostRelay(w http.ResponseWriter, r *http.Request) {
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay host", hostid)
 	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "fail to publish peer update: ", err.Error())
+		if err := mq.PublishPeerUpdateForHost("", relayHost, nil, nil); err != nil {
+			logger.Log(0, "failed to update peers after relay delete:", err.Error())
 		}
 	}()
 	apiHostData := relayHost.ConvertNMHostToAPI()

+ 6 - 10
mq/handlers.go

@@ -1,7 +1,6 @@
 package mq
 
 import (
-	"context"
 	"encoding/json"
 	"fmt"
 	"math"
@@ -146,7 +145,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 		return
 	}
 	if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
-		if err = PublishPeerUpdate(); err != nil {
+		if err = PublishPeerUpdateForNode("", &newNode, false); err != nil {
 			logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
 		}
 	}
@@ -186,7 +185,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
 				return
 			} else {
-				if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil, nil); err != nil {
+				if err = PublishPeerUpdateForHost("", currentHost, nil, nil); err != nil {
 					logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
 					return
 				}
@@ -223,7 +222,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	}
 
 	if sendPeerUpdate {
-		err := PublishPeerUpdate()
+		err := PublishPeerUpdateForHost("", &hostUpdate.Host, nil, nil)
 		if err != nil {
 			logger.Log(0, "failed to pulish peer update: ", err.Error())
 		}
@@ -282,11 +281,8 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 
 		if shouldUpdate {
 			logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
-			host, err := logic.GetHost(currentNode.HostID.String())
-			if err == nil {
-				if err = PublishSingleHostPeerUpdate(context.Background(), host, nil, nil); err != nil {
-					logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
-				}
+			if err = PublishPeerUpdateForNode("", &currentNode, false); err != nil {
+				logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
 			}
 		}
 
@@ -315,7 +311,7 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
 	case ncutils.ACK:
 		// do we still need this
 	case ncutils.DONE:
-		if err = PublishPeerUpdate(); err != nil {
+		if err = PublishPeerUpdateForNode("", &currentNode, false); err != nil {
 			logger.Log(1, "error publishing peer update for node", currentNode.ID.String(), err.Error())
 			return
 		}

+ 26 - 95
mq/publishers.go

@@ -1,7 +1,6 @@
 package mq
 
 import (
-	"context"
 	"encoding/json"
 	"errors"
 	"fmt"
@@ -13,74 +12,6 @@ import (
 	"github.com/gravitl/netmaker/servercfg"
 )
 
-// PublishPeerUpdate --- determines and publishes a peer update to all the hosts
-func PublishPeerUpdate() error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	// Get nodes of host that was updated
-	// Go through all affected hosts
-	// publish peer update with single peer
-	logic.ResetPeerUpdateContext()
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
-// PublishDeletedNodePeerUpdate --- determines and publishes a peer update
-// to all the hosts with a deleted node to account for
-func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	logic.ResetPeerUpdateContext()
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, delNode, nil); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
-// PublishDeletedClientPeerUpdate --- determines and publishes a peer update
-// to all the hosts with a deleted ext client to account for
-func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
-	if !servercfg.IsMessageQueueBackend() {
-		return nil
-	}
-
-	hosts, err := logic.GetAllHosts()
-	if err != nil {
-		logger.Log(1, "err getting all hosts", err.Error())
-		return err
-	}
-	logic.ResetPeerUpdateContext()
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, delClient); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-		}
-	}
-	return err
-}
-
 // PublishPeerUpdateForClient - publishes a peer update to affected host on behalf of a client's Host
 func PublishPeerUpdateForClient(network string, c *models.ExtClient, deleted bool) error {
 	h := logic.GetHostByNodeID(c.IngressGatewayID)
@@ -152,31 +83,31 @@ func PublishPeerUpdateForHost(network string, updatedHost *models.Host, deletedN
 	return nil
 }
 
-// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClient *models.ExtClient) error {
-
-	peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClient)
-	if err != nil {
-		return err
-	}
-	if len(peerUpdate.Peers) == 0 { // no peers to send
-		return nil
-	}
-	if host.ProxyEnabled {
-		proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host)
-		if err != nil {
-			return err
-		}
-		proxyUpdate.Action = models.ProxyUpdate
-		peerUpdate.ProxyUpdate = proxyUpdate
-	}
-
-	data, err := json.Marshal(&peerUpdate)
-	if err != nil {
-		return err
-	}
-	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
-}
+// // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
+// func PublishSingleHostPeerUpdate(ctx context.Context, host *models.Host, deletedNode *models.Node, deletedClient *models.ExtClient) error {
+
+// 	peerUpdate, err := logic.GetPeerUpdateForHost(ctx, "", host, deletedNode, deletedClient)
+// 	if err != nil {
+// 		return err
+// 	}
+// 	if len(peerUpdate.Peers) == 0 { // no peers to send
+// 		return nil
+// 	}
+// 	if host.ProxyEnabled {
+// 		proxyUpdate, err := logic.GetProxyUpdateForHost(ctx, host)
+// 		if err != nil {
+// 			return err
+// 		}
+// 		proxyUpdate.Action = models.ProxyUpdate
+// 		peerUpdate.ProxyUpdate = proxyUpdate
+// 	}
+
+// 	data, err := json.Marshal(&peerUpdate)
+// 	if err != nil {
+// 		return err
+// 	}
+// 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
+// }
 
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
@@ -525,7 +456,7 @@ func sendPeers() {
 		for _, host := range hosts {
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			if err = PublishSingleHostPeerUpdate(logic.PeerUpdateCtx, &host, nil, nil); err != nil {
+			if err = PublishPeerUpdateForHost("", &host, nil, nil); err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}
 		}