Browse Source

fix race condition when incrementing number of clients

flashmob 6 years ago
parent
commit
b314384533
2 changed files with 8 additions and 11 deletions
  1. 6 5
      dashboard/datastore.go
  2. 2 6
      dashboard/hook.go

+ 6 - 5
dashboard/datastore.go

@@ -1,6 +1,7 @@
 package dashboard
 package dashboard
 
 
 import (
 import (
+	"go.uber.org/atomic"
 	"runtime"
 	"runtime"
 	"sync"
 	"sync"
 	"time"
 	"time"
@@ -38,7 +39,7 @@ type dataStore struct {
 	// List of samples of number of connected clients
 	// List of samples of number of connected clients
 	nClientTicks []point
 	nClientTicks []point
 	// Up-to-date number of clients
 	// Up-to-date number of clients
-	nClients uint64
+	nClients atomic.Uint64
 	// Total number of clients in the current aggregation buffer
 	// Total number of clients in the current aggregation buffer
 	nClientsInBuffer uint64
 	nClientsInBuffer uint64
 	topDomain        bufferedRanking
 	topDomain        bufferedRanking
@@ -125,11 +126,13 @@ func (ds *dataStore) rankingManager() {
 // Aggregates the rankings from the ranking buffer into a single map
 // Aggregates the rankings from the ranking buffer into a single map
 // for each of domain, helo, ip. This is what we send to the frontend.
 // for each of domain, helo, ip. This is what we send to the frontend.
 func (ds *dataStore) aggregateRankings() ranking {
 func (ds *dataStore) aggregateRankings() ranking {
+	ds.lock.Lock()
+	defer ds.lock.Unlock()
+
 	topDomain := make(map[string]int, len(ds.topDomain[0]))
 	topDomain := make(map[string]int, len(ds.topDomain[0]))
 	topHelo := make(map[string]int, len(ds.topHelo[0]))
 	topHelo := make(map[string]int, len(ds.topHelo[0]))
 	topIP := make(map[string]int, len(ds.topIP[0]))
 	topIP := make(map[string]int, len(ds.topIP[0]))
 
 
-	ds.lock.Lock()
 	// Aggregate buffers
 	// Aggregate buffers
 	for i := 0; i < nRankingBuffers; i++ {
 	for i := 0; i < nRankingBuffers; i++ {
 		if len(ds.topDomain) > i {
 		if len(ds.topDomain) > i {
@@ -148,7 +151,6 @@ func (ds *dataStore) aggregateRankings() ranking {
 			}
 			}
 		}
 		}
 	}
 	}
-	ds.lock.Unlock()
 
 
 	return ranking{
 	return ranking{
 		TopDomain: topDomain,
 		TopDomain: topDomain,
@@ -219,7 +221,7 @@ func dataListener(interval time.Duration) {
 		case t := <-ticker:
 		case t := <-ticker:
 			runtime.ReadMemStats(memStats)
 			runtime.ReadMemStats(memStats)
 			ramPoint := point{t, memStats.Alloc}
 			ramPoint := point{t, memStats.Alloc}
-			nClientPoint := point{t, store.nClients}
+			nClientPoint := point{t, store.nClients.Load()}
 			mainlog().WithFields(map[string]interface{}{
 			mainlog().WithFields(map[string]interface{}{
 				"ram":     ramPoint.Y,
 				"ram":     ramPoint.Y,
 				"clients": nClientPoint.Y,
 				"clients": nClientPoint.Y,
@@ -235,7 +237,6 @@ func dataListener(interval time.Duration) {
 		case <-stopDataListener:
 		case <-stopDataListener:
 			return
 			return
 		}
 		}
-
 	}
 	}
 }
 }
 
 

+ 2 - 6
dashboard/hook.go

@@ -38,9 +38,7 @@ func (h logHook) Fire(e *log.Entry) error {
 
 
 	switch event {
 	switch event {
 	case "connect":
 	case "connect":
-		store.lock.Lock()
-		store.nClients++
-		store.lock.Unlock()
+		store.nClients.Add(1)
 	case "mailfrom":
 	case "mailfrom":
 		store.newConns <- conn{
 		store.newConns <- conn{
 			domain: domain,
 			domain: domain,
@@ -48,9 +46,7 @@ func (h logHook) Fire(e *log.Entry) error {
 			ip:     ip,
 			ip:     ip,
 		}
 		}
 	case "disconnect":
 	case "disconnect":
-		store.lock.Lock()
-		store.nClients--
-		store.lock.Unlock()
+		store.nClients.Sub(1)
 	}
 	}
 	return nil
 	return nil
 }
 }