Browse Source

create emqx for server, get app creds from env

abhishek9686 1 year ago
parent
commit
fb0fead2f0
8 changed files with 176 additions and 43 deletions
  1. 1 1
      auth/host_session.go
  2. 1 1
      controllers/enrollmentkeys.go
  3. 1 1
      controllers/hosts.go
  4. 4 3
      mq/emqx.go
  5. 99 17
      mq/emqx_cloud.go
  6. 36 2
      mq/emqx_on_prem.go
  7. 24 18
      mq/mq.go
  8. 10 0
      servercfg/serverconf.go

+ 1 - 1
auth/host_session.go

@@ -132,7 +132,7 @@ func SessionHandler(conn *websocket.Conn) {
 		hostPass := result.Host.HostPass
 		if !logic.HostExists(&result.Host) { // check if host already exists, add if not
 			if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-				if err := mq.GetEmqxHandler().CreateEmqxUser(result.Host.ID.String(), result.Host.HostPass, false); err != nil {
+				if err := mq.GetEmqxHandler().CreateEmqxUser(result.Host.ID.String(), result.Host.HostPass); err != nil {
 					logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 					return
 				}

+ 1 - 1
controllers/enrollmentkeys.go

@@ -312,7 +312,7 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 		logic.CheckHostPorts(&newHost)
 		// create EMQX credentials and ACLs for host
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-			if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass, false); err != nil {
+			if err := mq.GetEmqxHandler().CreateEmqxUser(newHost.ID.String(), newHost.HostPass); err != nil {
 				logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 				return
 			}

+ 1 - 1
controllers/hosts.go

@@ -549,7 +549,7 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
 
 	// Create EMQX creds and ACLs if not found
 	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-		if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password, false); err != nil {
+		if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
 			slog.Error("failed to create host credentials for EMQX: ", err.Error())
 		} else {
 			if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {

+ 4 - 3
mq/emqx.go

@@ -6,7 +6,8 @@ var emqx Emqx
 
 type Emqx interface {
 	GetType() servercfg.Emqxdeploy
-	CreateEmqxUser(username, password string, admin bool) error
+	CreateEmqxUser(username, password string) error
+	CreateEmqxUserforServer() error
 	CreateEmqxDefaultAuthenticator() error
 	CreateEmqxDefaultAuthorizer() error
 	CreateDefaultDenyRule() error
@@ -23,8 +24,8 @@ func init() {
 	if servercfg.GetEmqxDeployType() == servercfg.EmqxCloudDeploy {
 		emqx = &EmqxCloud{
 			URL:       servercfg.GetEmqxRestEndpoint(),
-			AppID:     servercfg.GetMqUserName(),
-			AppSecret: servercfg.GetMqPassword(),
+			AppID:     servercfg.GetEmqxAppID(),
+			AppSecret: servercfg.GetEmqxAppSecret(),
 		}
 	} else {
 		emqx = &EmqxOnPrem{

+ 99 - 17
mq/emqx_cloud.go

@@ -31,7 +31,7 @@ type cloudAcl struct {
 
 func (e *EmqxCloud) GetType() servercfg.Emqxdeploy { return servercfg.EmqxCloudDeploy }
 
-func (e *EmqxCloud) CreateEmqxUser(username, pass string, admin bool) error {
+func (e *EmqxCloud) CreateEmqxUser(username, pass string) error {
 
 	payload := userCreateReq{
 		UserName: username,
@@ -62,35 +62,92 @@ func (e *EmqxCloud) CreateEmqxUser(username, pass string, admin bool) error {
 	return nil
 }
 
-func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
-
-func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
+func (e *EmqxCloud) CreateEmqxUserforServer() error {
+	payload := userCreateReq{
+		UserName: servercfg.GetMqUserName(),
+		Password: servercfg.GetMqPassword(),
+	}
+	data, _ := json.Marshal(payload)
+	client := &http.Client{}
+	req, err := http.NewRequest(http.MethodPost, e.URL, strings.NewReader(string(data)))
+	if err != nil {
+		return err
+	}
+	req.SetBasicAuth(e.AppID, e.AppSecret)
+	req.Header.Add("Content-Type", "application/json")
 
-func (e *EmqxCloud) CreateDefaultDenyRule() error {
-	return nil
-}
+	res, err := client.Do(req)
+	if err != nil {
+		return err
+	}
+	defer res.Body.Close()
 
-func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
+	body, err := io.ReadAll(res.Body)
+	if err != nil {
+		return err
+	}
+	if res.StatusCode != http.StatusOK {
+		return errors.New("request failed " + string(body))
+	}
+	// add acls
 	acls := []cloudAcl{
 		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("update/%s/#", servercfg.GetServer()),
 			Access:   "allow",
-			Action:   "pubsub",
+			Action:   "sub",
 		},
 		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("host/update/%s/%s", hostID, serverName),
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("host/serverupdate/%s/#", servercfg.GetServer()),
 			Access:   "allow",
-			Action:   "pubsub",
+			Action:   "sub",
 		},
 		{
-			UserName: hostID,
-			Topic:    fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("signal/%s/#", servercfg.GetServer()),
 			Access:   "allow",
-			Action:   "pubsub",
+			Action:   "sub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    fmt.Sprintf("metrics/%s/#", servercfg.GetServer()),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    "peers/host/#",
+			Access:   "allow",
+			Action:   "pub",
+		},
+		{
+			UserName: servercfg.GetMqUserName(),
+			Topic:    "node/update/#",
+			Access:   "allow",
+			Action:   "pub",
+		},
+		{
+
+			UserName: servercfg.GetMqUserName(),
+			Topic:    "host/update/#",
+			Access:   "allow",
+			Action:   "pub",
 		},
 	}
+
+	return e.createacls(acls)
+}
+
+func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ignore
+
+func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
+
+func (e *EmqxCloud) CreateDefaultDenyRule() error {
+	return nil
+}
+
+func (e *EmqxCloud) createacls(acls []cloudAcl) error {
 	payload, err := json.Marshal(acls)
 	if err != nil {
 		return err
@@ -118,6 +175,31 @@ func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
 	return nil
 }
 
+func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
+	acls := []cloudAcl{
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("host/update/%s/%s", hostID, serverName),
+			Access:   "allow",
+			Action:   "sub",
+		},
+		{
+			UserName: hostID,
+			Topic:    fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
+			Access:   "allow",
+			Action:   "pub",
+		},
+	}
+
+	return e.createacls(acls)
+}
+
 func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
 	acls := []cloudAcl{
 		{

+ 36 - 2
mq/emqx_on_prem.go

@@ -81,7 +81,7 @@ func getEmqxAuthToken() (string, error) {
 func (e *EmqxOnPrem) GetType() servercfg.Emqxdeploy { return servercfg.EmqxOnPremDeploy }
 
 // CreateEmqxUser - creates an EMQX user
-func (e *EmqxOnPrem) CreateEmqxUser(username, password string, admin bool) error {
+func (e *EmqxOnPrem) CreateEmqxUser(username, password string) error {
 	token, err := getEmqxAuthToken()
 	if err != nil {
 		return err
@@ -89,7 +89,41 @@ func (e *EmqxOnPrem) CreateEmqxUser(username, password string, admin bool) error
 	payload, err := json.Marshal(&emqxUser{
 		UserID:   username,
 		Password: password,
-		Admin:    admin,
+	})
+	if err != nil {
+		return err
+	}
+	req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
+	if err != nil {
+		return err
+	}
+	req.Header.Add("content-type", "application/json")
+	req.Header.Add("authorization", "Bearer "+token)
+	resp, err := (&http.Client{}).Do(req)
+	if err != nil {
+		return err
+	}
+	defer resp.Body.Close()
+	if resp.StatusCode >= 300 {
+		msg, err := io.ReadAll(resp.Body)
+		if err != nil {
+			return err
+		}
+		if !strings.Contains(string(msg), already_exists) {
+			return fmt.Errorf("error creating EMQX user %v", string(msg))
+		}
+	}
+	return nil
+}
+func (e *EmqxOnPrem) CreateEmqxUserforServer() error {
+	token, err := getEmqxAuthToken()
+	if err != nil {
+		return err
+	}
+	payload, err := json.Marshal(&emqxUser{
+		UserID:   servercfg.GetMqUserName(),
+		Password: servercfg.GetMqPassword(),
+		Admin:    true,
 	})
 	if err != nil {
 		return err

+ 24 - 18
mq/mq.go

@@ -40,26 +40,32 @@ func setMqOptions(user, password string, opts *mqtt.ClientOptions) {
 
 // SetupMQTT creates a connection to broker and return client
 func SetupMQTT() {
-	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType && emqx.GetType() == servercfg.EmqxOnPremDeploy {
-		time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
-		// setup authenticator and create admin user
-		if err := emqx.CreateEmqxDefaultAuthenticator(); err != nil {
-			logger.Log(0, err.Error())
-		}
-		emqx.DeleteEmqxUser(servercfg.GetMqUserName())
-		if err := emqx.CreateEmqxUser(servercfg.GetMqUserName(), servercfg.GetMqPassword(), true); err != nil {
-			log.Fatal(err)
-		}
-		// create an ACL authorization source for the built in EMQX MNESIA database
-		if err := emqx.CreateEmqxDefaultAuthorizer(); err != nil {
-			logger.Log(0, err.Error())
+	if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
+		if emqx.GetType() == servercfg.EmqxOnPremDeploy {
+			time.Sleep(10 * time.Second) // wait for the REST endpoint to be ready
+			// setup authenticator and create admin user
+			if err := emqx.CreateEmqxDefaultAuthenticator(); err != nil {
+				logger.Log(0, err.Error())
+			}
+			emqx.DeleteEmqxUser(servercfg.GetMqUserName())
+			if err := emqx.CreateEmqxUserforServer(); err != nil {
+				log.Fatal(err)
+			}
+			// create an ACL authorization source for the built in EMQX MNESIA database
+			if err := emqx.CreateEmqxDefaultAuthorizer(); err != nil {
+				logger.Log(0, err.Error())
+			}
+			// create a default deny ACL to all topics for all users
+			if err := emqx.CreateDefaultDenyRule(); err != nil {
+				log.Fatal(err)
+			}
+		} else {
+			if err := emqx.CreateEmqxUserforServer(); err != nil {
+				log.Fatal(err)
+			}
 		}
-
-	}
-	// create a default deny ACL to all topics for all users
-	if err := emqx.CreateDefaultDenyRule(); err != nil {
-		log.Fatal(err)
 	}
+
 	opts := mqtt.NewClientOptions()
 	setMqOptions(servercfg.GetMqUserName(), servercfg.GetMqPassword(), opts)
 	opts.SetOnConnectHandler(func(client mqtt.Client) {

+ 10 - 0
servercfg/serverconf.go

@@ -688,3 +688,13 @@ func GetEmqxDeployType() (deployType Emqxdeploy) {
 	}
 	return
 }
+
+// GetEmqxAppID - gets the emqx cloud app id
+func GetEmqxAppID() string {
+	return os.Getenv("EMQX_APP_ID")
+}
+
+// GetEmqxAppSecret - gets the emqx cloud app secret
+func GetEmqxAppSecret() string {
+	return os.Getenv("EMQX_APP_SECRET")
+}