Răsfoiți Sursa

Merge branch 'develop' into NET-612

Matthew R Kasun 1 an în urmă
părinte
comite
6f556fabf3
6 a modificat fișierele cu 51 adăugiri și 46 ștergeri
  1. 0 10
      controllers/ext_client.go
  2. 6 3
      controllers/hosts.go
  3. 5 1
      controllers/migrate.go
  4. 29 20
      controllers/node.go
  5. 1 10
      mq/handlers.go
  6. 10 2
      pro/logic/relays.go

+ 0 - 10
controllers/ext_client.go

@@ -90,16 +90,6 @@ func getAllExtClients(w http.ResponseWriter, r *http.Request) {
 
 	w.Header().Set("Content-Type", "application/json")
 
-	headerNetworks := r.Header.Get("networks")
-	networksSlice := []string{}
-	marshalErr := json.Unmarshal([]byte(headerNetworks), &networksSlice)
-	if marshalErr != nil {
-		slog.Error("error unmarshalling networks", "error", marshalErr.Error())
-		logic.ReturnErrorResponse(w, r, logic.FormatError(marshalErr, "internal"))
-		return
-	}
-
-	var err error
 	clients, err := logic.GetAllExtClients()
 	if err != nil && !database.IsEmptyRecord(err) {
 		logger.Log(0, "failed to get all extclients: ", err.Error())

+ 6 - 3
controllers/hosts.go

@@ -342,9 +342,12 @@ func deleteHostFromNetwork(w http.ResponseWriter, r *http.Request) {
 	}
 	node.Action = models.NODE_DELETE
 	node.PendingDelete = true
-	// notify node change
-	mq.RunUpdates(node, false)
-	go func() { // notify of peer change
+	go func() {
+		// notify node change
+		if err := mq.NodeUpdate(node); err != nil {
+			slog.Error("error publishing node update to node", "node", node.ID, "error", err)
+		}
+		// notify of peer change
 		err = mq.PublishDeletedNodePeerUpdate(node)
 		if err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())

+ 5 - 1
controllers/migrate.go

@@ -123,7 +123,11 @@ func migrate(w http.ResponseWriter, r *http.Request) {
 			if err != nil {
 				logger.Log(0, "error creating ingress gateway for node", node.ID, err.Error())
 			}
-			mq.RunUpdates(&ingressNode, true)
+			go func() {
+				if err := mq.NodeUpdate(&ingressNode); err != nil {
+					slog.Error("error publishing node update to node", "node", ingressNode.ID, "error", err)
+				}
+			}()
 		}
 	}
 }

+ 29 - 20
controllers/node.go

@@ -440,9 +440,11 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
 	go func() {
+		if err := mq.NodeUpdate(&node); err != nil {
+			slog.Error("error publishing node update to node", "node", node.ID, "error", err)
+		}
 		mq.PublishPeerUpdate()
 	}()
-	mq.RunUpdates(&node, true)
 }
 
 // swagger:route DELETE /api/nodes/{network}/{nodeid}/deletegateway nodes deleteEgressGateway
@@ -481,9 +483,11 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
 	go func() {
+		if err := mq.NodeUpdate(&node); err != nil {
+			slog.Error("error publishing node update to node", "node", node.ID, "error", err)
+		}
 		mq.PublishPeerUpdate()
 	}()
-	mq.RunUpdates(&node, true)
 }
 
 // == INGRESS ==
@@ -530,8 +534,11 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-
-	mq.RunUpdates(&node, true)
+	go func() {
+		if err := mq.NodeUpdate(&node); err != nil {
+			slog.Error("error publishing node update to node", "node", node.ID, "error", err)
+		}
+	}()
 }
 
 // swagger:route DELETE /api/nodes/{network}/{nodeid}/deleteingress nodes deleteIngressGateway
@@ -582,16 +589,16 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 			if err != nil {
 				return
 			}
-			go mq.PublishSingleHostPeerUpdate(
-				host,
-				allNodes,
-				nil,
-				removedClients[:],
-			)
+			go func() {
+				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:]); err != nil {
+					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
+				}
+				if err := mq.NodeUpdate(&node); err != nil {
+					slog.Error("error publishing node update to node", "node", node.ID, "error", err)
+				}
+			}()
 		}
 	}
-
-	mq.RunUpdates(&node, true)
 }
 
 // swagger:route PUT /api/nodes/{network}/{nodeid} nodes updateNode
@@ -660,9 +667,11 @@ func updateNode(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "updated node", currentNode.ID.String(), "on network", currentNode.Network)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	mq.RunUpdates(newNode, ifaceDelta)
 	go func(aclUpdate, relayupdate bool, newNode *models.Node) {
-		if aclUpdate || relayupdate {
+		if err := mq.NodeUpdate(newNode); err != nil {
+			slog.Error("error publishing node update to node", "node", newNode.ID, "error", err)
+		}
+		if aclUpdate || relayupdate || ifaceDelta {
 			if err := mq.PublishPeerUpdate(); err != nil {
 				logger.Log(0, "error during node ACL update for node", newNode.ID.String())
 			}
@@ -735,13 +744,13 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 
 	logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])
-	if !fromNode { // notify node change
-		mq.RunUpdates(&node, false)
-	}
 	go func() { // notify of peer change
-		var err error
-		err = mq.PublishDeletedNodePeerUpdate(&node)
-		if err != nil {
+		if !fromNode {
+			if err := mq.NodeUpdate(&node); err != nil {
+				slog.Error("error publishing node update to node", "node", node.ID, "error", err)
+			}
+		}
+		if err := mq.PublishDeletedNodePeerUpdate(&node); err != nil {
 			logger.Log(1, "error publishing peer update ", err.Error())
 		}
 		host, err := logic.GetHost(node.HostID.String())

+ 1 - 10
mq/handlers.go

@@ -3,10 +3,10 @@ package mq
 import (
 	"encoding/json"
 	"fmt"
+
 	mqtt "github.com/eclipse/paho.mqtt.golang"
 	"github.com/google/uuid"
 	"github.com/gravitl/netmaker/database"
-	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic/hostactions"
 	"github.com/gravitl/netmaker/models"
@@ -20,15 +20,6 @@ import (
 var UpdateMetrics = func(client mqtt.Client, msg mqtt.Message) {
 }
 
-func RunUpdates(node *models.Node, ifaceDelta bool) {
-	go func() { // don't block http response
-		// publish node update if not server
-		if err := NodeUpdate(node); err != nil {
-			logger.Log(1, "error publishing node update to node", node.ID.String(), err.Error())
-		}
-	}()
-}
-
 // DefaultHandler default message queue handler  -- NOT USED
 func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
 	slog.Info("mqtt default handler", "topic", msg.Topic(), "message", msg.Payload())

+ 10 - 2
pro/logic/relays.go

@@ -3,13 +3,15 @@ package logic
 import (
 	"errors"
 	"fmt"
+	"net"
+
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/logic/acls/nodeacls"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/servercfg"
-	"net"
+	"golang.org/x/exp/slog"
 )
 
 // CreateRelay - creates a relay
@@ -136,7 +138,13 @@ func UpdateRelayed(currentNode, newNode *models.Node) {
 	updatenodes := updateRelayNodes(currentNode.ID.String(), currentNode.RelayedNodes, newNode.RelayedNodes)
 	if len(updatenodes) > 0 {
 		for _, relayedNode := range updatenodes {
-			mq.RunUpdates(&relayedNode, false)
+			node := relayedNode
+			go func() {
+				if err := mq.NodeUpdate(&node); err != nil {
+					slog.Error("error publishing node update to node", "node", node.ID, "error", err)
+				}
+
+			}()
 		}
 	}
 }