Browse Source

batch peerUpdate

Max Ma 1 year ago
parent
commit
629e99a0a2

+ 1 - 1
controllers/ext_client.go

@@ -602,7 +602,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 					slog.Error("Failed to get nodes", "error", err)
 					slog.Error("Failed to get nodes", "error", err)
 					return
 					return
 				}
 				}
-				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false)
+				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false, nil)
 			}
 			}
 		}
 		}
 
 

+ 1 - 1
controllers/network.go

@@ -319,7 +319,7 @@ func updateNetworkACLv2(w http.ResponseWriter, r *http.Request) {
 		}
 		}
 		for hostId, clients := range assocClientsToDisconnectPerHost {
 		for hostId, clients := range assocClientsToDisconnectPerHost {
 			if host, ok := hostsMap[hostId]; ok {
 			if host, ok := hostsMap[hostId]; ok {
-				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false); err != nil {
+				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false, nil); err != nil {
 					slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId)
 					slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId)
 				}
 				}
 			}
 			}

+ 1 - 1
controllers/node.go

@@ -592,7 +592,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 				return
 				return
 			}
 			}
 			go func() {
 			go func() {
-				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil {
+				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false, nil); err != nil {
 					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
 					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
 				}
 				}
 				if err := mq.NodeUpdate(&node); err != nil {
 				if err := mq.NodeUpdate(&node); err != nil {

+ 2 - 2
mq/handlers.go

@@ -65,7 +65,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			}
 			}
 			allNodes, err := logic.GetAllNodes()
 			allNodes, err := logic.GetAllNodes()
 			if err == nil {
 			if err == nil {
-				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
+				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false, nil)
 			}
 			}
 		} else {
 		} else {
 			err = PublishPeerUpdate(false)
 			err = PublishPeerUpdate(false)
@@ -117,7 +117,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				if err != nil {
 				if err != nil {
 					return
 					return
 				}
 				}
-				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
+				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false, nil); err != nil {
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					return
 					return
 				}
 				}

+ 60 - 9
mq/publishers.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"encoding/json"
 	"errors"
 	"errors"
 	"fmt"
 	"fmt"
+	"sync"
 	"time"
 	"time"
 
 
 	"github.com/gravitl/netmaker/logger"
 	"github.com/gravitl/netmaker/logger"
@@ -13,6 +14,8 @@ import (
 	"golang.org/x/exp/slog"
 	"golang.org/x/exp/slog"
 )
 )
 
 
+const batchSize = 20
+
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
 func PublishPeerUpdate(replacePeers bool) error {
 	if !servercfg.IsMessageQueueBackend() {
 	if !servercfg.IsMessageQueueBackend() {
@@ -28,14 +31,59 @@ func PublishPeerUpdate(replacePeers bool) error {
 	if err != nil {
 	if err != nil {
 		return err
 		return err
 	}
 	}
-	for _, host := range hosts {
-		host := host
-		go func(host models.Host) {
-			if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
-				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+
+	var wg sync.WaitGroup
+	hostLen := len(hosts)
+	batch := batchSize
+	div := hostLen / batch
+	mod := hostLen % batch
+
+	if div == 0 {
+		wg.Add(hostLen)
+		for i := 0; i < hostLen; i++ {
+			host := hosts[i]
+			go func(host models.Host) {
+				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
+					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+				}
+			}(host)
+		}
+		wg.Wait()
+	} else {
+		for i := 0; i < div*batch; i += batch {
+			wg.Add(batch)
+			for j := 0; j < batch; j++ {
+				host := hosts[i+j]
+				go func(host models.Host) {
+					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
+						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+					}
+				}(host)
 			}
 			}
-		}(host)
+			wg.Wait()
+		}
+		if mod != 0 {
+			wg.Add(hostLen - (div * batch))
+			for k := div * batch; k < hostLen; k++ {
+				host := hosts[k]
+				go func(host models.Host) {
+					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
+						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+					}
+				}(host)
+			}
+			wg.Wait()
+		}
 	}
 	}
+
+	// for _, host := range hosts {
+	// 	host := host
+	// 	go func(host models.Host) {
+	// 		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
+	// 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+	// 		}
+	// 	}(host)
+	// }
 	return err
 	return err
 }
 }
 
 
@@ -57,7 +105,7 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
 	}
 	}
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
-		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
+		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 		}
 	}
 	}
@@ -83,7 +131,7 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 	for _, host := range hosts {
 	for _, host := range hosts {
 		host := host
 		host := host
 		if host.OS != models.OS_Types.IoT {
 		if host.OS != models.OS_Types.IoT {
-			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
+			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
 				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			}
 			}
 		}
 		}
@@ -92,7 +140,10 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 }
 }
 
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {
+func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
+	if wg != nil {
+		defer wg.Done()
+	}
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	if err != nil {
 	if err != nil {
 		return err
 		return err

+ 1 - 1
pro/controllers/relay.go

@@ -108,7 +108,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 						return
 						return
 					}
 					}
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
-					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil {
+					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false, nil); err != nil {
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 					}
 					}
 				}
 				}

+ 1 - 1
pro/remote_access_client.go

@@ -78,7 +78,7 @@ func disableExtClient(client *models.ExtClient) error {
 			if err != nil {
 			if err != nil {
 				return err
 				return err
 			}
 			}
-			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false)
+			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false, nil)
 		} else {
 		} else {
 			return err
 			return err
 		}
 		}