datastore.go 7.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310
  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. const (
  9. // Number of entries to show in top N charts
  10. topClientsSize = 5
  11. // Redux action type names
  12. initMessageType = "INIT"
  13. tickMessageType = "TICK"
  14. )
  15. var (
  16. tickInterval = time.Second * 5
  17. maxWindow = time.Hour * 24
  18. rankingUpdateInterval = time.Hour * 6
  19. uniqueHeloRatioMax = 0.8
  20. maxTicks = int(maxWindow / tickInterval)
  21. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  22. LogHook = logHook(1)
  23. store = newDataStore()
  24. )
  25. // Keeps track of connection data that is buffered in the topClients
  26. // so the data can be removed after `maxWindow` interval has occurred.
  27. type conn struct {
  28. helo, domain, ip string
  29. }
  30. type dataStore struct {
  31. lock sync.Mutex
  32. // List of samples of RAM usage
  33. ramTicks []point
  34. // List of samples of number of connected clients
  35. nClientTicks []point
  36. // Up-to-date number of clients
  37. nClients uint64
  38. // Total number of clients in the current aggregation buffer
  39. nClientsInBuffer uint64
  40. topDomain bufferedRanking
  41. topHelo bufferedRanking
  42. topIP bufferedRanking
  43. // For notifying the store about new connections
  44. newConns chan conn
  45. subs map[string]chan<- *message
  46. }
  47. func newDataStore() *dataStore {
  48. newConns := make(chan conn, 64)
  49. subs := make(map[string]chan<- *message)
  50. ds := &dataStore{
  51. ramTicks: make([]point, 0, maxTicks),
  52. nClientTicks: make([]point, 0, maxTicks),
  53. topDomain: newBufferedRanking(nRankingBuffers),
  54. topHelo: newBufferedRanking(nRankingBuffers),
  55. topIP: newBufferedRanking(nRankingBuffers),
  56. newConns: newConns,
  57. subs: subs,
  58. }
  59. return ds
  60. }
  61. // Keeps track of top domain/helo/ip rankings, but buffered into multiple
  62. // maps so that old records can be efficiently kept track of and quickly removed
  63. type bufferedRanking []map[string]int
  64. func newBufferedRanking(nBuffers int) bufferedRanking {
  65. br := make([]map[string]int, nBuffers)
  66. for i := 0; i < nBuffers; i++ {
  67. br[i] = make(map[string]int)
  68. }
  69. return br
  70. }
  71. // Manages the list of top clients by domain, helo, and IP by updating buffered
  72. // record maps. At each `rankingUpdateInterval` we shift the maps and remove the
  73. // oldest, so rankings are always at most as old as `maxWindow`
  74. func (ds *dataStore) rankingManager() {
  75. ticker := time.NewTicker(rankingUpdateInterval)
  76. for {
  77. select {
  78. case c := <-ds.newConns:
  79. nHelos := len(ds.topHelo)
  80. if nHelos > 5 &&
  81. float64(nHelos)/float64(ds.nClientsInBuffer) > uniqueHeloRatioMax {
  82. // If too many unique HELO messages are detected as a ratio to the total
  83. // number of clients, quit collecting data until we roll over into the next
  84. // aggregation buffer.
  85. continue
  86. }
  87. ds.lock.Lock()
  88. ds.nClientsInBuffer++
  89. ds.topDomain[0][c.domain]++
  90. ds.topHelo[0][c.helo]++
  91. ds.topIP[0][c.ip]++
  92. ds.lock.Unlock()
  93. case <-ticker.C:
  94. ds.lock.Lock()
  95. // Add empty map at index 0 and shift other maps one down
  96. ds.nClientsInBuffer = 0
  97. ds.topDomain = append(
  98. []map[string]int{map[string]int{}},
  99. ds.topDomain[:len(ds.topDomain)-1]...)
  100. ds.topHelo = append(
  101. []map[string]int{map[string]int{}},
  102. ds.topHelo[:len(ds.topHelo)-1]...)
  103. ds.topIP = append(
  104. []map[string]int{map[string]int{}},
  105. ds.topHelo[:len(ds.topIP)-1]...)
  106. ds.lock.Unlock()
  107. case <-stopRankingManager:
  108. return
  109. }
  110. }
  111. }
  112. // Aggregates the rankings from the ranking buffer into a single map
  113. // for each of domain, helo, ip. This is what we send to the frontend.
  114. func (ds *dataStore) aggregateRankings() ranking {
  115. topDomain := make(map[string]int, len(ds.topDomain[0]))
  116. topHelo := make(map[string]int, len(ds.topHelo[0]))
  117. topIP := make(map[string]int, len(ds.topIP[0]))
  118. for i := 0; i < nRankingBuffers; i++ {
  119. for domain, count := range ds.topDomain[i] {
  120. topDomain[domain] += count
  121. }
  122. for helo, count := range ds.topHelo[i] {
  123. topHelo[helo] += count
  124. }
  125. for ip, count := range ds.topIP[i] {
  126. topIP[ip] += count
  127. }
  128. }
  129. return ranking{
  130. TopDomain: topDomain,
  131. TopHelo: topHelo,
  132. TopIP: topIP,
  133. }
  134. }
  135. // Adds a new ram point, removing old points if necessary
  136. func (ds *dataStore) addRAMPoint(p point) {
  137. if len(ds.ramTicks) == int(maxTicks) {
  138. ds.ramTicks = append(ds.ramTicks[1:], p)
  139. } else {
  140. ds.ramTicks = append(ds.ramTicks, p)
  141. }
  142. }
  143. // Adds a new nClients point, removing old points if necessary
  144. func (ds *dataStore) addNClientPoint(p point) {
  145. if len(ds.nClientTicks) == int(maxTicks) {
  146. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  147. } else {
  148. ds.nClientTicks = append(ds.nClientTicks, p)
  149. }
  150. }
  151. func (ds *dataStore) subscribe(id string, c chan<- *message) {
  152. ds.subs[id] = c
  153. }
  154. func (ds *dataStore) unsubscribe(id string) {
  155. delete(ds.subs, id)
  156. }
  157. func (ds *dataStore) notify(m *message) {
  158. // Prevent concurrent read/write to maps in the store
  159. ds.lock.Lock()
  160. defer ds.lock.Unlock()
  161. for _, c := range ds.subs {
  162. select {
  163. case c <- m:
  164. default:
  165. }
  166. }
  167. }
  168. // Initiates a session with all historic data in the store
  169. func (ds *dataStore) initSession(sess *session) {
  170. store.subs[sess.id] <- &message{initMessageType, initFrame{
  171. Ram: store.ramTicks,
  172. NClients: store.nClientTicks,
  173. }}
  174. }
  175. type point struct {
  176. X time.Time `json:"x"`
  177. Y uint64 `json:"y"`
  178. }
  179. // Measures RAM and number of connected clients and sends a tick
  180. // message to all connected clients on the given interval
  181. func dataListener(interval time.Duration) {
  182. ticker := time.Tick(interval)
  183. memStats := &runtime.MemStats{}
  184. for {
  185. select {
  186. case t := <-ticker:
  187. runtime.ReadMemStats(memStats)
  188. ramPoint := point{t, memStats.Alloc}
  189. nClientPoint := point{t, store.nClients}
  190. log.WithFields(map[string]interface{}{
  191. "ram": ramPoint.Y,
  192. "clients": nClientPoint.Y,
  193. }).Info("Logging analytics data")
  194. store.addRAMPoint(ramPoint)
  195. store.addNClientPoint(nClientPoint)
  196. store.notify(&message{tickMessageType, dataFrame{
  197. Ram: ramPoint,
  198. NClients: nClientPoint,
  199. ranking: store.aggregateRankings(),
  200. }})
  201. case <-stopDataListener:
  202. return
  203. }
  204. }
  205. }
  206. // Keeps track of top clients by helo, ip, and domain
  207. type ranking struct {
  208. TopHelo map[string]int `json:"topHelo"`
  209. TopIP map[string]int `json:"topIP"`
  210. TopDomain map[string]int `json:"topDomain"`
  211. }
  212. type dataFrame struct {
  213. Ram point `json:"ram"`
  214. NClients point `json:"nClients"`
  215. ranking
  216. }
  217. type initFrame struct {
  218. Ram []point `json:"ram"`
  219. NClients []point `json:"nClients"`
  220. ranking
  221. }
  222. // Format of messages to be sent over WebSocket
  223. type message struct {
  224. Type string `json:"type"`
  225. Payload interface{} `json:"payload"`
  226. }
  227. type logHook int
  228. func (h logHook) Levels() []log.Level {
  229. return log.AllLevels
  230. }
  231. // Checks fired logs for information that is relevant to the dashboard
  232. func (h logHook) Fire(e *log.Entry) error {
  233. event, ok := e.Data["event"].(string)
  234. if !ok {
  235. return nil
  236. }
  237. var helo, ip, domain string
  238. if event == "mailfrom" {
  239. helo, ok = e.Data["helo"].(string)
  240. if !ok {
  241. return nil
  242. }
  243. if len(helo) > 16 {
  244. helo = helo[:16]
  245. }
  246. ip, ok = e.Data["address"].(string)
  247. if !ok {
  248. return nil
  249. }
  250. domain, ok = e.Data["domain"].(string)
  251. if !ok {
  252. return nil
  253. }
  254. }
  255. switch event {
  256. case "connect":
  257. store.lock.Lock()
  258. store.nClients++
  259. store.lock.Unlock()
  260. case "mailfrom":
  261. store.newConns <- conn{
  262. domain: domain,
  263. helo: helo,
  264. ip: ip,
  265. }
  266. case "disconnect":
  267. store.lock.Lock()
  268. store.nClients--
  269. store.lock.Unlock()
  270. }
  271. return nil
  272. }