Browse Source

NET-1440:batch peerUpdate (#3024)

* batch peerUpdate

* add peerUpdate batch size to config

* update batch peerUpdate

* add batch peerUpdate switch

* set batch peerUpdate to true by default
Max Ma 1 year ago
parent
commit
73aca7cbdc

+ 1 - 1
controllers/ext_client.go

@@ -602,7 +602,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 					slog.Error("Failed to get nodes", "error", err)
 					slog.Error("Failed to get nodes", "error", err)
 					return
 					return
 				}
 				}
-				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false)
+				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false, nil)
 			}
 			}
 		}
 		}
 
 

+ 1 - 1
controllers/network.go

@@ -319,7 +319,7 @@ func updateNetworkACLv2(w http.ResponseWriter, r *http.Request) {
 		}
 		}
 		for hostId, clients := range assocClientsToDisconnectPerHost {
 		for hostId, clients := range assocClientsToDisconnectPerHost {
 			if host, ok := hostsMap[hostId]; ok {
 			if host, ok := hostsMap[hostId]; ok {
-				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false); err != nil {
+				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false, nil); err != nil {
 					slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId)
 					slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId)
 				}
 				}
 			}
 			}

+ 1 - 1
controllers/node.go

@@ -592,7 +592,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 				return
 				return
 			}
 			}
 			go func() {
 			go func() {
-				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil {
+				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false, nil); err != nil {
 					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
 					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
 				}
 				}
 				if err := mq.NodeUpdate(&node); err != nil {
 				if err := mq.NodeUpdate(&node); err != nil {

+ 2 - 2
mq/handlers.go

@@ -65,7 +65,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			}
 			}
 			allNodes, err := logic.GetAllNodes()
 			allNodes, err := logic.GetAllNodes()
 			if err == nil {
 			if err == nil {
-				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
+				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false, nil)
 			}
 			}
 		} else {
 		} else {
 			err = PublishPeerUpdate(false)
 			err = PublishPeerUpdate(false)
@@ -117,7 +117,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				if err != nil {
 				if err != nil {
 					return
 					return
 				}
 				}
-				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
+				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false, nil); err != nil {
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					return
 					return
 				}
 				}

+ 40 - 11
mq/publishers.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"sync"
 	"time"
 	"time"
 
 
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logger"
@@ -13,6 +14,9 @@ import (
 	"golang.org/x/exp/slog"
 	"golang.org/x/exp/slog"
 )
 )
 
 
+var batchSize = servercfg.GetPeerUpdateBatchSize()
+var batchUpdate = servercfg.GetBatchPeerUpdate()
+
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
 func PublishPeerUpdate(replacePeers bool) error {
 	if !servercfg.IsMessageQueueBackend() {
 	if !servercfg.IsMessageQueueBackend() {
@@ -28,15 +32,37 @@ func PublishPeerUpdate(replacePeers bool) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	for _, host := range hosts {
-		host := host
-		go func(host models.Host) {
-			if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
-				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-			}
-		}(host)
+
+	//if batch peer update disabled
+	if !batchUpdate {
+		for _, host := range hosts {
+			host := host
+			go func(host models.Host) {
+				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
+					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+				}
+			}(host)
+		}
+		return nil
 	}
 	}
-	return err
+
+	//if batch peer update enabled
+	batchHost := BatchItems(hosts, batchSize)
+	var wg sync.WaitGroup
+	for _, v := range batchHost {
+		hostLen := len(v)
+		wg.Add(hostLen)
+		for i := 0; i < hostLen; i++ {
+			host := hosts[i]
+			go func(host models.Host) {
+				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
+					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+				}
+			}(host)
+		}
+		wg.Wait()
+	}
+	return nil
 }
 }
 
 
 // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
 // PublishDeletedNodePeerUpdate --- determines and publishes a peer update
@@ -57,7 +83,7 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
 	}
 	}
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
-		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 		}
 	}
 	}
@@ -83,7 +109,7 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
 		if host.OS != models.OS_Types.IoT {
 		if host.OS != models.OS_Types.IoT {
-			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
+			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
 				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			}
 			}
 		}
 		}
@@ -92,7 +118,10 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 }
 }
 
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {
+func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
+	if wg != nil {
+		defer wg.Done()
+	}
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	if err != nil {
 	if err != nil {
 		return err
 		return err

+ 21 - 0
mq/util.go

@@ -3,6 +3,7 @@ package mq
 import (
 import (
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"math"
 	"strings"
 	"strings"
 	"time"
 	"time"
 
 
@@ -45,6 +46,26 @@ func DecryptMsg(node *models.Node, msg []byte) ([]byte, error) {
 	return decryptMsgWithHost(host, msg)
 	return decryptMsgWithHost(host, msg)
 }
 }
 
 
+func BatchItems[T any](items []T, batchSize int) [][]T {
+	if batchSize <= 0 {
+		return nil
+	}
+	remainderBatchSize := len(items) % batchSize
+	nBatches := int(math.Ceil(float64(len(items)) / float64(batchSize)))
+	batches := make([][]T, nBatches)
+	for i := range batches {
+		if i == nBatches-1 && remainderBatchSize > 0 {
+			batches[i] = make([]T, remainderBatchSize)
+		} else {
+			batches[i] = make([]T, batchSize)
+		}
+		for j := range batches[i] {
+			batches[i][j] = items[i*batchSize+j]
+		}
+	}
+	return batches
+}
+
 func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
 func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
 	if host.OS == models.OS_Types.IoT {
 	if host.OS == models.OS_Types.IoT {
 		return msg, nil
 		return msg, nil

+ 1 - 1
pro/controllers/relay.go

@@ -108,7 +108,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 						return
 						return
 					}
 					}
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
-					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil {
+					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false, nil); err != nil {
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 					}
 					}
 				}
 				}

+ 1 - 1
pro/remote_access_client.go

@@ -78,7 +78,7 @@ func disableExtClient(client *models.ExtClient) error {
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false)
+			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false, nil)
 		} else {
 		} else {
 			return err
 			return err
 		}
 		}

+ 22 - 0
servercfg/serverconf.go

@@ -597,6 +597,28 @@ func GetMetricInterval() string {
 	return mi
 	return mi
 }
 }
 
 
+// GetBatchPeerUpdate - if batch peer update
+func GetBatchPeerUpdate() bool {
+	enabled := true
+	if os.Getenv("PEER_UPDATE_BATCH") != "" {
+		enabled = os.Getenv("PEER_UPDATE_BATCH") == "true"
+	}
+	return enabled
+}
+
+// GetPeerUpdateBatchSize - get the batch size for peer update
+func GetPeerUpdateBatchSize() int {
+	//default 50
+	batchSize := 50
+	if os.Getenv("PEER_UPDATE_BATCH_SIZE") != "" {
+		b, e := strconv.Atoi(os.Getenv("PEER_UPDATE_BATCH_SIZE"))
+		if e == nil && b > 0 && b < 1000 {
+			batchSize = b
+		}
+	}
+	return batchSize
+}
+
 // GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX
 // GetEmqxRestEndpoint - returns the REST API Endpoint of EMQX
 func GetEmqxRestEndpoint() string {
 func GetEmqxRestEndpoint() string {
 	return os.Getenv("EMQX_REST_ENDPOINT")
 	return os.Getenv("EMQX_REST_ENDPOINT")