Browse Source

added all current handler logic

0xdcarns 2 years ago
parent
commit
8ed5915c8f
4 changed files with 306 additions and 18 deletions
  1. 21 12
      models/events.go
  2. 0 6
      mq/handlers.go
  3. 188 0
      queue/handlers.go
  4. 97 0
      queue/logic.go

+ 21 - 12
models/events.go

@@ -5,13 +5,16 @@ type Event struct {
 	ID      string `json:"id"`
 	ID      string `json:"id"`
 	Topic   int    `json:"topic"`
 	Topic   int    `json:"topic"`
 	Payload struct {
 	Payload struct {
-		*Host `json:"host,omitempty"`
-		*Node `json:"odd,omitempty"`
-		*Test `json:"test,omitempty"`
+		*HostUpdate  `json:"host,omitempty"`
+		*Node        `json:"node,omitempty"`
+		*Test        `json:"test,omitempty"`
+		*NodeCheckin `json:"node_checkin,omitempty"`
+		*Metrics     `json:"metrics,omitempty"`
+		Action       byte `json:"action"`
 	} `json:"payload"`
 	} `json:"payload"`
 }
 }
 
 
-// Test - used for testing the handlers
+// Test - just used for testing the handlers
 type Test struct {
 type Test struct {
 	Data string `json:"data"`
 	Data string `json:"data"`
 }
 }
@@ -20,13 +23,19 @@ type Test struct {
 
 
 // EventTopics - hold topic IDs for each type of possible event
 // EventTopics - hold topic IDs for each type of possible event
 var EventTopics = struct {
 var EventTopics = struct {
-	Test       int
-	NodeUpdate int
-	HostUpdate int
-	PeerUpdate int
+	Test         int
+	NodeUpdate   int
+	HostUpdate   int
+	PeerUpdate   int
+	Ping         int
+	Metrics      int
+	ClientUpdate int
 }{
 }{
-	Test:       0,
-	NodeUpdate: 1,
-	HostUpdate: 2,
-	PeerUpdate: 3,
+	Test:         0,
+	NodeUpdate:   1,
+	HostUpdate:   2,
+	PeerUpdate:   3,
+	Ping:         4,
+	Metrics:      5,
+	ClientUpdate: 6,
 }
 }

+ 0 - 6
mq/handlers.go

@@ -168,12 +168,6 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				logger.Log(0, "failed to pulish peer update: ", err.Error())
 				logger.Log(0, "failed to pulish peer update: ", err.Error())
 			}
 			}
 		}
 		}
-		if sendPeerUpdate {
-			err := PublishPeerUpdate()
-			if err != nil {
-				logger.Log(0, "failed to pulish peer update: ", err.Error())
-			}
-		}
 		// if servercfg.Is_EE && ifaceDelta {
 		// if servercfg.Is_EE && ifaceDelta {
 		// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
 		// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
 		// 		logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
 		// 		logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)

+ 188 - 0
queue/handlers.go

@@ -1,8 +1,15 @@
 package queue
 package queue
 
 
 import (
 import (
+	"fmt"
+
 	"github.com/gorilla/websocket"
 	"github.com/gorilla/websocket"
+	"github.com/gravitl/netmaker/database"
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/netclient/ncutils"
+	"github.com/gravitl/netmaker/servercfg"
 )
 )
 
 
 // holds a map of funcs
 // holds a map of funcs
@@ -14,6 +21,10 @@ func initializeHandlers() {
 	handlerFuncs = make(map[int]func(*models.Event))
 	handlerFuncs = make(map[int]func(*models.Event))
 	handlerFuncs[models.EventTopics.NodeUpdate] = nodeUpdate
 	handlerFuncs[models.EventTopics.NodeUpdate] = nodeUpdate
 	handlerFuncs[models.EventTopics.Test] = test
 	handlerFuncs[models.EventTopics.Test] = test
+	handlerFuncs[models.EventTopics.HostUpdate] = hostUpdate
+	handlerFuncs[models.EventTopics.Ping] = ping
+	handlerFuncs[models.EventTopics.Metrics] = updateMetrics
+	handlerFuncs[models.EventTopics.ClientUpdate] = clientPeerUpdate
 }
 }
 
 
 func test(e *models.Event) {
 func test(e *models.Event) {
@@ -26,6 +37,183 @@ func test(e *models.Event) {
 	}
 	}
 }
 }
 
 
+func ping(e *models.Event) {
+	node, err := logic.GetNodeByID(e.ID)
+	if err != nil {
+		logger.Log(3, "mq-ping error getting node: ", err.Error())
+		record, err := database.FetchRecord(database.NODES_TABLE_NAME, e.ID)
+		if err != nil {
+			logger.Log(3, "error reading database ", err.Error())
+			return
+		}
+		logger.Log(3, "record from database")
+		logger.Log(3, record)
+		return
+	}
+
+	checkin := e.Payload.NodeCheckin
+	if checkin == nil {
+		logger.Log(0, "failed to complete checkin for node", node.ID.String())
+	}
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
+		logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
+		return
+	}
+	node.SetLastCheckIn()
+	host.Version = checkin.Version
+	node.Connected = checkin.Connected
+	host.Interfaces = checkin.Ifaces
+	for i := range host.Interfaces {
+		host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
+	}
+	if err := logic.UpdateNode(&node, &node); err != nil {
+		logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
+		return
+	}
+
+	logger.Log(3, "ping processed for node", node.ID.String())
+}
+
 func nodeUpdate(e *models.Event) {
 func nodeUpdate(e *models.Event) {
+	currentNode, err := logic.GetNodeByID(e.ID)
+	if err != nil {
+		logger.Log(1, "error getting node ", e.ID, err.Error())
+		return
+	}
+
+	newNode := e.Payload.Node
+	if newNode == nil {
+		logger.Log(0, "failed to update node", currentNode.ID.String())
+	}
+
+	ifaceDelta := logic.IfaceDelta(&currentNode, newNode)
+	if servercfg.Is_EE && ifaceDelta {
+		if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
+			logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
+		}
+	}
+	newNode.SetLastCheckIn()
+	if err := logic.UpdateNode(&currentNode, newNode); err != nil {
+		logger.Log(1, "error saving node", err.Error())
+		return
+	}
+	if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
+		// TODO handle publishing udpates
+		// if err = PublishPeerUpdate(); err != nil {
+		// 	logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
+		// }
+	}
+
+	logger.Log(1, "updated node", newNode.ID.String())
+}
+
+func hostUpdate(e *models.Event) {
+
+	currentHost, err := logic.GetHost(e.ID)
+	if err != nil {
+		logger.Log(1, "error getting host ", e.ID, err.Error())
+		return
+	}
+
+	hostUpdate := e.Payload.HostUpdate
+	if hostUpdate == nil {
+		logger.Log(0, "failed to update host", currentHost.Name, currentHost.ID.String())
+	}
+	logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
+	var sendPeerUpdate bool
+	switch hostUpdate.Action {
+	case models.UpdateHost:
+		sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
+		err := logic.UpsertHost(currentHost)
+		if err != nil {
+			logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error())
+			return
+		}
+	case models.DeleteHost:
+		if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
+			logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
+			return
+		}
+		if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
+			logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
+			return
+		}
+		sendPeerUpdate = true
+	}
+	// TODO handle publishing a peer update
+	if sendPeerUpdate {
+		// 	err := PublishPeerUpdate()
+		// 	if err != nil {
+		// 		logger.Log(0, "failed to pulish peer update: ", err.Error())
+		// 	}
+	}
+}
+
+func updateMetrics(e *models.Event) {
+	if servercfg.Is_EE {
+		id := e.ID
+		currentNode, err := logic.GetNodeByID(id)
+		if err != nil {
+			logger.Log(1, "error getting node ", id, err.Error())
+			return
+		}
+
+		var newMetrics = e.Payload.Metrics
+		if newMetrics == nil {
+			logger.Log(1, "provided metrics were nil for node", id)
+			return
+		}
+		shouldUpdate := updateNodeMetrics(&currentNode, newMetrics)
+
+		if err = logic.UpdateMetrics(id, newMetrics); err != nil {
+			logger.Log(1, "faield to update node metrics", id, err.Error())
+			return
+		}
+		// TODO adapt metrics exporter..
+		// if servercfg.IsMetricsExporter() {
+		// 	if err := pushMetricsToExporter(newMetrics); err != nil {
+		// 		logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v",
+		// 			currentNode.ID, err))
+		// 	}
+		// }
+
+		if newMetrics.Connectivity != nil {
+			err := logic.EnterpriseFailoverFunc(&currentNode)
+			if err != nil {
+				logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error())
+			}
+		}
+
+		if shouldUpdate {
+			logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
+			// host, err := logic.GetHost(currentNode.HostID.String())
+			// if err == nil {
+			// 	if err = PublishSingleHostUpdate(host); err != nil {
+			// 		logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
+			// 	}
+			// }
+			// TODO publish a single host update
+		}
+
+		logger.Log(1, "updated node metrics", id)
+	}
+}
+
+func clientPeerUpdate(e *models.Event) {
+	id := e.ID
+	_, err := logic.GetNodeByID(id)
+	if err != nil {
+		logger.Log(1, "error getting node ", id, err.Error())
+		return
+	}
+	action := e.Payload.Action
+	switch action {
+	case ncutils.ACK:
+		//do we still need this
+	case ncutils.DONE:
+		// TODO publish a peer update to the calling node
+	}
 
 
+	logger.Log(1, "sent peer updates after signal received from", id)
 }
 }

+ 97 - 0
queue/logic.go

@@ -0,0 +1,97 @@
+package queue
+
+import (
+	"time"
+
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+	"github.com/gravitl/netmaker/netclient/ncutils"
+)
+
+func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
+	if newMetrics.FailoverPeers == nil {
+		newMetrics.FailoverPeers = make(map[string]string)
+	}
+	oldMetrics, err := logic.GetMetrics(currentNode.ID.String())
+	if err != nil {
+		logger.Log(1, "error finding old metrics for node", currentNode.ID.String())
+		return false
+	}
+	if oldMetrics.FailoverPeers == nil {
+		oldMetrics.FailoverPeers = make(map[string]string)
+	}
+
+	var attachedClients []models.ExtClient
+	if currentNode.IsIngressGateway {
+		clients, err := logic.GetExtClientsByID(currentNode.ID.String(), currentNode.Network)
+		if err == nil {
+			attachedClients = clients
+		}
+	}
+	if len(attachedClients) > 0 {
+		// associate ext clients with IDs
+		for i := range attachedClients {
+			extMetric := newMetrics.Connectivity[attachedClients[i].PublicKey]
+			if len(extMetric.NodeName) == 0 &&
+				len(newMetrics.Connectivity[attachedClients[i].ClientID].NodeName) > 0 { // cover server clients
+				extMetric = newMetrics.Connectivity[attachedClients[i].ClientID]
+				if extMetric.TotalReceived > 0 && extMetric.TotalSent > 0 {
+					extMetric.Connected = true
+				}
+			}
+			extMetric.NodeName = attachedClients[i].ClientID
+			delete(newMetrics.Connectivity, attachedClients[i].PublicKey)
+			newMetrics.Connectivity[attachedClients[i].ClientID] = extMetric
+		}
+	}
+
+	// run through metrics for each peer
+	for k := range newMetrics.Connectivity {
+		currMetric := newMetrics.Connectivity[k]
+		oldMetric := oldMetrics.Connectivity[k]
+		currMetric.TotalTime += oldMetric.TotalTime
+		currMetric.Uptime += oldMetric.Uptime // get the total uptime for this connection
+		if currMetric.Uptime == 0 || currMetric.TotalTime == 0 {
+			currMetric.PercentUp = 0
+		} else {
+			currMetric.PercentUp = 100.0 * (float64(currMetric.Uptime) / float64(currMetric.TotalTime))
+		}
+		totalUpMinutes := currMetric.Uptime * ncutils.CheckInInterval
+		currMetric.ActualUptime = time.Duration(totalUpMinutes) * time.Minute
+		delete(oldMetrics.Connectivity, k) // remove from old data
+		newMetrics.Connectivity[k] = currMetric
+
+	}
+
+	// add nodes that need failover
+	nodes, err := logic.GetNetworkNodes(currentNode.Network)
+	if err != nil {
+		logger.Log(0, "failed to retrieve nodes while updating metrics")
+		return false
+	}
+	for _, node := range nodes {
+		if !newMetrics.Connectivity[node.ID.String()].Connected &&
+			len(newMetrics.Connectivity[node.ID.String()].NodeName) > 0 &&
+			node.Connected &&
+			len(node.FailoverNode) > 0 &&
+			!node.Failover {
+			newMetrics.FailoverPeers[node.ID.String()] = node.FailoverNode.String()
+		}
+	}
+	shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
+	for k, v := range oldMetrics.FailoverPeers {
+		if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {
+			shouldUpdate = true
+		}
+
+		if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 {
+			newMetrics.FailoverPeers[k] = v
+		}
+	}
+
+	for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing
+		delete(newMetrics.Connectivity, k)
+	}
+	return shouldUpdate
+}