Procházet zdrojové kódy

NET-1165: Remove creation of acls on emqx (#2996)

* rm acls creation for on-prem emqx

* remove use of acls

* add additional broker status field on status api
Abhishek K před 1 rokem
rodič
revize
d593de2b6e

+ 0 - 4
auth/host_session.go

@@ -164,10 +164,6 @@ func SessionHandler(conn *websocket.Conn) {
 					logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 					return
 				}
-				if err := mq.GetEmqxHandler().CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
-					logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
-					return
-				}
 			}
 			logic.CheckHostPorts(&result.Host)
 			if err := logic.CreateHost(&result.Host); err != nil {

+ 0 - 4
controllers/enrollmentkeys.go

@@ -315,10 +315,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 				logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 				return
 			}
-			if err := mq.GetEmqxHandler().CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil {
-				logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
-				return
-			}
 		}
 		if err = logic.CreateHost(&newHost); err != nil {
 			logger.Log(

+ 1 - 14
controllers/hosts.go

@@ -555,23 +555,10 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
 		return
 	}
 	go func() {
-		// Create EMQX creds and ACLs if not found
+		// Create EMQX creds
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 			if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
 				slog.Error("failed to create host credentials for EMQX: ", err.Error())
-			} else {
-				if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
-					slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
-				}
-				for _, nodeID := range host.Nodes {
-					if node, err := logic.GetNodeByID(nodeID); err == nil {
-						if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
-							slog.Error("failed to add ACLs for EMQX node", "error", err)
-						}
-					} else {
-						slog.Error("failed to get node", "nodeid", nodeID, "error", err)
-					}
-				}
 			}
 		}
 	}()

+ 2 - 0
controllers/server.go

@@ -117,6 +117,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
 	type status struct {
 		DB               bool      `json:"db_connected"`
 		Broker           bool      `json:"broker_connected"`
+		IsBrokerConnOpen bool      `json:"is_broker_conn_open"`
 		LicenseError     string    `json:"license_error"`
 		IsPro            bool      `json:"is_pro"`
 		TrialEndDate     time.Time `json:"trial_end_date"`
@@ -141,6 +142,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
 	currentServerStatus := status{
 		DB:               database.IsConnected(),
 		Broker:           mq.IsConnected(),
+		IsBrokerConnOpen: mq.IsConnectionOpen(),
 		LicenseError:     licenseErr,
 		IsPro:            servercfg.IsPro,
 		TrialEndDate:     trialEndDate,

+ 1 - 4
mq/emqx.go

@@ -10,10 +10,7 @@ type Emqx interface {
 	CreateEmqxUserforServer() error
 	CreateEmqxDefaultAuthenticator() error
 	CreateEmqxDefaultAuthorizer() error
-	CreateDefaultDenyRule() error
-	CreateHostACL(hostID, serverName string) error
-	AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error
-	GetUserACL(username string) (*aclObject, error)
+	CreateDefaultAllowRule() error
 	DeleteEmqxUser(username string) error
 }
 

+ 1 - 12
mq/emqx_cloud.go

@@ -89,21 +89,10 @@ func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ign
 
 func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
 
-func (e *EmqxCloud) CreateDefaultDenyRule() error {
+func (e *EmqxCloud) CreateDefaultAllowRule() error {
 	return nil
 }
 
-func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
-	return nil
-}
-
-func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
-	return nil
-
-}
-
-func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list
-
 func (e *EmqxCloud) DeleteEmqxUser(username string) error {
 
 	client := &http.Client{}

+ 3 - 153
mq/emqx_on_prem.go

@@ -7,7 +7,6 @@ import (
 	"io"
 	"net/http"
 	"strings"
-	"sync"
 
 	"github.com/gravitl/netmaker/servercfg"
 )
@@ -246,45 +245,14 @@ func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error {
 	return nil
 }
 
-// GetUserACL - returns ACL rules by username
-func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return nil, err
-	}
-	req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
-	if err != nil {
-		return nil, err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return nil, err
-	}
-	defer resp.Body.Close()
-	response, err := io.ReadAll(resp.Body)
-	if err != nil {
-		return nil, err
-	}
-	if resp.StatusCode != http.StatusOK {
-		return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
-	}
-	body := new(aclObject)
-	if err := json.Unmarshal(response, body); err != nil {
-		return nil, err
-	}
-	return body, nil
-}
-
-// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
+// CreateDefaultAllowRule - creates a rule to deny access to all topics for all users by default
 // to allow user access to topics use the `mq.CreateUserAccessRule` function
-func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
+func (e *EmqxOnPrem) CreateDefaultAllowRule() error {
 	token, err := getEmqxAuthToken()
 	if err != nil {
 		return err
 	}
-	payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
+	payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "allow", Action: "all"}}})
 	if err != nil {
 		return err
 	}
@@ -308,121 +276,3 @@ func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
 	}
 	return nil
 }
-
-// CreateHostACL - create host ACL rules
-func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	payload, err := json.Marshal(&aclObject{
-		Username: hostID,
-		Rules: []aclRule{
-			{
-				Topic:      fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
-				Permission: "allow",
-				Action:     "all",
-			},
-			{
-				Topic:      fmt.Sprintf("host/update/%s/%s", hostID, serverName),
-				Permission: "allow",
-				Action:     "all",
-			},
-			{
-				Topic:      fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
-				Permission: "allow",
-				Action:     "all",
-			},
-		},
-	})
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
-	}
-	return nil
-}
-
-// a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
-// might occur when multiple nodes belonging to the same host are created at the same time
-var nodeAclMux sync.Mutex
-
-// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
-func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
-	nodeAclMux.Lock()
-	defer nodeAclMux.Unlock()
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	aclObject, err := emqx.GetUserACL(hostID)
-	if err != nil {
-		return err
-	}
-	aclObject.Rules = append(aclObject.Rules, []aclRule{
-		{
-			Topic:      fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
-			Permission: "allow",
-			Action:     "subscribe",
-		},
-		{
-			Topic:      fmt.Sprintf("ping/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("update/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("signal/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-	}...)
-	payload, err := json.Marshal(aclObject)
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
-	}
-	return nil
-}

+ 0 - 6
mq/handlers.go

@@ -113,12 +113,6 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				slog.Error("failed to send new node to host", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 				return
 			} else {
-				if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-					if err = emqx.AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil {
-						slog.Error("failed to add ACLs for EMQX node", "error", err)
-						return
-					}
-				}
 				nodes, err := logic.GetAllNodes()
 				if err != nil {
 					return

+ 6 - 1
mq/mq.go

@@ -58,7 +58,7 @@ func SetupMQTT(fatal bool) {
 				logger.Log(0, err.Error())
 			}
 			// create a default deny ACL to all topics for all users
-			if err := emqx.CreateDefaultDenyRule(); err != nil {
+			if err := emqx.CreateDefaultAllowRule(); err != nil {
 				log.Fatal(err)
 			}
 		} else {
@@ -142,6 +142,11 @@ func Keepalive(ctx context.Context) {
 
 // IsConnected - function for determining if the mqclient is connected or not
 func IsConnected() bool {
+	return mqclient != nil && mqclient.IsConnected()
+}
+
+// IsConnectionOpen - function for determining if the mqclient is connected or not
+func IsConnectionOpen() bool {
 	return mqclient != nil && mqclient.IsConnectionOpen()
 }