Browse Source

Merge pull request #2085 from gravitl/GRA-1275-mq-cleanup

GRA 1275 small mq cleanup
dcarns 2 years ago
parent
commit
db8a25607c
1 changed files with 201 additions and 186 deletions
  1. 201 186
      mq/handlers.go

+ 201 - 186
mq/handlers.go

@@ -23,12 +23,14 @@ func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
 
 // Ping message Handler -- handles ping topic from client nodes
 func Ping(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := getID(msg.Topic())
-		if err != nil {
-			logger.Log(0, "error getting node.ID sent on ping topic ")
-			return
-		}
+	id, err := getID(msg.Topic())
+	if err != nil {
+		logger.Log(0, "error getting node.ID sent on ping topic ")
+		return
+	}
+	node, err := logic.GetNodeByID(id)
+	if err != nil {
+		logger.Log(3, "mq-ping error getting node: ", err.Error())
 		node, err := logic.GetNodeByID(id)
 		if err != nil {
 			logger.Log(3, "mq-ping error getting node: ", err.Error())
@@ -44,7 +46,6 @@ func Ping(client mqtt.Client, msg mqtt.Message) {
 					}
 				}
 			}
-
 			return
 		}
 		decrypted, decryptErr := decryptMsg(&node, msg.Payload())
@@ -74,233 +75,247 @@ func Ping(client mqtt.Client, msg mqtt.Message) {
 			return
 		}
 
-		logger.Log(3, "ping processed for node", node.ID.String())
-		// --TODO --set client version once feature is implemented.
-		//node.SetClientVersion(msg.Payload())
-	}()
+		return
+	}
+	decrypted, decryptErr := decryptMsg(&node, msg.Payload())
+	if decryptErr != nil {
+		logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error())
+		return
+	}
+	var checkin models.NodeCheckin
+	if err := json.Unmarshal(decrypted, &checkin); err != nil {
+		logger.Log(1, "error unmarshaling payload ", err.Error())
+		return
+	}
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
+		logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
+		return
+	}
+	node.SetLastCheckIn()
+	host.Version = checkin.Version
+	node.Connected = checkin.Connected
+	host.Interfaces = checkin.Ifaces
+	for i := range host.Interfaces {
+		host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
+	}
+	if err := logic.UpdateNode(&node, &node); err != nil {
+		logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
+		return
+	}
+
+	logger.Log(3, "ping processed for node", node.ID.String())
+	// --TODO --set client version once feature is implemented.
+	//node.SetClientVersion(msg.Payload())
 }
 
 // UpdateNode  message Handler -- handles updates from client nodes
 func UpdateNode(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := getID(msg.Topic())
-		if err != nil {
-			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
-			return
+	id, err := getID(msg.Topic())
+	if err != nil {
+		logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
+		return
+	}
+	currentNode, err := logic.GetNodeByID(id)
+	if err != nil {
+		logger.Log(1, "error getting node ", id, err.Error())
+		return
+	}
+	decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
+	if decryptErr != nil {
+		logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
+		return
+	}
+	var newNode models.Node
+	if err := json.Unmarshal(decrypted, &newNode); err != nil {
+		logger.Log(1, "error unmarshaling payload ", err.Error())
+		return
+	}
+
+	ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
+	if servercfg.Is_EE && ifaceDelta {
+		if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
+			logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
 		}
-		currentNode, err := logic.GetNodeByID(id)
-		if err != nil {
-			logger.Log(1, "error getting node ", id, err.Error())
-			return
+	}
+	newNode.SetLastCheckIn()
+	if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
+		logger.Log(1, "error saving node", err.Error())
+		return
+	}
+	if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
+		if err = PublishPeerUpdate(); err != nil {
+			logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
 		}
-		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
-			return
+	}
+
+	logger.Log(1, "updated node", id, newNode.ID.String())
+}
+
+// UpdateHost  message Handler -- handles host updates from clients
+func UpdateHost(client mqtt.Client, msg mqtt.Message) {
+	id, err := getID(msg.Topic())
+	if err != nil {
+		logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
+		return
+	}
+	currentHost, err := logic.GetHost(id)
+	if err != nil {
+		logger.Log(1, "error getting host ", id, err.Error())
+		return
+	}
+	decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
+	if decryptErr != nil {
+		logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error())
+		return
+	}
+	var hostUpdate models.HostUpdate
+	if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
+		logger.Log(1, "error unmarshaling payload ", err.Error())
+		return
+	}
+	logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
+	var sendPeerUpdate bool
+	switch hostUpdate.Action {
+	case models.Acknowledgement:
+		hu := hostactions.GetAction(currentHost.ID.String())
+		if hu != nil {
+			if err = HostUpdate(hu); err != nil {
+				logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
+				return
+			} else {
+				if err = PublishSingleHostPeerUpdate(currentHost, nil); err != nil {
+					logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
+					return
+				}
+			}
 		}
-		var newNode models.Node
-		if err := json.Unmarshal(decrypted, &newNode); err != nil {
-			logger.Log(1, "error unmarshaling payload ", err.Error())
+	case models.UpdateHost:
+		sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
+		err := logic.UpsertHost(currentHost)
+		if err != nil {
+			logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error())
 			return
 		}
-
-		ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
-		if servercfg.Is_EE && ifaceDelta {
-			if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
-				logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
+	case models.DeleteHost:
+		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
+			// delete EMQX credentials for host
+			if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
+				logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())
+				return
 			}
 		}
-		newNode.SetLastCheckIn()
-		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
-			logger.Log(1, "error saving node", err.Error())
+		if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
+			logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
 			return
 		}
-		if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
-			if err = PublishPeerUpdate(); err != nil {
-				logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
-			}
+		if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
+			logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
+			return
 		}
+		sendPeerUpdate = true
+	}
 
-		logger.Log(1, "updated node", id, newNode.ID.String())
-
-	}()
+	if sendPeerUpdate {
+		err := PublishPeerUpdate()
+		if err != nil {
+			logger.Log(0, "failed to pulish peer update: ", err.Error())
+		}
+	}
+	// if servercfg.Is_EE && ifaceDelta {
+	// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
+	// 		logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
+	// 	}
+	// }
 }
 
-// UpdateHost  message Handler -- handles host updates from clients
-func UpdateHost(client mqtt.Client, msg mqtt.Message) {
-	go func(msg mqtt.Message) {
+// UpdateMetrics  message Handler -- handles updates from client nodes for metrics
+func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
+	if servercfg.Is_EE {
 		id, err := getID(msg.Topic())
 		if err != nil {
-			logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
+			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
 			return
 		}
-		currentHost, err := logic.GetHost(id)
+		currentNode, err := logic.GetNodeByID(id)
 		if err != nil {
-			logger.Log(1, "error getting host ", id, err.Error())
+			logger.Log(1, "error getting node ", id, err.Error())
 			return
 		}
-		decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
+		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
 		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error())
+			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
 			return
 		}
-		var hostUpdate models.HostUpdate
-		if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
+
+		var newMetrics models.Metrics
+		if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
 			logger.Log(1, "error unmarshaling payload ", err.Error())
 			return
 		}
-		logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
-		var sendPeerUpdate bool
-		switch hostUpdate.Action {
-		case models.Acknowledgement:
-			hu := hostactions.GetAction(currentHost.ID.String())
-			if hu != nil {
-				if err = HostUpdate(hu); err != nil {
-					logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
-					return
-				} else {
-					if err = PublishSingleHostPeerUpdate(currentHost, nil); err != nil {
-						logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
-						return
-					}
-				}
-			}
-		case models.UpdateHost:
-			sendPeerUpdate = logic.UpdateHostFromClient(&hostUpdate.Host, currentHost)
-			err := logic.UpsertHost(currentHost)
-			if err != nil {
-				logger.Log(0, "failed to update host: ", currentHost.ID.String(), err.Error())
-				return
-			}
-		case models.DeleteHost:
-			if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-				// delete EMQX credentials for host
-				if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
-					logger.Log(0, "failed to remove host credentials from EMQX: ", currentHost.ID.String(), err.Error())
-					return
-				}
-			}
-			if err := logic.DisassociateAllNodesFromHost(currentHost.ID.String()); err != nil {
-				logger.Log(0, "failed to delete all nodes of host: ", currentHost.ID.String(), err.Error())
-				return
-			}
-			if err := logic.RemoveHostByID(currentHost.ID.String()); err != nil {
-				logger.Log(0, "failed to delete host: ", currentHost.ID.String(), err.Error())
-				return
-			}
-			sendPeerUpdate = true
-		}
 
-		if sendPeerUpdate {
-			err := PublishPeerUpdate()
-			if err != nil {
-				logger.Log(0, "failed to pulish peer update: ", err.Error())
+		shouldUpdate := updateNodeMetrics(&currentNode, &newMetrics)
+
+		if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
+			logger.Log(1, "faield to update node metrics", id, err.Error())
+			return
+		}
+		if servercfg.IsMetricsExporter() {
+			if err := pushMetricsToExporter(newMetrics); err != nil {
+				logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v",
+					currentNode.ID, err))
 			}
 		}
-		// if servercfg.Is_EE && ifaceDelta {
-		// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
-		// 		logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
-		// 	}
-		// }
-
-	}(msg)
-}
 
-// UpdateMetrics  message Handler -- handles updates from client nodes for metrics
-func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
-	if servercfg.Is_EE {
-		go func() {
-			id, err := getID(msg.Topic())
-			if err != nil {
-				logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
-				return
-			}
-			currentNode, err := logic.GetNodeByID(id)
+		if newMetrics.Connectivity != nil {
+			err := logic.EnterpriseFailoverFunc(&currentNode)
 			if err != nil {
-				logger.Log(1, "error getting node ", id, err.Error())
-				return
-			}
-			decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
-			if decryptErr != nil {
-				logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
-				return
-			}
-
-			var newMetrics models.Metrics
-			if err := json.Unmarshal(decrypted, &newMetrics); err != nil {
-				logger.Log(1, "error unmarshaling payload ", err.Error())
-				return
-			}
-
-			shouldUpdate := updateNodeMetrics(&currentNode, &newMetrics)
-
-			if err = logic.UpdateMetrics(id, &newMetrics); err != nil {
-				logger.Log(1, "faield to update node metrics", id, err.Error())
-				return
-			}
-			if servercfg.IsMetricsExporter() {
-				if err := pushMetricsToExporter(newMetrics); err != nil {
-					logger.Log(2, fmt.Sprintf("failed to push node: [%s] metrics to exporter, err: %v",
-						currentNode.ID, err))
-				}
-			}
-
-			if newMetrics.Connectivity != nil {
-				err := logic.EnterpriseFailoverFunc(&currentNode)
-				if err != nil {
-					logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error())
-				}
+				logger.Log(0, "failed to failover for node", currentNode.ID.String(), "on network", currentNode.Network, "-", err.Error())
 			}
+		}
 
-			if shouldUpdate {
-				logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
-				host, err := logic.GetHost(currentNode.HostID.String())
-				if err == nil {
-					if err = PublishSingleHostPeerUpdate(host, nil); err != nil {
-						logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
-					}
+		if shouldUpdate {
+			logger.Log(2, "updating peers after node", currentNode.ID.String(), currentNode.Network, "detected connectivity issues")
+			host, err := logic.GetHost(currentNode.HostID.String())
+			if err == nil {
+				if err = PublishSingleHostPeerUpdate(host, nil); err != nil {
+					logger.Log(0, "failed to publish update after failover peer change for node", currentNode.ID.String(), currentNode.Network)
 				}
 			}
+		}
 
-			logger.Log(1, "updated node metrics", id)
-		}()
+		logger.Log(1, "updated node metrics", id)
 	}
 }
 
 // ClientPeerUpdate  message handler -- handles updating peers after signal from client nodes
 func ClientPeerUpdate(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := getID(msg.Topic())
-		if err != nil {
-			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
-			return
-		}
-		currentNode, err := logic.GetNodeByID(id)
-		if err != nil {
-			logger.Log(1, "error getting node ", id, err.Error())
-			return
-		}
-		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error())
+	id, err := getID(msg.Topic())
+	if err != nil {
+		logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
+		return
+	}
+	currentNode, err := logic.GetNodeByID(id)
+	if err != nil {
+		logger.Log(1, "error getting node ", id, err.Error())
+		return
+	}
+	decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
+	if decryptErr != nil {
+		logger.Log(1, "failed to decrypt message during client peer update for node ", id, decryptErr.Error())
+		return
+	}
+	switch decrypted[0] {
+	case ncutils.ACK:
+		// do we still need this
+	case ncutils.DONE:
+		if err = PublishPeerUpdate(); err != nil {
+			logger.Log(1, "error publishing peer update for node", currentNode.ID.String(), err.Error())
 			return
 		}
-		switch decrypted[0] {
-		case ncutils.ACK:
-			//do we still need this
-		case ncutils.DONE:
-			updateNodePeers(&currentNode)
-		}
-
-		logger.Log(1, "sent peer updates after signal received from", id)
-	}()
-}
-
-func updateNodePeers(currentNode *models.Node) {
-	if err := PublishPeerUpdate(); err != nil {
-		logger.Log(1, "error publishing peer update ", err.Error())
-		return
 	}
+
+	logger.Log(1, "sent peer updates after signal received from", id)
 }
 
 func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) bool {