Преглед изворни кода

updated host update and node update to share same topic

0xdcarns пре 2 година
родитељ
комит
78281edf85
4 измењених фајлова са 82 додато и 84 уклоњено
  1. 0 18
      mq/dynsec_helper.go
  2. 79 59
      mq/handlers.go
  3. 2 6
      mq/mq.go
  4. 1 1
      mq/publishers.go

+ 0 - 18
mq/dynsec_helper.go

@@ -178,12 +178,6 @@ func fetchHostAcls(hostID string) []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
-		{
-			AclType:  "publishClientReceive",
-			Topic:    fmt.Sprintf("host/update/%s/#", hostID),
-			Priority: -1,
-			Allow:    true,
-		},
 	}
 }
 
@@ -359,12 +353,6 @@ func fetchServerAcls() []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
-		{
-			AclType:  "publishClientReceive",
-			Topic:    "host/update/#",
-			Priority: -1,
-			Allow:    true,
-		},
 	}
 }
 
@@ -390,12 +378,6 @@ func fetchNodeAcls() []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
-		{
-			AclType:  "publishClientSend",
-			Topic:    "host/update/#",
-			Priority: -1,
-			Allow:    true,
-		},
 		{
 			AclType:  "publishClientSend",
 			Topic:    "metrics/#",

+ 79 - 59
mq/handlers.go

@@ -72,74 +72,29 @@ func Ping(client mqtt.Client, msg mqtt.Message) {
 	}()
 }
 
-// UpdateNode  message Handler -- handles updates from client nodes
-func UpdateNode(client mqtt.Client, msg mqtt.Message) {
+// Update  message Handler -- handles updates from client nodes
+func Update(client mqtt.Client, msg mqtt.Message) {
 	go func() {
 		id, err := getID(msg.Topic())
 		if err != nil {
 			logger.Log(1, "error getting node.ID sent on ", msg.Topic(), err.Error())
 			return
 		}
-		currentNode, err := logic.GetNodeByID(id)
+		var currentNode *models.Node
+		var currentHost *models.Host
+		currentHost, err = logic.GetHost(id)
 		if err != nil {
-			logger.Log(1, "error getting node ", id, err.Error())
-			return
-		}
-		decrypted, decryptErr := decryptMsg(&currentNode, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
-			return
-		}
-		var newNode models.Node
-		if err := json.Unmarshal(decrypted, &newNode); err != nil {
-			logger.Log(1, "error unmarshaling payload ", err.Error())
-			return
-		}
-
-		ifaceDelta := logic.IfaceDelta(&currentNode, &newNode)
-		if servercfg.Is_EE && ifaceDelta {
-			if err = logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
-				logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
-			}
-		}
-		newNode.SetLastCheckIn()
-		if err := logic.UpdateNode(&currentNode, &newNode); err != nil {
-			logger.Log(1, "error saving node", err.Error())
-			return
-		}
-		if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
-			if err = PublishPeerUpdate(); err != nil {
-				logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
+			node, err := logic.GetNodeByID(id)
+			if err != nil {
+				logger.Log(1, "error getting completing update  ", msg.Topic(), err.Error())
+				return
 			}
+			currentNode = &node
 		}
-
-		logger.Log(1, "updated node", id, newNode.ID.String())
-
-	}()
-}
-
-// UpdateHost  message Handler -- handles updates from client hosts
-func UpdateHost(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := getID(msg.Topic())
-		if err != nil {
-			logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
-			return
-		}
-		currentHost, err := logic.GetHost(id)
-		if err != nil {
-			logger.Log(1, "error getting host ", id, err.Error())
-			return
-		}
-		decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
-		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
-			return
-		}
-
-		if decrypted[0] == byte(ncutils.ACK) {
-			logger.Log(1, "sending peer updates after completed host update", currentHost.ID.String(), currentHost.Name)
-			sendPeers(true)
+		if currentHost == nil && currentNode != nil {
+			handleNodeUpdate(currentNode, msg.Payload())
+		} else {
+			handleHostUpdate(currentHost, msg.Payload())
 		}
 	}()
 }
@@ -328,3 +283,68 @@ func updateNodeMetrics(currentNode *models.Node, newMetrics *models.Metrics) boo
 	}
 	return shouldUpdate
 }
+
+func handleHostUpdate(h *models.Host, payload []byte) {
+	go func() {
+		decrypted, decryptErr := decryptMsgWithHost(h, payload)
+		if decryptErr != nil {
+			logger.Log(1, "failed to decrypt message for host", h.Name, decryptErr.Error())
+			return
+		}
+		if len(decrypted) == 1 {
+			if decrypted[0] == byte(ncutils.ACK) {
+				logger.Log(1, "sending peer updates after completed host update", h.ID.String(), h.Name)
+			}
+		} else {
+			var newHost models.Host
+			if err := json.Unmarshal(payload, &newHost); err != nil {
+				logger.Log(0, "failed to unmarshal host update for host", h.Name, h.ID.String(), err.Error())
+				return
+			}
+			logic.UpdateHost(&newHost, h)
+			if err := logic.UpsertHost(&newHost); err != nil {
+				logger.Log(0, "failed to update host via signal", h.Name, h.ID.String(), err.Error())
+				return
+			}
+			logger.Log(1, "updated host", h.Name, h.ID.String())
+		}
+		sendPeers(true)
+	}()
+}
+
+func handleNodeUpdate(currentNode *models.Node, payload []byte) {
+	go func() {
+		decrypted, decryptErr := decryptMsg(currentNode, payload)
+		if decryptErr != nil {
+			logger.Log(1, "failed to decrypt message for node", currentNode.ID.String(), decryptErr.Error())
+			return
+		}
+		if decryptErr != nil {
+			logger.Log(1, "failed to decrypt message for node", currentNode.ID.String(), decryptErr.Error())
+			return
+		}
+		var newNode models.Node
+		if err := json.Unmarshal(decrypted, &newNode); err != nil {
+			logger.Log(1, "error unmarshaling payload ", err.Error())
+			return
+		}
+
+		ifaceDelta := logic.IfaceDelta(currentNode, &newNode)
+		if servercfg.Is_EE && ifaceDelta {
+			if err := logic.EnterpriseResetAllPeersFailovers(currentNode.ID, currentNode.Network); err != nil {
+				logger.Log(1, "failed to reset failover list during node update", currentNode.ID.String(), currentNode.Network)
+			}
+		}
+		newNode.SetLastCheckIn()
+		if err := logic.UpdateNode(currentNode, &newNode); err != nil {
+			logger.Log(1, "error saving node", err.Error())
+			return
+		}
+		if ifaceDelta { // reduce number of unneeded updates, by only sending on iface changes
+			if err := PublishPeerUpdate(); err != nil {
+				logger.Log(0, "error updating peers when node", currentNode.ID.String(), "informed the server of an interface change", err.Error())
+			}
+		}
+		logger.Log(1, "updated node", newNode.ID.String())
+	}()
+}

+ 2 - 6
mq/mq.go

@@ -79,13 +79,9 @@ func SetupMQTT() {
 			client.Disconnect(240)
 			logger.Log(0, "ping subscription failed")
 		}
-		if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(UpdateNode)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
+		if token := client.Subscribe("update/#", 0, mqtt.MessageHandler(Update)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 			client.Disconnect(240)
-			logger.Log(0, "node update subscription failed")
-		}
-		if token := client.Subscribe("host/update/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
-			client.Disconnect(240)
-			logger.Log(0, "host update subscription failed")
+			logger.Log(0, "update subscription failed")
 		}
 		if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 			client.Disconnect(240)

+ 1 - 1
mq/publishers.go

@@ -104,7 +104,7 @@ func HostUpdate(host *models.Host) error {
 		logger.Log(2, "error marshalling node update ", err.Error())
 		return err
 	}
-	if err = publish(host, fmt.Sprintf("host/update/%s/%s", host.ID.String(), servercfg.GetServer()), data); err != nil {
+	if err = publish(host, fmt.Sprintf("update/%s/%s", host.ID.String(), servercfg.GetServer()), data); err != nil {
 		logger.Log(2, "error publishing host update to", host.ID.String(), err.Error())
 		return err
 	}