Ver Fonte

NET-1782: Fetch Node Connection Status from metrics (#3237)

* add live status of node

* handle static node status

* add public IP field to server configuration

* get public Ip from config

* improve node status logic

* improvise status check

* use only checkin status on old nodes

---------

Co-authored-by: the_aceix <[email protected]>
Abhishek K há 9 meses atrás
pai
commit
31c2311bef
12 ficheiros alterados com 293 adições e 34 exclusões
  1. 1 0
      config/config.go
  2. 3 1
      controllers/node.go
  3. 14 0
      logic/nodes.go
  4. 26 0
      logic/status.go
  5. 28 1
      logic/util.go
  6. 2 0
      models/api_node.go
  7. 14 0
      models/node.go
  8. 0 3
      mq/migrate.go
  9. 1 29
      mq/util.go
  10. 1 0
      pro/initialize.go
  11. 197 0
      pro/logic/status.go
  12. 6 0
      servercfg/serverconf.go

+ 1 - 0
config/config.go

@@ -104,6 +104,7 @@ type ServerConfig struct {
 	Stun                       bool          `yaml:"stun"`
 	StunServers                string        `yaml:"stun_servers"`
 	DefaultDomain              string        `yaml:"default_domain"`
+	PublicIp                   string        `yaml:"public_ip"`
 }
 
 // SQLConfig - Generic SQL Config

+ 3 - 1
controllers/node.go

@@ -326,8 +326,9 @@ func getNetworkNodes(w http.ResponseWriter, r *http.Request) {
 	if len(filteredNodes) > 0 {
 		nodes = filteredNodes
 	}
-	nodes = logic.AddStaticNodestoList(nodes)
 
+	nodes = logic.AddStaticNodestoList(nodes)
+	nodes = logic.AddStatusToNodes(nodes)
 	// returns all the nodes in JSON/API format
 	apiNodes := logic.GetAllNodesAPI(nodes[:])
 	logger.Log(2, r.Header.Get("user"), "fetched nodes on network", networkName)
@@ -367,6 +368,7 @@ func getAllNodes(w http.ResponseWriter, r *http.Request) {
 
 	}
 	nodes = logic.AddStaticNodestoList(nodes)
+	nodes = logic.AddStatusToNodes(nodes)
 	// return all the nodes in JSON/API format
 	apiNodes := logic.GetAllNodesAPI(nodes[:])
 	logger.Log(3, r.Header.Get("user"), "fetched all nodes they have access to")

+ 14 - 0
logic/nodes.go

@@ -445,6 +445,20 @@ func AddStaticNodestoList(nodes []models.Node) []models.Node {
 	return nodes
 }
 
+func AddStatusToNodes(nodes []models.Node) (nodesWithStatus []models.Node) {
+	aclDefaultPolicyStatusMap := make(map[string]bool)
+	for _, node := range nodes {
+		if _, ok := aclDefaultPolicyStatusMap[node.Network]; !ok {
+			// check default policy if all allowed return true
+			defaultPolicy, _ := GetDefaultPolicy(models.NetworkID(node.Network), models.DevicePolicy)
+			aclDefaultPolicyStatusMap[node.Network] = defaultPolicy.Enabled
+		}
+		GetNodeStatus(&node, aclDefaultPolicyStatusMap[node.Network])
+		nodesWithStatus = append(nodesWithStatus, node)
+	}
+	return
+}
+
 // GetNetworkByNode - gets the network model from a node
 func GetNetworkByNode(node *models.Node) (models.Network, error) {
 

+ 26 - 0
logic/status.go

@@ -0,0 +1,26 @@
+package logic
+
+import (
+	"time"
+
+	"github.com/gravitl/netmaker/models"
+)
+
+var GetNodeStatus = getNodeStatus
+
+func getNodeStatus(node *models.Node, t bool) {
+	// On CE check only last check-in time
+	if node.IsStatic {
+		if !node.StaticNode.Enabled {
+			node.Status = models.OfflineSt
+			return
+		}
+		node.Status = models.OnlineSt
+		return
+	}
+	if time.Since(node.LastCheckIn) > time.Minute*10 {
+		node.Status = models.OfflineSt
+		return
+	}
+	node.Status = models.OnlineSt
+}

+ 28 - 1
logic/util.go

@@ -6,11 +6,14 @@ import (
 	"encoding/base32"
 	"encoding/base64"
 	"encoding/json"
+	"fmt"
 	"net"
 	"os"
 	"strings"
 	"time"
+	"unicode"
 
+	"github.com/blang/semver"
 	"github.com/c-robinson/iplib"
 	"github.com/gravitl/netmaker/database"
 	"github.com/gravitl/netmaker/logger"
@@ -148,4 +151,28 @@ func IsSlicesEqual(a, b []string) bool {
 	return true
 }
 
-// == private ==
+// VersionLessThan checks if v1 < v2 semantically
+// dev is the latest version
+func VersionLessThan(v1, v2 string) (bool, error) {
+	if v1 == "dev" {
+		return false, nil
+	}
+	if v2 == "dev" {
+		return true, nil
+	}
+	semVer1 := strings.TrimFunc(v1, func(r rune) bool {
+		return !unicode.IsNumber(r)
+	})
+	semVer2 := strings.TrimFunc(v2, func(r rune) bool {
+		return !unicode.IsNumber(r)
+	})
+	sv1, err := semver.Parse(semVer1)
+	if err != nil {
+		return false, fmt.Errorf("failed to parse semver1 (%s): %w", semVer1, err)
+	}
+	sv2, err := semver.Parse(semVer2)
+	if err != nil {
+		return false, fmt.Errorf("failed to parse semver2 (%s): %w", semVer2, err)
+	}
+	return sv1.LT(sv2), nil
+}

+ 2 - 0
models/api_node.go

@@ -52,6 +52,7 @@ type ApiNode struct {
 	IsStatic          bool                `json:"is_static"`
 	IsUserNode        bool                `json:"is_user_node"`
 	StaticNode        ExtClient           `json:"static_node"`
+	Status            NodeStatus          `json:"status"`
 }
 
 // ApiNode.ConvertToServerNode - converts an api node to a server node
@@ -192,6 +193,7 @@ func (nm *Node) ConvertToAPINode() *ApiNode {
 	apiNode.IsStatic = nm.IsStatic
 	apiNode.IsUserNode = nm.IsUserNode
 	apiNode.StaticNode = nm.StaticNode
+	apiNode.Status = nm.Status
 	return &apiNode
 }
 

+ 14 - 0
models/node.go

@@ -11,6 +11,19 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
+type NodeStatus string
+
+const (
+	OnlineSt  NodeStatus = "online"
+	OfflineSt NodeStatus = "offline"
+	WarningSt NodeStatus = "warning"
+	ErrorSt   NodeStatus = "error"
+	UnKnown   NodeStatus = "unknown"
+)
+
+// LastCheckInThreshold - if node's checkin more than this threshold,then node is declared as offline
+const LastCheckInThreshold = time.Minute * 10
+
 const (
 	// NODE_SERVER_NAME - the default server name
 	NODE_SERVER_NAME = "netmaker"
@@ -103,6 +116,7 @@ type Node struct {
 	IsStatic          bool                `json:"is_static"`
 	IsUserNode        bool                `json:"is_user_node"`
 	StaticNode        ExtClient           `json:"static_node"`
+	Status            NodeStatus          `json:"node_status"`
 }
 
 // LegacyNode - legacy struct for node model

+ 0 - 3
mq/migrate.go

@@ -93,9 +93,6 @@ func SendPullSYN() error {
 			return err
 		}
 		encrypted, encryptErr := encryptAESGCM(host.TrafficKeyPublic[0:32], zipped)
-		if encryptErr != nil {
-			return encryptErr
-		}
 
 		if encryptErr != nil {
 			continue

+ 1 - 29
mq/util.go

@@ -12,9 +12,7 @@ import (
 	"math"
 	"strings"
 	"time"
-	"unicode"
 
-	"github.com/blang/semver"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/ncutils"
@@ -139,7 +137,7 @@ func publish(host *models.Host, dest string, msg []byte) error {
 
 	var encrypted []byte
 	var encryptErr error
-	vlt, err := versionLessThan(host.Version, "v0.30.0")
+	vlt, err := logic.VersionLessThan(host.Version, "v0.30.0")
 	if err != nil {
 		slog.Warn("error checking version less than", "error", err)
 		return err
@@ -187,29 +185,3 @@ func GetID(topic string) (string, error) {
 	//the last part of the topic will be the node.ID
 	return parts[count-1], nil
 }
-
-// versionLessThan checks if v1 < v2 semantically
-// dev is the latest version
-func versionLessThan(v1, v2 string) (bool, error) {
-	if v1 == "dev" {
-		return false, nil
-	}
-	if v2 == "dev" {
-		return true, nil
-	}
-	semVer1 := strings.TrimFunc(v1, func(r rune) bool {
-		return !unicode.IsNumber(r)
-	})
-	semVer2 := strings.TrimFunc(v2, func(r rune) bool {
-		return !unicode.IsNumber(r)
-	})
-	sv1, err := semver.Parse(semVer1)
-	if err != nil {
-		return false, fmt.Errorf("failed to parse semver1 (%s): %w", semVer1, err)
-	}
-	sv2, err := semver.Parse(semVer2)
-	if err != nil {
-		return false, fmt.Errorf("failed to parse semver2 (%s): %w", semVer2, err)
-	}
-	return sv1.LT(sv2), nil
-}

+ 1 - 0
pro/initialize.go

@@ -140,6 +140,7 @@ func InitPro() {
 	logic.IntialiseGroups = proLogic.UserGroupsInit
 	logic.AddGlobalNetRolesToAdmins = proLogic.AddGlobalNetRolesToAdmins
 	logic.GetUserGroupsInNetwork = proLogic.GetUserGroupsInNetwork
+	logic.GetNodeStatus = proLogic.GetNodeStatus
 }
 
 func retrieveProLogo() string {

+ 197 - 0
pro/logic/status.go

@@ -0,0 +1,197 @@
+package logic
+
+import (
+	"time"
+
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+)
+
+func getNodeStatusOld(node *models.Node) {
+	// On CE check only last check-in time
+	if node.IsStatic {
+		if !node.StaticNode.Enabled {
+			node.Status = models.OfflineSt
+			return
+		}
+		node.Status = models.OnlineSt
+		return
+	}
+	if time.Since(node.LastCheckIn) > time.Minute*10 {
+		node.Status = models.OfflineSt
+		return
+	}
+	node.Status = models.OnlineSt
+}
+
+func GetNodeStatus(node *models.Node, defaultEnabledPolicy bool) {
+
+	if time.Since(node.LastCheckIn) > models.LastCheckInThreshold {
+		node.Status = models.OfflineSt
+		return
+	}
+	if node.IsStatic {
+		if !node.StaticNode.Enabled {
+			node.Status = models.OfflineSt
+			return
+		}
+		// check extclient connection from metrics
+		ingressMetrics, err := GetMetrics(node.StaticNode.IngressGatewayID)
+		if err != nil || ingressMetrics == nil || ingressMetrics.Connectivity == nil {
+			node.Status = models.UnKnown
+			return
+		}
+		if metric, ok := ingressMetrics.Connectivity[node.StaticNode.ClientID]; ok {
+			if metric.Connected {
+				node.Status = models.OnlineSt
+				return
+			} else {
+				node.Status = models.OfflineSt
+				return
+			}
+		}
+		node.Status = models.UnKnown
+		return
+	}
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
+		node.Status = models.UnKnown
+		return
+	}
+	vlt, err := logic.VersionLessThan(host.Version, "v0.30.0")
+	if err != nil {
+		node.Status = models.UnKnown
+		return
+	}
+	if vlt {
+		getNodeStatusOld(node)
+		return
+	}
+	metrics, err := logic.GetMetrics(node.ID.String())
+	if err != nil {
+		return
+	}
+	if metrics == nil || metrics.Connectivity == nil {
+		if time.Since(node.LastCheckIn) < models.LastCheckInThreshold {
+			node.Status = models.OnlineSt
+			return
+		}
+	}
+	// if node.IsFailOver {
+	// 	if time.Since(node.LastCheckIn) < models.LastCheckInThreshold {
+	// 		node.Status = models.OnlineSt
+	// 		return
+	// 	}
+	// }
+	// If all Peers are able to reach me and and the peer is not able to reached by any peer then return online
+	/* 1. FailOver Exists
+		a. check connectivity to failover Node - if no connection return warning
+		b. if getting failedover and still no connection to any of the peers - then show error
+		c. if getting failedOver and has connections to some peers - show warning
+	2. FailOver Doesn't Exist
+		a. check connectivity to pu
+
+	*/
+
+	// failoverNode, exists := FailOverExists(node.Network)
+	// if exists && failoverNode.FailedOverBy != uuid.Nil {
+	// 	// check connectivity to failover Node
+	// 	if metric, ok := metrics.Connectivity[failoverNode.ID.String()]; ok {
+	// 		if time.Since(failoverNode.LastCheckIn) < models.LastCheckInThreshold {
+	// 			if metric.Connected {
+	// 				node.Status = models.OnlineSt
+	// 				return
+	// 			} else {
+	// 				checkPeerConnectivity(node, metrics)
+	// 				return
+	// 			}
+	// 		}
+	// 	} else {
+	// 		node.Status = models.OnlineSt
+	// 		return
+	// 	}
+
+	// }
+	checkPeerConnectivity(node, metrics, defaultEnabledPolicy)
+
+}
+
+func checkPeerStatus(node *models.Node, defaultAclPolicy bool) {
+	peerNotConnectedCnt := 0
+	metrics, err := logic.GetMetrics(node.ID.String())
+	if err != nil {
+		return
+	}
+	if metrics == nil || metrics.Connectivity == nil {
+		if time.Since(node.LastCheckIn) < models.LastCheckInThreshold {
+			node.Status = models.OnlineSt
+			return
+		}
+	}
+	for peerID, metric := range metrics.Connectivity {
+		peer, err := logic.GetNodeByID(peerID)
+		if err != nil {
+			continue
+		}
+		if !defaultAclPolicy && !logic.IsNodeAllowedToCommunicate(*node, peer, false) {
+			continue
+		}
+
+		if time.Since(peer.LastCheckIn) > models.LastCheckInThreshold {
+			continue
+		}
+		if metric.Connected {
+			continue
+		}
+		if peer.Status == models.ErrorSt {
+			continue
+		}
+		peerNotConnectedCnt++
+
+	}
+	if peerNotConnectedCnt == 0 {
+		node.Status = models.OnlineSt
+		return
+	}
+	if peerNotConnectedCnt == len(metrics.Connectivity) {
+		node.Status = models.ErrorSt
+		return
+	}
+	node.Status = models.WarningSt
+}
+
+func checkPeerConnectivity(node *models.Node, metrics *models.Metrics, defaultAclPolicy bool) {
+	peerNotConnectedCnt := 0
+	for peerID, metric := range metrics.Connectivity {
+		peer, err := logic.GetNodeByID(peerID)
+		if err != nil {
+			continue
+		}
+		if !defaultAclPolicy && !logic.IsNodeAllowedToCommunicate(*node, peer, false) {
+			continue
+		}
+
+		if time.Since(peer.LastCheckIn) > models.LastCheckInThreshold {
+			continue
+		}
+		if metric.Connected {
+			continue
+		}
+		// check if peer is in error state
+		checkPeerStatus(&peer, defaultAclPolicy)
+		if peer.Status == models.ErrorSt {
+			continue
+		}
+		peerNotConnectedCnt++
+
+	}
+	if peerNotConnectedCnt == 0 {
+		node.Status = models.OnlineSt
+		return
+	}
+	if peerNotConnectedCnt == len(metrics.Connectivity) {
+		node.Status = models.ErrorSt
+		return
+	}
+	node.Status = models.WarningSt
+}

+ 6 - 0
servercfg/serverconf.go

@@ -76,6 +76,7 @@ func GetServerConfig() config.ServerConfig {
 	cfg.Database = GetDB()
 	cfg.Platform = GetPlatform()
 	cfg.Version = GetVersion()
+	cfg.PublicIp = GetServerHostIP()
 
 	// == auth config ==
 	var authInfo = GetAuthProviderInfo()
@@ -180,6 +181,11 @@ func GetVersion() string {
 	return Version
 }
 
+// GetServerHostIP - fetches server IP
+func GetServerHostIP() string {
+	return os.Getenv("SERVER_HOST")
+}
+
 // GetDB - gets the database type
 func GetDB() string {
 	database := "sqlite"