Quellcode durchsuchen

NET-1226: Scalability Improvements (#2987)

* add api to check if failover node existed

* remove 5 minute peerUpdate

* update peerUpdate to trigger pull

* update Action name to SignalPull

* revert the peerUpdate from SignalPull

* fix getfailover error issue

* rm acls creation for on-prem emqx

* remove use of acls

* add additional broker status field on status api

* NET-1165: Remove creation of acls on emqx (#2996)

* rm acls creation for on-prem emqx

* remove use of acls

* add additional broker status field on status api

* comment out mq reconnect logic

* configure mq conn params

* add metric_interval in ENV for publishing metrics

* add metric_interval in ENV for publishing metrics

* update PUBLISH_METRIC_INTERVAL env name

* revert the mq setttings back

* fix error nil issue

---------

Co-authored-by: abhishek9686 <[email protected]>
Co-authored-by: Abhishek K <[email protected]>
Max Ma vor 1 Jahr
Ursprung
Commit
65faf73fe9

+ 1 - 1
Dockerfile

@@ -1,5 +1,5 @@
 #first stage - builder
-FROM gravitl/go-builder as builder
+FROM gravitl/go-builder AS builder
 ARG tags 
 WORKDIR /app
 COPY . .

+ 0 - 4
auth/host_session.go

@@ -164,10 +164,6 @@ func SessionHandler(conn *websocket.Conn) {
 					logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 					return
 				}
-				if err := mq.GetEmqxHandler().CreateHostACL(result.Host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
-					logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
-					return
-				}
 			}
 			logic.CheckHostPorts(&result.Host)
 			if err := logic.CreateHost(&result.Host); err != nil {

+ 1 - 0
config/config.go

@@ -94,6 +94,7 @@ type ServerConfig struct {
 	CacheEnabled               string        `yaml:"caching_enabled"`
 	EndpointDetection          bool          `json:"endpoint_detection"`
 	AllowedEmailDomains        string        `yaml:"allowed_email_domains"`
+	MetricInterval             string        `yaml:"metric_interval"`
 }
 
 // SQLConfig - Generic SQL Config

+ 0 - 4
controllers/enrollmentkeys.go

@@ -315,10 +315,6 @@ func handleHostRegister(w http.ResponseWriter, r *http.Request) {
 				logger.Log(0, "failed to create host credentials for EMQX: ", err.Error())
 				return
 			}
-			if err := mq.GetEmqxHandler().CreateHostACL(newHost.ID.String(), servercfg.GetServerInfo().Server); err != nil {
-				logger.Log(0, "failed to add host ACL rules to EMQX: ", err.Error())
-				return
-			}
 		}
 		if err = logic.CreateHost(&newHost); err != nil {
 			logger.Log(

+ 1 - 14
controllers/hosts.go

@@ -555,23 +555,10 @@ func authenticateHost(response http.ResponseWriter, request *http.Request) {
 		return
 	}
 	go func() {
-		// Create EMQX creds and ACLs if not found
+		// Create EMQX creds
 		if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
 			if err := mq.GetEmqxHandler().CreateEmqxUser(host.ID.String(), authRequest.Password); err != nil {
 				slog.Error("failed to create host credentials for EMQX: ", err.Error())
-			} else {
-				if err := mq.GetEmqxHandler().CreateHostACL(host.ID.String(), servercfg.GetServerInfo().Server); err != nil {
-					slog.Error("failed to add host ACL rules to EMQX: ", err.Error())
-				}
-				for _, nodeID := range host.Nodes {
-					if node, err := logic.GetNodeByID(nodeID); err == nil {
-						if err = mq.GetEmqxHandler().AppendNodeUpdateACL(host.ID.String(), node.Network, node.ID.String(), servercfg.GetServer()); err != nil {
-							slog.Error("failed to add ACLs for EMQX node", "error", err)
-						}
-					} else {
-						slog.Error("failed to get node", "nodeid", nodeID, "error", err)
-					}
-				}
 			}
 		}
 	}()

+ 2 - 0
controllers/server.go

@@ -117,6 +117,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
 	type status struct {
 		DB               bool      `json:"db_connected"`
 		Broker           bool      `json:"broker_connected"`
+		IsBrokerConnOpen bool      `json:"is_broker_conn_open"`
 		LicenseError     string    `json:"license_error"`
 		IsPro            bool      `json:"is_pro"`
 		TrialEndDate     time.Time `json:"trial_end_date"`
@@ -141,6 +142,7 @@ func getStatus(w http.ResponseWriter, r *http.Request) {
 	currentServerStatus := status{
 		DB:               database.IsConnected(),
 		Broker:           mq.IsConnected(),
+		IsBrokerConnOpen: mq.IsConnectionOpen(),
 		LicenseError:     licenseErr,
 		IsPro:            servercfg.IsPro,
 		TrialEndDate:     trialEndDate,

+ 2 - 0
models/host.go

@@ -116,6 +116,8 @@ const (
 	UpdateKeys HostMqAction = "UPDATE_KEYS"
 	// RequestPull - request a pull from a host
 	RequestPull HostMqAction = "REQ_PULL"
+	// SignalPull - request a pull from a host without restart
+	SignalPull HostMqAction = "SIGNAL_PULL"
 	// UpdateMetrics - updates metrics data
 	UpdateMetrics HostMqAction = "UPDATE_METRICS"
 )

+ 14 - 13
models/structs.go

@@ -273,19 +273,20 @@ type NodeJoinResponse struct {
 
 // ServerConfig - struct for dealing with the server information for a netclient
 type ServerConfig struct {
-	CoreDNSAddr string `yaml:"corednsaddr"`
-	API         string `yaml:"api"`
-	APIPort     string `yaml:"apiport"`
-	DNSMode     string `yaml:"dnsmode"`
-	Version     string `yaml:"version"`
-	MQPort      string `yaml:"mqport"`
-	MQUserName  string `yaml:"mq_username"`
-	MQPassword  string `yaml:"mq_password"`
-	BrokerType  string `yaml:"broker_type"`
-	Server      string `yaml:"server"`
-	Broker      string `yaml:"broker"`
-	IsPro       bool   `yaml:"isee" json:"Is_EE"`
-	TrafficKey  []byte `yaml:"traffickey"`
+	CoreDNSAddr    string `yaml:"corednsaddr"`
+	API            string `yaml:"api"`
+	APIPort        string `yaml:"apiport"`
+	DNSMode        string `yaml:"dnsmode"`
+	Version        string `yaml:"version"`
+	MQPort         string `yaml:"mqport"`
+	MQUserName     string `yaml:"mq_username"`
+	MQPassword     string `yaml:"mq_password"`
+	BrokerType     string `yaml:"broker_type"`
+	Server         string `yaml:"server"`
+	Broker         string `yaml:"broker"`
+	IsPro          bool   `yaml:"isee" json:"Is_EE"`
+	TrafficKey     []byte `yaml:"traffickey"`
+	MetricInterval string `yaml:"metric_interval"`
 }
 
 // User.NameInCharset - returns if name is in charset below or not

+ 1 - 4
mq/emqx.go

@@ -10,10 +10,7 @@ type Emqx interface {
 	CreateEmqxUserforServer() error
 	CreateEmqxDefaultAuthenticator() error
 	CreateEmqxDefaultAuthorizer() error
-	CreateDefaultDenyRule() error
-	CreateHostACL(hostID, serverName string) error
-	AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error
-	GetUserACL(username string) (*aclObject, error)
+	CreateDefaultAllowRule() error
 	DeleteEmqxUser(username string) error
 }
 

+ 1 - 12
mq/emqx_cloud.go

@@ -89,21 +89,10 @@ func (e *EmqxCloud) CreateEmqxDefaultAuthenticator() error { return nil } // ign
 
 func (e *EmqxCloud) CreateEmqxDefaultAuthorizer() error { return nil } // ignore
 
-func (e *EmqxCloud) CreateDefaultDenyRule() error {
+func (e *EmqxCloud) CreateDefaultAllowRule() error {
 	return nil
 }
 
-func (e *EmqxCloud) CreateHostACL(hostID, serverName string) error {
-	return nil
-}
-
-func (e *EmqxCloud) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
-	return nil
-
-}
-
-func (e *EmqxCloud) GetUserACL(username string) (*aclObject, error) { return nil, nil } // ununsed on cloud since it doesn't overwrite acls list
-
 func (e *EmqxCloud) DeleteEmqxUser(username string) error {
 
 	client := &http.Client{}

+ 3 - 153
mq/emqx_on_prem.go

@@ -7,7 +7,6 @@ import (
 	"io"
 	"net/http"
 	"strings"
-	"sync"
 
 	"github.com/gravitl/netmaker/servercfg"
 )
@@ -246,45 +245,14 @@ func (e *EmqxOnPrem) CreateEmqxDefaultAuthorizer() error {
 	return nil
 }
 
-// GetUserACL - returns ACL rules by username
-func (e *EmqxOnPrem) GetUserACL(username string) (*aclObject, error) {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return nil, err
-	}
-	req, err := http.NewRequest(http.MethodGet, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+username, nil)
-	if err != nil {
-		return nil, err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return nil, err
-	}
-	defer resp.Body.Close()
-	response, err := io.ReadAll(resp.Body)
-	if err != nil {
-		return nil, err
-	}
-	if resp.StatusCode != http.StatusOK {
-		return nil, fmt.Errorf("error fetching ACL rules %v", string(response))
-	}
-	body := new(aclObject)
-	if err := json.Unmarshal(response, body); err != nil {
-		return nil, err
-	}
-	return body, nil
-}
-
-// CreateDefaultDenyRule - creates a rule to deny access to all topics for all users by default
+// CreateDefaultAllowRule - creates a rule to deny access to all topics for all users by default
 // to allow user access to topics use the `mq.CreateUserAccessRule` function
-func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
+func (e *EmqxOnPrem) CreateDefaultAllowRule() error {
 	token, err := getEmqxAuthToken()
 	if err != nil {
 		return err
 	}
-	payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "deny", Action: "all"}}})
+	payload, err := json.Marshal(&aclObject{Rules: []aclRule{{Topic: "#", Permission: "allow", Action: "all"}}})
 	if err != nil {
 		return err
 	}
@@ -308,121 +276,3 @@ func (e *EmqxOnPrem) CreateDefaultDenyRule() error {
 	}
 	return nil
 }
-
-// CreateHostACL - create host ACL rules
-func (e *EmqxOnPrem) CreateHostACL(hostID, serverName string) error {
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	payload, err := json.Marshal(&aclObject{
-		Username: hostID,
-		Rules: []aclRule{
-			{
-				Topic:      fmt.Sprintf("peers/host/%s/%s", hostID, serverName),
-				Permission: "allow",
-				Action:     "all",
-			},
-			{
-				Topic:      fmt.Sprintf("host/update/%s/%s", hostID, serverName),
-				Permission: "allow",
-				Action:     "all",
-			},
-			{
-				Topic:      fmt.Sprintf("host/serverupdate/%s/%s", serverName, hostID),
-				Permission: "allow",
-				Action:     "all",
-			},
-		},
-	})
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
-	}
-	return nil
-}
-
-// a lock required for preventing simultaneous updates to the same ACL object leading to overwriting each other
-// might occur when multiple nodes belonging to the same host are created at the same time
-var nodeAclMux sync.Mutex
-
-// AppendNodeUpdateACL - adds ACL rule for subscribing to node updates for a node ID
-func (e *EmqxOnPrem) AppendNodeUpdateACL(hostID, nodeNetwork, nodeID, serverName string) error {
-	nodeAclMux.Lock()
-	defer nodeAclMux.Unlock()
-	token, err := getEmqxAuthToken()
-	if err != nil {
-		return err
-	}
-	aclObject, err := emqx.GetUserACL(hostID)
-	if err != nil {
-		return err
-	}
-	aclObject.Rules = append(aclObject.Rules, []aclRule{
-		{
-			Topic:      fmt.Sprintf("node/update/%s/%s", nodeNetwork, nodeID),
-			Permission: "allow",
-			Action:     "subscribe",
-		},
-		{
-			Topic:      fmt.Sprintf("ping/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("update/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("signal/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-		{
-			Topic:      fmt.Sprintf("metrics/%s/%s", serverName, nodeID),
-			Permission: "allow",
-			Action:     "all",
-		},
-	}...)
-	payload, err := json.Marshal(aclObject)
-	if err != nil {
-		return err
-	}
-	req, err := http.NewRequest(http.MethodPut, servercfg.GetEmqxRestEndpoint()+"/api/v5/authorization/sources/built_in_database/username/"+hostID, bytes.NewReader(payload))
-	if err != nil {
-		return err
-	}
-	req.Header.Add("content-type", "application/json")
-	req.Header.Add("authorization", "Bearer "+token)
-	resp, err := (&http.Client{}).Do(req)
-	if err != nil {
-		return err
-	}
-	defer resp.Body.Close()
-	if resp.StatusCode != http.StatusNoContent {
-		msg, err := io.ReadAll(resp.Body)
-		if err != nil {
-			return err
-		}
-		return fmt.Errorf("error adding ACL Rules for user %s Error: %v", hostID, string(msg))
-	}
-	return nil
-}

+ 0 - 6
mq/handlers.go

@@ -113,12 +113,6 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				slog.Error("failed to send new node to host", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 				return
 			} else {
-				if servercfg.GetBrokerType() == servercfg.EmqxBrokerType {
-					if err = emqx.AppendNodeUpdateACL(hu.Host.ID.String(), hu.Node.Network, hu.Node.ID.String(), servercfg.GetServer()); err != nil {
-						slog.Error("failed to add ACLs for EMQX node", "error", err)
-						return
-					}
-				}
 				nodes, err := logic.GetAllNodes()
 				if err != nil {
 					return

+ 6 - 1
mq/mq.go

@@ -58,7 +58,7 @@ func SetupMQTT(fatal bool) {
 				logger.Log(0, err.Error())
 			}
 			// create a default deny ACL to all topics for all users
-			if err := emqx.CreateDefaultDenyRule(); err != nil {
+			if err := emqx.CreateDefaultAllowRule(); err != nil {
 				log.Fatal(err)
 			}
 		} else {
@@ -142,6 +142,11 @@ func Keepalive(ctx context.Context) {
 
 // IsConnected - function for determining if the mqclient is connected or not
 func IsConnected() bool {
+	return mqclient != nil && mqclient.IsConnected()
+}
+
+// IsConnectionOpen - function for determining if the mqclient is connected or not
+func IsConnectionOpen() bool {
 	return mqclient != nil && mqclient.IsConnectionOpen()
 }
 

+ 1 - 18
mq/publishers.go

@@ -35,7 +35,6 @@ func PublishPeerUpdate(replacePeers bool) error {
 				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			}
 		}(host)
-
 	}
 	return err
 }
@@ -217,30 +216,14 @@ func sendPeers() {
 	if err != nil && len(hosts) > 0 {
 		logger.Log(1, "error retrieving networks for keepalive", err.Error())
 	}
-	nodes, err := logic.GetAllNodes()
-	if err != nil {
-		return
-	}
-	var force bool
+
 	peer_force_send++
 	if peer_force_send == 5 {
 		servercfg.SetHost()
-		force = true
 		peer_force_send = 0
 		err := logic.TimerCheckpoint() // run telemetry & log dumps if 24 hours has passed..
 		if err != nil {
 			logger.Log(3, "error occurred on timer,", err.Error())
 		}
-
-		//collectServerMetrics(networks[:])
-	}
-	if force {
-		for _, host := range hosts {
-			host := host
-			logger.Log(2, "sending scheduled peer update (5 min)")
-			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, nil, false); err != nil {
-				logger.Log(1, "error publishing peer updates for host: ", host.ID.String(), " Err: ", err.Error())
-			}
-		}
 	}
 }

+ 32 - 0
pro/controllers/failover.go

@@ -19,12 +19,44 @@ import (
 
 // FailOverHandlers - handlers for FailOver
 func FailOverHandlers(r *mux.Router) {
+	r.HandleFunc("/api/v1/node/{nodeid}/failover", http.HandlerFunc(getfailOver)).Methods(http.MethodGet)
 	r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(createfailOver))).Methods(http.MethodPost)
 	r.HandleFunc("/api/v1/node/{nodeid}/failover", logic.SecurityCheck(true, http.HandlerFunc(deletefailOver))).Methods(http.MethodDelete)
 	r.HandleFunc("/api/v1/node/{network}/failover/reset", logic.SecurityCheck(true, http.HandlerFunc(resetFailOver))).Methods(http.MethodPost)
 	r.HandleFunc("/api/v1/node/{nodeid}/failover_me", controller.Authorize(true, false, "host", http.HandlerFunc(failOverME))).Methods(http.MethodPost)
 }
 
+// swagger:route GET /api/v1/node/failover node getfailOver
+//
+// get failover node.
+//
+//			Schemes: https
+//
+//			Security:
+//	  		oauth
+//
+//			Responses:
+//				200: nodeResponse
+func getfailOver(w http.ResponseWriter, r *http.Request) {
+	var params = mux.Vars(r)
+	nodeid := params["nodeid"]
+	// confirm host exists
+	node, err := logic.GetNodeByID(nodeid)
+	if err != nil {
+		slog.Error("failed to get node:", "error", err.Error())
+		logic.ReturnErrorResponse(w, r, logic.FormatError(err, "badrequest"))
+		return
+	}
+
+	failOverNode, exists := proLogic.FailOverExists(node.Network)
+	if !exists {
+		logic.ReturnErrorResponse(w, r, logic.FormatError(errors.New("failover node not found"), "notfound"))
+		return
+	}
+	w.Header().Set("Content-Type", "application/json")
+	logic.ReturnSuccessResponseWithJson(w, r, failOverNode, "get failover node successfully")
+}
+
 // swagger:route POST /api/v1/node/failover node createfailOver
 //
 // Create a relay.

+ 12 - 1
servercfg/serverconf.go

@@ -91,7 +91,7 @@ func GetServerConfig() config.ServerConfig {
 	}
 	cfg.JwtValidityDuration = GetJwtValidityDuration()
 	cfg.RacAutoDisable = GetRacAutoDisable()
-
+	cfg.MetricInterval = GetMetricInterval()
 	return cfg
 }
 
@@ -135,6 +135,7 @@ func GetServerInfo() models.ServerConfig {
 	}
 	cfg.Version = GetVersion()
 	cfg.IsPro = IsPro
+	cfg.MetricInterval = GetMetricInterval()
 	return cfg
 }
 
@@ -586,6 +587,16 @@ func GetMqUserName() string {
 	return password
 }
 
+// GetMetricInterval - get the publish metric interval
+func GetMetricInterval() string {
+	//default 15 minutes
+	mi := "15"
+	if os.Getenv("PUBLISH_METRIC_INTERVAL") != "" {
+		mi = os.Getenv("PUBLISH_METRIC_INTERVAL")
+	}
+	return mi
+}
+
 // GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX
 func GetEmqxRestEndpoint() string {
 	return os.Getenv("EMQX_REST_ENDPOINT")