|
@@ -17,7 +17,8 @@ const (
|
|
// Number of entries to show in top N charts
|
|
// Number of entries to show in top N charts
|
|
topClientsSize = 5
|
|
topClientsSize = 5
|
|
// Frequency at which we update top client rankings
|
|
// Frequency at which we update top client rankings
|
|
- topClientsUpdateInterval = time.Minute * 5
|
|
|
|
|
|
+ rankingUpdateInterval = time.Hour * 6
|
|
|
|
+ nRankingBuffers = int(maxWindow / rankingUpdateInterval)
|
|
// Redux action type names
|
|
// Redux action type names
|
|
initMessageType = "INIT"
|
|
initMessageType = "INIT"
|
|
tickMessageType = "TICK"
|
|
tickMessageType = "TICK"
|
|
@@ -32,7 +33,6 @@ var (
|
|
// Keeps track of connection data that is buffered in the topClients
|
|
// Keeps track of connection data that is buffered in the topClients
|
|
// so the data can be removed after `maxWindow` interval has occurred.
|
|
// so the data can be removed after `maxWindow` interval has occurred.
|
|
type conn struct {
|
|
type conn struct {
|
|
- addedTime int64
|
|
|
|
helo, domain, ip string
|
|
helo, domain, ip string
|
|
}
|
|
}
|
|
|
|
|
|
@@ -43,8 +43,10 @@ type dataStore struct {
|
|
// List of samples of number of connected clients
|
|
// List of samples of number of connected clients
|
|
nClientTicks []point
|
|
nClientTicks []point
|
|
// Up-to-date number of clients
|
|
// Up-to-date number of clients
|
|
- nClients uint64
|
|
|
|
- topClients
|
|
|
|
|
|
+ nClients uint64
|
|
|
|
+ topDomain bufferedRanking
|
|
|
|
+ topHelo bufferedRanking
|
|
|
|
+ topIP bufferedRanking
|
|
// For notifying the store about new connections
|
|
// For notifying the store about new connections
|
|
newConns chan conn
|
|
newConns chan conn
|
|
subs map[string]chan<- *message
|
|
subs map[string]chan<- *message
|
|
@@ -56,56 +58,83 @@ func newDataStore() *dataStore {
|
|
ds := &dataStore{
|
|
ds := &dataStore{
|
|
ramTicks: make([]point, 0, maxTicks),
|
|
ramTicks: make([]point, 0, maxTicks),
|
|
nClientTicks: make([]point, 0, maxTicks),
|
|
nClientTicks: make([]point, 0, maxTicks),
|
|
- topClients: topClients{
|
|
|
|
- TopDomain: make(map[string]int),
|
|
|
|
- TopHelo: make(map[string]int),
|
|
|
|
- TopIP: make(map[string]int),
|
|
|
|
- },
|
|
|
|
- newConns: newConns,
|
|
|
|
- subs: subs,
|
|
|
|
|
|
+ topDomain: newBufferedRanking(nRankingBuffers),
|
|
|
|
+ topHelo: newBufferedRanking(nRankingBuffers),
|
|
|
|
+ topIP: newBufferedRanking(nRankingBuffers),
|
|
|
|
+ newConns: newConns,
|
|
|
|
+ subs: subs,
|
|
}
|
|
}
|
|
- go ds.topClientsManager()
|
|
|
|
|
|
+ go ds.rankingManager()
|
|
return ds
|
|
return ds
|
|
}
|
|
}
|
|
|
|
|
|
-// Manages the list of top clients by domain, helo, and IP by incrementing
|
|
|
|
-// records upon a new connection and scheduling a decrement after the `maxWindow`
|
|
|
|
-// interval has passed.
|
|
|
|
-func (ds *dataStore) topClientsManager() {
|
|
|
|
- bufferedConns := []conn{}
|
|
|
|
- ticker := time.NewTicker(topClientsUpdateInterval)
|
|
|
|
|
|
+// Keeps track of top domain/helo/ip rankings, but buffered into multiple
|
|
|
|
+// maps so that old records can be efficiently kept track of and quickly removed
|
|
|
|
+type bufferedRanking []map[string]int
|
|
|
|
+
|
|
|
|
+func newBufferedRanking(nBuffers int) bufferedRanking {
|
|
|
|
+ br := make([]map[string]int, nBuffers)
|
|
|
|
+ for i := 0; i < nBuffers; i++ {
|
|
|
|
+ br[i] = make(map[string]int)
|
|
|
|
+ }
|
|
|
|
+ return br
|
|
|
|
+}
|
|
|
|
+
|
|
|
|
+// Manages the list of top clients by domain, helo, and IP by updating buffered
|
|
|
|
+// record maps. At each `rankingUpdateInterval` we shift the maps and remove the
|
|
|
|
+// oldest, so rankings are always at most as old as `maxWindow`
|
|
|
|
+func (ds *dataStore) rankingManager() {
|
|
|
|
+ ticker := time.NewTicker(rankingUpdateInterval)
|
|
|
|
|
|
for {
|
|
for {
|
|
select {
|
|
select {
|
|
case c := <-ds.newConns:
|
|
case c := <-ds.newConns:
|
|
- bufferedConns = append(bufferedConns, c)
|
|
|
|
-
|
|
|
|
ds.lock.Lock()
|
|
ds.lock.Lock()
|
|
- ds.TopDomain[c.domain]++
|
|
|
|
- ds.TopHelo[c.helo]++
|
|
|
|
- ds.TopIP[c.ip]++
|
|
|
|
|
|
+ ds.topDomain[0][c.domain]++
|
|
|
|
+ ds.topHelo[0][c.helo]++
|
|
|
|
+ ds.topIP[0][c.ip]++
|
|
ds.lock.Unlock()
|
|
ds.lock.Unlock()
|
|
|
|
|
|
case <-ticker.C:
|
|
case <-ticker.C:
|
|
- cutoff := time.Now().Add(-maxWindow).Unix()
|
|
|
|
- cutoffI := 0
|
|
|
|
-
|
|
|
|
ds.lock.Lock()
|
|
ds.lock.Lock()
|
|
- for i, bc := range bufferedConns {
|
|
|
|
- // We make an assumption here that conns come in in-order, which probably
|
|
|
|
- // isn't exactly true, but close enough to not make much of a difference
|
|
|
|
- if bc.addedTime > cutoff {
|
|
|
|
- cutoffI = i
|
|
|
|
- break
|
|
|
|
- }
|
|
|
|
- ds.TopDomain[bc.domain]--
|
|
|
|
- ds.TopHelo[bc.helo]--
|
|
|
|
- ds.TopIP[bc.ip]--
|
|
|
|
- }
|
|
|
|
|
|
+ // Add empty map at index 0 and shift other maps one down
|
|
|
|
+ ds.topDomain = append(
|
|
|
|
+ []map[string]int{map[string]int{}},
|
|
|
|
+ ds.topDomain[:len(ds.topDomain)-1]...)
|
|
|
|
+ ds.topHelo = append(
|
|
|
|
+ []map[string]int{map[string]int{}},
|
|
|
|
+ ds.topHelo[:len(ds.topHelo)-1]...)
|
|
|
|
+ ds.topIP = append(
|
|
|
|
+ []map[string]int{map[string]int{}},
|
|
|
|
+ ds.topHelo[:len(ds.topIP)-1]...)
|
|
ds.lock.Unlock()
|
|
ds.lock.Unlock()
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+}
|
|
|
|
|
|
- bufferedConns = bufferedConns[cutoffI:]
|
|
|
|
|
|
+// Aggregates the rankings from the ranking buffer into a single map
|
|
|
|
+// for each of domain, helo, ip. This is what we send to the frontend.
|
|
|
|
+func (ds *dataStore) aggregateRankings() ranking {
|
|
|
|
+ topDomain := make(map[string]int, len(ds.topDomain[0]))
|
|
|
|
+ topHelo := make(map[string]int, len(ds.topHelo[0]))
|
|
|
|
+ topIP := make(map[string]int, len(ds.topIP[0]))
|
|
|
|
+
|
|
|
|
+ for i := 0; i < nRankingBuffers; i++ {
|
|
|
|
+ for domain, count := range ds.topDomain[i] {
|
|
|
|
+ topDomain[domain] += count
|
|
|
|
+ }
|
|
|
|
+ for helo, count := range ds.topHelo[i] {
|
|
|
|
+ topHelo[helo] += count
|
|
}
|
|
}
|
|
|
|
+ for ip, count := range ds.topIP[i] {
|
|
|
|
+ topIP[ip] += count
|
|
|
|
+ }
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ return ranking{
|
|
|
|
+ TopDomain: topDomain,
|
|
|
|
+ TopHelo: topHelo,
|
|
|
|
+ TopIP: topIP,
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
@@ -179,15 +208,15 @@ func dataListener(interval time.Duration) {
|
|
store.addRAMPoint(ramPoint)
|
|
store.addRAMPoint(ramPoint)
|
|
store.addNClientPoint(nClientPoint)
|
|
store.addNClientPoint(nClientPoint)
|
|
store.notify(&message{tickMessageType, dataFrame{
|
|
store.notify(&message{tickMessageType, dataFrame{
|
|
- Ram: ramPoint,
|
|
|
|
- NClients: nClientPoint,
|
|
|
|
- topClients: store.topClients,
|
|
|
|
|
|
+ Ram: ramPoint,
|
|
|
|
+ NClients: nClientPoint,
|
|
|
|
+ ranking: store.aggregateRankings(),
|
|
}})
|
|
}})
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
|
|
// Keeps track of top clients by helo, ip, and domain
|
|
// Keeps track of top clients by helo, ip, and domain
|
|
-type topClients struct {
|
|
|
|
|
|
+type ranking struct {
|
|
TopHelo map[string]int `json:"topHelo"`
|
|
TopHelo map[string]int `json:"topHelo"`
|
|
TopIP map[string]int `json:"topIP"`
|
|
TopIP map[string]int `json:"topIP"`
|
|
TopDomain map[string]int `json:"topDomain"`
|
|
TopDomain map[string]int `json:"topDomain"`
|
|
@@ -196,13 +225,13 @@ type topClients struct {
|
|
type dataFrame struct {
|
|
type dataFrame struct {
|
|
Ram point `json:"ram"`
|
|
Ram point `json:"ram"`
|
|
NClients point `json:"nClients"`
|
|
NClients point `json:"nClients"`
|
|
- topClients
|
|
|
|
|
|
+ ranking
|
|
}
|
|
}
|
|
|
|
|
|
type initFrame struct {
|
|
type initFrame struct {
|
|
Ram []point `json:"ram"`
|
|
Ram []point `json:"ram"`
|
|
NClients []point `json:"nClients"`
|
|
NClients []point `json:"nClients"`
|
|
- topClients
|
|
|
|
|
|
+ ranking
|
|
}
|
|
}
|
|
|
|
|
|
// Format of messages to be sent over WebSocket
|
|
// Format of messages to be sent over WebSocket
|
|
@@ -247,10 +276,9 @@ func (h logHook) Fire(e *log.Entry) error {
|
|
store.lock.Unlock()
|
|
store.lock.Unlock()
|
|
case "mailfrom":
|
|
case "mailfrom":
|
|
store.newConns <- conn{
|
|
store.newConns <- conn{
|
|
- addedTime: time.Now().Unix(),
|
|
|
|
- domain: domain,
|
|
|
|
- helo: helo,
|
|
|
|
- ip: ip,
|
|
|
|
|
|
+ domain: domain,
|
|
|
|
+ helo: helo,
|
|
|
|
+ ip: ip,
|
|
}
|
|
}
|
|
case "disconnect":
|
|
case "disconnect":
|
|
store.lock.Lock()
|
|
store.lock.Lock()
|