datastore.go 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. const (
  9. // Frequency at which we measure stats and send messages to clients
  10. tickInterval = time.Second * 5
  11. // Maximum history of stats kept in store and displayed on frontend
  12. maxWindow = time.Hour * 24
  13. maxTicks = int(maxWindow / tickInterval)
  14. // Number of entries to show in top N charts
  15. topClientsSize = 5
  16. // Frequency at which we update top client rankings
  17. rankingUpdateInterval = time.Hour * 6
  18. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  19. // Redux action type names
  20. initMessageType = "INIT"
  21. tickMessageType = "TICK"
  22. )
  23. var (
  24. // Log for sending client events from the server to the dashboard.
  25. LogHook = logHook(1)
  26. store = newDataStore()
  27. )
  28. // Keeps track of connection data that is buffered in the topClients
  29. // so the data can be removed after `maxWindow` interval has occurred.
  30. type conn struct {
  31. helo, domain, ip string
  32. }
  33. type dataStore struct {
  34. lock sync.Mutex
  35. // List of samples of RAM usage
  36. ramTicks []point
  37. // List of samples of number of connected clients
  38. nClientTicks []point
  39. // Up-to-date number of clients
  40. nClients uint64
  41. topDomain bufferedRanking
  42. topHelo bufferedRanking
  43. topIP bufferedRanking
  44. // For notifying the store about new connections
  45. newConns chan conn
  46. subs map[string]chan<- *message
  47. }
  48. func newDataStore() *dataStore {
  49. newConns := make(chan conn, 64)
  50. subs := make(map[string]chan<- *message)
  51. ds := &dataStore{
  52. ramTicks: make([]point, 0, maxTicks),
  53. nClientTicks: make([]point, 0, maxTicks),
  54. topDomain: newBufferedRanking(nRankingBuffers),
  55. topHelo: newBufferedRanking(nRankingBuffers),
  56. topIP: newBufferedRanking(nRankingBuffers),
  57. newConns: newConns,
  58. subs: subs,
  59. }
  60. go ds.rankingManager()
  61. return ds
  62. }
  63. // Keeps track of top domain/helo/ip rankings, but buffered into multiple
  64. // maps so that old records can be efficiently kept track of and quickly removed
  65. type bufferedRanking []map[string]int
  66. func newBufferedRanking(nBuffers int) bufferedRanking {
  67. br := make([]map[string]int, nBuffers)
  68. for i := 0; i < nBuffers; i++ {
  69. br[i] = make(map[string]int)
  70. }
  71. return br
  72. }
  73. // Manages the list of top clients by domain, helo, and IP by updating buffered
  74. // record maps. At each `rankingUpdateInterval` we shift the maps and remove the
  75. // oldest, so rankings are always at most as old as `maxWindow`
  76. func (ds *dataStore) rankingManager() {
  77. ticker := time.NewTicker(rankingUpdateInterval)
  78. for {
  79. select {
  80. case c := <-ds.newConns:
  81. ds.lock.Lock()
  82. ds.topDomain[0][c.domain]++
  83. ds.topHelo[0][c.helo]++
  84. ds.topIP[0][c.ip]++
  85. ds.lock.Unlock()
  86. case <-ticker.C:
  87. ds.lock.Lock()
  88. // Add empty map at index 0 and shift other maps one down
  89. ds.topDomain = append(
  90. []map[string]int{map[string]int{}},
  91. ds.topDomain[:len(ds.topDomain)-1]...)
  92. ds.topHelo = append(
  93. []map[string]int{map[string]int{}},
  94. ds.topHelo[:len(ds.topHelo)-1]...)
  95. ds.topIP = append(
  96. []map[string]int{map[string]int{}},
  97. ds.topHelo[:len(ds.topIP)-1]...)
  98. ds.lock.Unlock()
  99. }
  100. }
  101. }
  102. // Aggregates the rankings from the ranking buffer into a single map
  103. // for each of domain, helo, ip. This is what we send to the frontend.
  104. func (ds *dataStore) aggregateRankings() ranking {
  105. topDomain := make(map[string]int, len(ds.topDomain[0]))
  106. topHelo := make(map[string]int, len(ds.topHelo[0]))
  107. topIP := make(map[string]int, len(ds.topIP[0]))
  108. for i := 0; i < nRankingBuffers; i++ {
  109. for domain, count := range ds.topDomain[i] {
  110. topDomain[domain] += count
  111. }
  112. for helo, count := range ds.topHelo[i] {
  113. topHelo[helo] += count
  114. }
  115. for ip, count := range ds.topIP[i] {
  116. topIP[ip] += count
  117. }
  118. }
  119. return ranking{
  120. TopDomain: topDomain,
  121. TopHelo: topHelo,
  122. TopIP: topIP,
  123. }
  124. }
  125. // Adds a new ram point, removing old points if necessary
  126. func (ds *dataStore) addRAMPoint(p point) {
  127. if len(ds.ramTicks) == int(maxTicks) {
  128. ds.ramTicks = append(ds.ramTicks[1:], p)
  129. } else {
  130. ds.ramTicks = append(ds.ramTicks, p)
  131. }
  132. }
  133. // Adds a new nClients point, removing old points if necessary
  134. func (ds *dataStore) addNClientPoint(p point) {
  135. if len(ds.nClientTicks) == int(maxTicks) {
  136. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  137. } else {
  138. ds.nClientTicks = append(ds.nClientTicks, p)
  139. }
  140. }
  141. func (ds *dataStore) subscribe(id string, c chan<- *message) {
  142. ds.subs[id] = c
  143. }
  144. func (ds *dataStore) unsubscribe(id string) {
  145. delete(ds.subs, id)
  146. }
  147. func (ds *dataStore) notify(m *message) {
  148. // Prevent concurrent read/write to maps in the store
  149. ds.lock.Lock()
  150. defer ds.lock.Unlock()
  151. for _, c := range ds.subs {
  152. select {
  153. case c <- m:
  154. default:
  155. }
  156. }
  157. }
  158. // Initiates a session with all historic data in the store
  159. func (ds *dataStore) initSession(sess *session) {
  160. store.subs[sess.id] <- &message{initMessageType, initFrame{
  161. Ram: store.ramTicks,
  162. NClients: store.nClientTicks,
  163. }}
  164. }
  165. type point struct {
  166. X time.Time `json:"x"`
  167. Y uint64 `json:"y"`
  168. }
  169. // Measures RAM and number of connected clients and sends a tick
  170. // message to all connected clients on the given interval
  171. func dataListener(interval time.Duration) {
  172. ticker := time.Tick(interval)
  173. memStats := &runtime.MemStats{}
  174. for {
  175. t := <-ticker
  176. runtime.ReadMemStats(memStats)
  177. ramPoint := point{t, memStats.Alloc}
  178. nClientPoint := point{t, store.nClients}
  179. log.WithFields(map[string]interface{}{
  180. "ram": ramPoint.Y,
  181. "clients": nClientPoint.Y,
  182. }).Info("Logging analytics data")
  183. store.addRAMPoint(ramPoint)
  184. store.addNClientPoint(nClientPoint)
  185. store.notify(&message{tickMessageType, dataFrame{
  186. Ram: ramPoint,
  187. NClients: nClientPoint,
  188. ranking: store.aggregateRankings(),
  189. }})
  190. }
  191. }
  192. // Keeps track of top clients by helo, ip, and domain
  193. type ranking struct {
  194. TopHelo map[string]int `json:"topHelo"`
  195. TopIP map[string]int `json:"topIP"`
  196. TopDomain map[string]int `json:"topDomain"`
  197. }
  198. type dataFrame struct {
  199. Ram point `json:"ram"`
  200. NClients point `json:"nClients"`
  201. ranking
  202. }
  203. type initFrame struct {
  204. Ram []point `json:"ram"`
  205. NClients []point `json:"nClients"`
  206. ranking
  207. }
  208. // Format of messages to be sent over WebSocket
  209. type message struct {
  210. Type string `json:"type"`
  211. Payload interface{} `json:"payload"`
  212. }
  213. type logHook int
  214. func (h logHook) Levels() []log.Level {
  215. return log.AllLevels
  216. }
  217. // Checks fired logs for information that is relevant to the dashboard
  218. func (h logHook) Fire(e *log.Entry) error {
  219. event, ok := e.Data["event"].(string)
  220. if !ok {
  221. return nil
  222. }
  223. var helo, ip, domain string
  224. if event == "mailfrom" {
  225. helo, ok = e.Data["helo"].(string)
  226. if !ok {
  227. return nil
  228. }
  229. ip, ok = e.Data["address"].(string)
  230. if !ok {
  231. return nil
  232. }
  233. domain, ok = e.Data["domain"].(string)
  234. if !ok {
  235. return nil
  236. }
  237. }
  238. switch event {
  239. case "connect":
  240. store.lock.Lock()
  241. store.nClients++
  242. store.lock.Unlock()
  243. case "mailfrom":
  244. store.newConns <- conn{
  245. domain: domain,
  246. helo: helo,
  247. ip: ip,
  248. }
  249. case "disconnect":
  250. store.lock.Lock()
  251. store.nClients--
  252. store.lock.Unlock()
  253. }
  254. return nil
  255. }