Browse Source

mq handlers for host updates

Abhishek Kondur 2 years ago
parent
commit
acceb6e370
3 changed files with 27 additions and 21 deletions
  1. 11 15
      mq/handlers.go
  2. 6 6
      mq/publishers.go
  3. 10 0
      mq/util.go

+ 11 - 15
mq/handlers.go

@@ -118,43 +118,39 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 	}()
 }
 
-// UpdateHost  message Handler -- handles updates from client hosts
+// UpdateHost  message Handler -- handles host updates from clients
 func UpdateHost(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := getID(msg.Topic())
+	go func(msg mqtt.Message) {
+		id, err := getHostID(msg.Topic())
 		if err != nil {
 			logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
 			return
 		}
 		currentHost, err := logic.GetHost(id)
 		if err != nil {
-			logger.Log(1, "error getting node ", id, err.Error())
+			logger.Log(1, "error getting host ", id, err.Error())
 			return
 		}
 		decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
 		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
+			logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error())
 			return
 		}
-		var newHost models.Host
-		if err := json.Unmarshal(decrypted, &newHost); err != nil {
+		var hostUpdate models.HostUpdate
+		if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
 			logger.Log(1, "error unmarshaling payload ", err.Error())
 			return
 		}
-		// ifaceDelta := logic.IfaceDelta(&currentHost, newNode)
+		logger.Log(0, fmt.Sprintf("recieved host update: %+v\n", hostUpdate))
+		// TODO: update logic for host for the recieved update from client
+
 		// if servercfg.Is_EE && ifaceDelta {
 		// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
 		// 		logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
 		// 	}
 		// }
-		logic.UpdateHost(&newHost, currentHost)
-		if err := logic.UpsertHost(&newHost); err != nil {
-			logger.Log(1, "error saving host", err.Error())
-			return
-		}
 
-		logger.Log(1, "updated host", newHost.ID.String())
-	}()
+	}(msg)
 }
 
 // UpdateMetrics  message Handler -- handles updates from client nodes for metrics

+ 6 - 6
mq/publishers.go

@@ -92,20 +92,20 @@ func NodeUpdate(node *models.Node) error {
 	return nil
 }
 
-// HostUpdate -- publishes a host topic update
-func HostUpdate(host *models.Host) error {
+// HostUpdate -- publishes a host update to clients
+func HostUpdate(hostUpdate *models.HostUpdate) error {
 	if !servercfg.IsMessageQueueBackend() {
 		return nil
 	}
-	logger.Log(3, "publishing host update to "+host.ID.String())
+	logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
 
-	data, err := json.Marshal(host)
+	data, err := json.Marshal(hostUpdate)
 	if err != nil {
 		logger.Log(2, "error marshalling node update ", err.Error())
 		return err
 	}
-	if err = publish(host, fmt.Sprintf("host/update/%s", host.ID.String()), data); err != nil {
-		logger.Log(2, "error publishing host update to", host.ID.String(), err.Error())
+	if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
+		logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
 		return err
 	}
 

+ 10 - 0
mq/util.go

@@ -94,3 +94,13 @@ func getID(topic string) (string, error) {
 	//the last part of the topic will be the node.ID
 	return parts[count-1], nil
 }
+
+// decodes a message queue topic and returns the embedded host.ID
+func getHostID(topic string) (string, error) {
+	parts := strings.Split(topic, "/")
+	count := len(parts)
+	if count < 4 {
+		return "", fmt.Errorf("invalid topic")
+	}
+	return parts[2], nil
+}