Przeglądaj źródła

set peerUpdate run in sequence

Max Ma 9 miesięcy temu
rodzic
commit
590479f514
1 zmienionych plików z 9 dodań i 4 usunięć
  1. 9 4
      mq/publishers.go

+ 9 - 4
mq/publishers.go

@@ -19,16 +19,16 @@ import (
 var batchSize = servercfg.GetPeerUpdateBatchSize()
 var batchUpdate = servercfg.GetBatchPeerUpdate()
 
-var peerUpdateTS = time.Now().Unix()
+var running bool
 
 // PublishPeerUpdate --- determines and publishes a peer update to all the hosts
 func PublishPeerUpdate(replacePeers bool) error {
 	slog.Error("entering PublishPeerUpdate", "Debug")
-	t1 := time.Now().Unix()
-	if time.Now().Unix()-peerUpdateTS < 60 {
+	if running {
 		return nil
 	}
-	peerUpdateTS = time.Now().Unix()
+	running = true
+	t1 := time.Now().Unix()
 
 	pc, file, no, ok := runtime.Caller(1)
 	if ok {
@@ -60,6 +60,7 @@ func PublishPeerUpdate(replacePeers bool) error {
 	if !batchUpdate {
 		for _, host := range hosts {
 			host := host
+			time.Sleep(30 * time.Millisecond)
 			go func(host models.Host) {
 				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, nil); err != nil {
 					id := host.Name
@@ -70,6 +71,8 @@ func PublishPeerUpdate(replacePeers bool) error {
 				}
 			}(host)
 		}
+		running = false
+		slog.Error("leaving PublishPeerUpdate, time cost: ", "Debug", time.Now().Unix()-t1)
 		return nil
 	}
 
@@ -81,6 +84,7 @@ func PublishPeerUpdate(replacePeers bool) error {
 		wg.Add(hostLen)
 		for i := 0; i < hostLen; i++ {
 			host := hosts[i]
+			//time.Sleep(1 * time.Millisecond)
 			go func(host models.Host) {
 				if err = PublishSingleHostPeerUpdate(&host, allNodes, nil, nil, replacePeers, &wg); err != nil {
 					id := host.Name
@@ -93,6 +97,7 @@ func PublishPeerUpdate(replacePeers bool) error {
 		}
 		wg.Wait()
 	}
+	running = false
 	slog.Error("leaving PublishPeerUpdate, time cost: ", "Debug", time.Now().Unix()-t1)
 	return nil
 }