Max Ma 1 год назад
Родитель
Сommit
500d783861
9 измененных файлов с 177 добавлено и 32 удалено
  1. 1 1
      controllers/ext_client.go
  2. 1 1
      controllers/network.go
  3. 1 1
      controllers/node.go
  4. 2 2
      mq/handlers.go
  5. 2 1
      mq/mq.go
  6. 163 18
      mq/publishers.go
  7. 5 6
      mq/util.go
  8. 1 1
      pro/controllers/relay.go
  9. 1 1
      pro/remote_access_client.go

+ 1 - 1
controllers/ext_client.go

@@ -602,7 +602,7 @@ func updateExtClient(w http.ResponseWriter, r *http.Request) {
 					slog.Error("Failed to get nodes", "error", err)
 					return
 				}
-				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false)
+				go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{oldExtClient}, false, nil)
 			}
 		}
 

+ 1 - 1
controllers/network.go

@@ -319,7 +319,7 @@ func updateNetworkACLv2(w http.ResponseWriter, r *http.Request) {
 		}
 		for hostId, clients := range assocClientsToDisconnectPerHost {
 			if host, ok := hostsMap[hostId]; ok {
-				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false); err != nil {
+				if err = mq.PublishSingleHostPeerUpdate(&host, allNodes, nil, clients, false, nil); err != nil {
 					slog.Error("failed to publish peer update to ingress after ACL update on network", "network", netname, "host", hostId)
 				}
 			}

+ 1 - 1
controllers/node.go

@@ -592,7 +592,7 @@ func deleteIngressGateway(w http.ResponseWriter, r *http.Request) {
 				return
 			}
 			go func() {
-				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false); err != nil {
+				if err := mq.PublishSingleHostPeerUpdate(host, allNodes, nil, removedClients[:], false, nil); err != nil {
 					slog.Error("publishSingleHostUpdate", "host", host.Name, "error", err)
 				}
 				if err := mq.NodeUpdate(&node); err != nil {

+ 2 - 2
mq/handlers.go

@@ -65,7 +65,7 @@ func UpdateNode(client mqtt.Client, msg mqtt.Message) {
 			}
 			allNodes, err := logic.GetAllNodes()
 			if err == nil {
-				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false)
+				PublishSingleHostPeerUpdate(host, allNodes, nil, nil, false, nil)
 			}
 		} else {
 			err = PublishPeerUpdate(false)
@@ -117,7 +117,7 @@ func UpdateHost(client mqtt.Client, msg mqtt.Message) {
 				if err != nil {
 					return
 				}
-				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false); err != nil {
+				if err = PublishSingleHostPeerUpdate(currentHost, nodes, nil, nil, false, nil); err != nil {
 					slog.Error("failed peers publish after join acknowledged", "name", hostUpdate.Host.Name, "id", currentHost.ID, "error", err)
 					return
 				}

+ 2 - 1
mq/mq.go

@@ -93,6 +93,7 @@ func SetupMQTT(fatal bool) {
 	opts.SetConnectionLostHandler(func(c mqtt.Client, e error) {
 		slog.Warn("detected broker connection lost", "err", e.Error())
 		//c.Disconnect(250)
+		mqclient = nil
 		slog.Info("re-initiating MQ connection")
 		//SetupMQTT(false)
 
@@ -134,7 +135,7 @@ func Keepalive(ctx context.Context) {
 			if mqclient == nil {
 				SetupMQTT(false)
 			}
-			ServerStatusUpdate()
+			serverStatusUpdate()
 			sendPeers()
 		}
 	}

+ 163 - 18
mq/publishers.go

@@ -4,6 +4,7 @@ import (
 	"encoding/json"
 	"errors"
 	"fmt"
+	"sync"
 	"time"
 
 	"github.com/gravitl/netmaker/database"
@@ -27,6 +28,8 @@ type ServerStatus struct {
 
 var serverStatusCache = ServerStatus{}
 
+const batchSize = 20
+
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
 	if !servercfg.IsMessageQueueBackend() {
@@ -42,14 +45,59 @@ func PublishPeerUpdate(replacePeers bool) error {
 	if err != nil {
 		return err
 	}
-	for _, host := range hosts {
-		host := host
-		go func(host models.Host) {
-			if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
-				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+
+	var wg sync.WaitGroup
+	hostLen := len(hosts)
+	batch := batchSize
+	div := hostLen / batch
+	mod := hostLen % batch
+
+	if div == 0 {
+		wg.Add(hostLen)
+		for i := 0; i < hostLen; i++ {
+			host := hosts[i]
+			go func(host models.Host) {
+				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
+					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+				}
+			}(host)
+		}
+		wg.Wait()
+	} else {
+		for i := 0; i < div*batch; i += batch {
+			wg.Add(batch)
+			for j := 0; j < batch; j++ {
+				host := hosts[i+j]
+				go func(host models.Host) {
+					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
+						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+					}
+				}(host)
 			}
-		}(host)
+			wg.Wait()
+		}
+		if mod != 0 {
+			wg.Add(hostLen - (div * batch))
+			for k := div * batch; k < hostLen; k++ {
+				host := hosts[k]
+				go func(host models.Host) {
+					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
+						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+					}
+				}(host)
+			}
+			wg.Wait()
+		}
 	}
+
+	// for _, host := range hosts {
+	// 	host := host
+	// 	go func(host models.Host) {
+	// 		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
+	// 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+	// 		}
+	// 	}(host)
+	// }
 	return err
 }
 
@@ -69,12 +117,56 @@ func PublishDeletedNodePeerUpdate(delNode *models.Node) error {
 	if err != nil {
 		return err
 	}
-	for _, host := range hosts {
-		host := host
-		if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false); err != nil {
-			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+
+	var wg sync.WaitGroup
+	hostLen := len(hosts)
+	batch := batchSize
+	div := hostLen / batch
+	mod := hostLen % batch
+
+	if div == 0 {
+		wg.Add(hostLen)
+		for i := 0; i < hostLen; i++ {
+			host := hosts[i]
+			go func(host models.Host) {
+				if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, &wg); err != nil {
+					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+				}
+			}(host)
+		}
+		wg.Wait()
+	} else {
+		for i := 0; i < div*batch; i += batch {
+			wg.Add(batch)
+			for j := 0; j < batch; j++ {
+				host := hosts[i+j]
+				go func(host models.Host) {
+					if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, &wg); err != nil {
+						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+					}
+				}(host)
+			}
+			wg.Wait()
+		}
+		if mod != 0 {
+			wg.Add(hostLen - (div * batch))
+			for k := div * batch; k < hostLen; k++ {
+				host := hosts[k]
+				go func(host models.Host) {
+					if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, &wg); err != nil {
+						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+					}
+				}(host)
+			}
+			wg.Wait()
 		}
 	}
+	// for _, host := range hosts {
+	// 	host := host
+	// 	if err = PublishSingleHostPeerUpdate(&host, allNodes, delNode, nil, false, nil); err != nil {
+	// 		logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+	// 	}
+	// }
 	return err
 }
 
@@ -94,20 +186,73 @@ func PublishDeletedClientPeerUpdate(delClient *models.ExtClient) error {
 	if err != nil {
 		return err
 	}
-	for _, host := range hosts {
-		host := host
-		if host.OS != models.OS_Types.IoT {
-			if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false); err != nil {
-				logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+
+	var wg sync.WaitGroup
+	hostLen := len(hosts)
+	batch := batchSize
+	div := hostLen / batch
+	mod := hostLen % batch
+
+	if div == 0 {
+		wg.Add(hostLen)
+		for i := 0; i < hostLen; i++ {
+			host := hosts[i]
+			go func(host models.Host) {
+				if host.OS != models.OS_Types.IoT {
+					if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, &wg); err != nil {
+						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+					}
+				}
+			}(host)
+		}
+		wg.Wait()
+	} else {
+		for i := 0; i < div*batch; i += batch {
+			wg.Add(batch)
+			for j := 0; j < batch; j++ {
+				host := hosts[i+j]
+				go func(host models.Host) {
+					if host.OS != models.OS_Types.IoT {
+						if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, &wg); err != nil {
+							logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+						}
+					}
+				}(host)
 			}
+			wg.Wait()
+		}
+		if mod != 0 {
+			wg.Add(hostLen - (div * batch))
+			for k := div * batch; k < hostLen; k++ {
+				host := hosts[k]
+				go func(host models.Host) {
+					if host.OS != models.OS_Types.IoT {
+						if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, &wg); err != nil {
+							logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+						}
+					}
+				}(host)
+			}
+			wg.Wait()
 		}
 	}
+
+	// for _, host := range hosts {
+	// 	host := host
+	// 	if host.OS != models.OS_Types.IoT {
+	// 		if err = PublishSingleHostPeerUpdate(&host, nodes, nil, []models.ExtClient{*delClient}, false, nil); err != nil {
+	// 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+	// 		}
+	// 	}
+	// }
 	return err
 }
 
 // PublishSingleHostPeerUpdate --- determines and publishes a peer update to one host
-func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool) error {
-
+func PublishSingleHostPeerUpdate(host *models.Host, allNodes []models.Node, deletedNode *models.Node, deletedClients []models.ExtClient, replacePeers bool, wg *sync.WaitGroup) error {
+	if wg != nil {
+		defer wg.Done()
+	}
 	peerUpdate, err := logic.GetPeerUpdateForHost("", host, allNodes, deletedNode, deletedClients)
 	if err != nil {
 		return err
@@ -148,7 +293,7 @@ func NodeUpdate(node *models.Node) error {
 	return nil
 }
 
-func ServerStatusUpdate() error {
+func serverStatusUpdate() error {
 
 	licenseErr := ""
 	if servercfg.ErrLicenseValidation != nil {

+ 5 - 6
mq/util.go

@@ -8,6 +8,7 @@ import (
 	"github.com/gravitl/netmaker/logic"
 	"github.com/gravitl/netmaker/models"
 	"github.com/gravitl/netmaker/netclient/ncutils"
+	"golang.org/x/exp/slog"
 )
 
 func decryptMsgWithHost(host *models.Host, msg []byte) ([]byte, error) {
@@ -82,13 +83,11 @@ func publish(host *models.Host, dest string, msg []byte) error {
 	}
 
 	if token := mqclient.Publish(dest, 0, true, encrypted); token.Wait() && token.Error() != nil {
-		var err error
-		if token.Error() == nil {
-			err = errors.New("connection timeout")
-		} else {
-			err = token.Error()
+		slog.Error("publish to mq error", "error", token.Error().Error())
+		if strings.Contains(token.Error().Error(), "use of closed network connection") || strings.Contains(token.Error().Error(), "publish was broken by timeout") {
+			mqclient = nil
 		}
-		return err
+		return token.Error()
 	}
 	return nil
 }

+ 1 - 1
pro/controllers/relay.go

@@ -108,7 +108,7 @@ func deleteRelay(w http.ResponseWriter, r *http.Request) {
 						return
 					}
 					node.IsRelay = true // for iot update to recognise that it has to delete relay peer
-					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false); err != nil {
+					if err = mq.PublishSingleHostPeerUpdate(h, nodes, &node, nil, false, nil); err != nil {
 						logger.Log(1, "failed to publish peer update to host", h.ID.String(), ": ", err.Error())
 					}
 				}

+ 1 - 1
pro/remote_access_client.go

@@ -78,7 +78,7 @@ func disableExtClient(client *models.ExtClient) error {
 			if err != nil {
 				return err
 			}
-			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false)
+			go mq.PublishSingleHostPeerUpdate(ingressHost, nodes, nil, []models.ExtClient{*client}, false, nil)
 		} else {
 			return err
 		}