Browse Source

Merge pull request #1945 from gravitl/GRA-985/host_updates

GRA-985/host updates
dcarns 2 years ago
parent
commit
a8f943ac78
7 changed files with 68 additions and 25 deletions
  1. 2 1
      controllers/hosts.go
  2. 19 0
      models/host.go
  3. 14 2
      mq/dynsec_helper.go
  4. 16 15
      mq/handlers.go
  5. 1 1
      mq/mq.go
  6. 6 6
      mq/publishers.go
  7. 10 0
      mq/util.go

+ 2 - 1
controllers/hosts.go

@@ -108,7 +108,7 @@ func updateHost(w http.ResponseWriter, r *http.Request) {
 			logger.Log(0, r.Header.Get("user"), "failed to update host networks roles in DynSec:", err.Error())
 		}
 	}
-
+	// TODO: publish host update through MQ
 	go func() {
 		if err := mq.PublishPeerUpdate(); err != nil {
 			logger.Log(0, "fail to publish peer update: ", err.Error())
@@ -148,6 +148,7 @@ func deleteHost(w http.ResponseWriter, r *http.Request) {
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
 	}
+	// TODO: publish host update with delete action using MQ
 
 	if err = mq.DeleteMqClient(currHost.ID.String()); err != nil {
 		logger.Log(0, "error removing DynSec credentials for host:", currHost.Name, err.Error())

+ 19 - 0
models/host.go

@@ -64,3 +64,22 @@ func ParseBool(s string) bool {
 	}
 	return b
 }
+
+// HostMqAction - type for host update action
+type HostMqAction string
+
+const (
+	// UpdateHost - constant for host update action
+	UpdateHost = "UPDATE_HOST"
+	// DeleteHost - constant for host delete action
+	DeleteHost = "DELETE_HOST"
+	// JoinHostToNetwork - constant for host network join action
+	JoinHostToNetwork = "JOIN_HOST_TO_NETWORK"
+)
+
+// HostUpdate - struct for host update
+type HostUpdate struct {
+	Action HostMqAction
+	Host   Host
+	Node   Node
+}

+ 14 - 2
mq/dynsec_helper.go

@@ -180,7 +180,13 @@ func fetchHostAcls(hostID string) []Acl {
 		},
 		{
 			AclType:  "publishClientReceive",
-			Topic:    fmt.Sprintf("host/update/%s", hostID),
+			Topic:    fmt.Sprintf("host/update/%s/#", hostID),
+			Priority: -1,
+			Allow:    true,
+		},
+		{
+			AclType:  "publishClientSend",
+			Topic:    fmt.Sprintf("host/serverupdate/%s", hostID),
 			Priority: -1,
 			Allow:    true,
 		},
@@ -323,6 +329,12 @@ func fetchServerAcls() []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
+		{
+			AclType:  "publishClientSend",
+			Topic:    "host/update/#",
+			Priority: -1,
+			Allow:    true,
+		},
 		{
 			AclType:  "publishClientReceive",
 			Topic:    "ping/#",
@@ -361,7 +373,7 @@ func fetchServerAcls() []Acl {
 		},
 		{
 			AclType:  "publishClientReceive",
-			Topic:    "host/update/#",
+			Topic:    "host/serverupdate/#",
 			Priority: -1,
 			Allow:    true,
 		},

+ 16 - 15
mq/handlers.go

@@ -118,43 +118,44 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 	}()
 }
 
-// UpdateHost  message Handler -- handles updates from client hosts
+// UpdateHost  message Handler -- handles host updates from clients
 func UpdateHost(client mqtt.Client, msg mqtt.Message) {
-	go func() {
-		id, err := getID(msg.Topic())
+	go func(msg mqtt.Message) {
+		id, err := getHostID(msg.Topic())
 		if err != nil {
 			logger.Log(1, "error getting host.ID sent on ", msg.Topic(), err.Error())
 			return
 		}
 		currentHost, err := logic.GetHost(id)
 		if err != nil {
-			logger.Log(1, "error getting node ", id, err.Error())
+			logger.Log(1, "error getting host ", id, err.Error())
 			return
 		}
 		decrypted, decryptErr := decryptMsgWithHost(currentHost, msg.Payload())
 		if decryptErr != nil {
-			logger.Log(1, "failed to decrypt message for node ", id, decryptErr.Error())
+			logger.Log(1, "failed to decrypt message for host ", id, decryptErr.Error())
 			return
 		}
-		var newHost models.Host
-		if err := json.Unmarshal(decrypted, &newHost); err != nil {
+		var hostUpdate models.HostUpdate
+		if err := json.Unmarshal(decrypted, &hostUpdate); err != nil {
 			logger.Log(1, "error unmarshaling payload ", err.Error())
 			return
 		}
-		// ifaceDelta := logic.IfaceDelta(&currentHost, newNode)
+		logger.Log(0, "recieved host update for host: ", id)
+		switch hostUpdate.Action {
+		case models.UpdateHost:
+			// TODO: logic to update host recieved from client
+		case models.DeleteHost:
+			// TODO: logic to delete host on the server
+
+		}
 		// if servercfg.Is_EE && ifaceDelta {
 		// 	if err = logic.EnterpriseResetAllPeersFailovers(currentHost.ID.String(), currentHost.Network); err != nil {
 		// 		logger.Log(1, "failed to reset failover list during node update", currentHost.ID.String(), currentHost.Network)
 		// 	}
 		// }
-		logic.UpdateHost(&newHost, currentHost)
-		if err := logic.UpsertHost(&newHost); err != nil {
-			logger.Log(1, "error saving host", err.Error())
-			return
-		}
 
-		logger.Log(1, "updated host", newHost.ID.String())
-	}()
+	}(msg)
 }
 
 // UpdateMetrics  message Handler -- handles updates from client nodes for metrics

+ 1 - 1
mq/mq.go

@@ -83,7 +83,7 @@ func SetupMQTT() {
 			client.Disconnect(240)
 			logger.Log(0, "node update subscription failed")
 		}
-		if token := client.Subscribe("host/update/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
+		if token := client.Subscribe("host/serverupdate/#", 0, mqtt.MessageHandler(UpdateHost)); token.WaitTimeout(MQ_TIMEOUT*time.Second) && token.Error() != nil {
 			client.Disconnect(240)
 			logger.Log(0, "host update subscription failed")
 		}

+ 6 - 6
mq/publishers.go

@@ -92,20 +92,20 @@ func NodeUpdate(node *models.Node) error {
 	return nil
 }
 
-// HostUpdate -- publishes a host topic update
-func HostUpdate(host *models.Host) error {
+// HostUpdate -- publishes a host update to clients
+func HostUpdate(hostUpdate *models.HostUpdate) error {
 	if !servercfg.IsMessageQueueBackend() {
 		return nil
 	}
-	logger.Log(3, "publishing host update to "+host.ID.String())
+	logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
 
-	data, err := json.Marshal(host)
+	data, err := json.Marshal(hostUpdate)
 	if err != nil {
 		logger.Log(2, "error marshalling node update ", err.Error())
 		return err
 	}
-	if err = publish(host, fmt.Sprintf("host/update/%s", host.ID.String()), data); err != nil {
-		logger.Log(2, "error publishing host update to", host.ID.String(), err.Error())
+	if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
+		logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
 		return err
 	}
 

+ 10 - 0
mq/util.go

@@ -94,3 +94,13 @@ func getID(topic string) (string, error) {
 	//the last part of the topic will be the node.ID
 	return parts[count-1], nil
 }
+
+// decodes a message queue topic and returns the embedded host.ID
+func getHostID(topic string) (string, error) {
+	parts := strings.Split(topic, "/")
+	count := len(parts)
+	if count < 4 {
+		return "", fmt.Errorf("invalid topic")
+	}
+	return parts[2], nil
+}