datastore.go 6.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. )
  7. const (
  8. // Number of entries to show in top N charts
  9. topClientsSize = 5
  10. // Redux action type names
  11. initMessageType = "INIT"
  12. tickMessageType = "TICK"
  13. )
  14. var (
  15. tickInterval = time.Second * 5
  16. maxWindow = time.Hour * 24
  17. rankingUpdateInterval = time.Hour * 6
  18. uniqueHeloRatioMax = 0.8
  19. maxTicks = int(maxWindow / tickInterval)
  20. nRankingBuffers = int(maxWindow / rankingUpdateInterval)
  21. LogHook = logHook(1)
  22. store = newDataStore()
  23. )
  24. // Keeps track of connection data that is buffered in the topClients
  25. // so the data can be removed after `maxWindow` interval has occurred.
  26. type conn struct {
  27. helo, domain, ip string
  28. }
  29. type dataStore struct {
  30. lock sync.Mutex
  31. // List of samples of RAM usage
  32. ramTicks []point
  33. // List of samples of number of connected clients
  34. nClientTicks []point
  35. // Up-to-date number of clients
  36. nClients uint64
  37. // Total number of clients in the current aggregation buffer
  38. nClientsInBuffer uint64
  39. topDomain bufferedRanking
  40. topHelo bufferedRanking
  41. topIP bufferedRanking
  42. // For notifying the store about new connections
  43. newConns chan conn
  44. subs map[string]chan<- *message
  45. }
  46. func newDataStore() *dataStore {
  47. newConns := make(chan conn, 64)
  48. subs := make(map[string]chan<- *message)
  49. ds := &dataStore{
  50. ramTicks: make([]point, 0, maxTicks),
  51. nClientTicks: make([]point, 0, maxTicks),
  52. topDomain: newBufferedRanking(nRankingBuffers),
  53. topHelo: newBufferedRanking(nRankingBuffers),
  54. topIP: newBufferedRanking(nRankingBuffers),
  55. newConns: newConns,
  56. subs: subs,
  57. }
  58. return ds
  59. }
  60. // Keeps track of top domain/helo/ip rankings, but buffered into multiple
  61. // maps so that old records can be efficiently kept track of and quickly removed
  62. type bufferedRanking []map[string]int
  63. func newBufferedRanking(nBuffers int) bufferedRanking {
  64. br := make([]map[string]int, nBuffers)
  65. for i := 0; i < nBuffers; i++ {
  66. br[i] = make(map[string]int)
  67. }
  68. return br
  69. }
  70. // Manages the list of top clients by domain, helo, and IP by updating buffered
  71. // record maps. At each `rankingUpdateInterval` we shift the maps and remove the
  72. // oldest, so rankings are always at most as old as `maxWindow`
  73. func (ds *dataStore) rankingManager() {
  74. ticker := time.NewTicker(rankingUpdateInterval)
  75. for {
  76. select {
  77. case c := <-ds.newConns:
  78. nHelos := len(ds.topHelo)
  79. if nHelos > 5 &&
  80. float64(nHelos)/float64(ds.nClientsInBuffer) > uniqueHeloRatioMax {
  81. // If too many unique HELO messages are detected as a ratio to the total
  82. // number of clients, quit collecting data until we roll over into the next
  83. // aggregation buffer.
  84. continue
  85. }
  86. ds.lock.Lock()
  87. ds.nClientsInBuffer++
  88. ds.topDomain[0][c.domain]++
  89. ds.topHelo[0][c.helo]++
  90. ds.topIP[0][c.ip]++
  91. ds.lock.Unlock()
  92. case <-ticker.C:
  93. ds.lock.Lock()
  94. // Add empty map at index 0 and shift other maps one down
  95. ds.nClientsInBuffer = 0
  96. ds.topDomain = append(
  97. []map[string]int{map[string]int{}},
  98. ds.topDomain[:len(ds.topDomain)-1]...)
  99. ds.topHelo = append(
  100. []map[string]int{map[string]int{}},
  101. ds.topHelo[:len(ds.topHelo)-1]...)
  102. ds.topIP = append(
  103. []map[string]int{map[string]int{}},
  104. ds.topHelo[:len(ds.topIP)-1]...)
  105. ds.lock.Unlock()
  106. case <-stopRankingManager:
  107. return
  108. }
  109. }
  110. }
  111. // Aggregates the rankings from the ranking buffer into a single map
  112. // for each of domain, helo, ip. This is what we send to the frontend.
  113. func (ds *dataStore) aggregateRankings() ranking {
  114. topDomain := make(map[string]int, len(ds.topDomain[0]))
  115. topHelo := make(map[string]int, len(ds.topHelo[0]))
  116. topIP := make(map[string]int, len(ds.topIP[0]))
  117. ds.lock.Lock()
  118. // Aggregate buffers
  119. for i := 0; i < nRankingBuffers; i++ {
  120. for domain, count := range ds.topDomain[i] {
  121. topDomain[domain] += count
  122. }
  123. for helo, count := range ds.topHelo[i] {
  124. topHelo[helo] += count
  125. }
  126. for ip, count := range ds.topIP[i] {
  127. topIP[ip] += count
  128. }
  129. }
  130. ds.lock.Unlock()
  131. return ranking{
  132. TopDomain: topDomain,
  133. TopHelo: topHelo,
  134. TopIP: topIP,
  135. }
  136. }
  137. // Adds a new ram point, removing old points if necessary
  138. func (ds *dataStore) addRAMPoint(p point) {
  139. if len(ds.ramTicks) == int(maxTicks) {
  140. ds.ramTicks = append(ds.ramTicks[1:], p)
  141. } else {
  142. ds.ramTicks = append(ds.ramTicks, p)
  143. }
  144. }
  145. // Adds a new nClients point, removing old points if necessary
  146. func (ds *dataStore) addNClientPoint(p point) {
  147. if len(ds.nClientTicks) == int(maxTicks) {
  148. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  149. } else {
  150. ds.nClientTicks = append(ds.nClientTicks, p)
  151. }
  152. }
  153. func (ds *dataStore) subscribe(id string, c chan<- *message) {
  154. ds.subs[id] = c
  155. }
  156. func (ds *dataStore) unsubscribe(id string) {
  157. delete(ds.subs, id)
  158. }
  159. func (ds *dataStore) notify(m *message) {
  160. // Prevent concurrent read/write to maps in the store
  161. ds.lock.Lock()
  162. defer ds.lock.Unlock()
  163. for _, c := range ds.subs {
  164. select {
  165. case c <- m:
  166. default:
  167. }
  168. }
  169. }
  170. // Initiates a session with all historic data in the store
  171. func (ds *dataStore) initSession(sess *session) {
  172. store.subs[sess.id] <- &message{initMessageType, initFrame{
  173. Ram: store.ramTicks,
  174. NClients: store.nClientTicks,
  175. }}
  176. }
  177. type point struct {
  178. X time.Time `json:"x"`
  179. Y uint64 `json:"y"`
  180. }
  181. // Measures RAM and number of connected clients and sends a tick
  182. // message to all connected clients on the given interval
  183. func dataListener(interval time.Duration) {
  184. ticker := time.Tick(interval)
  185. memStats := &runtime.MemStats{}
  186. for {
  187. select {
  188. case t := <-ticker:
  189. runtime.ReadMemStats(memStats)
  190. ramPoint := point{t, memStats.Alloc}
  191. nClientPoint := point{t, store.nClients}
  192. mainlog().WithFields(map[string]interface{}{
  193. "ram": ramPoint.Y,
  194. "clients": nClientPoint.Y,
  195. }).Info("Logging analytics data")
  196. store.addRAMPoint(ramPoint)
  197. store.addNClientPoint(nClientPoint)
  198. store.notify(&message{tickMessageType, dataFrame{
  199. Ram: ramPoint,
  200. NClients: nClientPoint,
  201. ranking: store.aggregateRankings(),
  202. }})
  203. case <-stopDataListener:
  204. return
  205. }
  206. }
  207. }
  208. // Keeps track of top clients by helo, ip, and domain
  209. type ranking struct {
  210. TopHelo map[string]int `json:"topHelo"`
  211. TopIP map[string]int `json:"topIP"`
  212. TopDomain map[string]int `json:"topDomain"`
  213. }
  214. type dataFrame struct {
  215. Ram point `json:"ram"`
  216. NClients point `json:"nClients"`
  217. ranking
  218. }
  219. type initFrame struct {
  220. Ram []point `json:"ram"`
  221. NClients []point `json:"nClients"`
  222. ranking
  223. }
  224. // Format of messages to be sent over WebSocket
  225. type message struct {
  226. Type string `json:"type"`
  227. Payload interface{} `json:"payload"`
  228. }