Browse Source

initial logic for failover

0xdcarns 2 years ago
parent
commit
d565dbeaf8
6 changed files with 118 additions and 0 deletions
  1. 2 0
      ee/initialize.go
  2. 77 0
      ee/logic/failover.go
  3. 1 0
      logic/nodes.go
  4. 4 0
      logic/server.go
  5. 12 0
      models/node.go
  6. 22 0
      mq/handlers.go

+ 2 - 0
ee/initialize.go

@@ -6,6 +6,7 @@ package ee
 import (
 	controller "github.com/gravitl/netmaker/controllers"
 	"github.com/gravitl/netmaker/ee/ee_controllers"
+	eelogic "github.com/gravitl/netmaker/ee/logic"
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
@@ -28,6 +29,7 @@ func InitEE() {
 		// == End License Handling ==
 		AddLicenseHooks()
 	})
+	logic.EnterpriseFailoverFunc = eelogic.AutoRelay
 }
 
 func setControllerLimits() {

+ 77 - 0
ee/logic/failover.go

@@ -0,0 +1,77 @@
+package logic
+
+import (
+	"github.com/gravitl/netmaker/logger"
+	"github.com/gravitl/netmaker/logic"
+	"github.com/gravitl/netmaker/models"
+)
+
+// AutoRelay - finds a suitable relay candidate and creates a relay
+func AutoRelay(nodeToBeRelayed *models.Node) (updateNodes []models.Node, err error) {
+	newRelayer := determineFailoverCandidate(nodeToBeRelayed)
+	if newRelayer != nil {
+		return changeRelayStatus(newRelayer, nodeToBeRelayed)
+	}
+	return
+}
+
+// determineFailoverCandidate - returns a list of nodes that
+// are suitable for relaying a given node
+func determineFailoverCandidate(nodeToBeRelayed *models.Node) *models.Node {
+
+	currentNetworkNodes, err := logic.GetNetworkNodes(nodeToBeRelayed.Network)
+	if err != nil {
+		return nil
+	}
+
+	currentMetrics, err := logic.GetMetrics(nodeToBeRelayed.ID)
+	if err != nil || currentMetrics == nil || currentMetrics.Connectivity == nil {
+		return nil
+	}
+
+	minLatency := int64(9223372036854775807) // max signed int64 value
+	var fastestCandidate *models.Node
+	for i := range currentNetworkNodes {
+		if currentNetworkNodes[i].ID == nodeToBeRelayed.ID {
+			continue
+		}
+
+		if currentMetrics.Connectivity[currentNetworkNodes[i].ID].Connected && (currentNetworkNodes[i].Failover == "yes" || currentNetworkNodes[i].IsServer == "yes") {
+			if currentMetrics.Connectivity[currentNetworkNodes[i].ID].Latency < int64(minLatency) {
+				fastestCandidate = &currentNetworkNodes[i]
+				minLatency = currentMetrics.Connectivity[currentNetworkNodes[i].ID].Latency
+			}
+		}
+	}
+
+	if fastestCandidate == nil {
+		leader, err := logic.GetNetworkServerLeader(nodeToBeRelayed.Network)
+		if err != nil {
+			return nil
+		}
+		return &leader
+	}
+
+	return fastestCandidate
+}
+
+// changeRelayStatus - changes nodes to relay
+func changeRelayStatus(relayer, nodeToBeRelayed *models.Node) ([]models.Node, error) {
+	var newRelayRequest models.RelayRequest
+
+	if relayer.IsRelay == "yes" {
+		newRelayRequest.RelayAddrs = relayer.RelayAddrs
+	}
+	newRelayRequest.NodeID = relayer.ID
+	newRelayRequest.NetID = relayer.Network
+	newRelayRequest.RelayAddrs = append(newRelayRequest.RelayAddrs, nodeToBeRelayed.PrimaryAddress())
+
+	updatenodes, _, err := logic.CreateRelay(newRelayRequest)
+	if err != nil {
+		logger.Log(0, "failed to create relay automatically for node", nodeToBeRelayed.Name, "on network", nodeToBeRelayed.Network)
+		return nil, err
+	}
+	logger.Log(0, "created relay automatically for node", nodeToBeRelayed.Name, "on network", nodeToBeRelayed.Network)
+
+	return updatenodes, nil
+}

+ 1 - 0
logic/nodes.go

@@ -480,6 +480,7 @@ func SetNodeDefaults(node *models.Node) {
 	node.SetDefaultIsHub()
 	node.SetDefaultConnected()
 	node.SetDefaultACL()
+	node.SetDefaultFailover()
 }
 
 // GetRecordKey - get record key

+ 4 - 0
logic/server.go

@@ -18,8 +18,12 @@ import (
 	"golang.zx2c4.com/wireguard/wgctrl/wgtypes"
 )
 
+// EnterpriseCheckFuncs - can be set to run functions for EE
 var EnterpriseCheckFuncs []interface{}
 
+// EnterpriseFailoverFunc - interface to control failover funcs
+var EnterpriseFailoverFunc interface{}
+
 // == Join, Checkin, and Leave for Server ==
 
 // KUBERNETES_LISTEN_PORT - starting port for Kubernetes in order to use NodePort range

+ 12 - 0
models/node.go

@@ -104,6 +104,7 @@ type Node struct {
 	// == PRO ==
 	DefaultACL string `json:"defaultacl,omitempty" bson:"defaultacl,omitempty" yaml:"defaultacl,omitempty" validate:"checkyesornoorunset"`
 	OwnerID    string `json:"ownerid,omitempty" bson:"ownerid,omitempty" yaml:"ownerid,omitempty"`
+	Failover   string `json:"failover" bson:"failover" yaml:"failover" validate:"checkyesorno"`
 }
 
 // NodesArray - used for node sorting
@@ -297,6 +298,13 @@ func (node *Node) SetDefaultName() {
 	}
 }
 
+// Node.SetDefaultFailover - sets default value of failover status to no if not set
+func (node *Node) SetDefaultFailover() {
+	if node.Failover == "" {
+		node.Failover = "yes"
+	}
+}
+
 // Node.Fill - fills other node data into calling node data if not set on calling node
 func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftables present
 	newNode.ID = currentNode.ID
@@ -452,6 +460,10 @@ func (newNode *Node) Fill(currentNode *Node) { // TODO add new field for nftable
 		newNode.DefaultACL = currentNode.DefaultACL
 	}
 
+	if newNode.Failover == "" {
+		newNode.Failover = currentNode.Failover
+	}
+
 	newNode.TrafficKeys = currentNode.TrafficKeys
 }
 

+ 22 - 0
mq/handlers.go

@@ -135,6 +135,28 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 				}
 			}
 
+			if newMetrics.Connectivity != nil {
+				hasDisconnection := false
+				for k := range newMetrics.Connectivity {
+					if !newMetrics.Connectivity[k].Connected {
+						hasDisconnection = true
+					}
+				}
+				if hasDisconnection {
+					_, err := logic.EnterpriseFailoverFunc.(func(*models.Node) ([]models.Node, error))(&currentNode)
+					if err != nil {
+						logger.Log(0, "could failed to failover for node", currentNode.Name, "on network", currentNode.Network, "-", err.Error())
+					} else {
+						if err := NodeUpdate(&currentNode); err != nil {
+							logger.Log(1, "error publishing node update to node", currentNode.Name, err.Error())
+						}
+						if err := PublishPeerUpdate(&currentNode, true); err != nil {
+							logger.Log(1, "error publishing peer update after auto relay for node", currentNode.Name, err.Error())
+						}
+					}
+				}
+			}
+
 			logger.Log(1, "updated node metrics", id, currentNode.Name)
 		}()
 	}