Browse Source

update mq topics to use servername

Anish Mukherjee 2 years ago
parent
commit
852abcf4e7
2 changed files with 12 additions and 6 deletions
  1. 6 6
      mq/emqx.go
  2. 6 0
      mq/handlers.go

+ 6 - 6
mq/emqx.go

@@ -292,7 +292,7 @@ func CreateHostACL(hostID, serverName string) error {
 				Action:     "all",
 			},
 			{
-				Topic:      fmt.Sprintf("host/serverupdate/%s", hostID),
+				Topic:      fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
 				Permission: "allow",
 				Action:     "all",
 			},
@@ -327,7 +327,7 @@ func CreateHostACL(hostID, serverName string) error {
 var nodeAclMux sync.Mutex
 
 // AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
-func AppendNodeUpdateACL(hostID, nodeNetwork, nodeID string) error {
+func AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
 	nodeAclMux.Lock()
 	defer nodeAclMux.Unlock()
 	token, err := getEmqxAuthToken()
@@ -345,22 +345,22 @@ func AppendNodeUpdateACL(hostID, nodeNetwork, nodeID string) error {
 			Action:     "subscribe",
 		},
 		{
-			Topic:      fmt.Sprintf("ping/%s", nodeID),
+			Topic:      fmt.Sprintf("ping/%s/%s", serverName, nodeID),
 			Permission: "allow",
 			Action:     "all",
 		},
 		{
-			Topic:      fmt.Sprintf("update/%s", nodeID),
+			Topic:      fmt.Sprintf("update/%s/%s", serverName, nodeID),
 			Permission: "allow",
 			Action:     "all",
 		},
 		{
-			Topic:      fmt.Sprintf("signal/%s", nodeID),
+			Topic:      fmt.Sprintf("signal/%s/%s", serverName, nodeID),
 			Permission: "allow",
 			Action:     "all",
 		},
 		{
-			Topic:      fmt.Sprintf("metrics/%s", nodeID),
+			Topic:      fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
 			Permission: "allow",
 			Action:     "all",
 		},

+ 6 - 0
mq/handlers.go

@@ -186,6 +186,12 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				logger.Log(0, "failed to send new node to host", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
 				return
 			} else {
+				if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
+					if err = AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil {
+						logger.Log(0, "failed to add ACLs for EMQX node", err.Error())
+						return
+					}
+				}
 				if err = PublishSingleHostPeerUpdate(context.Background(), currentHost, nil, nil); err != nil {
 					logger.Log(0, "failed peers publish after join acknowledged", hostUpdate.Host.Name, currentHost.ID.String(), err.Error())
 					return