소스 검색

add batch peerUpdate switch

Max Ma 1 년 전
부모
커밋
e92af2af4a
2개의 변경된 파일25개의 추가작업 그리고 6개의 파일을 삭제
  1. 16 6
      mq/publishers.go
  2. 9 0
      servercfg/serverconf.go

+ 16 - 6
mq/publishers.go

@@ -14,7 +14,8 @@ import (
 	"golang.org/x/exp/slog"
 )
 
-var batch int
+var batchSize = servercfg.GetPeerUpdateBatchSize()
+var batchUpdate = servercfg.GetBatchPeerUpdate()
 
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
@@ -32,11 +33,21 @@ func PublishPeerUpdate(replacePeers bool) error {
 		return err
 	}
 
-	if batch == 0 {
-		batch = servercfg.GetPeerUpdateBatchSize()
+	//if batch peer update disabled
+	if !batchUpdate {
+		for _, host := range hosts {
+			host := host
+			go func(host models.Host) {
+				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
+					logger.Log(1, "failed to publish peer update to host", host.ID.String(), ": ", err.Error())
+				}
+			}(host)
+		}
+		return nil
 	}
-	batchHost := BatchItems(hosts, batch)
 
+	//if batch peer update enabled
+	batchHost := BatchItems(hosts, batchSize)
 	var wg sync.WaitGroup
 	for _, v := range batchHost {
 		hostLen := len(v)
@@ -51,8 +62,7 @@ func PublishPeerUpdate(replacePeers bool) error {
 		}
 		wg.Wait()
 	}
-
-	return err
+	return nil
 }
 
 // PublishDeletedNodePeerUpdate --- determines and publishes a peer update

+ 9 - 0
servercfg/serverconf.go

@@ -597,6 +597,15 @@ func GetMetricInterval() string {
 	return mi
 }
 
+// GetBatchPeerUpdate - if batch peer update
+func GetBatchPeerUpdate() bool {
+	enabled := false
+	if os.Getenv("PEER_UPDATE_BATCH") != "" {
+		enabled = os.Getenv("PEER_UPDATE_BATCH") == "true"
+	}
+	return enabled
+}
+
 // GetPeerUpdateBatchSize - get the batch size for peer update
 func GetPeerUpdateBatchSize() int {
 	//default 50