Jelajahi Sumber

began switching controller funcs to call queue

0xdcarns 2 tahun lalu
induk
melakukan
91f8d4ccd9

+ 3 - 0
controllers/dns.go

@@ -11,6 +11,7 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
+	"github.com/gravitl/netmaker/queue"
 	"github.com/gravitl/netmaker/servercfg"
 )
 
@@ -179,6 +180,8 @@ func createDNS(w http.ResponseWriter, r *http.Request) {
 		if err = mq.PublishPeerUpdate(); err != nil {
 			logger.Log(0, "failed to publish peer update after ACL update on", entry.Network)
 		}
+	} else {
+		queue.PublishAllPeerUpdate()
 	}
 	logger.Log(2, r.Header.Get("user"),
 		fmt.Sprintf("DNS entry is set: %+v", entry))

+ 21 - 8
controllers/ext_client.go

@@ -16,6 +16,8 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models/promodels"
 	"github.com/gravitl/netmaker/mq"
+	"github.com/gravitl/netmaker/queue"
+	"github.com/gravitl/netmaker/servercfg"
 	"github.com/skip2/go-qrcode"
 )
 
@@ -388,9 +390,13 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
-	err = mq.PublishPeerUpdate()
-	if err != nil {
-		logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
+	if servercfg.IsMessageQueueBackend() {
+		err = mq.PublishPeerUpdate()
+		if err != nil {
+			logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
+		}
+	} else {
+		queue.PublishAllPeerUpdate()
 	}
 }
 
@@ -483,8 +489,12 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "updated ext client", newExtClient.ClientID)
 	if changedEnabled { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
-			if err = mq.PublishPeerUpdate(); err != nil {
-				logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
+			if servercfg.IsMessageQueueBackend() {
+				if err = mq.PublishPeerUpdate(); err != nil {
+					logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
+				}
+			} else {
+				queue.PublishAllPeerUpdate()
 			}
 		}
 	}
@@ -554,9 +564,12 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	err = mq.PublishPeerUpdate()
-	if err != nil {
-		logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error())
+	if servercfg.IsMessageQueueBackend() {
+		if err = mq.PublishPeerUpdate(); err != nil {
+			logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error())
+		}
+	} else {
+		queue.PublishAllPeerUpdate()
 	}
 
 	logger.Log(0, r.Header.Get("user"),

+ 41 - 15
controllers/hosts.go

@@ -12,6 +12,8 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
+	"github.com/gravitl/netmaker/queue"
+	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/crypto/bcrypt"
 )
 
@@ -98,18 +100,27 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
 		logic.UpdateHostRelay(currHost.ID.String(), currHost.RelayedHosts, newHost.RelayedHosts)
 	}
 
-	// publish host update through MQ
-	if err := mq.HostUpdate(&models.HostUpdate{
+	hostUpdate := &models.HostUpdate{
 		Action: models.UpdateHost,
 		Host:   *newHost,
-	}); err != nil {
-		logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error())
 	}
-	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "fail to publish peer update: ", err.Error())
+	// publish host update through MQ
+	if servercfg.IsMessageQueueBackend() {
+		err = mq.HostUpdate(hostUpdate)
+		go func() {
+			if err := mq.PublishPeerUpdate(); err != nil {
+				logger.Log(0, "fail to publish peer update: ", err.Error())
+			}
+		}()
+	} else {
+		if err = queue.HostUpdate(hostUpdate); err != nil {
+			logger.Log(0, "failed to host update:", err.Error())
 		}
-	}()
+		queue.PublishAllPeerUpdate()
+	}
+	if err != nil {
+		logger.Log(0, r.Header.Get("user"), "failed to send host update: ", currHost.ID.String(), err.Error())
+	}
 
 	apiHostData := newHost.ConvertNMHostToAPI()
 	logger.Log(2, r.Header.Get("user"), "updated host", newHost.ID.String())
@@ -170,11 +181,18 @@ func deleteHost(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	if err = mq.HostUpdate(&models.HostUpdate{
+	hostUpdate := &models.HostUpdate{
 		Action: models.DeleteHost,
 		Host:   *currHost,
-	}); err != nil {
-		logger.Log(0, r.Header.Get("user"), "failed to send delete host update: ", currHost.ID.String(), err.Error())
+	}
+	if servercfg.IsMessageQueueBackend() {
+		if err = mq.HostUpdate(hostUpdate); err != nil {
+			logger.Log(0, r.Header.Get("user"), "failed to send delete host update: ", currHost.ID.String(), err.Error())
+		}
+	} else {
+		if err = queue.HostUpdate(hostUpdate); err != nil {
+			logger.Log(0, r.Header.Get("user"), "failed to send delete host update: ", currHost.ID.String(), err.Error())
+		}
 	}
 
 	if err = mq.DeleteMqClient(currHost.ID.String()); err != nil {
@@ -221,13 +239,21 @@ func addHostToNetwork(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
-	logger.Log(1, "added new node", newNode.ID.String(), "to host", currHost.Name)
-	if err = mq.HostUpdate(&models.HostUpdate{
+
+	hostUpdate := &models.HostUpdate{
 		Action: models.JoinHostToNetwork,
 		Host:   *currHost,
 		Node:   *newNode,
-	}); err != nil {
-		logger.Log(0, r.Header.Get("user"), "failed to update host to join network:", hostid, network, err.Error())
+	}
+	logger.Log(1, "added new node", newNode.ID.String(), "to host", currHost.Name)
+	if servercfg.IsMessageQueueBackend() {
+		if err = mq.HostUpdate(hostUpdate); err != nil {
+			logger.Log(0, r.Header.Get("user"), "failed to update host to join network:", hostid, network, err.Error())
+		}
+	} else {
+		if err = queue.HostUpdate(hostUpdate); err != nil {
+			logger.Log(0, r.Header.Get("user"), "failed to update host to join network:", hostid, network, err.Error())
+		}
 	}
 
 	logger.Log(2, r.Header.Get("user"), fmt.Sprintf("added host %s to network %s", currHost.Name, network))

+ 9 - 2
controllers/network.go

@@ -15,6 +15,7 @@ import (
 	"github.com/gravitl/netmaker/logic/acls"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
+	"github.com/gravitl/netmaker/queue"
 	"github.com/gravitl/netmaker/servercfg"
 )
 
@@ -147,8 +148,14 @@ func keyUpdate(w http.ResponseWriter, r *http.Request) {
 	}
 	for _, node := range nodes {
 		logger.Log(2, "updating node ", node.ID.String(), " for a key update")
-		if err = mq.NodeUpdate(&node); err != nil {
-			logger.Log(1, "failed to send update to node during a network wide key update", node.ID.String(), err.Error())
+		if servercfg.IsMessageQueueBackend() {
+			if err = mq.NodeUpdate(&node); err != nil {
+				logger.Log(1, "failed to send update to node during a network wide key update", node.ID.String(), err.Error())
+			}
+		} else {
+			if err = queue.NodeUpdate(&node); err != nil {
+				logger.Log(1, "failed to send update to node during a network wide key update", node.ID.String(), err.Error())
+			}
 		}
 	}
 }

+ 9 - 3
controllers/node.go

@@ -15,6 +15,7 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models/promodels"
 	"github.com/gravitl/netmaker/mq"
+	"github.com/gravitl/netmaker/queue"
 	"github.com/gravitl/netmaker/servercfg"
 	"golang.org/x/crypto/bcrypt"
 )
@@ -997,9 +998,14 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 
 func runUpdates(node *models.Node, ifaceDelta bool) {
 	go func() { // don't block http response
-		// publish node update if not server
-		if err := mq.NodeUpdate(node); err != nil {
-			logger.Log(1, "error publishing node update to node", node.ID.String(), err.Error())
+		if servercfg.IsMessageQueueBackend() {
+			if err := mq.NodeUpdate(node); err != nil {
+				logger.Log(1, "error publishing node update to node", node.ID.String(), err.Error())
+			}
+		} else {
+			if err := queue.NodeUpdate(node); err != nil {
+				logger.Log(1, "error publishing node update to node", node.ID.String(), err.Error())
+			}
 		}
 	}()
 }

+ 18 - 7
controllers/relay.go

@@ -10,6 +10,8 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
+	"github.com/gravitl/netmaker/queue"
+	"github.com/gravitl/netmaker/servercfg"
 )
 
 // swagger:route POST /api/nodes/{network}/{nodeid}/createrelay nodes createRelay
@@ -45,10 +47,14 @@ func createRelay(w http.ResponseWriter, r *http.Request) {
 
 	logger.Log(1, r.Header.Get("user"), "created relay on node", relay.NodeID, "on network", relay.NetID)
 	for _, relayedNode := range updatenodes {
-
-		err = mq.NodeUpdate(&relayedNode)
-		if err != nil {
-			logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
+		if servercfg.IsMessageQueueBackend() {
+			if err = mq.NodeUpdate(&relayedNode); err != nil {
+				logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
+			}
+		} else {
+			if err = queue.NodeUpdate(&relayedNode); err != nil {
+				logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", relay.NetID, ": ", err.Error())
+			}
 		}
 	}
 
@@ -82,9 +88,14 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay server", nodeid, "on network", netid)
 	for _, relayedNode := range updatenodes {
-		err = mq.NodeUpdate(&relayedNode)
-		if err != nil {
-			logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", netid, ": ", err.Error())
+		if servercfg.IsMessageQueueBackend() {
+			if err = mq.NodeUpdate(&relayedNode); err != nil {
+				logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", netid, ": ", err.Error())
+			}
+		} else {
+			if err = queue.NodeUpdate(&relayedNode); err != nil {
+				logger.Log(1, "error sending update to relayed node ", relayedNode.ID.String(), "on network", netid, ": ", err.Error())
+			}
 		}
 	}
 	apiNode := node.ConvertToAPINode()

+ 3 - 0
queue/handlers.go

@@ -17,6 +17,7 @@ import (
 var handlerFuncs map[int]func(*models.Event)
 
 // initializes the map of functions
+// "Send" functions are sent to clients, others affect server
 func initializeHandlers() {
 	handlerFuncs = make(map[int]func(*models.Event))
 	handlerFuncs[models.EventTopics.Test] = test
@@ -30,6 +31,8 @@ func initializeHandlers() {
 	handlerFuncs[models.EventTopics.SendNodeUpdate] = sendNodeUpdate
 }
 
+// == handler funcs ==
+
 func test(e *models.Event) {
 	val, ok := ConnMap.Load(e.ID)
 	if ok {

+ 1 - 1
queue/publishers.go

@@ -25,7 +25,7 @@ func PublishAllPeerUpdate() {
 func NodeUpdate(node *models.Node) error {
 	host, err := logic.GetHost(node.HostID.String())
 	if err != nil {
-		return nil
+		return err
 	}
 	event := models.Event{
 		ID:    host.ID.String(),