Răsfoiți Sursa

update batch peerUpdate

Max Ma 1 an în urmă
părinte
comite
cbb54e35be
2 a modificat fișierele cu 25 adăugiri și 38 ștergeri
  1. 4 38
      mq/publishers.go
  2. 21 0
      mq/util.go

+ 4 - 38
mq/publishers.go

@@ -32,15 +32,14 @@ func PublishPeerUpdate(replacePeers bool) error {
 		return err
 	}
 
-	var wg sync.WaitGroup
-	hostLen := len(hosts)
 	if batch == 0 {
 		batch = servercfg.GetPeerUpdateBatchSize()
 	}
-	div := hostLen / batch
-	mod := hostLen % batch
+	batchHost := BatchItems(hosts, batch)
 
-	if div == 0 {
+	var wg sync.WaitGroup
+	for _, v := range batchHost {
+		hostLen := len(v)
 		wg.Add(hostLen)
 		for i := 0; i < hostLen; i++ {
 			host := hosts[i]
@@ -51,41 +50,8 @@ func PublishPeerUpdate(replacePeers bool) error {
 			}(host)
 		}
 		wg.Wait()
-	} else {
-		for i := 0; i < div*batch; i += batch {
-			wg.Add(batch)
-			for j := 0; j < batch; j++ {
-				host := hosts[i+j]
-				go func(host models.Host) {
-					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
-						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-					}
-				}(host)
-			}
-			wg.Wait()
-		}
-		if mod != 0 {
-			wg.Add(hostLen - (div * batch))
-			for k := div * batch; k < hostLen; k++ {
-				host := hosts[k]
-				go func(host models.Host) {
-					if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
-						logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-					}
-				}(host)
-			}
-			wg.Wait()
-		}
 	}
 
-	// for _, host := range hosts {
-	// 	host := host
-	// 	go func(host models.Host) {
-	// 		if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers); err != nil {
-	// 			logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
-	// 		}
-	// 	}(host)
-	// }
 	return err
 }
 

+ 21 - 0
mq/util.go

@@ -3,6 +3,7 @@ package mq
 import (
 	"errors"
 	"fmt"
+	"math"
 	"strings"
 	"time"
 
@@ -45,6 +46,26 @@ func DecryptMsg(node *models.Node, msg []byte) ([]byte, error) {
 	return decryptMsgWithHost(host, msg)
 }
 
+func BatchItems[T any](items []T, batchSize int) [][]T {
+	if batchSize <= 0 {
+		return nil
+	}
+	remainderBatchSize := len(items) % batchSize
+	nBatches := int(math.Ceil(float64(len(items)) / float64(batchSize)))
+	batches := make([][]T, nBatches)
+	for i := range batches {
+		if i == nBatches-1 && remainderBatchSize > 0 {
+			batches[i] = make([]T, remainderBatchSize)
+		} else {
+			batches[i] = make([]T, batchSize)
+		}
+		for j := range batches[i] {
+			batches[i][j] = items[i*batchSize+j]
+		}
+	}
+	return batches
+}
+
 func encryptMsg(host *models.Host, msg []byte) ([]byte, error) {
 	if host.OS == models.OS_Types.IoT {
 		return msg, nil