Selaa lähdekoodia

added syncmap for storing conns/responses

0xdcarns 2 vuotta sitten
vanhempi
commit
19637dcbd4
5 muutettua tiedostoa jossa 44 lisäystä ja 20 poistoa
  1. 4 2
      controllers/node.go
  2. 12 5
      controllers/update.go
  3. 14 8
      models/events.go
  4. 6 0
      queue/conns.go
  5. 8 5
      queue/queue.go

+ 4 - 2
controllers/node.go

@@ -19,6 +19,8 @@ import (
 	"golang.org/x/crypto/bcrypt"
 )
 
+var hostID = "host-id"
+
 func nodeHandlers(r *mux.Router) {
 
 	r.HandleFunc("/api/nodes", authorize(false, false, "user", http.HandlerFunc(getAllNodes))).Methods(http.MethodGet)
@@ -234,8 +236,8 @@ func authorize(nodesAllowed, networkCheck bool, authNetwork string, next http.Ha
 			//check if node instead of user
 			if nodesAllowed {
 				// TODO --- should ensure that node is only operating on itself
-				if _, _, _, err := logic.VerifyToken(authToken); err == nil {
-
+				if id, _, _, err := logic.VerifyToken(authToken); err == nil {
+					r.Header.Add(hostID, id)
 					// this indicates request is from a node
 					// used for failover - if a getNode comes from node, this will trigger a metrics wipe
 					next.ServeHTTP(w, r)

+ 12 - 5
controllers/update.go

@@ -7,27 +7,30 @@ import (
 	"net/http"
 
 	"github.com/gorilla/mux"
-	"github.com/gorilla/websocket"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/queue"
 )
 
-var updateUpgrader = websocket.Upgrader{} // use default options
-
 func updateHandlers(r *mux.Router) {
 	r.HandleFunc("/api/v1/update", http.HandlerFunc(handleUpdate)).Methods(http.MethodGet)
 }
 
 func handleUpdate(w http.ResponseWriter, r *http.Request) {
-	c, err := updateUpgrader.Upgrade(w, r, nil)
+	c, err := upgrader.Upgrade(w, r, nil)
 	if err != nil {
 		logger.Log(0,
 			fmt.Sprintf("error occurred starting update ws for a client [%v]", err))
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
+	if len(r.Header.Get(hostID)) > 0 {
+		queue.ConnMap.Store(r.Header.Get(hostID), c)
+	} else {
+		queue.ConnMap.Store("test", c)
+	}
+	// load the connection address for reference later
 	defer c.Close()
 	for {
 		_, msg, err := c.ReadMessage()
@@ -41,8 +44,12 @@ func handleUpdate(w http.ResponseWriter, r *http.Request) {
 			log.Printf("error unmarshalling json! %v\n", err)
 			continue
 		}
-		event.Conn = c
 		fmt.Printf("got event: %+v \n", event)
 		queue.EventQueue.Enqueue(event)
 	}
+	if len(r.Header.Get(hostID)) > 0 {
+		queue.ConnMap.Delete(r.Header.Get(hostID))
+	} else {
+		queue.ConnMap.Delete("test")
+	}
 }

+ 14 - 8
models/events.go

@@ -1,17 +1,14 @@
 package models
 
-import "github.com/gorilla/websocket"
-
 // Event - holds info about messages to be used by different handlers
 type Event struct {
 	ID      string `json:"id"`
-	Topic   string `json:"topic"`
+	Topic   int    `json:"topic"`
 	Payload struct {
 		*Host `json:"host,omitempty"`
 		*Node `json:"odd,omitempty"`
 		*Test `json:"test,omitempty"`
 	} `json:"payload"`
-	Conn *websocket.Conn `json:"conn"`
 }
 
 // Test - used for testing the handlers
@@ -20,7 +17,16 @@ type Test struct {
 }
 
 // == TOPICS ==
-const (
-	// Event_TestTopic - the topic for a test event
-	Event_TestTopic = "test"
-)
+
+// EventTopics - hold topic IDs for each type of possible event
+var EventTopics = struct {
+	Test       int
+	NodeUpdate int
+	HostUpdate int
+	PeerUpdate int
+}{
+	Test:       0,
+	NodeUpdate: 1,
+	HostUpdate: 2,
+	PeerUpdate: 3,
+}

+ 6 - 0
queue/conns.go

@@ -0,0 +1,6 @@
+package queue
+
+import "sync"
+
+// ConnMap - map for holding http/ws connections and responses
+var ConnMap sync.Map

+ 8 - 5
queue/queue.go

@@ -6,6 +6,7 @@ import (
 	"fmt"
 
 	"github.com/enriquebris/goconcurrentqueue"
+	"github.com/gorilla/websocket"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/servercfg"
@@ -19,7 +20,6 @@ func StartQueue(ctx context.Context) {
 	initQueue()
 
 	go func(ctx context.Context) {
-		logger.Log(2, "initialized queue service!")
 		for {
 			msg, err := EventQueue.DequeueOrWaitForNextElementContext(ctx)
 			if err != nil { // handle dequeue error
@@ -32,12 +32,15 @@ func StartQueue(ctx context.Context) {
 			}
 			event := msg.(models.Event)
 			switch event.Topic {
-			case "test":
-				fmt.Printf("received test topic event %+v \n", event)
+			case models.EventTopics.Test:
+				conn, ok := ConnMap.Load(event.ID)
+				if ok {
+					conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, []byte("success"))
+				}
 			default:
-				fmt.Printf("topic unknown\n")
+				logger.Log(0, fmt.Sprintf("received an unknown topic %d \n", event.Topic))
 			}
-			logger.Log(0, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
+			logger.Log(3, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
 		}
 	}(ctx)
 }