Browse Source

Merge pull request #1921 from gravitl/story/GRA-895

Enabled MQ for host Updates between client + server (unused)
dcarns 2 years ago
parent
commit
0a5f57181e
5 changed files with 87 additions and 12 deletions
  1. 12 0
      mq/dynsec_helper.go
  2. 39 0
      mq/handlers.go
  3. 4 0
      mq/mq.go
  4. 20 0
      mq/publishers.go
  5. 12 12
      mq/util.go

+ 12 - 0
mq/dynsec_helper.go

@@ -178,6 +178,12 @@ func fetchHostAcls(hostID string) []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
+		{
+			AclType:  "publishClientReceive",
+			Topic:    fmt.Sprintf("host/update/%s", hostID),
+			Priority: -1,
+			Allow:    true,
+		},
 	}
 }
 
@@ -353,6 +359,12 @@ func fetchServerAcls() []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
+		{
+			AclType:  "publishClientReceive",
+			Topic:    "host/update/#",
+			Priority: -1,
+			Allow:    true,
+		},
 	}
 }
 

+ 39 - 0
mq/handlers.go

@@ -118,6 +118,45 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 	}()
 }
 
+// UpdateHost  message Handler -- handles updates from client hosts
+func UpdateHost(client mqtt.Client, msg mqtt.Message) {
+	go func() {
+		id, err := getID(msg.Topic())
+		if err != nil {
+			logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
+			return
+		}
+		currentHost, err := logic.GetHost(id)
+		if err != nil {
+			logger.Log(1, "error getting node ", id, err.Error())
+			return
+		}
+		decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
+		if decryptErr != nil {
+			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
+			return
+		}
+		var newHost models.Host
+		if err := json.Unmarshal(decrypted, &newHost); err != nil {
+			logger.Log(1, "error unmarshaling payload ", err.Error())
+			return
+		}
+		// ifaceDelta := logic.IfaceDelta(&currentHost, newNode)
+		// if servercfg.Is_EE && ifaceDelta {
+		// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
+		// 		logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
+		// 	}
+		// }
+		logic.UpdateHost(&newHost, currentHost)
+		if err := logic.UpsertHost(&newHost); err != nil {
+			logger.Log(1, "error saving host", err.Error())
+			return
+		}
+
+		logger.Log(1, "updated host", newHost.ID.String())
+	}()
+}
+
 // UpdateMetrics  message Handler -- handles updates from client nodes for metrics
 func UpdateMetrics(client mqtt.Client, msg mqtt.Message) {
 	if servercfg.Is_EE {

+ 4 - 0
mq/mq.go

@@ -83,6 +83,10 @@ func SetupMQTT() {
 			client.Disconnect(240)
 			logger.Log(0, "node update subscription failed")
 		}
+		if token := client.Subscribe("host/update/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
+			client.Disconnect(240)
+			logger.Log(0, "host update subscription failed")
+		}
 		if token := client.Subscribe("signal/#", 0, mqtt.MessageHandler(ClientPeerUpdate)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 			client.Disconnect(240)
 			logger.Log(0, "node client subscription failed")

+ 20 - 0
mq/publishers.go

@@ -113,6 +113,26 @@ func NodeUpdate(node *models.Node) error {
 	return nil
 }
 
+// HostUpdate -- publishes a host topic update
+func HostUpdate(host *models.Host) error {
+	if !servercfg.IsMessageQueueBackend() {
+		return nil
+	}
+	logger.Log(3, "publishing host update to "+host.ID.String())
+
+	data, err := json.Marshal(host)
+	if err != nil {
+		logger.Log(2, "error marshalling node update ", err.Error())
+		return err
+	}
+	if err = publish(host, fmt.Sprintf("host/update/%s", host.ID.String()), data); err != nil {
+		logger.Log(2, "error publishing host update to", host.ID.String(), err.Error())
+		return err
+	}
+
+	return nil
+}
+
 // ProxyUpdate -- publishes updates to peers related to proxy
 func ProxyUpdate(proxyPayload *proxy_models.ProxyManagerPayload, node *models.Node) error {
 	host, err := logic.GetHost(node.HostID.String())

+ 12 - 12
mq/util.go

@@ -11,15 +11,7 @@ import (
 	"github.com/gravitl/netmaker/netclient/ncutils"
 )
 
-func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
-	if len(msg) <= 24 { // make sure message is of appropriate length
-		return nil, fmt.Errorf("recieved invalid message from broker %v", msg)
-	}
-	host, err := logic.GetHost(node.HostID.String())
-	if err != nil {
-		return nil, err
-	}
-
+func decryptMsgWithHost(host *models.Host, msg []byte) ([]byte, error) {
 	trafficKey, trafficErr := logic.RetrievePrivateTrafficKey() // get server private key
 	if trafficErr != nil {
 		return nil, trafficErr
@@ -33,11 +25,19 @@ func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
 		return nil, err
 	}
 
-	if strings.Contains(host.Version, "0.10.0") {
-		return ncutils.BoxDecrypt(msg, nodePubTKey, serverPrivTKey)
+	return ncutils.DeChunk(msg, nodePubTKey, serverPrivTKey)
+}
+
+func decryptMsg(node *models.Node, msg []byte) ([]byte, error) {
+	if len(msg) <= 24 { // make sure message is of appropriate length
+		return nil, fmt.Errorf("recieved invalid message from broker %v", msg)
+	}
+	host, err := logic.GetHost(node.HostID.String())
+	if err != nil {
+		return nil, err
 	}
 
-	return ncutils.DeChunk(msg, nodePubTKey, serverPrivTKey)
+	return decryptMsgWithHost(host, msg)
 }
 
 func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {