2
0
Эх сурвалжийг харах

added intialization and termination of queue

0xdcarns 2 жил өмнө
parent
commit
c7057cdaec
3 өөрчлөгдсөн 35 нэмэгдсэн , 19 устгасан
  1. 14 10
      main.go
  2. 6 0
      models/events.go
  3. 15 9
      queue/queue.go

+ 14 - 10
main.go

@@ -22,6 +22,7 @@ import (
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/mq"
 	"github.com/gravitl/netmaker/netclient/ncutils"
+	"github.com/gravitl/netmaker/queue"
 	"github.com/gravitl/netmaker/servercfg"
 	"github.com/gravitl/netmaker/serverctl"
 	stunserver "github.com/gravitl/netmaker/stun-server"
@@ -135,11 +136,9 @@ func startControllers() {
 		waitnetwork.Add(1)
 		go controller.HandleRESTRequests(&waitnetwork)
 	}
-	//Run MessageQueue
-	if servercfg.IsMessageQueueBackend() {
-		waitnetwork.Add(1)
-		go runMessageQueue(&waitnetwork)
-	}
+	// Run External or Internal MessageQueue
+	waitnetwork.Add(1)
+	go runMessageQueue(&waitnetwork)
 
 	if !servercfg.IsAgentBackend() && !servercfg.IsRestBackend() && !servercfg.IsMessageQueueBackend() {
 		logger.Log(0, "No Server Mode selected, so nothing is being served! Set Agent mode (AGENT_BACKEND) or Rest mode (REST_BACKEND) or MessageQueue (MESSAGEQUEUE_BACKEND) to 'true'.")
@@ -170,12 +169,17 @@ func startControllers() {
 // Should we be using a context vice a waitgroup????????????
 func runMessageQueue(wg *sync.WaitGroup) {
 	defer wg.Done()
-	brokerHost, secure := servercfg.GetMessageQueueEndpoint()
-	logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
-	mq.SetUpAdminClient()
-	mq.SetupMQTT()
 	ctx, cancel := context.WithCancel(context.Background())
-	go mq.Keepalive(ctx)
+
+	if servercfg.IsMessageQueueBackend() { // connect to external broker
+		brokerHost, secure := servercfg.GetMessageQueueEndpoint()
+		logger.Log(0, "connecting to mq broker at", brokerHost, "with TLS?", fmt.Sprintf("%v", secure))
+		mq.SetUpAdminClient()
+		mq.SetupMQTT()
+		go mq.Keepalive(ctx)
+	} else { // use internal queue system
+		queue.StartQueue(ctx)
+	}
 	go func() {
 		peerUpdate := make(chan *models.Node)
 		go logic.ManageZombies(ctx, peerUpdate)

+ 6 - 0
models/events.go

@@ -18,3 +18,9 @@ type Event struct {
 type Test struct {
 	Data string `json:"data"`
 }
+
+// == TOPICS ==
+const (
+	// Event_TestTopic - the topic for a test event
+	Event_TestTopic = "test"
+)

+ 15 - 9
queue/queue.go

@@ -2,6 +2,7 @@ package queue
 
 import (
 	"context"
+	"errors"
 	"fmt"
 
 	"github.com/enriquebris/goconcurrentqueue"
@@ -22,16 +23,19 @@ func StartQueue(ctx context.Context) {
 		for {
 			msg, err := EventQueue.DequeueOrWaitForNextElementContext(ctx)
 			if err != nil { // handle dequeue error
-				logger.Log(0, "error when dequeuing event -", err.Error())
-				continue
-			} else { // handle event
-				event := msg.(models.Event)
-				switch event.Topic {
-				case "test":
-					fmt.Printf("received test topic event %+v \n", event)
-				default:
-					fmt.Printf("topic unknown\n")
+				if errors.Is(err, context.Canceled) {
+					logger.Log(0, "queue shutdown successfully")
+					break
 				}
+				logger.Log(0, "error dequeuing event -", err.Error())
+				continue
+			}
+			event := msg.(models.Event)
+			switch event.Topic {
+			case "test":
+				fmt.Printf("received test topic event %+v \n", event)
+			default:
+				fmt.Printf("topic unknown\n")
 			}
 			logger.Log(0, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
 		}
@@ -42,8 +46,10 @@ func StartQueue(ctx context.Context) {
 func initQueue() {
 	size := servercfg.GetQueueSize()
 	if size > 0 {
+		logger.Log(0, "started queue with fixed allocation -", fmt.Sprintf("%d", size))
 		EventQueue = goconcurrentqueue.NewFixedFIFO(size)
 	} else {
+		logger.Log(0, "started queue with dynamic allocation")
 		EventQueue = goconcurrentqueue.NewFIFO()
 	}
 }