Преглед изворни кода

initial commit, init queue + handler

0xdcarns пре 2 година
родитељ
комит
d41df17039
6 измењених фајлова са 110 додато и 0 уклоњено
  1. 1 0
      controllers/controller.go
  2. 48 0
      controllers/update.go
  3. 1 0
      go.mod
  4. 2 0
      go.sum
  5. 20 0
      models/events.go
  6. 38 0
      queue/queue.go

+ 1 - 0
controllers/controller.go

@@ -29,6 +29,7 @@ var HttpHandlers = []interface{}{
 	ipHandlers,
 	loggerHandlers,
 	hostHandlers,
+	updateHandlers,
 }
 
 // HandleRESTRequests - handles the rest requests

+ 48 - 0
controllers/update.go

@@ -0,0 +1,48 @@
+package controller
+
+import (
+	"encoding/json"
+	"fmt"
+	"log"
+	"net/http"
+
+	"github.com/gorilla/mux"
+	"github.com/gorilla/websocket"
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/queue"
+)
+
+var updateUpgrader = websocket.Upgrader{} // use default options
+
+func updateHandlers(r *mux.Router) {
+	r.HandleFunc("/api/v1/update", http.HandlerFunc(handleUpdate)).Methods(http.MethodGet)
+}
+
+func handleUpdate(w http.ResponseWriter, r *http.Request) {
+	c, err := updateUpgrader.Upgrade(w, r, nil)
+	if err != nil {
+		logger.Log(0,
+			fmt.Sprintf("error occurred starting update ws for a client [%v]", err))
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
+		return
+	}
+	defer c.Close()
+	for {
+		_, msg, err := c.ReadMessage()
+		if err != nil {
+			log.Println("read:", err)
+			break
+		}
+		var event models.Event
+		err = json.Unmarshal(msg, &event)
+		if err != nil {
+			log.Printf("error unmarshalling json! %v\n", err)
+			continue
+		}
+		event.Conn = c
+		fmt.Printf("got event: %+v \n", event)
+		queue.EventQueue.Enqueue(event)
+	}
+}

+ 1 - 0
go.mod

@@ -50,6 +50,7 @@ require (
 
 require (
 	cloud.google.com/go/compute/metadata v0.2.1 // indirect
+	github.com/enriquebris/goconcurrentqueue v0.7.0 // indirect
 	github.com/go-jose/go-jose/v3 v3.0.0 // indirect
 	github.com/inconshreveable/mousetrap v1.0.1 // indirect
 	github.com/rivo/uniseg v0.2.0 // indirect

+ 2 - 0
go.sum

@@ -23,6 +23,8 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
 github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
 github.com/eclipse/paho.mqtt.golang v1.4.2 h1:66wOzfUHSSI1zamx7jR6yMEI5EuHnT1G6rNA5PM12m4=
 github.com/eclipse/paho.mqtt.golang v1.4.2/go.mod h1:JGt0RsEwEX+Xa/agj90YJ9d9DH2b7upDZMK9HRbFvCA=
+github.com/enriquebris/goconcurrentqueue v0.7.0 h1:JYrDa45N3xo3Sr9mjvlRaWiBHvBEJIhAdLXO3VGVghA=
+github.com/enriquebris/goconcurrentqueue v0.7.0/go.mod h1:OZ+KC2BcRYzjg0vgoUs1GFqdAjkD9mz2Ots7Jbm1yS4=
 github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=
 github.com/felixge/httpsnoop v1.0.3 h1:s/nj+GCswXYzN5v2DpNMuMQYe+0DDwt5WVCU6CWBdXk=
 github.com/felixge/httpsnoop v1.0.3/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U=

+ 20 - 0
models/events.go

@@ -0,0 +1,20 @@
+package models
+
+import "github.com/gorilla/websocket"
+
+// Event - holds info about messages to be used by different handlers
+type Event struct {
+	ID      string `json:"id"`
+	Topic   string `json:"topic"`
+	Payload struct {
+		*Host `json:"host,omitempty"`
+		*Node `json:"odd,omitempty"`
+		*Test `json:"test,omitempty"`
+	} `json:"payload"`
+	Conn *websocket.Conn `json:"conn"`
+}
+
+// Test - used for testing the handlers
+type Test struct {
+	Data string `json:"data"`
+}

+ 38 - 0
queue/queue.go

@@ -0,0 +1,38 @@
+package queue
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/enriquebris/goconcurrentqueue"
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/models"
+)
+
+// EventQueue - responsible for queueing and handling events sent to the server
+var EventQueue goconcurrentqueue.Queue
+
+// StartQueue - starts the queue and listens for messages
+func StartQueue(ctx context.Context) {
+
+	EventQueue = goconcurrentqueue.NewFIFO()
+	go func(ctx context.Context) {
+		logger.Log(2, "initialized queue service!")
+		for {
+			msg, err := EventQueue.DequeueOrWaitForNextElementContext(ctx)
+			if err != nil { // handle dequeue error
+				logger.Log(0, "error when dequeuing event -", err.Error())
+				continue
+			} else { // handle event
+				event := msg.(models.Event)
+				switch event.Topic {
+				case "test":
+					fmt.Printf("received test topic event %+v \n", event)
+				default:
+					fmt.Printf("topic unknown\n")
+				}
+			}
+			logger.Log(0, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
+		}
+	}(ctx)
+}