datastore.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301
  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. const (
  9. // Number of entries to show in top N charts
  10. topClientsSize = 5
  11. // Redux action type names
  12. initMessageType = "INIT"
  13. tickMessageType = "TICK"
  14. )
  15. var (
  16. tickInterval = time.Second * 5
  17. maxWindow = time.Hour * 24
  18. rankingUpdateInterval = time.Hour * 6
  19. uniqueHeloRatioMax = 0.8
  20. maxTicks = int(maxWindow / tickInterval)
  21. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  22. LogHook = logHook(1)
  23. store = newDataStore()
  24. )
  25. // Keeps track of connection data that is buffered in the topClients
  26. // so the data can be removed after `maxWindow` interval has occurred.
  27. type conn struct {
  28. helo, domain, ip string
  29. }
  30. type dataStore struct {
  31. lock sync.Mutex
  32. // List of samples of RAM usage
  33. ramTicks []point
  34. // List of samples of number of connected clients
  35. nClientTicks []point
  36. // Up-to-date number of clients
  37. nClients uint64
  38. // Total number of clients in the current aggregation buffer
  39. nClientsInBuffer uint64
  40. topDomain bufferedRanking
  41. topHelo bufferedRanking
  42. topIP bufferedRanking
  43. // For notifying the store about new connections
  44. newConns chan conn
  45. subs map[string]chan<- *message
  46. }
  47. func newDataStore() *dataStore {
  48. newConns := make(chan conn, 64)
  49. subs := make(map[string]chan<- *message)
  50. ds := &dataStore{
  51. ramTicks: make([]point, 0, maxTicks),
  52. nClientTicks: make([]point, 0, maxTicks),
  53. topDomain: newBufferedRanking(nRankingBuffers),
  54. topHelo: newBufferedRanking(nRankingBuffers),
  55. topIP: newBufferedRanking(nRankingBuffers),
  56. newConns: newConns,
  57. subs: subs,
  58. }
  59. return ds
  60. }
  61. // Keeps track of top domain/helo/ip rankings, but buffered into multiple
  62. // maps so that old records can be efficiently kept track of and quickly removed
  63. type bufferedRanking []map[string]int
  64. func newBufferedRanking(nBuffers int) bufferedRanking {
  65. br := make([]map[string]int, nBuffers)
  66. for i := 0; i < nBuffers; i++ {
  67. br[i] = make(map[string]int)
  68. }
  69. return br
  70. }
  71. // Manages the list of top clients by domain, helo, and IP by updating buffered
  72. // record maps. At each `rankingUpdateInterval` we shift the maps and remove the
  73. // oldest, so rankings are always at most as old as `maxWindow`
  74. func (ds *dataStore) rankingManager() {
  75. ticker := time.NewTicker(rankingUpdateInterval)
  76. for {
  77. select {
  78. case c := <-ds.newConns:
  79. nHelos := len(ds.topHelo)
  80. if nHelos > 5 &&
  81. float64(nHelos)/float64(ds.nClientsInBuffer) > uniqueHeloRatioMax {
  82. // If too many unique HELO messages are detected as a ratio to the total
  83. // number of clients, quit collecting data until we roll over into the next
  84. // aggregation buffer.
  85. continue
  86. }
  87. ds.lock.Lock()
  88. ds.nClientsInBuffer++
  89. ds.topDomain[0][c.domain]++
  90. ds.topHelo[0][c.helo]++
  91. ds.topIP[0][c.ip]++
  92. ds.lock.Unlock()
  93. case <-ticker.C:
  94. ds.lock.Lock()
  95. // Add empty map at index 0 and shift other maps one down
  96. ds.nClientsInBuffer = 0
  97. ds.topDomain = append(
  98. []map[string]int{map[string]int{}},
  99. ds.topDomain[:len(ds.topDomain)-1]...)
  100. ds.topHelo = append(
  101. []map[string]int{map[string]int{}},
  102. ds.topHelo[:len(ds.topHelo)-1]...)
  103. ds.topIP = append(
  104. []map[string]int{map[string]int{}},
  105. ds.topHelo[:len(ds.topIP)-1]...)
  106. ds.lock.Unlock()
  107. }
  108. }
  109. }
  110. // Aggregates the rankings from the ranking buffer into a single map
  111. // for each of domain, helo, ip. This is what we send to the frontend.
  112. func (ds *dataStore) aggregateRankings() ranking {
  113. topDomain := make(map[string]int, len(ds.topDomain[0]))
  114. topHelo := make(map[string]int, len(ds.topHelo[0]))
  115. topIP := make(map[string]int, len(ds.topIP[0]))
  116. for i := 0; i < nRankingBuffers; i++ {
  117. for domain, count := range ds.topDomain[i] {
  118. topDomain[domain] += count
  119. }
  120. for helo, count := range ds.topHelo[i] {
  121. topHelo[helo] += count
  122. }
  123. for ip, count := range ds.topIP[i] {
  124. topIP[ip] += count
  125. }
  126. }
  127. return ranking{
  128. TopDomain: topDomain,
  129. TopHelo: topHelo,
  130. TopIP: topIP,
  131. }
  132. }
  133. // Adds a new ram point, removing old points if necessary
  134. func (ds *dataStore) addRAMPoint(p point) {
  135. if len(ds.ramTicks) == int(maxTicks) {
  136. ds.ramTicks = append(ds.ramTicks[1:], p)
  137. } else {
  138. ds.ramTicks = append(ds.ramTicks, p)
  139. }
  140. }
  141. // Adds a new nClients point, removing old points if necessary
  142. func (ds *dataStore) addNClientPoint(p point) {
  143. if len(ds.nClientTicks) == int(maxTicks) {
  144. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  145. } else {
  146. ds.nClientTicks = append(ds.nClientTicks, p)
  147. }
  148. }
  149. func (ds *dataStore) subscribe(id string, c chan<- *message) {
  150. ds.subs[id] = c
  151. }
  152. func (ds *dataStore) unsubscribe(id string) {
  153. delete(ds.subs, id)
  154. }
  155. func (ds *dataStore) notify(m *message) {
  156. // Prevent concurrent read/write to maps in the store
  157. ds.lock.Lock()
  158. defer ds.lock.Unlock()
  159. for _, c := range ds.subs {
  160. select {
  161. case c <- m:
  162. default:
  163. }
  164. }
  165. }
  166. // Initiates a session with all historic data in the store
  167. func (ds *dataStore) initSession(sess *session) {
  168. store.subs[sess.id] <- &message{initMessageType, initFrame{
  169. Ram: store.ramTicks,
  170. NClients: store.nClientTicks,
  171. }}
  172. }
  173. type point struct {
  174. X time.Time `json:"x"`
  175. Y uint64 `json:"y"`
  176. }
  177. // Measures RAM and number of connected clients and sends a tick
  178. // message to all connected clients on the given interval
  179. func dataListener(interval time.Duration) {
  180. ticker := time.Tick(interval)
  181. memStats := &runtime.MemStats{}
  182. for {
  183. t := <-ticker
  184. runtime.ReadMemStats(memStats)
  185. ramPoint := point{t, memStats.Alloc}
  186. nClientPoint := point{t, store.nClients}
  187. log.WithFields(map[string]interface{}{
  188. "ram": ramPoint.Y,
  189. "clients": nClientPoint.Y,
  190. }).Info("Logging analytics data")
  191. store.addRAMPoint(ramPoint)
  192. store.addNClientPoint(nClientPoint)
  193. store.notify(&message{tickMessageType, dataFrame{
  194. Ram: ramPoint,
  195. NClients: nClientPoint,
  196. ranking: store.aggregateRankings(),
  197. }})
  198. }
  199. }
  200. // Keeps track of top clients by helo, ip, and domain
  201. type ranking struct {
  202. TopHelo map[string]int `json:"topHelo"`
  203. TopIP map[string]int `json:"topIP"`
  204. TopDomain map[string]int `json:"topDomain"`
  205. }
  206. type dataFrame struct {
  207. Ram point `json:"ram"`
  208. NClients point `json:"nClients"`
  209. ranking
  210. }
  211. type initFrame struct {
  212. Ram []point `json:"ram"`
  213. NClients []point `json:"nClients"`
  214. ranking
  215. }
  216. // Format of messages to be sent over WebSocket
  217. type message struct {
  218. Type string `json:"type"`
  219. Payload interface{} `json:"payload"`
  220. }
  221. type logHook int
  222. func (h logHook) Levels() []log.Level {
  223. return log.AllLevels
  224. }
  225. // Checks fired logs for information that is relevant to the dashboard
  226. func (h logHook) Fire(e *log.Entry) error {
  227. event, ok := e.Data["event"].(string)
  228. if !ok {
  229. return nil
  230. }
  231. var helo, ip, domain string
  232. if event == "mailfrom" {
  233. helo, ok = e.Data["helo"].(string)
  234. if !ok {
  235. return nil
  236. }
  237. if len(helo) > 16 {
  238. helo = helo[:16]
  239. }
  240. ip, ok = e.Data["address"].(string)
  241. if !ok {
  242. return nil
  243. }
  244. domain, ok = e.Data["domain"].(string)
  245. if !ok {
  246. return nil
  247. }
  248. }
  249. switch event {
  250. case "connect":
  251. store.lock.Lock()
  252. store.nClients++
  253. store.lock.Unlock()
  254. case "mailfrom":
  255. store.newConns <- conn{
  256. domain: domain,
  257. helo: helo,
  258. ip: ip,
  259. }
  260. case "disconnect":
  261. store.lock.Lock()
  262. store.nClients--
  263. store.lock.Unlock()
  264. }
  265. return nil
  266. }