|
@@ -3,12 +3,10 @@ package mq
|
|
import (
|
|
import (
|
|
"encoding/json"
|
|
"encoding/json"
|
|
"fmt"
|
|
"fmt"
|
|
- "math"
|
|
|
|
- "time"
|
|
|
|
-
|
|
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
mqtt "github.com/eclipse/paho.mqtt.golang"
|
|
"github.com/google/uuid"
|
|
"github.com/google/uuid"
|
|
"github.com/gravitl/netmaker/database"
|
|
"github.com/gravitl/netmaker/database"
|
|
|
|
+ "github.com/gravitl/netmaker/logger"
|
|
"github.com/gravitl/netmaker/logic"
|
|
"github.com/gravitl/netmaker/logic"
|
|
"github.com/gravitl/netmaker/logic/hostactions"
|
|
"github.com/gravitl/netmaker/logic/hostactions"
|
|
"github.com/gravitl/netmaker/models"
|
|
"github.com/gravitl/netmaker/models"
|
|
@@ -18,6 +16,19 @@ import (
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
|
|
)
|
|
)
|
|
|
|
|
|
|
|
+// UpdateMetrics message Handler -- handles updates from client nodes for metrics
|
|
|
|
+var UpdateMetrics = func(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+func RunUpdates(node *models.Node, ifaceDelta bool) {
|
|
|
|
+ go func() { // don't block http response
|
|
|
|
+ // publish node update if not server
|
|
|
|
+ if err := NodeUpdate(node); err != nil {
|
|
|
|
+ logger.Log(1, "error publishing node update to node", node.ID.String(), err.Error())
|
|
|
|
+ }
|
|
|
|
+ }()
|
|
|
|
+}
|
|
|
|
+
|
|
// DefaultHandler default message queue handler -- NOT USED
|
|
// DefaultHandler default message queue handler -- NOT USED
|
|
func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
|
|
func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
|
|
slog.Info("mqtt default handler", "topic", msg.Topic(), "message", msg.Payload())
|
|
slog.Info("mqtt default handler", "topic", msg.Topic(), "message", msg.Payload())
|
|
@@ -25,7 +36,7 @@ func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
|
|
// UpdateNode message Handler -- handles updates from client nodes
|
|
// UpdateNode message Handler -- handles updates from client nodes
|
|
func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
|
func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
|
- id, err := getID(msg.Topic())
|
|
|
|
|
|
+ id, err := GetID(msg.Topic())
|
|
if err != nil {
|
|
if err != nil {
|
|
slog.Error("error getting node.ID ", "topic", msg.Topic(), "error", err)
|
|
slog.Error("error getting node.ID ", "topic", msg.Topic(), "error", err)
|
|
return
|
|
return
|
|
@@ -35,7 +46,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
|
slog.Error("error getting node", "id", id, "error", err)
|
|
slog.Error("error getting node", "id", id, "error", err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
|
|
|
|
|
+ decrypted, decryptErr := DecryptMsg(¤tNode, msg.Payload())
|
|
if decryptErr != nil {
|
|
if decryptErr != nil {
|
|
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
|
|
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
|
|
return
|
|
return
|
|
@@ -47,7 +58,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
|
}
|
|
}
|
|
|
|
|
|
ifaceDelta := logic.IfaceDelta(¤tNode, &newNode)
|
|
ifaceDelta := logic.IfaceDelta(¤tNode, &newNode)
|
|
- if servercfg.Is_EE && ifaceDelta {
|
|
|
|
|
|
+ if servercfg.IsPro && ifaceDelta {
|
|
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
|
|
if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
|
|
slog.Warn("failed to reset failover list during node update", "nodeid", currentNode.ID, "network", currentNode.Network)
|
|
slog.Warn("failed to reset failover list during node update", "nodeid", currentNode.ID, "network", currentNode.Network)
|
|
}
|
|
}
|
|
@@ -68,7 +79,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
|
|
// UpdateHost message Handler -- handles host updates from clients
|
|
// UpdateHost message Handler -- handles host updates from clients
|
|
func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
|
func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
|
- id, err := getID(msg.Topic())
|
|
|
|
|
|
+ id, err := GetID(msg.Topic())
|
|
if err != nil {
|
|
if err != nil {
|
|
slog.Error("error getting host.ID sent on ", "topic", msg.Topic(), "error", err)
|
|
slog.Error("error getting host.ID sent on ", "topic", msg.Topic(), "error", err)
|
|
return
|
|
return
|
|
@@ -183,77 +194,11 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
|
|
slog.Error("failed to publish peer update", "error", err)
|
|
slog.Error("failed to publish peer update", "error", err)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
- // if servercfg.Is_EE && ifaceDelta {
|
|
|
|
- // if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
|
|
|
|
- // logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
|
|
|
|
- // }
|
|
|
|
- // }
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
-// UpdateMetrics message Handler -- handles updates from client nodes for metrics
|
|
|
|
-func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
|
|
|
|
- if servercfg.Is_EE {
|
|
|
|
- id, err := getID(msg.Topic())
|
|
|
|
- if err != nil {
|
|
|
|
- slog.Error("error getting ID sent on ", "topic", msg.Topic(), "error", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- currentNode, err := logic.GetNodeByID(id)
|
|
|
|
- if err != nil {
|
|
|
|
- slog.Error("error getting node", "id", id, "error", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
|
|
|
- if decryptErr != nil {
|
|
|
|
- slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- var newMetrics models.Metrics
|
|
|
|
- if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
|
|
|
|
- slog.Error("error unmarshaling payload", "error", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- shouldUpdate := updateNodeMetrics(¤tNode, &newMetrics)
|
|
|
|
-
|
|
|
|
- if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
|
|
|
|
- slog.Error("failed to update node metrics", "id", id, "error", err)
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if servercfg.IsMetricsExporter() {
|
|
|
|
- if err := pushMetricsToExporter(newMetrics); err != nil {
|
|
|
|
- slog.Error("failed to push node metrics to exporter", "id", currentNode.ID, "error", err)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if newMetrics.Connectivity != nil {
|
|
|
|
- err := logic.EnterpriseFailoverFunc(¤tNode)
|
|
|
|
- if err != nil {
|
|
|
|
- slog.Error("failed to failover for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if shouldUpdate {
|
|
|
|
- slog.Info("updating peers after node detected connectivity issues", "id", currentNode.ID, "network", currentNode.Network)
|
|
|
|
- host, err := logic.GetHost(currentNode.HostID.String())
|
|
|
|
- if err == nil {
|
|
|
|
- nodes, err := logic.GetAllNodes()
|
|
|
|
- if err != nil {
|
|
|
|
- return
|
|
|
|
- }
|
|
|
|
- if err = PublishSingleHostPeerUpdate(host, nodes, nil, nil); err != nil {
|
|
|
|
- slog.Warn("failed to publish update after failover peer change for node", "id", currentNode.ID, "network", currentNode.Network, "error", err)
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- slog.Debug("updated node metrics", "id", id)
|
|
|
|
- }
|
|
|
|
}
|
|
}
|
|
|
|
|
|
// ClientPeerUpdate message handler -- handles updating peers after signal from client nodes
|
|
// ClientPeerUpdate message handler -- handles updating peers after signal from client nodes
|
|
func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
- id, err := getID(msg.Topic())
|
|
|
|
|
|
+ id, err := GetID(msg.Topic())
|
|
if err != nil {
|
|
if err != nil {
|
|
slog.Error("error getting node.ID sent on ", "topic", msg.Topic(), "error", err)
|
|
slog.Error("error getting node.ID sent on ", "topic", msg.Topic(), "error", err)
|
|
return
|
|
return
|
|
@@ -263,7 +208,7 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
slog.Error("error getting node", "id", id, "error", err)
|
|
slog.Error("error getting node", "id", id, "error", err)
|
|
return
|
|
return
|
|
}
|
|
}
|
|
- decrypted, decryptErr := decryptMsg(¤tNode, msg.Payload())
|
|
|
|
|
|
+ decrypted, decryptErr := DecryptMsg(¤tNode, msg.Payload())
|
|
if decryptErr != nil {
|
|
if decryptErr != nil {
|
|
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
|
|
slog.Error("failed to decrypt message for node", "id", id, "error", decryptErr)
|
|
return
|
|
return
|
|
@@ -281,105 +226,6 @@ func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
|
|
slog.Info("sent peer updates after signal received from", "id", id)
|
|
slog.Info("sent peer updates after signal received from", "id", id)
|
|
}
|
|
}
|
|
|
|
|
|
-func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {
|
|
|
|
- if newMetrics.FailoverPeers == nil {
|
|
|
|
- newMetrics.FailoverPeers = make(map[string]string)
|
|
|
|
- }
|
|
|
|
- oldMetrics, err := logic.GetMetrics(currentNode.ID.String())
|
|
|
|
- if err != nil {
|
|
|
|
- slog.Error("error finding old metrics for node", "id", currentNode.ID, "error", err)
|
|
|
|
- return false
|
|
|
|
- }
|
|
|
|
- if oldMetrics.FailoverPeers == nil {
|
|
|
|
- oldMetrics.FailoverPeers = make(map[string]string)
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- var attachedClients []models.ExtClient
|
|
|
|
- if currentNode.IsIngressGateway {
|
|
|
|
- clients, err := logic.GetExtClientsByID(currentNode.ID.String(), currentNode.Network)
|
|
|
|
- if err == nil {
|
|
|
|
- attachedClients = clients
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- if len(attachedClients) > 0 {
|
|
|
|
- // associate ext clients with IDs
|
|
|
|
- for i := range attachedClients {
|
|
|
|
- extMetric := newMetrics.Connectivity[attachedClients[i].PublicKey]
|
|
|
|
- if len(extMetric.NodeName) == 0 &&
|
|
|
|
- len(newMetrics.Connectivity[attachedClients[i].ClientID].NodeName) > 0 { // cover server clients
|
|
|
|
- extMetric = newMetrics.Connectivity[attachedClients[i].ClientID]
|
|
|
|
- if extMetric.TotalReceived > 0 && extMetric.TotalSent > 0 {
|
|
|
|
- extMetric.Connected = true
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- extMetric.NodeName = attachedClients[i].ClientID
|
|
|
|
- delete(newMetrics.Connectivity, attachedClients[i].PublicKey)
|
|
|
|
- newMetrics.Connectivity[attachedClients[i].ClientID] = extMetric
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // run through metrics for each peer
|
|
|
|
- for k := range newMetrics.Connectivity {
|
|
|
|
- currMetric := newMetrics.Connectivity[k]
|
|
|
|
- oldMetric := oldMetrics.Connectivity[k]
|
|
|
|
- currMetric.TotalTime += oldMetric.TotalTime
|
|
|
|
- currMetric.Uptime += oldMetric.Uptime // get the total uptime for this connection
|
|
|
|
-
|
|
|
|
- if currMetric.TotalReceived < oldMetric.TotalReceived {
|
|
|
|
- currMetric.TotalReceived += oldMetric.TotalReceived
|
|
|
|
- } else {
|
|
|
|
- currMetric.TotalReceived += int64(math.Abs(float64(currMetric.TotalReceived) - float64(oldMetric.TotalReceived)))
|
|
|
|
- }
|
|
|
|
- if currMetric.TotalSent < oldMetric.TotalSent {
|
|
|
|
- currMetric.TotalSent += oldMetric.TotalSent
|
|
|
|
- } else {
|
|
|
|
- currMetric.TotalSent += int64(math.Abs(float64(currMetric.TotalSent) - float64(oldMetric.TotalSent)))
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if currMetric.Uptime == 0 || currMetric.TotalTime == 0 {
|
|
|
|
- currMetric.PercentUp = 0
|
|
|
|
- } else {
|
|
|
|
- currMetric.PercentUp = 100.0 * (float64(currMetric.Uptime) / float64(currMetric.TotalTime))
|
|
|
|
- }
|
|
|
|
- totalUpMinutes := currMetric.Uptime * ncutils.CheckInInterval
|
|
|
|
- currMetric.ActualUptime = time.Duration(totalUpMinutes) * time.Minute
|
|
|
|
- delete(oldMetrics.Connectivity, k) // remove from old data
|
|
|
|
- newMetrics.Connectivity[k] = currMetric
|
|
|
|
-
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- // add nodes that need failover
|
|
|
|
- nodes, err := logic.GetNetworkNodes(currentNode.Network)
|
|
|
|
- if err != nil {
|
|
|
|
- slog.Error("failed to retrieve nodes while updating metrics", "error", err)
|
|
|
|
- return false
|
|
|
|
- }
|
|
|
|
- for _, node := range nodes {
|
|
|
|
- if !newMetrics.Connectivity[node.ID.String()].Connected &&
|
|
|
|
- len(newMetrics.Connectivity[node.ID.String()].NodeName) > 0 &&
|
|
|
|
- node.Connected &&
|
|
|
|
- len(node.FailoverNode) > 0 &&
|
|
|
|
- !node.Failover {
|
|
|
|
- newMetrics.FailoverPeers[node.ID.String()] = node.FailoverNode.String()
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
- shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
|
|
|
|
- for k, v := range oldMetrics.FailoverPeers {
|
|
|
|
- if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {
|
|
|
|
- shouldUpdate = true
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- if len(v) > 0 && len(newMetrics.FailoverPeers[k]) == 0 {
|
|
|
|
- newMetrics.FailoverPeers[k] = v
|
|
|
|
- }
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- for k := range oldMetrics.Connectivity { // cleanup any left over data, self healing
|
|
|
|
- delete(newMetrics.Connectivity, k)
|
|
|
|
- }
|
|
|
|
- return shouldUpdate
|
|
|
|
-}
|
|
|
|
-
|
|
|
|
func handleNewNodeDNS(host *models.Host, node *models.Node) error {
|
|
func handleNewNodeDNS(host *models.Host, node *models.Node) error {
|
|
dns := models.DNSUpdate{
|
|
dns := models.DNSUpdate{
|
|
Action: models.DNSInsert,
|
|
Action: models.DNSInsert,
|