Browse Source

adding resets and single peer update on metrics disconnect detections

afeiszli 2 years ago
parent
commit
c8672818fe
5 changed files with 64 additions and 30 deletions
  1. 31 1
      controllers/node.go
  2. 8 13
      logic/gateway.go
  3. 1 0
      logic/nodes.go
  4. 8 2
      mq/handlers.go
  5. 16 14
      mq/publishers.go

+ 31 - 1
controllers/node.go

@@ -226,6 +226,10 @@ func authorize(nodesAllowed, networkCheck bool, authNetwork string, next http.Ha
 			if nodesAllowed {
 				// TODO --- should ensure that node is only operating on itself
 				if _, _, _, err := logic.VerifyToken(authToken); err == nil {
+
+					// this indicates request is from a node
+					// used for failover - if a getNode comes from node, this will trigger a metrics wipe
+					r.Header.Set("requestfrom", "node")
 					next.ServeHTTP(w, r)
 					return
 				}
@@ -411,6 +415,8 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 	// set header.
 	w.Header().Set("Content-Type", "application/json")
 
+	nodeRequest := r.Header.Get("requestfrom") == "node"
+
 	var params = mux.Vars(r)
 	nodeid := params["nodeid"]
 	node, err := logic.GetNodeByID(nodeid)
@@ -440,6 +446,12 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 		PeerIDs:      peerUpdate.PeerIDs,
 	}
 
+	if servercfg.Is_EE && nodeRequest {
+		if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil {
+			logger.Log(1, "failed to reset failover list during node config pull", node.Name, node.Network)
+		}
+	}
+
 	logger.Log(2, r.Header.Get("user"), "fetched node", params["nodeid"])
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(response)
@@ -771,6 +783,12 @@ func createIngressGateway(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	if servercfg.Is_EE && failoverReqBody.Failover {
+		if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil {
+			logger.Log(1, "failed to reset failover list during failover create", node.Name, node.Network)
+		}
+	}
+
 	logger.Log(1, r.Header.Get("user"), "created ingress gateway on node", nodeid, "on network", netid)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
@@ -794,7 +812,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 	var params = mux.Vars(r)
 	nodeid := params["nodeid"]
 	netid := params["network"]
-	node, err := logic.DeleteIngressGateway(netid, nodeid)
+	node, wasFailover, err := logic.DeleteIngressGateway(netid, nodeid)
 	if err != nil {
 		logger.Log(0, r.Header.Get("user"),
 			fmt.Sprintf("failed to delete ingress gateway on node [%s] on network [%s]: %v",
@@ -803,6 +821,12 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	if servercfg.Is_EE && wasFailover {
+		if err = logic.EnterpriseResetFailoverFunc(node.Network); err != nil {
+			logger.Log(1, "failed to reset failover list during failover create", node.Name, node.Network)
+		}
+	}
+
 	logger.Log(1, r.Header.Get("user"), "deleted ingress gateway", nodeid)
 	w.WriteHeader(http.StatusOK)
 	json.NewEncoder(w).Encode(node)
@@ -969,6 +993,12 @@ func deleteNode(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
+	if servercfg.Is_EE {
+		if err = logic.EnterpriseResetAllPeersFailovers(node.ID, node.Network); err != nil {
+			logger.Log(0, "failed to reset failover lists during node delete for node", node.Name, node.Network)
+		}
+	}
+
 	logic.ReturnSuccessResponse(w, r, nodeid+" deleted.")
 
 	logger.Log(1, r.Header.Get("user"), "Deleted node", nodeid, "from network", params["network"])

+ 8 - 13
logic/gateway.go

@@ -241,22 +241,22 @@ func CreateIngressGateway(netid string, nodeid string, failover bool) (models.No
 }
 
 // DeleteIngressGateway - deletes an ingress gateway
-func DeleteIngressGateway(networkName string, nodeid string) (models.Node, error) {
+func DeleteIngressGateway(networkName string, nodeid string) (models.Node, bool, error) {
 
 	node, err := GetNodeByID(nodeid)
 	if err != nil {
-		return models.Node{}, err
+		return models.Node{}, false, err
 	}
 	network, err := GetParentNetwork(networkName)
 	if err != nil {
-		return models.Node{}, err
+		return models.Node{}, false, err
 	}
 	// delete ext clients belonging to ingress gateway
 	if err = DeleteGatewayExtClients(node.ID, networkName); err != nil {
-		return models.Node{}, err
+		return models.Node{}, false, err
 	}
 	logger.Log(3, "deleting ingress gateway")
-
+	wasFailover := node.Failover == "yes"
 	node.UDPHolePunch = network.DefaultUDPHolePunch
 	node.LastModified = time.Now().Unix()
 	node.IsIngressGateway = "no"
@@ -276,21 +276,16 @@ func DeleteIngressGateway(networkName string, nodeid string) (models.Node, error
 		}
 	}
 
-	err = EnterpriseResetFailoverFunc(node.Network)
-	if err != nil {
-		logger.Log(0, "failed to reset failover on network", node.Network, ":", err.Error())
-	}
-
 	data, err := json.Marshal(&node)
 	if err != nil {
-		return models.Node{}, err
+		return models.Node{}, false, err
 	}
 	err = database.Insert(node.ID, string(data), database.NODES_TABLE_NAME)
 	if err != nil {
-		return models.Node{}, err
+		return models.Node{}, wasFailover, err
 	}
 	err = SetNetworkNodesLastModified(networkName)
-	return node, err
+	return node, wasFailover, err
 }
 
 // DeleteGatewayExtClients - deletes ext clients based on gateway (mac) of ingress node and network

+ 1 - 0
logic/nodes.go

@@ -212,6 +212,7 @@ func DeleteNodeByID(node *models.Node, exterminate bool) error {
 	if node.IsServer == "yes" {
 		return removeLocalServer(node)
 	}
+
 	return nil
 }
 

+ 8 - 2
mq/handlers.go

@@ -86,6 +86,12 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			logger.Log(1, "error unmarshaling payload ", err.Error())
 			return
 		}
+		ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
+		if servercfg.Is_EE && ifaceDelta {
+			if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
+				logger.Log(1, "failed to reset failover list during node update", currentNode.Name, currentNode.Network)
+			}
+		}
 		newNode.SetLastCheckIn()
 		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
 			logger.Log(1, "error saving node", err.Error())
@@ -144,7 +150,7 @@ func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 
 			if shouldUpdate {
 				logger.Log(2, "updating peers after node", currentNode.Name, currentNode.Network, "detected connectivity issues")
-				if err = PublishPeerUpdate(&currentNode, true); err != nil {
+				if err = PublishSinglePeerUpdate(&currentNode); err != nil {
 					logger.Log(0, "failed to publish update after failover peer change for node", currentNode.Name, currentNode.Network)
 				}
 			}
@@ -267,7 +273,7 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo
 			newMetrics.FailoverPeers[node.ID] = node.FailoverNode
 		}
 	}
-	shouldUpdate := false
+	shouldUpdate := len(oldMetrics.FailoverPeers) == 0 && len(newMetrics.FailoverPeers) > 0
 	for k, v := range oldMetrics.FailoverPeers {
 		if len(newMetrics.FailoverPeers[k]) > 0 && len(v) == 0 {
 			shouldUpdate = true

+ 16 - 14
mq/publishers.go

@@ -33,23 +33,25 @@ func PublishPeerUpdate(newNode *models.Node, publishToSelf bool) error {
 			//skip self
 			continue
 		}
-		peerUpdate, err := logic.GetPeerUpdate(&node)
+		err = PublishSinglePeerUpdate(&node)
 		if err != nil {
-			logger.Log(1, "error getting peer update for node", node.ID, err.Error())
-			continue
-		}
-		data, err := json.Marshal(&peerUpdate)
-		if err != nil {
-			logger.Log(2, "error marshaling peer update for node", node.ID, err.Error())
-			continue
-		}
-		if err = publish(&node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data); err != nil {
-			logger.Log(1, "failed to publish peer update for node", node.ID)
-		} else {
-			logger.Log(1, "sent peer update for node", node.Name, "on network:", node.Network)
+			logger.Log(1, "failed to publish peer update to node", node.Name, "on network", node.Network, ":", err.Error())
 		}
 	}
-	return nil
+	return err
+}
+
+// PublishSinglePeerUpdate --- determines and publishes a peer update to one node
+func PublishSinglePeerUpdate(node *models.Node) error {
+	peerUpdate, err := logic.GetPeerUpdate(node)
+	if err != nil {
+		return err
+	}
+	data, err := json.Marshal(&peerUpdate)
+	if err != nil {
+		return err
+	}
+	return publish(node, fmt.Sprintf("peers/%s/%s", node.Network, node.ID), data)
 }
 
 // PublishPeerUpdate --- publishes a peer update to all the peers of a node