ソースを参照

added default use internal queue

0xdcarns 2 年 前
コミット
783801d58f
4 ファイル変更59 行追加33 行削除
  1. 35 16
      controllers/node.go
  2. 16 8
      controllers/relay.go
  3. 0 1
      main.go
  4. 8 8
      servercfg/serverconf.go

+ 35 - 16
controllers/node.go

@@ -654,11 +654,16 @@ func createNode(w http.ResponseWriter, r *http.Request) {
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(response)
 
-	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(1, "failed a peer update after creation of node", data.Host.Name)
-		}
-	}()
+	if servercfg.IsMessageQueueBackend() {
+		go func() {
+			if err := mq.PublishPeerUpdate(); err != nil {
+				logger.Log(1, "failed a peer update after creation of node", data.Host.Name)
+			}
+		}()
+	} else {
+		queue.PublishAllPeerUpdate()
+	}
+
 	//runForceServerUpdate(&data.Node, true)
 }
 
@@ -700,9 +705,14 @@ func createEgressGateway(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "created egress gateway on node", gateway.NodeID, "on network", gateway.NetID)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	go func() {
-		mq.PublishPeerUpdate()
-	}()
+	if servercfg.IsMessageQueueBackend() {
+		go func() {
+			mq.PublishPeerUpdate()
+		}()
+	} else {
+		queue.PublishAllPeerUpdate()
+	}
+
 	runUpdates(&node, true)
 }
 
@@ -736,9 +746,14 @@ func deleteEgressGateway(w http.ResponseWriter, r *http.Request) {
 	logger.Log(1, r.Header.Get("user"), "deleted egress gateway on node", nodeid, "on network", netid)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiNode)
-	go func() {
-		mq.PublishPeerUpdate()
-	}()
+	if servercfg.IsMessageQueueBackend() {
+		go func() {
+			mq.PublishPeerUpdate()
+		}()
+	} else {
+		queue.PublishAllPeerUpdate()
+	}
+
 	runUpdates(&node, true)
 }
 
@@ -975,11 +990,15 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 	if !fromNode { // notify node change
 		runUpdates(&node, false)
 	}
-	go func() { // notify of peer change
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(1, "error publishing peer update ", err.Error())
-		}
-	}()
+	if servercfg.IsMessageQueueBackend() {
+		go func() { // notify of peer change
+			if err := mq.PublishPeerUpdate(); err != nil {
+				logger.Log(1, "error publishing peer update ", err.Error())
+			}
+		}()
+	} else {
+		queue.PublishAllPeerUpdate()
+	}
 }
 
 func runUpdates(node *models.Node, ifaceDelta bool) {

+ 16 - 8
controllers/relay.go

@@ -141,10 +141,13 @@ func createHostRelay(w http.ResponseWriter, r *http.Request) {
 			relatedHost.ProxyEnabled = true
 			logic.UpsertHost(&relatedHost)
 		}
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "fail to publish peer update: ", err.Error())
+		if servercfg.IsMessageQueueBackend() {
+			if err := mq.PublishPeerUpdate(); err != nil {
+				logger.Log(0, "fail to publish peer update: ", err.Error())
+			}
+		} else {
+			queue.PublishAllPeerUpdate()
 		}
-
 	}(relay.HostID)
 
 	apiHostData := relayHost.ConvertNMHostToAPI()
@@ -174,11 +177,16 @@ func deleteHostRelay(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 	logger.Log(1, r.Header.Get("user"), "deleted relay host", hostid)
-	go func() {
-		if err := mq.PublishPeerUpdate(); err != nil {
-			logger.Log(0, "fail to publish peer update: ", err.Error())
-		}
-	}()
+	if servercfg.IsMessageQueueBackend() {
+		go func() {
+			if err := mq.PublishPeerUpdate(); err != nil {
+				logger.Log(0, "fail to publish peer update: ", err.Error())
+			}
+		}()
+	} else {
+		queue.PublishAllPeerUpdate()
+	}
+
 	apiHostData := relayHost.ConvertNMHostToAPI()
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(apiHostData)

+ 0 - 1
main.go

@@ -172,7 +172,6 @@ func runMessageQueue(wg *sync.WaitGroup) {
 	if servercfg.IsMessageQueueBackend() { // connect to external broker
 		brokerHost, secure := servercfg.GetMessageQueueEndpoint()
 		logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
-		mq.SetUpAdminClient()
 		mq.SetupMQTT()
 		go mq.Keepalive(ctx)
 	} else { // use internal queue system

+ 8 - 8
servercfg/serverconf.go

@@ -332,14 +332,14 @@ func IsAgentBackend() bool {
 // IsMessageQueueBackend - checks if message queue is on or off
 func IsMessageQueueBackend() bool {
 	ismessagequeue := true
-	if os.Getenv("MESSAGEQUEUE_BACKEND") != "" {
-		if os.Getenv("MESSAGEQUEUE_BACKEND") == "off" {
-			ismessagequeue = false
-		}
-	} else if config.Config.Server.MessageQueueBackend != "" {
-		if config.Config.Server.MessageQueueBackend == "off" {
-			ismessagequeue = false
-		}
+	if len(os.Getenv("MESSAGEQUEUE_BACKEND")) == 0 ||
+		os.Getenv("MESSAGEQUEUE_BACKEND") == "off" ||
+		os.Getenv("MESSAGEQUEUE_BACKEND") == "internal" {
+		ismessagequeue = false
+	} else if len(config.Config.Server.MessageQueueBackend) == 0 ||
+		config.Config.Server.MessageQueueBackend == "off" ||
+		os.Getenv("MESSAGEQUEUE_BACKEND") == "internal" {
+		ismessagequeue = false
 	}
 	return ismessagequeue
 }