|
@@ -1,386 +1,42 @@
|
|
|
package mq
|
|
|
|
|
|
-import (
|
|
|
- "bytes"
|
|
|
- "encoding/json"
|
|
|
- "fmt"
|
|
|
- "io"
|
|
|
- "net/http"
|
|
|
- "strings"
|
|
|
- "sync"
|
|
|
+import "github.com/gravitl/netmaker/servercfg"
|
|
|
|
|
|
- "github.com/gravitl/netmaker/servercfg"
|
|
|
-)
|
|
|
+var emqx Emqx
|
|
|
|
|
|
-const already_exists = "ALREADY_EXISTS"
|
|
|
-
|
|
|
-type (
|
|
|
- emqxUser struct {
|
|
|
- UserID string `json:"user_id"`
|
|
|
- Password string `json:"password"`
|
|
|
- Admin bool `json:"is_superuser"`
|
|
|
- }
|
|
|
-
|
|
|
- emqxLogin struct {
|
|
|
- Username string `json:"username"`
|
|
|
- Password string `json:"password"`
|
|
|
- }
|
|
|
-
|
|
|
- emqxLoginResponse struct {
|
|
|
- License struct {
|
|
|
- Edition string `json:"edition"`
|
|
|
- } `json:"license"`
|
|
|
- Token string `json:"token"`
|
|
|
- Version string `json:"version"`
|
|
|
- }
|
|
|
-
|
|
|
- aclRule struct {
|
|
|
- Topic string `json:"topic"`
|
|
|
- Permission string `json:"permission"`
|
|
|
- Action string `json:"action"`
|
|
|
- }
|
|
|
-
|
|
|
- aclObject struct {
|
|
|
- Rules []aclRule `json:"rules"`
|
|
|
- Username string `json:"username,omitempty"`
|
|
|
- }
|
|
|
-)
|
|
|
-
|
|
|
-func getEmqxAuthToken() (string, error) {
|
|
|
- payload, err := json.Marshal(&emqxLogin{
|
|
|
- Username: servercfg.GetMqUserName(),
|
|
|
- Password: servercfg.GetMqPassword(),
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- resp, err := http.Post(servercfg.GetEmqxRestEndpoint()+"/api/v5/login", "application/json", bytes.NewReader(payload))
|
|
|
- if err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- if resp.StatusCode != http.StatusOK {
|
|
|
- return "", fmt.Errorf("error during EMQX login %v", string(msg))
|
|
|
- }
|
|
|
- var loginResp emqxLoginResponse
|
|
|
- if err := json.Unmarshal(msg, &loginResp); err != nil {
|
|
|
- return "", err
|
|
|
- }
|
|
|
- return loginResp.Token, nil
|
|
|
-}
|
|
|
-
|
|
|
-// CreateEmqxUser - creates an EMQX user
|
|
|
-func CreateEmqxUser(username, password string, admin bool) error {
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- payload, err := json.Marshal(&emqxUser{
|
|
|
- UserID: username,
|
|
|
- Password: password,
|
|
|
- Admin: admin,
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users", bytes.NewReader(payload))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req.Header.Add("content-type", "application/json")
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- if resp.StatusCode >= 300 {
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- if !strings.Contains(string(msg), already_exists) {
|
|
|
- return fmt.Errorf("error creating EMQX user %v", string(msg))
|
|
|
- }
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// DeleteEmqxUser - deletes an EMQX user
|
|
|
-func DeleteEmqxUser(username string) error {
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodDelete, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication/password_based:built_in_database/users/"+username, nil)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- if resp.StatusCode >= 300 {
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- return fmt.Errorf("error deleting EMQX user %v", string(msg))
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// CreateEmqxDefaultAuthenticator - creates a default authenticator based on password and using EMQX's built in database as storage
|
|
|
-func CreateEmqxDefaultAuthenticator() error {
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- payload, err := json.Marshal(&struct {
|
|
|
- Mechanism string `json:"mechanism"`
|
|
|
- Backend string `json:"backend"`
|
|
|
- UserIDType string `json:"user_id_type"`
|
|
|
- }{Mechanism: "password_based", Backend: "built_in_database", UserIDType: "username"})
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authentication", bytes.NewReader(payload))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req.Header.Add("content-type", "application/json")
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- if resp.StatusCode != http.StatusOK {
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- return fmt.Errorf("error creating default EMQX authenticator %v", string(msg))
|
|
|
- }
|
|
|
- return nil
|
|
|
+type Emqx interface {
|
|
|
+ GetType() servercfg.Emqxdeploy
|
|
|
+ CreateEmqxUser(username, password string) error
|
|
|
+ CreateEmqxUserforServer() error
|
|
|
+ CreateEmqxDefaultAuthenticator() error
|
|
|
+ CreateEmqxDefaultAuthorizer() error
|
|
|
+ CreateDefaultDenyRule() error
|
|
|
+ CreateHostACL(hostID, serverName string) error
|
|
|
+ AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error
|
|
|
+ GetUserACL(username string) (*aclObject, error)
|
|
|
+ DeleteEmqxUser(username string) error
|
|
|
}
|
|
|
|
|
|
-// CreateEmqxDefaultAuthorizer - creates a default ACL authorization mechanism based on the built in database
|
|
|
-func CreateEmqxDefaultAuthorizer() error {
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- payload, err := json.Marshal(&struct {
|
|
|
- Enable bool `json:"enable"`
|
|
|
- Type string `json:"type"`
|
|
|
- }{Enable: true, Type: "built_in_database"})
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources", bytes.NewReader(payload))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+func init() {
|
|
|
+ if servercfg.GetBrokerType() != servercfg.EmqxBrokerType {
|
|
|
+ return
|
|
|
}
|
|
|
- req.Header.Add("content-type", "application/json")
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- if resp.StatusCode != http.StatusNoContent {
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+ if servercfg.GetEmqxDeployType() == servercfg.EmqxCloudDeploy {
|
|
|
+ emqx = &EmqxCloud{
|
|
|
+ URL: servercfg.GetEmqxRestEndpoint(),
|
|
|
+ AppID: servercfg.GetEmqxAppID(),
|
|
|
+ AppSecret: servercfg.GetEmqxAppSecret(),
|
|
|
}
|
|
|
- return fmt.Errorf("error creating default EMQX ACL authorization mechanism %v", string(msg))
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// GetUserACL - returns ACL rules by username
|
|
|
-func GetUserACL(username string) (*aclObject, error) {
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- req.Header.Add("content-type", "application/json")
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- response, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- if resp.StatusCode != http.StatusOK {
|
|
|
- return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
|
|
|
- }
|
|
|
- body := new(aclObject)
|
|
|
- if err := json.Unmarshal(response, body); err != nil {
|
|
|
- return nil, err
|
|
|
- }
|
|
|
- return body, nil
|
|
|
-}
|
|
|
-
|
|
|
-// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
|
|
|
-// to allow user access to topics use the `mq.CreateUserAccessRule` function
|
|
|
-func CreateDefaultDenyRule() error {
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodPost, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/all", bytes.NewReader(payload))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req.Header.Add("content-type", "application/json")
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- if resp.StatusCode != http.StatusNoContent {
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- return fmt.Errorf("error creating default ACL rules %v", string(msg))
|
|
|
- }
|
|
|
- return nil
|
|
|
-}
|
|
|
-
|
|
|
-// CreateHostACL - create host ACL rules
|
|
|
-func CreateHostACL(hostID, serverName string) error {
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- payload, err := json.Marshal(&aclObject{
|
|
|
- Username: hostID,
|
|
|
- Rules: []aclRule{
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
|
|
|
- Permission: "allow",
|
|
|
- Action: "all",
|
|
|
- },
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("host/update/%s/%s", hostID, serverName),
|
|
|
- Permission: "allow",
|
|
|
- Action: "all",
|
|
|
- },
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
|
|
|
- Permission: "allow",
|
|
|
- Action: "all",
|
|
|
- },
|
|
|
- },
|
|
|
- })
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req.Header.Add("content-type", "application/json")
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- if resp.StatusCode != http.StatusNoContent {
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
+ } else {
|
|
|
+ emqx = &EmqxOnPrem{
|
|
|
+ URL: servercfg.GetEmqxRestEndpoint(),
|
|
|
+ UserName: servercfg.GetMqUserName(),
|
|
|
+ Password: servercfg.GetMqPassword(),
|
|
|
}
|
|
|
- return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
|
|
|
}
|
|
|
- return nil
|
|
|
}
|
|
|
|
|
|
-// a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
|
|
|
-// might occur when multiple nodes belonging to the same host are created at the same time
|
|
|
-var nodeAclMux sync.Mutex
|
|
|
-
|
|
|
-// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
|
|
|
-func AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
|
|
|
- nodeAclMux.Lock()
|
|
|
- defer nodeAclMux.Unlock()
|
|
|
- token, err := getEmqxAuthToken()
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- aclObject, err := GetUserACL(hostID)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- aclObject.Rules = append(aclObject.Rules, []aclRule{
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
|
|
|
- Permission: "allow",
|
|
|
- Action: "subscribe",
|
|
|
- },
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("ping/%s/%s", serverName, nodeID),
|
|
|
- Permission: "allow",
|
|
|
- Action: "all",
|
|
|
- },
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("update/%s/%s", serverName, nodeID),
|
|
|
- Permission: "allow",
|
|
|
- Action: "all",
|
|
|
- },
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("signal/%s/%s", serverName, nodeID),
|
|
|
- Permission: "allow",
|
|
|
- Action: "all",
|
|
|
- },
|
|
|
- {
|
|
|
- Topic: fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
|
|
|
- Permission: "allow",
|
|
|
- Action: "all",
|
|
|
- },
|
|
|
- }...)
|
|
|
- payload, err := json.Marshal(aclObject)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- req.Header.Add("content-type", "application/json")
|
|
|
- req.Header.Add("authorization", "Bearer "+token)
|
|
|
- resp, err := (&http.Client{}).Do(req)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- defer resp.Body.Close()
|
|
|
- if resp.StatusCode != http.StatusNoContent {
|
|
|
- msg, err := io.ReadAll(resp.Body)
|
|
|
- if err != nil {
|
|
|
- return err
|
|
|
- }
|
|
|
- return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
|
|
|
- }
|
|
|
- return nil
|
|
|
+// GetEmqxHandler - gets emqx handler
|
|
|
+func GetEmqxHandler() Emqx {
|
|
|
+ return emqx
|
|
|
}
|