2
0
Эх сурвалжийг харах

added host and node publishers

0xdcarns 2 жил өмнө
parent
commit
7c1aee8936

+ 12 - 6
models/events.go

@@ -24,14 +24,18 @@ type Test struct {
 
 // EventTopics - hold topic IDs for each type of possible event
 var EventTopics = struct {
-	Test                  int
-	NodeUpdate            int
-	HostUpdate            int
-	Ping                  int
-	Metrics               int
-	ClientUpdate          int
+	// == to server topics ==
+	Test         int
+	NodeUpdate   int
+	HostUpdate   int
+	Ping         int
+	Metrics      int
+	ClientUpdate int
+	// == to client topics ==
 	SendAllHostPeerUpdate int
 	SendHostPeerUpdate    int
+	SendHostUpdate        int
+	SendNodeUpdate        int
 }{
 	Test:                  0,
 	NodeUpdate:            1,
@@ -41,4 +45,6 @@ var EventTopics = struct {
 	ClientUpdate:          5,
 	SendAllHostPeerUpdate: 6,
 	SendHostPeerUpdate:    7,
+	SendHostUpdate:        8,
+	SendNodeUpdate:        9,
 }

+ 2 - 0
queue/handlers.go

@@ -26,6 +26,8 @@ func initializeHandlers() {
 	handlerFuncs[models.EventTopics.Metrics] = updateMetrics
 	handlerFuncs[models.EventTopics.ClientUpdate] = clientPeerUpdate
 	handlerFuncs[models.EventTopics.SendAllHostPeerUpdate] = publishPeerUpdates
+	handlerFuncs[models.EventTopics.SendHostUpdate] = sendHostUpdate
+	handlerFuncs[models.EventTopics.SendNodeUpdate] = sendNodeUpdate
 }
 
 func test(e *models.Event) {

+ 48 - 0
queue/publishers.go

@@ -5,6 +5,7 @@ import (
 	"fmt"
 
 	"github.com/gorilla/websocket"
+	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 )
@@ -20,6 +21,53 @@ func PublishAllPeerUpdate() {
 	EventQueue.Enqueue(event)
 }
 
+// NodeUpdate -- publishes a node update
+func NodeUpdate(node *models.Node) error {
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
+		return nil
+	}
+	event := models.Event{
+		ID:    host.ID.String(),
+		Topic: models.EventTopics.SendNodeUpdate,
+	}
+	event.Payload.Node = node
+	logger.Log(1, "publishing node update to", host.Name, node.ID.String())
+	return EventQueue.Enqueue(event)
+}
+
+// HostUpdate -- publishes a host update to clients
+func HostUpdate(hostUpdate *models.HostUpdate) error {
+	event := models.Event{
+		ID:    hostUpdate.Host.ID.String(),
+		Topic: models.EventTopics.SendHostUpdate,
+	}
+	event.Payload.HostUpdate = hostUpdate
+	return EventQueue.Enqueue(event)
+}
+
+func sendNodeUpdate(e *models.Event) {
+	data, err := json.Marshal(e)
+	if err != nil {
+		logger.Log(0, "failed to encode node update", err.Error())
+	}
+	if err = publish(data, e.ID); err != nil {
+		logger.Log(0, "failed to send node update", err.Error())
+	}
+}
+
+func sendHostUpdate(e *models.Event) {
+	logger.Log(1, "publishing host update to "+e.ID)
+	data, err := json.Marshal(e)
+	if err != nil {
+		logger.Log(0, "failed to encode host update", err.Error())
+		return
+	}
+	if err = publish(data, e.ID); err != nil {
+		logger.Log(0, "failed to send host update", err.Error())
+	}
+}
+
 func publishPeerUpdates(e *models.Event) {
 	hostMap, err := logic.GetHostsMap()
 	if err != nil {