datastore.go 6.8 KB


  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. const (
  9. // Number of entries to show in top N charts
  10. topClientsSize = 5
  11. // Redux action type names
  12. initMessageType = "INIT"
  13. tickMessageType = "TICK"
  14. )
  15. var (
  16. // Log for sending client events from the server to the dashboard.
  17. tickInterval = time.Second * 5
  18. maxWindow = time.Hour * 24
  19. rankingUpdateInterval = time.Hour * 6
  20. maxTicks = int(maxWindow / tickInterval)
  21. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  22. LogHook = logHook(1)
  23. store = newDataStore()
  24. )
  25. // Keeps track of connection data that is buffered in the topClients
  26. // so the data can be removed after `maxWindow` interval has occurred.
  27. type conn struct {
  28. helo, domain, ip string
  29. }
  30. type dataStore struct {
  31. lock sync.Mutex
  32. // List of samples of RAM usage
  33. ramTicks []point
  34. // List of samples of number of connected clients
  35. nClientTicks []point
  36. // Up-to-date number of clients
  37. nClients uint64
  38. topDomain bufferedRanking
  39. topHelo bufferedRanking
  40. topIP bufferedRanking
  41. // For notifying the store about new connections
  42. newConns chan conn
  43. subs map[string]chan<- *message
  44. }
  45. func newDataStore() *dataStore {
  46. newConns := make(chan conn, 64)
  47. subs := make(map[string]chan<- *message)
  48. ds := &dataStore{
  49. ramTicks: make([]point, 0, maxTicks),
  50. nClientTicks: make([]point, 0, maxTicks),
  51. topDomain: newBufferedRanking(nRankingBuffers),
  52. topHelo: newBufferedRanking(nRankingBuffers),
  53. topIP: newBufferedRanking(nRankingBuffers),
  54. newConns: newConns,
  55. subs: subs,
  56. }
  57. return ds
  58. }
  59. // Keeps track of top domain/helo/ip rankings, but buffered into multiple
  60. // maps so that old records can be efficiently kept track of and quickly removed
  61. type bufferedRanking []map[string]int
  62. func newBufferedRanking(nBuffers int) bufferedRanking {
  63. br := make([]map[string]int, nBuffers)
  64. for i := 0; i < nBuffers; i++ {
  65. br[i] = make(map[string]int)
  66. }
  67. return br
  68. }
  69. // Manages the list of top clients by domain, helo, and IP by updating buffered
  70. // record maps. At each `rankingUpdateInterval` we shift the maps and remove the
  71. // oldest, so rankings are always at most as old as `maxWindow`
  72. func (ds *dataStore) rankingManager() {
  73. ticker := time.NewTicker(rankingUpdateInterval)
  74. for {
  75. select {
  76. case c := <-ds.newConns:
  77. ds.lock.Lock()
  78. ds.topDomain[0][c.domain]++
  79. ds.topHelo[0][c.helo]++
  80. ds.topIP[0][c.ip]++
  81. ds.lock.Unlock()
  82. case <-ticker.C:
  83. ds.lock.Lock()
  84. // Add empty map at index 0 and shift other maps one down
  85. ds.topDomain = append(
  86. []map[string]int{map[string]int{}},
  87. ds.topDomain[:len(ds.topDomain)-1]...)
  88. ds.topHelo = append(
  89. []map[string]int{map[string]int{}},
  90. ds.topHelo[:len(ds.topHelo)-1]...)
  91. ds.topIP = append(
  92. []map[string]int{map[string]int{}},
  93. ds.topHelo[:len(ds.topIP)-1]...)
  94. ds.lock.Unlock()
  95. }
  96. }
  97. }
  98. // Aggregates the rankings from the ranking buffer into a single map
  99. // for each of domain, helo, ip. This is what we send to the frontend.
  100. func (ds *dataStore) aggregateRankings() ranking {
  101. topDomain := make(map[string]int, len(ds.topDomain[0]))
  102. topHelo := make(map[string]int, len(ds.topHelo[0]))
  103. topIP := make(map[string]int, len(ds.topIP[0]))
  104. for i := 0; i < nRankingBuffers; i++ {
  105. for domain, count := range ds.topDomain[i] {
  106. topDomain[domain] += count
  107. }
  108. for helo, count := range ds.topHelo[i] {
  109. topHelo[helo] += count
  110. }
  111. for ip, count := range ds.topIP[i] {
  112. topIP[ip] += count
  113. }
  114. }
  115. return ranking{
  116. TopDomain: topDomain,
  117. TopHelo: topHelo,
  118. TopIP: topIP,
  119. }
  120. }
  121. // Adds a new ram point, removing old points if necessary
  122. func (ds *dataStore) addRAMPoint(p point) {
  123. if len(ds.ramTicks) == int(maxTicks) {
  124. ds.ramTicks = append(ds.ramTicks[1:], p)
  125. } else {
  126. ds.ramTicks = append(ds.ramTicks, p)
  127. }
  128. }
  129. // Adds a new nClients point, removing old points if necessary
  130. func (ds *dataStore) addNClientPoint(p point) {
  131. if len(ds.nClientTicks) == int(maxTicks) {
  132. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  133. } else {
  134. ds.nClientTicks = append(ds.nClientTicks, p)
  135. }
  136. }
  137. func (ds *dataStore) subscribe(id string, c chan<- *message) {
  138. ds.subs[id] = c
  139. }
  140. func (ds *dataStore) unsubscribe(id string) {
  141. delete(ds.subs, id)
  142. }
  143. func (ds *dataStore) notify(m *message) {
  144. // Prevent concurrent read/write to maps in the store
  145. ds.lock.Lock()
  146. defer ds.lock.Unlock()
  147. for _, c := range ds.subs {
  148. select {
  149. case c <- m:
  150. default:
  151. }
  152. }
  153. }
  154. // Initiates a session with all historic data in the store
  155. func (ds *dataStore) initSession(sess *session) {
  156. store.subs[sess.id] <- &message{initMessageType, initFrame{
  157. Ram: store.ramTicks,
  158. NClients: store.nClientTicks,
  159. }}
  160. }
  161. type point struct {
  162. X time.Time `json:"x"`
  163. Y uint64 `json:"y"`
  164. }
  165. // Measures RAM and number of connected clients and sends a tick
  166. // message to all connected clients on the given interval
  167. func dataListener(interval time.Duration) {
  168. ticker := time.Tick(interval)
  169. memStats := &runtime.MemStats{}
  170. for {
  171. t := <-ticker
  172. runtime.ReadMemStats(memStats)
  173. ramPoint := point{t, memStats.Alloc}
  174. nClientPoint := point{t, store.nClients}
  175. log.WithFields(map[string]interface{}{
  176. "ram": ramPoint.Y,
  177. "clients": nClientPoint.Y,
  178. }).Info("Logging analytics data")
  179. store.addRAMPoint(ramPoint)
  180. store.addNClientPoint(nClientPoint)
  181. store.notify(&message{tickMessageType, dataFrame{
  182. Ram: ramPoint,
  183. NClients: nClientPoint,
  184. ranking: store.aggregateRankings(),
  185. }})
  186. }
  187. }
  188. // Keeps track of top clients by helo, ip, and domain
  189. type ranking struct {
  190. TopHelo map[string]int `json:"topHelo"`
  191. TopIP map[string]int `json:"topIP"`
  192. TopDomain map[string]int `json:"topDomain"`
  193. }
  194. type dataFrame struct {
  195. Ram point `json:"ram"`
  196. NClients point `json:"nClients"`
  197. ranking
  198. }
  199. type initFrame struct {
  200. Ram []point `json:"ram"`
  201. NClients []point `json:"nClients"`
  202. ranking
  203. }
  204. // Format of messages to be sent over WebSocket
  205. type message struct {
  206. Type string `json:"type"`
  207. Payload interface{} `json:"payload"`
  208. }
  209. type logHook int
  210. func (h logHook) Levels() []log.Level {
  211. return log.AllLevels
  212. }
  213. // Checks fired logs for information that is relevant to the dashboard
  214. func (h logHook) Fire(e *log.Entry) error {
  215. event, ok := e.Data["event"].(string)
  216. if !ok {
  217. return nil
  218. }
  219. var helo, ip, domain string
  220. if event == "mailfrom" {
  221. helo, ok = e.Data["helo"].(string)
  222. if !ok {
  223. return nil
  224. }
  225. if len(helo) > 16 {
  226. helo = helo[:16]
  227. }
  228. ip, ok = e.Data["address"].(string)
  229. if !ok {
  230. return nil
  231. }
  232. domain, ok = e.Data["domain"].(string)
  233. if !ok {
  234. return nil
  235. }
  236. }
  237. switch event {
  238. case "connect":
  239. store.lock.Lock()
  240. store.nClients++
  241. store.lock.Unlock()
  242. case "mailfrom":
  243. store.newConns <- conn{
  244. domain: domain,
  245. helo: helo,
  246. ip: ip,
  247. }
  248. case "disconnect":
  249. store.lock.Lock()
  250. store.nClients--
  251. store.lock.Unlock()
  252. }
  253. return nil
  254. }