Bläddra i källkod

revert batch peerUpdate

Max Ma 1 år sedan
förälder
incheckning
fb5773fa28
7 ändrade filer med 23 tillägg och 166 borttagningar
  1. 1 1
      controllers/ext_client.go
  2. 1 1
      controllers/network.go
  3. 1 1
      controllers/node.go
  4. 2 2
      mq/handlers.go
  5. 16 159
      mq/publishers.go
  6. 1 1
      pro/controllers/relay.go
  7. 1 1
      pro/remote_access_client.go

+ 1 - 1
controllers/ext_client.go

@@ -602,7 +602,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 					slog.Error("Failed to get nodes", "error", err)
 					return
 				}
-				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false, nil)
+				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false)
 			}
 		}
 

+ 1 - 1
controllers/network.go

@@ -319,7 +319,7 @@ func updateNetworkACLv2(w http.ResponseWriter, r *http.Request) {
 		}
 		for hostId, clients := range assocClientsToDisconnectPerHost {
 			if host, ok := hostsMap[hostId]; ok {
-				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false, nil); err != nil {
+				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false); err != nil {
 					slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId)
 				}
 			}

+ 1 - 1
controllers/node.go

@@ -592,7 +592,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 				return
 			}
 			go func() {
-				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false, nil); err != nil {
+				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil {
 					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
 				}
 				if err := mq.NodeUpdate(&node); err != nil {

+ 2 - 2
mq/handlers.go

@@ -65,7 +65,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			}
 			allNodes, err := logic.GetAllNodes()
 			if err == nil {
-				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false, nil)
+				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
 			}
 		} else {
 			err = PublishPeerUpdate(false)
@@ -117,7 +117,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				if err != nil {
 					return
 				}
-				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false, nil); err != nil {
+				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					return
 				}

+ 16 - 159
mq/publishers.go

@@ -4,7 +4,6 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
-	"sync"
 	"time"
 
 	"github.com/gravitl/netmaker/logger"
@@ -25,8 +24,6 @@ type ServerStatus struct {
 	Failover         map[string]bool `json:"is_failover_existed"`
 }
 
-const batchSize = 50
-
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
 	if !servercfg.IsMessageQueueBackend() {
@@ -43,58 +40,14 @@ func PublishPeerUpdate(replacePeers bool) error {
 		return err
 	}
 
-	var wg sync.WaitGroup
-	hostLen := len(hosts)
-	batch := batchSize
-	div := hostLen / batch
-	mod := hostLen % batch
-
-	if div == 0 {
-		wg.Add(hostLen)
-		for i := 0; i < hostLen; i++ {
-			host := hosts[i]
-			go func(host models.Host) {
-				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
-					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-				}
-			}(host)
-		}
-		wg.Wait()
-	} else {
-		for i := 0; i < div*batch; i += batch {
-			wg.Add(batch)
-			for j := 0; j < batch; j++ {
-				host := hosts[i+j]
-				go func(host models.Host) {
-					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
-						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-					}
-				}(host)
+	for _, host := range hosts {
+		host := host
+		go func(host models.Host) {
+			if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
+				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			}
-			wg.Wait()
-		}
-		if mod != 0 {
-			wg.Add(hostLen - (div * batch))
-			for k := div * batch; k < hostLen; k++ {
-				host := hosts[k]
-				go func(host models.Host) {
-					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
-						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-					}
-				}(host)
-			}
-			wg.Wait()
-		}
+		}(host)
 	}
-
-	// for _, host := range hosts {
-	// 	host := host
-	// 	go func(host models.Host) {
-	// 		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
-	// 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-	// 		}
-	// 	}(host)
-	// }
 	return err
 }
 
@@ -115,55 +68,12 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
 		return err
 	}
 
-	var wg sync.WaitGroup
-	hostLen := len(hosts)
-	batch := batchSize
-	div := hostLen / batch
-	mod := hostLen % batch
-
-	if div == 0 {
-		wg.Add(hostLen)
-		for i := 0; i < hostLen; i++ {
-			host := hosts[i]
-			go func(host models.Host) {
-				if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, &wg); err != nil {
-					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-				}
-			}(host)
-		}
-		wg.Wait()
-	} else {
-		for i := 0; i < div*batch; i += batch {
-			wg.Add(batch)
-			for j := 0; j < batch; j++ {
-				host := hosts[i+j]
-				go func(host models.Host) {
-					if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, &wg); err != nil {
-						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-					}
-				}(host)
-			}
-			wg.Wait()
-		}
-		if mod != 0 {
-			wg.Add(hostLen - (div * batch))
-			for k := div * batch; k < hostLen; k++ {
-				host := hosts[k]
-				go func(host models.Host) {
-					if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, &wg); err != nil {
-						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-					}
-				}(host)
-			}
-			wg.Wait()
+	for _, host := range hosts {
+		host := host
+		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
+			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 		}
 	}
-	// for _, host := range hosts {
-	// 	host := host
-	// 	if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
-	// 		logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-	// 	}
-	// }
 	return err
 }
 
@@ -184,72 +94,19 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 		return err
 	}
 
-	var wg sync.WaitGroup
-	hostLen := len(hosts)
-	batch := batchSize
-	div := hostLen / batch
-	mod := hostLen % batch
-
-	if div == 0 {
-		wg.Add(hostLen)
-		for i := 0; i < hostLen; i++ {
-			host := hosts[i]
-			go func(host models.Host) {
-				if host.OS != models.OS_Types.IoT {
-					if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, &wg); err != nil {
-						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-					}
-				}
-			}(host)
-		}
-		wg.Wait()
-	} else {
-		for i := 0; i < div*batch; i += batch {
-			wg.Add(batch)
-			for j := 0; j < batch; j++ {
-				host := hosts[i+j]
-				go func(host models.Host) {
-					if host.OS != models.OS_Types.IoT {
-						if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, &wg); err != nil {
-							logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-						}
-					}
-				}(host)
+	for _, host := range hosts {
+		host := host
+		if host.OS != models.OS_Types.IoT {
+			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
+				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
 			}
-			wg.Wait()
-		}
-		if mod != 0 {
-			wg.Add(hostLen - (div * batch))
-			for k := div * batch; k < hostLen; k++ {
-				host := hosts[k]
-				go func(host models.Host) {
-					if host.OS != models.OS_Types.IoT {
-						if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, &wg); err != nil {
-							logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-						}
-					}
-				}(host)
-			}
-			wg.Wait()
 		}
 	}
-
-	// for _, host := range hosts {
-	// 	host := host
-	// 	if host.OS != models.OS_Types.IoT {
-	// 		if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
-	// 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-	// 		}
-	// 	}
-	// }
 	return err
 }
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
-	if wg != nil {
-		defer wg.Done()
-	}
+func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	if err != nil {
 		return err

+ 1 - 1
pro/controllers/relay.go

@@ -108,7 +108,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 						return
 					}
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
-					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false, nil); err != nil {
+					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil {
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 					}
 				}

+ 1 - 1
pro/remote_access_client.go

@@ -78,7 +78,7 @@ func disableExtClient(client *models.ExtClient) error {
 			if err != nil {
 				return err
 			}
-			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false, nil)
+			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false)
 		} else {
 			return err
 		}