Browse Source

Merge pull request #2132 from gravitl/GRA-1304-host-checkin

GRA-1304: adjusted check-in to be host-based
dcarns 2 years ago
parent
commit
a9e299fcda
4 changed files with 56 additions and 92 deletions
  1. 10 0
      logic/nodes.go
  2. 2 0
      models/host.go
  3. 44 88
      mq/handlers.go
  4. 0 4
      mq/mq.go

+ 10 - 0
logic/nodes.go

@@ -52,6 +52,16 @@ func GetNetworkNodesMemory(allNodes []models.Node, network string) []models.Node
 	return nodes
 }
 
+// UpdateNodeCheckin - updates the checkin time of a node
+func UpdateNodeCheckin(node *models.Node) error {
+	node.SetLastCheckIn()
+	data, err := json.Marshal(node)
+	if err != nil {
+		return err
+	}
+	return database.Insert(node.ID.String(), string(data), database.NODES_TABLE_NAME)
+}
+
 // UpdateNode - takes a node and updates another node with it's values
 func UpdateNode(currentNode *models.Node, newNode *models.Node) error {
 	if newNode.Address.IP.String() != currentNode.Address.IP.String() {

+ 2 - 0
models/host.go

@@ -94,6 +94,8 @@ const (
 	Acknowledgement = "ACK"
 	// RequestAck - request an ACK
 	RequestAck = "REQ_ACK"
+	// CheckIn - update last check in times and public address and interfaces
+	CheckIn = "CHECK_IN"
 )
 
 // HostUpdate - struct for host update

+ 44 - 88
mq/handlers.go

@@ -23,94 +23,6 @@ func DefaultHandler(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(0, "MQTT Message: Topic: ", string(msg.Topic()), " Message: ", string(msg.Payload()))
 }
 
-// Ping message Handler -- handles ping topic from client nodes
-func Ping(client mqtt.Client, msg mqtt.Message) {
-	id, err := getID(msg.Topic())
-	if err != nil {
-		logger.Log(0, "error getting node.ID sent on ping topic ")
-		return
-	}
-	node, err := logic.GetNodeByID(id)
-	if err != nil {
-		logger.Log(3, "mq-ping error getting node: ", err.Error())
-		node, err := logic.GetNodeByID(id)
-		if err != nil {
-			logger.Log(3, "mq-ping error getting node: ", err.Error())
-			if database.IsEmptyRecord(err) {
-				h := logic.GetHostByNodeID(id) // check if a host is still associated
-				if h != nil {                  // inform host that node should be removed
-					fakeNode := models.Node{}
-					fakeNode.ID, _ = uuid.Parse(id)
-					fakeNode.Action = models.NODE_DELETE
-					fakeNode.PendingDelete = true
-					if err := NodeUpdate(&fakeNode); err != nil {
-						logger.Log(0, "failed to inform host", h.Name, h.ID.String(), "to remove node", id, err.Error())
-					}
-				}
-			}
-			return
-		}
-		decrypted, decryptErr := decryptMsg(&node, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error())
-			return
-		}
-		var checkin models.NodeCheckin
-		if err := json.Unmarshal(decrypted, &checkin); err != nil {
-			logger.Log(1, "error unmarshaling payload ", err.Error())
-			return
-		}
-		host, err := logic.GetHost(node.HostID.String())
-		if err != nil {
-			logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
-			return
-		}
-		node.SetLastCheckIn()
-		host.Version = checkin.Version
-		node.Connected = checkin.Connected
-		host.Interfaces = checkin.Ifaces
-		for i := range host.Interfaces {
-			host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
-		}
-		if err := logic.UpdateNode(&node, &node); err != nil {
-			logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
-			return
-		}
-
-		return
-	}
-	decrypted, decryptErr := decryptMsg(&node, msg.Payload())
-	if decryptErr != nil {
-		logger.Log(0, "error decrypting when updating node ", node.ID.String(), decryptErr.Error())
-		return
-	}
-	var checkin models.NodeCheckin
-	if err := json.Unmarshal(decrypted, &checkin); err != nil {
-		logger.Log(1, "error unmarshaling payload ", err.Error())
-		return
-	}
-	host, err := logic.GetHost(node.HostID.String())
-	if err != nil {
-		logger.Log(0, "error retrieving host for node ", node.ID.String(), err.Error())
-		return
-	}
-	node.SetLastCheckIn()
-	host.Version = checkin.Version
-	node.Connected = checkin.Connected
-	host.Interfaces = checkin.Ifaces
-	for i := range host.Interfaces {
-		host.Interfaces[i].AddressString = host.Interfaces[i].Address.String()
-	}
-	if err := logic.UpdateNode(&node, &node); err != nil {
-		logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
-		return
-	}
-
-	logger.Log(3, "ping processed for node", node.ID.String())
-	// --TODO --set client version once feature is implemented.
-	//node.SetClientVersion(msg.Payload())
-}
-
 // UpdateNode  message Handler -- handles updates from client nodes
 func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 	id, err := getID(msg.Topic())
@@ -179,6 +91,8 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	logger.Log(3, fmt.Sprintf("recieved host update: %s\n", hostUpdate.Host.ID.String()))
 	var sendPeerUpdate bool
 	switch hostUpdate.Action {
+	case models.CheckIn:
+		sendPeerUpdate = handleHostCheckin(&hostUpdate.Host, currentHost)
 	case models.Acknowledgement:
 		hu := hostactions.GetAction(currentHost.ID.String())
 		if hu != nil {
@@ -447,3 +361,45 @@ func handleNewNodeDNS(host *models.Host, node *models.Node) error {
 	}
 	return nil
 }
+
+func handleHostCheckin(h, currentHost *models.Host) bool {
+	if h == nil {
+		return false
+	}
+
+	for i := range currentHost.Nodes {
+		currNodeID := currentHost.Nodes[i]
+		node, err := logic.GetNodeByID(currNodeID)
+		if err != nil {
+			if database.IsEmptyRecord(err) {
+				fakeNode := models.Node{}
+				fakeNode.ID, _ = uuid.Parse(currNodeID)
+				fakeNode.Action = models.NODE_DELETE
+				fakeNode.PendingDelete = true
+				if err := NodeUpdate(&fakeNode); err != nil {
+					logger.Log(0, "failed to inform host", currentHost.Name, currentHost.ID.String(), "to remove node", currNodeID, err.Error())
+				}
+			}
+			continue
+		}
+		if err := logic.UpdateNodeCheckin(&node); err != nil {
+			logger.Log(0, "error updating node", node.ID.String(), " on checkin", err.Error())
+		}
+	}
+
+	for i := range h.Interfaces {
+		h.Interfaces[i].AddressString = h.Interfaces[i].Address.String()
+	}
+	ifaceDelta := len(h.Interfaces) != len(currentHost.Interfaces) || !h.EndpointIP.Equal(currentHost.EndpointIP)
+	currentHost.EndpointIP = h.EndpointIP
+	currentHost.Interfaces = h.Interfaces
+	currentHost.DefaultInterface = h.DefaultInterface
+	if err := logic.UpsertHost(currentHost); err != nil {
+		logger.Log(0, "failed to update host after check-in", h.Name, h.ID.String(), err.Error())
+		return false
+	}
+
+	logger.Log(0, "ping processed for host", h.Name, h.ID.String())
+	return ifaceDelta
+
+}

+ 0 - 4
mq/mq.go

@@ -55,10 +55,6 @@ func SetupMQTT() {
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {
 		serverName := servercfg.GetServer()
-		if token := client.Subscribe(fmt.Sprintf("ping/%s/#", serverName), 2, mqtt.MessageHandler(Ping)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
-			client.Disconnect(240)
-			logger.Log(0, "ping subscription failed")
-		}
 		if token := client.Subscribe(fmt.Sprintf("update/%s/#", serverName), 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 			client.Disconnect(240)
 			logger.Log(0, "node update subscription failed")