123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310 |
- package dashboard
- import (
- "runtime"
- "sync"
- "time"
- log "github.com/Sirupsen/logrus"
- )
- const (
- // Number of entries to show in top N charts
- topClientsSize = 5
- // Redux action type names
- initMessageType = "INIT"
- tickMessageType = "TICK"
- )
- var (
- tickInterval = time.Second * 5
- maxWindow = time.Hour * 24
- rankingUpdateInterval = time.Hour * 6
- uniqueHeloRatioMax = 0.8
- maxTicks = int(maxWindow / tickInterval)
- nRankingBuffers = int(maxWindow / rankingUpdateInterval)
- LogHook = logHook(1)
- store = newDataStore()
- )
- // Keeps track of connection data that is buffered in the topClients
- // so the data can be removed after `maxWindow` interval has occurred.
- type conn struct {
- helo, domain, ip string
- }
- type dataStore struct {
- lock sync.Mutex
- // List of samples of RAM usage
- ramTicks []point
- // List of samples of number of connected clients
- nClientTicks []point
- // Up-to-date number of clients
- nClients uint64
- // Total number of clients in the current aggregation buffer
- nClientsInBuffer uint64
- topDomain bufferedRanking
- topHelo bufferedRanking
- topIP bufferedRanking
- // For notifying the store about new connections
- newConns chan conn
- subs map[string]chan<- *message
- }
- func newDataStore() *dataStore {
- newConns := make(chan conn, 64)
- subs := make(map[string]chan<- *message)
- ds := &dataStore{
- ramTicks: make([]point, 0, maxTicks),
- nClientTicks: make([]point, 0, maxTicks),
- topDomain: newBufferedRanking(nRankingBuffers),
- topHelo: newBufferedRanking(nRankingBuffers),
- topIP: newBufferedRanking(nRankingBuffers),
- newConns: newConns,
- subs: subs,
- }
- return ds
- }
- // Keeps track of top domain/helo/ip rankings, but buffered into multiple
- // maps so that old records can be efficiently kept track of and quickly removed
- type bufferedRanking []map[string]int
- func newBufferedRanking(nBuffers int) bufferedRanking {
- br := make([]map[string]int, nBuffers)
- for i := 0; i < nBuffers; i++ {
- br[i] = make(map[string]int)
- }
- return br
- }
- // Manages the list of top clients by domain, helo, and IP by updating buffered
- // record maps. At each `rankingUpdateInterval` we shift the maps and remove the
- // oldest, so rankings are always at most as old as `maxWindow`
- func (ds *dataStore) rankingManager() {
- ticker := time.NewTicker(rankingUpdateInterval)
- for {
- select {
- case c := <-ds.newConns:
- nHelos := len(ds.topHelo)
- if nHelos > 5 &&
- float64(nHelos)/float64(ds.nClientsInBuffer) > uniqueHeloRatioMax {
- // If too many unique HELO messages are detected as a ratio to the total
- // number of clients, quit collecting data until we roll over into the next
- // aggregation buffer.
- continue
- }
- ds.lock.Lock()
- ds.nClientsInBuffer++
- ds.topDomain[0][c.domain]++
- ds.topHelo[0][c.helo]++
- ds.topIP[0][c.ip]++
- ds.lock.Unlock()
- case <-ticker.C:
- ds.lock.Lock()
- // Add empty map at index 0 and shift other maps one down
- ds.nClientsInBuffer = 0
- ds.topDomain = append(
- []map[string]int{map[string]int{}},
- ds.topDomain[:len(ds.topDomain)-1]...)
- ds.topHelo = append(
- []map[string]int{map[string]int{}},
- ds.topHelo[:len(ds.topHelo)-1]...)
- ds.topIP = append(
- []map[string]int{map[string]int{}},
- ds.topHelo[:len(ds.topIP)-1]...)
- ds.lock.Unlock()
- case <-stopRankingManager:
- return
- }
- }
- }
- // Aggregates the rankings from the ranking buffer into a single map
- // for each of domain, helo, ip. This is what we send to the frontend.
- func (ds *dataStore) aggregateRankings() ranking {
- topDomain := make(map[string]int, len(ds.topDomain[0]))
- topHelo := make(map[string]int, len(ds.topHelo[0]))
- topIP := make(map[string]int, len(ds.topIP[0]))
- for i := 0; i < nRankingBuffers; i++ {
- for domain, count := range ds.topDomain[i] {
- topDomain[domain] += count
- }
- for helo, count := range ds.topHelo[i] {
- topHelo[helo] += count
- }
- for ip, count := range ds.topIP[i] {
- topIP[ip] += count
- }
- }
- return ranking{
- TopDomain: topDomain,
- TopHelo: topHelo,
- TopIP: topIP,
- }
- }
- // Adds a new ram point, removing old points if necessary
- func (ds *dataStore) addRAMPoint(p point) {
- if len(ds.ramTicks) == int(maxTicks) {
- ds.ramTicks = append(ds.ramTicks[1:], p)
- } else {
- ds.ramTicks = append(ds.ramTicks, p)
- }
- }
- // Adds a new nClients point, removing old points if necessary
- func (ds *dataStore) addNClientPoint(p point) {
- if len(ds.nClientTicks) == int(maxTicks) {
- ds.nClientTicks = append(ds.nClientTicks[1:], p)
- } else {
- ds.nClientTicks = append(ds.nClientTicks, p)
- }
- }
- func (ds *dataStore) subscribe(id string, c chan<- *message) {
- ds.subs[id] = c
- }
- func (ds *dataStore) unsubscribe(id string) {
- delete(ds.subs, id)
- }
- func (ds *dataStore) notify(m *message) {
- // Prevent concurrent read/write to maps in the store
- ds.lock.Lock()
- defer ds.lock.Unlock()
- for _, c := range ds.subs {
- select {
- case c <- m:
- default:
- }
- }
- }
- // Initiates a session with all historic data in the store
- func (ds *dataStore) initSession(sess *session) {
- store.subs[sess.id] <- &message{initMessageType, initFrame{
- Ram: store.ramTicks,
- NClients: store.nClientTicks,
- }}
- }
- type point struct {
- X time.Time `json:"x"`
- Y uint64 `json:"y"`
- }
- // Measures RAM and number of connected clients and sends a tick
- // message to all connected clients on the given interval
- func dataListener(interval time.Duration) {
- ticker := time.Tick(interval)
- memStats := &runtime.MemStats{}
- for {
- select {
- case t := <-ticker:
- runtime.ReadMemStats(memStats)
- ramPoint := point{t, memStats.Alloc}
- nClientPoint := point{t, store.nClients}
- log.WithFields(map[string]interface{}{
- "ram": ramPoint.Y,
- "clients": nClientPoint.Y,
- }).Info("Logging analytics data")
- store.addRAMPoint(ramPoint)
- store.addNClientPoint(nClientPoint)
- store.notify(&message{tickMessageType, dataFrame{
- Ram: ramPoint,
- NClients: nClientPoint,
- ranking: store.aggregateRankings(),
- }})
- case <-stopDataListener:
- return
- }
- }
- }
- // Keeps track of top clients by helo, ip, and domain
- type ranking struct {
- TopHelo map[string]int `json:"topHelo"`
- TopIP map[string]int `json:"topIP"`
- TopDomain map[string]int `json:"topDomain"`
- }
- type dataFrame struct {
- Ram point `json:"ram"`
- NClients point `json:"nClients"`
- ranking
- }
- type initFrame struct {
- Ram []point `json:"ram"`
- NClients []point `json:"nClients"`
- ranking
- }
- // Format of messages to be sent over WebSocket
- type message struct {
- Type string `json:"type"`
- Payload interface{} `json:"payload"`
- }
- type logHook int
- func (h logHook) Levels() []log.Level {
- return log.AllLevels
- }
- // Checks fired logs for information that is relevant to the dashboard
- func (h logHook) Fire(e *log.Entry) error {
- event, ok := e.Data["event"].(string)
- if !ok {
- return nil
- }
- var helo, ip, domain string
- if event == "mailfrom" {
- helo, ok = e.Data["helo"].(string)
- if !ok {
- return nil
- }
- if len(helo) > 16 {
- helo = helo[:16]
- }
- ip, ok = e.Data["address"].(string)
- if !ok {
- return nil
- }
- domain, ok = e.Data["domain"].(string)
- if !ok {
- return nil
- }
- }
- switch event {
- case "connect":
- store.lock.Lock()
- store.nClients++
- store.lock.Unlock()
- case "mailfrom":
- store.newConns <- conn{
- domain: domain,
- helo: helo,
- ip: ip,
- }
- case "disconnect":
- store.lock.Lock()
- store.nClients--
- store.lock.Unlock()
- }
- return nil
- }
|