Browse Source

add host acls and create user cloud emqx funcs

abhishek9686 1 year ago
parent
commit
c556e9d77d
3 changed files with 93 additions and 14 deletions
  1. 1 0
      mq/emqx.go
  2. 86 9
      mq/emqx_cloud.go
  3. 6 5
      mq/mq.go

+ 1 - 0
mq/emqx.go

@@ -35,6 +35,7 @@ func init() {
 	}
 }
 
+// GetEmqxHandler - gets emqx handler
 func GetEmqxHandler() Emqx {
 	return emqx
 }

+ 86 - 9
mq/emqx_cloud.go

@@ -2,6 +2,7 @@ package mq
 
 import (
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
 	"net/http"
@@ -21,6 +22,13 @@ type userCreateReq struct {
 	Password string `json:"password"`
 }
 
+type cloudAcl struct {
+	UserName string `json:"username"`
+	Topic    string `json:"topic"`
+	Action   string `json:"action"`
+	Access   string `json:"access"`
+}
+
 func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
 
 func (e *EmqxCloud) CreateEmqxUser(username, pass string, admin bool) error {
@@ -33,7 +41,6 @@ func (e *EmqxCloud) CreateEmqxUser(username, pass string, admin bool) error {
 	client := &http.Client{}
 	req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(data)))
 	if err != nil {
-		fmt.Println(err)
 		return err
 	}
 	req.SetBasicAuth(e.AppID, e.AppSecret)
@@ -41,27 +48,75 @@ func (e *EmqxCloud) CreateEmqxUser(username, pass string, admin bool) error {
 
 	res, err := client.Do(req)
 	if err != nil {
-		fmt.Println(err)
 		return err
 	}
 	defer res.Body.Close()
 
 	body, err := io.ReadAll(res.Body)
 	if err != nil {
-		fmt.Println(err)
 		return err
 	}
-	fmt.Println(string(body))
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
 	return nil
 }
 
-func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil }
+func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
+
+func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
 
-func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil }
+func (e *EmqxCloud) CreateDefaultDenyRule() error {
+	return nil
+}
 
-func (e *EmqxCloud) CreateDefaultDenyRule() error { return nil }
+func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
+	acls := []cloudAcl{
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
+			Access:   "allow",
+			Action:   "pubsub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("host/update/%s/%s", hostID, serverName),
+			Access:   "allow",
+			Action:   "pubsub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
+			Access:   "allow",
+			Action:   "pubsub",
+		},
+	}
+	payload, err := json.Marshal(acls)
+	if err != nil {
+		return err
+	}
+	client := &http.Client{}
+	req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(payload)))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("Content-Type", "application/json")
+	req.SetBasicAuth(e.AppID, e.AppSecret)
+	res, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer res.Body.Close()
 
-func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error { return nil }
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		return err
+	}
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
+	return nil
+}
 
 func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
 	return nil
@@ -69,4 +124,26 @@ func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName
 
 func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil }
 
-func (e *EmqxCloud) DeleteEmqxUser(username string) error { return nil }
+func (e *EmqxCloud) DeleteEmqxUser(username string) error {
+
+	client := &http.Client{}
+	req, err := http.NewRequest(http.MethodDelete, e.URL, nil)
+	if err != nil {
+		return err
+	}
+	req.SetBasicAuth(e.AppID, e.AppSecret)
+	res, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer res.Body.Close()
+
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		return err
+	}
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
+	return nil
+}

+ 6 - 5
mq/mq.go

@@ -40,7 +40,7 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT() {
-	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
+	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType && emqx.GetType() == servercfg.EmqxOnPremDeploy {
 		time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
 		// setup authenticator and create admin user
 		if err := emqx.CreateEmqxDefaultAuthenticator(); err != nil {
@@ -54,10 +54,11 @@ func SetupMQTT() {
 		if err := emqx.CreateEmqxDefaultAuthorizer(); err != nil {
 			logger.Log(0, err.Error())
 		}
-		// create a default deny ACL to all topics for all users
-		if err := emqx.CreateDefaultDenyRule(); err != nil {
-			log.Fatal(err)
-		}
+
+	}
+	// create a default deny ACL to all topics for all users
+	if err := emqx.CreateDefaultDenyRule(); err != nil {
+		log.Fatal(err)
 	}
 	opts := mqtt.NewClientOptions()
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)