ソースを参照

added publishers

0xdcarns 2 年 前
コミット
2b6d735b81
6 ファイル変更128 行追加55 行削除
  1. 3 3
      controllers/ext_client.go
  2. 23 20
      models/events.go
  3. 1 1
      mq/handlers.go
  4. 4 11
      mq/publishers.go
  5. 18 20
      queue/handlers.go
  6. 79 0
      queue/publishers.go

+ 3 - 3
controllers/ext_client.go

@@ -388,7 +388,7 @@ func createExtClient(w http.ResponseWriter, r *http.Request) {
 
 	logger.Log(0, r.Header.Get("user"), "created new ext client on network", networkName)
 	w.WriteHeader(http.StatusOK)
-	err = mq.PublishExtPeerUpdate(&node)
+	err = mq.PublishPeerUpdate()
 	if err != nil {
 		logger.Log(1, "error setting ext peers on "+nodeid+": "+err.Error())
 	}
@@ -483,7 +483,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 	logger.Log(0, r.Header.Get("user"), "updated ext client", newExtClient.ClientID)
 	if changedEnabled { // need to send a peer update to the ingress node as enablement of one of it's clients has changed
 		if ingressNode, err := logic.GetNodeByID(newclient.IngressGatewayID); err == nil {
-			if err = mq.PublishExtPeerUpdate(&ingressNode); err != nil {
+			if err = mq.PublishPeerUpdate(); err != nil {
 				logger.Log(1, "error setting ext peers on", ingressNode.ID.String(), ":", err.Error())
 			}
 		}
@@ -554,7 +554,7 @@ func deleteExtClient(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	err = mq.PublishExtPeerUpdate(&ingressnode)
+	err = mq.PublishPeerUpdate()
 	if err != nil {
 		logger.Log(1, "error setting ext peers on "+ingressnode.ID.String()+": "+err.Error())
 	}

+ 23 - 20
models/events.go

@@ -5,12 +5,13 @@ type Event struct {
 	ID      string `json:"id"`
 	Topic   int    `json:"topic"`
 	Payload struct {
-		*HostUpdate  `json:"host,omitempty"`
-		*Node        `json:"node,omitempty"`
-		*Test        `json:"test,omitempty"`
-		*NodeCheckin `json:"node_checkin,omitempty"`
-		*Metrics     `json:"metrics,omitempty"`
-		Action       byte `json:"action"`
+		*HostUpdate     `json:"host,omitempty"`
+		*Node           `json:"node,omitempty"`
+		*Test           `json:"test,omitempty"`
+		*NodeCheckin    `json:"node_checkin,omitempty"`
+		*Metrics        `json:"metrics,omitempty"`
+		*HostPeerUpdate `json:"host_peer_update,omitempty"`
+		Action          byte `json:"action"`
 	} `json:"payload"`
 }
 
@@ -23,19 +24,21 @@ type Test struct {
 
 // EventTopics - hold topic IDs for each type of possible event
 var EventTopics = struct {
-	Test         int
-	NodeUpdate   int
-	HostUpdate   int
-	PeerUpdate   int
-	Ping         int
-	Metrics      int
-	ClientUpdate int
+	Test                  int
+	NodeUpdate            int
+	HostUpdate            int
+	Ping                  int
+	Metrics               int
+	ClientUpdate          int
+	SendAllHostPeerUpdate int
+	SendHostPeerUpdate    int
 }{
-	Test:         0,
-	NodeUpdate:   1,
-	HostUpdate:   2,
-	PeerUpdate:   3,
-	Ping:         4,
-	Metrics:      5,
-	ClientUpdate: 6,
+	Test:                  0,
+	NodeUpdate:            1,
+	HostUpdate:            2,
+	Ping:                  3,
+	Metrics:               4,
+	ClientUpdate:          5,
+	SendAllHostPeerUpdate: 6,
+	SendHostPeerUpdate:    7,
 }

+ 1 - 1
mq/handlers.go

@@ -227,7 +227,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 				logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
 				host, err := logic.GetHost(currentNode.HostID.String())
 				if err == nil {
-					if err = PublishSingleHostUpdate(host); err != nil {
+					if err = PublishSingleHostPeerUpdate(host); err != nil {
 						logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
 					}
 				}

+ 4 - 11
mq/publishers.go

@@ -25,7 +25,7 @@ func PublishPeerUpdate() error {
 	}
 	for _, host := range hosts {
 		host := host
-		err = PublishSingleHostUpdate(&host)
+		err = PublishSingleHostPeerUpdate(&host)
 		if err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
@@ -33,8 +33,8 @@ func PublishPeerUpdate() error {
 	return err
 }
 
-// PublishSingleHostUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostUpdate(host *models.Host) error {
+// PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
+func PublishSingleHostPeerUpdate(host *models.Host) error {
 
 	peerUpdate, err := logic.GetPeerUpdateForHost(host)
 	if err != nil {
@@ -56,13 +56,6 @@ func PublishSingleHostUpdate(host *models.Host) error {
 	return publish(host, fmt.Sprintf("peers/host/%s/%s", host.ID.String(), servercfg.GetServer()), data)
 }
 
-// PublishPeerUpdate --- publishes a peer update to all the peers of a node
-func PublishExtPeerUpdate(node *models.Node) error {
-
-	go PublishPeerUpdate()
-	return nil
-}
-
 // NodeUpdate -- publishes a node update
 func NodeUpdate(node *models.Node) error {
 	host, err := logic.GetHost(node.HostID.String())
@@ -137,7 +130,7 @@ func sendPeers() {
 		if force {
 			host := host
 			logger.Log(2, "sending scheduled peer update (5 min)")
-			err = PublishSingleHostUpdate(&host)
+			err = PublishSingleHostPeerUpdate(&host)
 			if err != nil {
 				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
 			}

+ 18 - 20
queue/handlers.go

@@ -19,12 +19,13 @@ var handlerFuncs map[int]func(*models.Event)
 // initializes the map of functions
 func initializeHandlers() {
 	handlerFuncs = make(map[int]func(*models.Event))
-	handlerFuncs[models.EventTopics.NodeUpdate] = nodeUpdate
 	handlerFuncs[models.EventTopics.Test] = test
+	handlerFuncs[models.EventTopics.NodeUpdate] = nodeUpdate
 	handlerFuncs[models.EventTopics.HostUpdate] = hostUpdate
 	handlerFuncs[models.EventTopics.Ping] = ping
 	handlerFuncs[models.EventTopics.Metrics] = updateMetrics
 	handlerFuncs[models.EventTopics.ClientUpdate] = clientPeerUpdate
+	handlerFuncs[models.EventTopics.SendAllHostPeerUpdate] = publishPeerUpdates
 }
 
 func test(e *models.Event) {
@@ -99,10 +100,7 @@ func nodeUpdate(e *models.Event) {
 		return
 	}
 	if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
-		// TODO handle publishing udpates
-		// if err = PublishPeerUpdate(); err != nil {
-		// 	logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
-		// }
+		PublishAllPeerUpdate()
 	}
 
 	logger.Log(1, "updated node", newNode.ID.String())
@@ -141,12 +139,8 @@ func hostUpdate(e *models.Event) {
 		}
 		sendPeerUpdate = true
 	}
-	// TODO handle publishing a peer update
 	if sendPeerUpdate {
-		// 	err := PublishPeerUpdate()
-		// 	if err != nil {
-		// 		logger.Log(0, "failed to pulish peer update: ", err.Error())
-		// 	}
+		PublishAllPeerUpdate()
 	}
 }
 
@@ -187,13 +181,12 @@ func updateMetrics(e *models.Event) {
 
 		if shouldUpdate {
 			logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
-			// host, err := logic.GetHost(currentNode.HostID.String())
-			// if err == nil {
-			// 	if err = PublishSingleHostUpdate(host); err != nil {
-			// 		logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
-			// 	}
-			// }
-			// TODO publish a single host update
+			host, err := logic.GetHost(currentNode.HostID.String())
+			if err == nil {
+				if err = publishHostPeerUpdate(host); err != nil {
+					logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
+				}
+			}
 		}
 
 		logger.Log(1, "updated node metrics", id)
@@ -202,9 +195,14 @@ func updateMetrics(e *models.Event) {
 
 func clientPeerUpdate(e *models.Event) {
 	id := e.ID
-	_, err := logic.GetNodeByID(id)
+	node, err := logic.GetNodeByID(id)
+	if err != nil {
+		logger.Log(1, "error getting node", id, err.Error())
+		return
+	}
+	host, err := logic.GetHost(node.HostID.String())
 	if err != nil {
-		logger.Log(1, "error getting node ", id, err.Error())
+		logger.Log(1, "error getting node's host for peer update", id, err.Error())
 		return
 	}
 	action := e.Payload.Action
@@ -212,7 +210,7 @@ func clientPeerUpdate(e *models.Event) {
 	case ncutils.ACK:
 		//do we still need this
 	case ncutils.DONE:
-		// TODO publish a peer update to the calling node
+		publishHostPeerUpdate(host)
 	}
 
 	logger.Log(1, "sent peer updates after signal received from", id)

+ 79 - 0
queue/publishers.go

@@ -0,0 +1,79 @@
+package queue
+
+import (
+	"encoding/json"
+	"fmt"
+
+	"github.com/gorilla/websocket"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+)
+
+// PublishAllPeerUpdate - publishes a peer update to
+// all hosts with current connections
+func PublishAllPeerUpdate() {
+	const publishAllID = "pub-all"
+	event := models.Event{
+		ID:    publishAllID,
+		Topic: models.EventTopics.SendAllHostPeerUpdate,
+	}
+	EventQueue.Enqueue(event)
+}
+
+func publishPeerUpdates(e *models.Event) {
+	hostMap, err := logic.GetHostsMap()
+	if err != nil {
+		return
+	}
+
+	ConnMap.Range(func(k, v interface{}) bool {
+		host, ok := hostMap[k.(string)] // in future can also handle http responses
+		if ok {                         // ensure ID is a legitimate host
+			conn := v.(*websocket.Conn)
+			if conn == nil {
+				return false
+			}
+			_ = publishHostPeerUpdate(host)
+		}
+		return true
+	})
+}
+
+func publishHostPeerUpdate(host *models.Host) error {
+
+	peerUpdate, err := logic.GetPeerUpdateForHost(host)
+	if err != nil {
+		return err
+	}
+	if host.ProxyEnabled {
+		proxyUpdate, err := logic.GetProxyUpdateForHost(host)
+		if err != nil {
+			return err
+		}
+		proxyUpdate.Action = models.ProxyUpdate
+		peerUpdate.ProxyUpdate = proxyUpdate
+	}
+
+	event := models.Event{
+		ID:    host.ID.String(),
+		Topic: models.EventTopics.SendHostPeerUpdate,
+	}
+	event.Payload.HostPeerUpdate = &peerUpdate
+
+	data, err := json.Marshal(&event)
+	if err != nil {
+		return err
+	}
+	return publish(data, host.ID.String())
+}
+
+func publish(data []byte, hostID string) error {
+	val, ok := ConnMap.Load(hostID)
+	if ok {
+		conn := val.(*websocket.Conn)
+		if conn != nil {
+			return conn.WriteMessage(websocket.TextMessage, data)
+		}
+	}
+	return fmt.Errorf("message send failure for host connection %s", hostID)
+}