浏览代码

join/delete host from networks,host updates to mq

Abhishek Kondur 2 年之前
父节点
当前提交
63e1bc230c
共有 5 个文件被更改,包括 80 次插入11 次删除
  1. 30 1
      controllers/hosts.go
  2. 27 5
      logic/hosts.go
  3. 12 0
      models/host.go
  4. 6 0
      mq/dynsec_helper.go
  5. 5 5
      mq/publishers.go

+ 30 - 1
controllers/hosts.go

@@ -189,7 +189,8 @@ func updateHostNetworks(w http.ResponseWriter, r *http.Request) {
 		return
 	}
 
-	if err = logic.UpdateHostNetworks(currHost, servercfg.GetServer(), payload.Networks[:]); err != nil {
+	newNets, delNets, err := logic.UpdateHostNetworks(currHost, servercfg.GetServer(), payload.Networks[:])
+	if err != nil {
 		logger.Log(0, r.Header.Get("user"), "failed to update host networks:", err.Error())
 		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "internal"))
 		return
@@ -202,6 +203,34 @@ func updateHostNetworks(w http.ResponseWriter, r *http.Request) {
 	}); err != nil {
 		logger.Log(0, r.Header.Get("user"), "failed to update host networks roles in DynSec:", err.Error())
 	}
+	go func() {
+		for _, newNet := range newNets {
+			node, err := logic.GetNodeByNetwork(currHost.ID.String(), newNet)
+			if err != nil {
+				logger.Log(0, "failed to get node for network: ", newNet, hostid)
+				continue
+			}
+			err = mq.HostUpdate(&models.HostUpdate{
+				Action:  models.JoinHostToNetwork,
+				Host:    *currHost,
+				Network: node.Network,
+				Node:    node,
+			})
+			if err != nil {
+				logger.Log(0, "failed to send mq msg to add host to network: ", node.Network, err.Error())
+			}
+		}
+		for _, delNet := range delNets {
+			err = mq.HostUpdate(&models.HostUpdate{
+				Action:  models.DeleteHostFromNetwork,
+				Host:    *currHost,
+				Network: delNet,
+			})
+			if err != nil {
+				logger.Log(0, "failed to send mq msg to delete host from network: ", delNet, err.Error())
+			}
+		}
+	}()
 
 	logger.Log(2, r.Header.Get("user"), "updated host networks", currHost.Name)
 	w.WriteHeader(http.StatusOK)

+ 27 - 5
logic/hosts.go

@@ -178,12 +178,14 @@ func RemoveHost(h *models.Host) error {
 }
 
 // UpdateHostNetworks - updates a given host's networks
-func UpdateHostNetworks(h *models.Host, server string, nets []string) error {
+func UpdateHostNetworks(h *models.Host, server string, nets []string) ([]string, []string, error) {
+	newNets := []string{}
+	delNets := []string{}
 	if len(h.Nodes) > 0 {
 		for i := range h.Nodes {
 			n, err := GetNodeByID(h.Nodes[i])
 			if err != nil {
-				return err
+				return newNets, delNets, err
 			}
 			// loop through networks and remove need for updating existing networks
 			found := false
@@ -195,8 +197,9 @@ func UpdateHostNetworks(h *models.Host, server string, nets []string) error {
 			}
 			if !found { // remove the node/host from that network
 				if err = DissasociateNodeFromHost(&n, h); err != nil {
-					return err
+					return newNets, delNets, err
 				}
+				delNets = append(delNets, n.Network)
 			}
 		}
 	} else {
@@ -210,13 +213,14 @@ func UpdateHostNetworks(h *models.Host, server string, nets []string) error {
 			newNode.Server = server
 			newNode.Network = nets[i]
 			if err := AssociateNodeToHost(&newNode, h); err != nil {
-				return err
+				return newNets, delNets, err
 			}
+			newNets = append(newNets, newNode.Network)
 			logger.Log(1, "added new node", newNode.ID.String(), "to host", h.Name)
 		}
 	}
 
-	return nil
+	return newNets, delNets, nil
 }
 
 // AssociateNodeToHost - associates and creates a node with a given host
@@ -339,3 +343,21 @@ func GetRelatedHosts(hostID string) []models.Host {
 	}
 	return relatedHosts
 }
+
+func GetNodeByNetwork(hostID, network string) (models.Node, error) {
+
+	currHost, err := GetHost(hostID)
+	if err != nil {
+		return models.Node{}, err
+	}
+	for i := range currHost.Nodes {
+		n, err := GetNodeByID(currHost.Nodes[i])
+		if err != nil {
+			continue
+		}
+		if n.Network == network {
+			return n, nil
+		}
+	}
+	return models.Node{}, errors.New("node not found")
+}

+ 12 - 0
models/host.go

@@ -63,3 +63,15 @@ func ParseBool(s string) bool {
 	}
 	return b
 }
+
+const (
+	JoinHostToNetwork     = "JOIN_HOST_TO_NETWORK"
+	DeleteHostFromNetwork = "DELETE_HOST_FROM_NETWORK"
+)
+
+type HostUpdate struct {
+	Action  string
+	Host    Host
+	Network string
+	Node    Node
+}

+ 6 - 0
mq/dynsec_helper.go

@@ -178,6 +178,12 @@ func fetchHostAcls(hostID string) []Acl {
 			Priority: -1,
 			Allow:    true,
 		},
+		{
+			AclType:  "publishClientReceive",
+			Topic:    fmt.Sprintf("host/update/%s/#", hostID),
+			Priority: -1,
+			Allow:    true,
+		},
 		{
 			AclType:  "publishClientSend",
 			Topic:    fmt.Sprintf("host/update/%s", hostID),

+ 5 - 5
mq/publishers.go

@@ -93,19 +93,19 @@ func NodeUpdate(node *models.Node) error {
 }
 
 // HostUpdate -- publishes a host topic update
-func HostUpdate(host *models.Host) error {
+func HostUpdate(hostUpdate *models.HostUpdate) error {
 	if !servercfg.IsMessageQueueBackend() {
 		return nil
 	}
-	logger.Log(3, "publishing host update to "+host.ID.String())
+	logger.Log(3, "publishing host update to "+hostUpdate.Host.ID.String())
 
-	data, err := json.Marshal(host)
+	data, err := json.Marshal(hostUpdate)
 	if err != nil {
 		logger.Log(2, "error marshalling node update ", err.Error())
 		return err
 	}
-	if err = publish(host, fmt.Sprintf("host/update/%s", host.ID.String()), data); err != nil {
-		logger.Log(2, "error publishing host update to", host.ID.String(), err.Error())
+	if err = publish(&hostUpdate.Host, fmt.Sprintf("host/update/%s/%s", hostUpdate.Host.ID.String(), servercfg.GetServer()), data); err != nil {
+		logger.Log(2, "error publishing host update to", hostUpdate.Host.ID.String(), err.Error())
 		return err
 	}