Browse Source

Merge pull request #2805 from gravitl/NET-908

NET-908: EMQX cloud Apis support
Abhishek K 1 year ago
parent
commit
e685e3ca45
11 changed files with 785 additions and 423 deletions
  1. 2 8
      auth/host_session.go
  2. 2 8
      controllers/enrollmentkeys.go
  3. 4 7
      controllers/hosts.go
  4. 0 3
      controllers/migrate.go
  5. 0 4
      controllers/node.go
  6. 29 373
      mq/emqx.go
  7. 264 0
      mq/emqx_cloud.go
  8. 428 0
      mq/emqx_on_prem.go
  9. 2 2
      mq/handlers.go
  10. 23 16
      mq/mq.go
  11. 31 2
      servercfg/serverconf.go

+ 2 - 8
auth/host_session.go

@@ -147,14 +147,13 @@ func SessionHandler(conn *websocket.Conn) {
 	select {
 	select {
 	case result := <-answer: // a read from req.answerCh has occurred
 	case result := <-answer: // a read from req.answerCh has occurred
 		// add the host, if not exists, handle like enrollment registration
 		// add the host, if not exists, handle like enrollment registration
-		hostPass := result.Host.HostPass
 		if !logic.HostExists(&result.Host) { // check if host already exists, add if not
 		if !logic.HostExists(&result.Host) { // check if host already exists, add if not
 			if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 			if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-				if err := mq.CreateEmqxUser(result.Host.ID.String(), result.Host.HostPass, false); err != nil {
+				if err := mq.GetEmqxHandler().CreateEmqxUser(result.Host.ID.String(), result.Host.HostPass); err != nil {
 					logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 					logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 					return
 					return
 				}
 				}
-				if err := mq.CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
+				if err := mq.GetEmqxHandler().CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
 					logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
 					logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
 					return
 					return
 				}
 				}
@@ -203,11 +202,6 @@ func SessionHandler(conn *websocket.Conn) {
 		}
 		}
 		server := servercfg.GetServerInfo()
 		server := servercfg.GetServerInfo()
 		server.TrafficKey = key
 		server.TrafficKey = key
-		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-			// set MQ username and password for EMQX clients
-			server.MQUserName = result.Host.ID.String()
-			server.MQPassword = hostPass
-		}
 		result.Host.HostPass = ""
 		result.Host.HostPass = ""
 		response := models.RegisterResponse{
 		response := models.RegisterResponse{
 			ServerConf:    server,
 			ServerConf:    server,

+ 2 - 8
controllers/enrollmentkeys.go

@@ -305,18 +305,17 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 		)
 		)
 		return
 		return
 	}
 	}
-	hostPass := newHost.HostPass
 	if !hostExists {
 	if !hostExists {
 		newHost.PersistentKeepalive = models.DefaultPersistentKeepAlive
 		newHost.PersistentKeepalive = models.DefaultPersistentKeepAlive
 		// register host
 		// register host
 		logic.CheckHostPorts(&newHost)
 		logic.CheckHostPorts(&newHost)
 		// create EMQX credentials and ACLs for host
 		// create EMQX credentials and ACLs for host
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-			if err := mq.CreateEmqxUser(newHost.ID.String(), newHost.HostPass, false); err != nil {
+			if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass); err != nil {
 				logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 				logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 				return
 				return
 			}
 			}
-			if err := mq.CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil {
+			if err := mq.GetEmqxHandler().CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil {
 				logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
 				logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
 				return
 				return
 			}
 			}
@@ -361,11 +360,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 	// ready the response
 	// ready the response
 	server := servercfg.GetServerInfo()
 	server := servercfg.GetServerInfo()
 	server.TrafficKey = key
 	server.TrafficKey = key
-	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-		// set MQ username and password for EMQX clients
-		server.MQUserName = newHost.ID.String()
-		server.MQPassword = hostPass
-	}
 	response := models.RegisterResponse{
 	response := models.RegisterResponse{
 		ServerConf:    server,
 		ServerConf:    server,
 		RequestedHost: newHost,
 		RequestedHost: newHost,

+ 4 - 7
controllers/hosts.go

@@ -124,9 +124,6 @@ func pull(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	serverConf := servercfg.GetServerInfo()
 	serverConf := servercfg.GetServerInfo()
-	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-		serverConf.MQUserName = hostID
-	}
 	key, keyErr := logic.RetrievePublicTrafficKey()
 	key, keyErr := logic.RetrievePublicTrafficKey()
 	if keyErr != nil {
 	if keyErr != nil {
 		logger.Log(0, "error retrieving key:", keyErr.Error())
 		logger.Log(0, "error retrieving key:", keyErr.Error())
@@ -298,7 +295,7 @@ func deleteHost(w http.ResponseWriter, r *http.Request) {
 	}
 	}
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		// delete EMQX credentials for host
 		// delete EMQX credentials for host
-		if err := mq.DeleteEmqxUser(currHost.ID.String()); err != nil {
+		if err := mq.GetEmqxHandler().DeleteEmqxUser(currHost.ID.String()); err != nil {
 			slog.Error("failed to remove host credentials from EMQX", "id", currHost.ID, "error", err)
 			slog.Error("failed to remove host credentials from EMQX", "id", currHost.ID, "error", err)
 		}
 		}
 	}
 	}
@@ -555,15 +552,15 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
 
 
 	// Create EMQX creds and ACLs if not found
 	// Create EMQX creds and ACLs if not found
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-		if err := mq.CreateEmqxUser(host.ID.String(), authRequest.Password, false); err != nil {
+		if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
 			slog.Error("failed to create host credentials for EMQX: ", err.Error())
 			slog.Error("failed to create host credentials for EMQX: ", err.Error())
 		} else {
 		} else {
-			if err := mq.CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
+			if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
 				slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
 				slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
 			}
 			}
 			for _, nodeID := range host.Nodes {
 			for _, nodeID := range host.Nodes {
 				if node, err := logic.GetNodeByID(nodeID); err == nil {
 				if node, err := logic.GetNodeByID(nodeID); err == nil {
-					if err = mq.AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
+					if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
 						slog.Error("failed to add ACLs for EMQX node", "error", err)
 						slog.Error("failed to add ACLs for EMQX node", "error", err)
 					}
 					}
 				} else {
 				} else {

+ 0 - 3
controllers/migrate.go

@@ -71,9 +71,6 @@ func migrate(w http.ResponseWriter, r *http.Request) {
 				return
 				return
 			}
 			}
 			server = servercfg.GetServerInfo()
 			server = servercfg.GetServerInfo()
-			if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-				server.MQUserName = host.ID.String()
-			}
 			key, keyErr := logic.RetrievePublicTrafficKey()
 			key, keyErr := logic.RetrievePublicTrafficKey()
 			if keyErr != nil {
 			if keyErr != nil {
 				slog.Error("retrieving traffickey", "error", err)
 				slog.Error("retrieving traffickey", "error", err)

+ 0 - 4
controllers/node.go

@@ -372,10 +372,6 @@ func getNode(w http.ResponseWriter, r *http.Request) {
 		return
 		return
 	}
 	}
 	server := servercfg.GetServerInfo()
 	server := servercfg.GetServerInfo()
-	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-		// set MQ username for EMQX clients
-		server.MQUserName = host.ID.String()
-	}
 	response := models.NodeGet{
 	response := models.NodeGet{
 		Node:         node,
 		Node:         node,
 		Host:         *host,
 		Host:         *host,

+ 29 - 373
mq/emqx.go

@@ -1,386 +1,42 @@
 package mq
 package mq
 
 
-import (
-	"bytes"
-	"encoding/json"
-	"fmt"
-	"io"
-	"net/http"
-	"strings"
-	"sync"
+import "github.com/gravitl/netmaker/servercfg"
 
 
-	"github.com/gravitl/netmaker/servercfg"
-)
+var emqx Emqx
 
 
-const already_exists = "ALREADY_EXISTS"
-
-type (
-	emqxUser struct {
-		UserID   string `json:"user_id"`
-		Password string `json:"password"`
-		Admin    bool   `json:"is_superuser"`
-	}
-
-	emqxLogin struct {
-		Username string `json:"username"`
-		Password string `json:"password"`
-	}
-
-	emqxLoginResponse struct {
-		License struct {
-			Edition string `json:"edition"`
-		} `json:"license"`
-		Token   string `json:"token"`
-		Version string `json:"version"`
-	}
-
-	aclRule struct {
-		Topic      string `json:"topic"`
-		Permission string `json:"permission"`
-		Action     string `json:"action"`
-	}
-
-	aclObject struct {
-		Rules    []aclRule `json:"rules"`
-		Username string    `json:"username,omitempty"`
-	}
-)
-
-func getEmqxAuthToken() (string, error) {
-	payload, err := json.Marshal(&emqxLogin{
-		Username: servercfg.GetMqUserName(),
-		Password: servercfg.GetMqPassword(),
-	})
-	if err != nil {
-		return "", err
-	}
-	resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload))
-	if err != nil {
-		return "", err
-	}
-	msg, err := io.ReadAll(resp.Body)
-	if err != nil {
-		return "", err
-	}
-	if resp.StatusCode != http.StatusOK {
-		return "", fmt.Errorf("error during EMQX login %v", string(msg))
-	}
-	var loginResp emqxLoginResponse
-	if err := json.Unmarshal(msg, &loginResp); err != nil {
-		return "", err
-	}
-	return loginResp.Token, nil
-}
-
-// CreateEmqxUser - creates an EMQX user
-func CreateEmqxUser(username, password string, admin bool) error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	payload, err := json.Marshal(&emqxUser{
-		UserID:   username,
-		Password: password,
-		Admin:    admin,
-	})
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode >= 300 {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		if !strings.Contains(string(msg), already_exists) {
-			return fmt.Errorf("error creating EMQX user %v", string(msg))
-		}
-	}
-	return nil
-}
-
-// DeleteEmqxUser - deletes an EMQX user
-func DeleteEmqxUser(username string) error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil)
-	if err != nil {
-		return err
-	}
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode >= 300 {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error deleting EMQX user %v", string(msg))
-	}
-	return nil
-}
-
-// CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage
-func CreateEmqxDefaultAuthenticator() error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	payload, err := json.Marshal(&struct {
-		Mechanism  string `json:"mechanism"`
-		Backend    string `json:"backend"`
-		UserIDType string `json:"user_id_type"`
-	}{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"})
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusOK {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error creating default EMQX authenticator %v", string(msg))
-	}
-	return nil
+type Emqx interface {
+	GetType() servercfg.Emqxdeploy
+	CreateEmqxUser(username, password string) error
+	CreateEmqxUserforServer() error
+	CreateEmqxDefaultAuthenticator() error
+	CreateEmqxDefaultAuthorizer() error
+	CreateDefaultDenyRule() error
+	CreateHostACL(hostID, serverName string) error
+	AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error
+	GetUserACL(username string) (*aclObject, error)
+	DeleteEmqxUser(username string) error
 }
 }
 
 
-// CreateEmqxDefaultAuthorizer - creates a default ACL authorization mechanism based on the built in database
-func CreateEmqxDefaultAuthorizer() error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	payload, err := json.Marshal(&struct {
-		Enable bool   `json:"enable"`
-		Type   string `json:"type"`
-	}{Enable: true, Type: "built_in_database"})
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources", bytes.NewReader(payload))
-	if err != nil {
-		return err
+func init() {
+	if servercfg.GetBrokerType() != servercfg.EmqxBrokerType {
+		return
 	}
 	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
+	if servercfg.GetEmqxDeployType() == servercfg.EmqxCloudDeploy {
+		emqx = &EmqxCloud{
+			URL:       servercfg.GetEmqxRestEndpoint(),
+			AppID:     servercfg.GetEmqxAppID(),
+			AppSecret: servercfg.GetEmqxAppSecret(),
 		}
 		}
-		return fmt.Errorf("error creating default EMQX ACL authorization mechanism %v", string(msg))
-	}
-	return nil
-}
-
-// GetUserACL - returns ACL rules by username
-func GetUserACL(username string) (*aclObject, error) {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return nil, err
-	}
-	req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
-	if err != nil {
-		return nil, err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return nil, err
-	}
-	defer resp.Body.Close()
-	response, err := io.ReadAll(resp.Body)
-	if err != nil {
-		return nil, err
-	}
-	if resp.StatusCode != http.StatusOK {
-		return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
-	}
-	body := new(aclObject)
-	if err := json.Unmarshal(response, body); err != nil {
-		return nil, err
-	}
-	return body, nil
-}
-
-// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
-// to allow user access to topics use the `mq.CreateUserAccessRule` function
-func CreateDefaultDenyRule() error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error creating default ACL rules %v", string(msg))
-	}
-	return nil
-}
-
-// CreateHostACL - create host ACL rules
-func CreateHostACL(hostID, serverName string) error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	payload, err := json.Marshal(&aclObject{
-		Username: hostID,
-		Rules: []aclRule{
-			{
-				Topic:      fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
-				Permission: "allow",
-				Action:     "all",
-			},
-			{
-				Topic:      fmt.Sprintf("host/update/%s/%s", hostID, serverName),
-				Permission: "allow",
-				Action:     "all",
-			},
-			{
-				Topic:      fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
-				Permission: "allow",
-				Action:     "all",
-			},
-		},
-	})
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
+	} else {
+		emqx = &EmqxOnPrem{
+			URL:      servercfg.GetEmqxRestEndpoint(),
+			UserName: servercfg.GetMqUserName(),
+			Password: servercfg.GetMqPassword(),
 		}
 		}
-		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
 	}
 	}
-	return nil
 }
 }
 
 
-// a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
-// might occur when multiple nodes belonging to the same host are created at the same time
-var nodeAclMux sync.Mutex
-
-// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
-func AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
-	nodeAclMux.Lock()
-	defer nodeAclMux.Unlock()
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	aclObject, err := GetUserACL(hostID)
-	if err != nil {
-		return err
-	}
-	aclObject.Rules = append(aclObject.Rules, []aclRule{
-		{
-			Topic:      fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
-			Permission: "allow",
-			Action:     "subscribe",
-		},
-		{
-			Topic:      fmt.Sprintf("ping/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("update/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("signal/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-	}...)
-	payload, err := json.Marshal(aclObject)
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
-	}
-	return nil
+// GetEmqxHandler - gets emqx handler
+func GetEmqxHandler() Emqx {
+	return emqx
 }
 }

+ 264 - 0
mq/emqx_cloud.go

@@ -0,0 +1,264 @@
+package mq
+
+import (
+	"encoding/json"
+	"errors"
+	"fmt"
+	"io"
+	"net/http"
+	"strings"
+
+	"github.com/gravitl/netmaker/servercfg"
+)
+
+type EmqxCloud struct {
+	URL       string
+	AppID     string
+	AppSecret string
+}
+
+type userCreateReq struct {
+	UserName string `json:"username"`
+	Password string `json:"password"`
+}
+
+type cloudAcl struct {
+	UserName string `json:"username"`
+	Topic    string `json:"topic"`
+	Action   string `json:"action"`
+	Access   string `json:"access"`
+}
+
+func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
+
+func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
+
+	payload := userCreateReq{
+		UserName: username,
+		Password: pass,
+	}
+	data, _ := json.Marshal(payload)
+	client := &http.Client{}
+	req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/auth_username", e.URL), strings.NewReader(string(data)))
+	if err != nil {
+		return err
+	}
+	req.SetBasicAuth(e.AppID, e.AppSecret)
+	req.Header.Add("Content-Type", "application/json")
+
+	res, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer res.Body.Close()
+
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		return err
+	}
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
+	return nil
+}
+
+func (e *EmqxCloud) CreateEmqxUserforServer() error {
+	payload := userCreateReq{
+		UserName: servercfg.GetMqUserName(),
+		Password: servercfg.GetMqPassword(),
+	}
+	data, _ := json.Marshal(payload)
+	client := &http.Client{}
+	req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/auth_username", e.URL), strings.NewReader(string(data)))
+	if err != nil {
+		return err
+	}
+	req.SetBasicAuth(e.AppID, e.AppSecret)
+	req.Header.Add("Content-Type", "application/json")
+
+	res, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer res.Body.Close()
+
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		return err
+	}
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
+	// add acls
+	acls := []cloudAcl{
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("update/%s/#", servercfg.GetServer()),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    "peers/host/#",
+			Access:   "allow",
+			Action:   "pub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    "node/update/#",
+			Access:   "allow",
+			Action:   "pub",
+		},
+		{
+
+			UserName: servercfg.GetMqUserName(),
+			Topic:    "host/update/#",
+			Access:   "allow",
+			Action:   "pub",
+		},
+	}
+
+	return e.createacls(acls)
+}
+
+func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
+
+func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
+
+func (e *EmqxCloud) CreateDefaultDenyRule() error {
+	return nil
+}
+
+func (e *EmqxCloud) createacls(acls []cloudAcl) error {
+	payload, err := json.Marshal(acls)
+	if err != nil {
+		return err
+	}
+	client := &http.Client{}
+	req, err := http.NewRequest(http.MethodPost, fmt.Sprintf("%s/api/acl", e.URL), strings.NewReader(string(payload)))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("Content-Type", "application/json")
+	req.SetBasicAuth(e.AppID, e.AppSecret)
+	res, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer res.Body.Close()
+
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		return err
+	}
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
+	return nil
+}
+
+func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
+	acls := []cloudAcl{
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("host/update/%s/%s", hostID, serverName),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
+			Access:   "allow",
+			Action:   "pub",
+		},
+	}
+
+	return e.createacls(acls)
+}
+
+func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
+	acls := []cloudAcl{
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("ping/%s/%s", serverName, nodeID),
+			Access:   "allow",
+			Action:   "pubsub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("update/%s/%s", serverName, nodeID),
+			Access:   "allow",
+			Action:   "pubsub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("signal/%s/%s", serverName, nodeID),
+			Access:   "allow",
+			Action:   "pubsub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
+			Access:   "allow",
+			Action:   "pubsub",
+		},
+	}
+
+	return e.createacls(acls)
+}
+
+func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list
+
+func (e *EmqxCloud) DeleteEmqxUser(username string) error {
+
+	client := &http.Client{}
+	req, err := http.NewRequest(http.MethodDelete, fmt.Sprintf("%s/api/auth_username/%s", e.URL, username), nil)
+	if err != nil {
+		return err
+	}
+	req.SetBasicAuth(e.AppID, e.AppSecret)
+	res, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer res.Body.Close()
+
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		return err
+	}
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
+	return nil
+}

+ 428 - 0
mq/emqx_on_prem.go

@@ -0,0 +1,428 @@
+package mq
+
+import (
+	"bytes"
+	"encoding/json"
+	"fmt"
+	"io"
+	"net/http"
+	"strings"
+	"sync"
+
+	"github.com/gravitl/netmaker/servercfg"
+)
+
+type EmqxOnPrem struct {
+	URL      string
+	UserName string
+	Password string
+}
+
+const already_exists = "ALREADY_EXISTS"
+
+type (
+	emqxUser struct {
+		UserID   string `json:"user_id"`
+		Password string `json:"password"`
+		Admin    bool   `json:"is_superuser"`
+	}
+
+	emqxLogin struct {
+		Username string `json:"username"`
+		Password string `json:"password"`
+	}
+
+	emqxLoginResponse struct {
+		License struct {
+			Edition string `json:"edition"`
+		} `json:"license"`
+		Token   string `json:"token"`
+		Version string `json:"version"`
+	}
+
+	aclRule struct {
+		Topic      string `json:"topic"`
+		Permission string `json:"permission"`
+		Action     string `json:"action"`
+	}
+
+	aclObject struct {
+		Rules    []aclRule `json:"rules"`
+		Username string    `json:"username,omitempty"`
+	}
+)
+
+func getEmqxAuthToken() (string, error) {
+	payload, err := json.Marshal(&emqxLogin{
+		Username: servercfg.GetMqUserName(),
+		Password: servercfg.GetMqPassword(),
+	})
+	if err != nil {
+		return "", err
+	}
+	resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload))
+	if err != nil {
+		return "", err
+	}
+	msg, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return "", err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return "", fmt.Errorf("error during EMQX login %v", string(msg))
+	}
+	var loginResp emqxLoginResponse
+	if err := json.Unmarshal(msg, &loginResp); err != nil {
+		return "", err
+	}
+	return loginResp.Token, nil
+}
+
+func (e *EmqxOnPrem) GetType() servercfg.Emqxdeploy { return servercfg.EmqxOnPremDeploy }
+
+// CreateEmqxUser - creates an EMQX user
+func (e *EmqxOnPrem) CreateEmqxUser(username, password string) error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&emqxUser{
+		UserID:   username,
+		Password: password,
+	})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode >= 300 {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		if !strings.Contains(string(msg), already_exists) {
+			return fmt.Errorf("error creating EMQX user %v", string(msg))
+		}
+	}
+	return nil
+}
+func (e *EmqxOnPrem) CreateEmqxUserforServer() error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&emqxUser{
+		UserID:   servercfg.GetMqUserName(),
+		Password: servercfg.GetMqPassword(),
+		Admin:    true,
+	})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode >= 300 {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		if !strings.Contains(string(msg), already_exists) {
+			return fmt.Errorf("error creating EMQX user %v", string(msg))
+		}
+	}
+	return nil
+}
+
+// DeleteEmqxUser - deletes an EMQX user
+func (e *EmqxOnPrem) DeleteEmqxUser(username string) error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil)
+	if err != nil {
+		return err
+	}
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode >= 300 {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error deleting EMQX user %v", string(msg))
+	}
+	return nil
+}
+
+// CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage
+func (e *EmqxOnPrem) CreateEmqxDefaultAuthenticator() error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&struct {
+		Mechanism  string `json:"mechanism"`
+		Backend    string `json:"backend"`
+		UserIDType string `json:"user_id_type"`
+	}{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusOK {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error creating default EMQX authenticator %v", string(msg))
+	}
+	return nil
+}
+
+// CreateEmqxDefaultAuthorizer - creates a default ACL authorization mechanism based on the built in database
+func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&struct {
+		Enable bool   `json:"enable"`
+		Type   string `json:"type"`
+	}{Enable: true, Type: "built_in_database"})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusNoContent {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error creating default EMQX ACL authorization mechanism %v", string(msg))
+	}
+	return nil
+}
+
+// GetUserACL - returns ACL rules by username
+func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return nil, err
+	}
+	req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
+	if err != nil {
+		return nil, err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return nil, err
+	}
+	defer resp.Body.Close()
+	response, err := io.ReadAll(resp.Body)
+	if err != nil {
+		return nil, err
+	}
+	if resp.StatusCode != http.StatusOK {
+		return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
+	}
+	body := new(aclObject)
+	if err := json.Unmarshal(response, body); err != nil {
+		return nil, err
+	}
+	return body, nil
+}
+
+// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
+// to allow user access to topics use the `mq.CreateUserAccessRule` function
+func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusNoContent {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error creating default ACL rules %v", string(msg))
+	}
+	return nil
+}
+
+// CreateHostACL - create host ACL rules
+func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&aclObject{
+		Username: hostID,
+		Rules: []aclRule{
+			{
+				Topic:      fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
+				Permission: "allow",
+				Action:     "all",
+			},
+			{
+				Topic:      fmt.Sprintf("host/update/%s/%s", hostID, serverName),
+				Permission: "allow",
+				Action:     "all",
+			},
+			{
+				Topic:      fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
+				Permission: "allow",
+				Action:     "all",
+			},
+		},
+	})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusNoContent {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
+	}
+	return nil
+}
+
+// a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
+// might occur when multiple nodes belonging to the same host are created at the same time
+var nodeAclMux sync.Mutex
+
+// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
+func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
+	nodeAclMux.Lock()
+	defer nodeAclMux.Unlock()
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	aclObject, err := emqx.GetUserACL(hostID)
+	if err != nil {
+		return err
+	}
+	aclObject.Rules = append(aclObject.Rules, []aclRule{
+		{
+			Topic:      fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
+			Permission: "allow",
+			Action:     "subscribe",
+		},
+		{
+			Topic:      fmt.Sprintf("ping/%s/%s", serverName, nodeID),
+			Permission: "allow",
+			Action:     "all",
+		},
+		{
+			Topic:      fmt.Sprintf("update/%s/%s", serverName, nodeID),
+			Permission: "allow",
+			Action:     "all",
+		},
+		{
+			Topic:      fmt.Sprintf("signal/%s/%s", serverName, nodeID),
+			Permission: "allow",
+			Action:     "all",
+		},
+		{
+			Topic:      fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
+			Permission: "allow",
+			Action:     "all",
+		},
+	}...)
+	payload, err := json.Marshal(aclObject)
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode != http.StatusNoContent {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
+	}
+	return nil
+}

+ 2 - 2
mq/handlers.go

@@ -114,7 +114,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				return
 				return
 			} else {
 			} else {
 				if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 				if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-					if err = AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil {
+					if err = emqx.AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil {
 						slog.Error("failed to add ACLs for EMQX node", "error", err)
 						slog.Error("failed to add ACLs for EMQX node", "error", err)
 						return
 						return
 					}
 					}
@@ -143,7 +143,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 	case models.DeleteHost:
 	case models.DeleteHost:
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 			// delete EMQX credentials for host
 			// delete EMQX credentials for host
-			if err := DeleteEmqxUser(currentHost.ID.String()); err != nil {
+			if err := emqx.DeleteEmqxUser(currentHost.ID.String()); err != nil {
 				slog.Error("failed to remove host credentials from EMQX", "id", currentHost.ID, "error", err)
 				slog.Error("failed to remove host credentials from EMQX", "id", currentHost.ID, "error", err)
 			}
 			}
 		}
 		}

+ 23 - 16
mq/mq.go

@@ -41,24 +41,31 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 // SetupMQTT creates a connection to broker and return client
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT() {
 func SetupMQTT() {
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-		time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
-		// setup authenticator and create admin user
-		if err := CreateEmqxDefaultAuthenticator(); err != nil {
-			logger.Log(0, err.Error())
-		}
-		DeleteEmqxUser(servercfg.GetMqUserName())
-		if err := CreateEmqxUser(servercfg.GetMqUserName(), servercfg.GetMqPassword(), true); err != nil {
-			log.Fatal(err)
-		}
-		// create an ACL authorization source for the built in EMQX MNESIA database
-		if err := CreateEmqxDefaultAuthorizer(); err != nil {
-			logger.Log(0, err.Error())
-		}
-		// create a default deny ACL to all topics for all users
-		if err := CreateDefaultDenyRule(); err != nil {
-			log.Fatal(err)
+		if emqx.GetType() == servercfg.EmqxOnPremDeploy {
+			time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
+			// setup authenticator and create admin user
+			if err := emqx.CreateEmqxDefaultAuthenticator(); err != nil {
+				logger.Log(0, err.Error())
+			}
+			emqx.DeleteEmqxUser(servercfg.GetMqUserName())
+			if err := emqx.CreateEmqxUserforServer(); err != nil {
+				log.Fatal(err)
+			}
+			// create an ACL authorization source for the built in EMQX MNESIA database
+			if err := emqx.CreateEmqxDefaultAuthorizer(); err != nil {
+				logger.Log(0, err.Error())
+			}
+			// create a default deny ACL to all topics for all users
+			if err := emqx.CreateDefaultDenyRule(); err != nil {
+				log.Fatal(err)
+			}
+		} else {
+			if err := emqx.CreateEmqxUserforServer(); err != nil {
+				log.Fatal(err)
+			}
 		}
 		}
 	}
 	}
+
 	opts := mqtt.NewClientOptions()
 	opts := mqtt.NewClientOptions()
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {
 	opts.SetOnConnectHandler(func(client mqtt.Client) {

+ 31 - 2
servercfg/serverconf.go

@@ -17,10 +17,15 @@ import (
 // EmqxBrokerType denotes the broker type for EMQX MQTT
 // EmqxBrokerType denotes the broker type for EMQX MQTT
 const EmqxBrokerType = "emqx"
 const EmqxBrokerType = "emqx"
 
 
+// Emqxdeploy - emqx deploy type
+type Emqxdeploy string
+
 var (
 var (
 	Version              = "dev"
 	Version              = "dev"
 	IsPro                = false
 	IsPro                = false
 	ErrLicenseValidation error
 	ErrLicenseValidation error
+	EmqxCloudDeploy      Emqxdeploy = "cloud"
+	EmqxOnPremDeploy     Emqxdeploy = "on-prem"
 )
 )
 
 
 // SetHost - sets the host ip
 // SetHost - sets the host ip
@@ -112,8 +117,13 @@ func GetRacAutoDisable() bool {
 func GetServerInfo() models.ServerConfig {
 func GetServerInfo() models.ServerConfig {
 	var cfg models.ServerConfig
 	var cfg models.ServerConfig
 	cfg.Server = GetServer()
 	cfg.Server = GetServer()
-	cfg.MQUserName = GetMqUserName()
-	cfg.MQPassword = GetMqPassword()
+	if GetBrokerType() == EmqxBrokerType {
+		cfg.MQUserName = "HOST_ID"
+		cfg.MQPassword = "HOST_PASS"
+	} else {
+		cfg.MQUserName = GetMqUserName()
+		cfg.MQPassword = GetMqPassword()
+	}
 	cfg.API = GetAPIConnString()
 	cfg.API = GetAPIConnString()
 	cfg.CoreDNSAddr = GetCoreDNSAddr()
 	cfg.CoreDNSAddr = GetCoreDNSAddr()
 	cfg.APIPort = GetAPIPort()
 	cfg.APIPort = GetAPIPort()
@@ -674,3 +684,22 @@ func GetEnvironment() string {
 	}
 	}
 	return ""
 	return ""
 }
 }
+
+// GetEmqxDeployType - fetches emqx deploy type this server uses
+func GetEmqxDeployType() (deployType Emqxdeploy) {
+	deployType = EmqxOnPremDeploy
+	if os.Getenv("EMQX_DEPLOY_TYPE") == string(EmqxCloudDeploy) {
+		deployType = EmqxCloudDeploy
+	}
+	return
+}
+
+// GetEmqxAppID - gets the emqx cloud app id
+func GetEmqxAppID() string {
+	return os.Getenv("EMQX_APP_ID")
+}
+
+// GetEmqxAppSecret - gets the emqx cloud app secret
+func GetEmqxAppSecret() string {
+	return os.Getenv("EMQX_APP_SECRET")
+}