2
0

datastore.go 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154
  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. const (
  9. tickInterval = time.Second * 5
  10. maxWindow = time.Hour * 24
  11. maxTicks = int(maxWindow / tickInterval)
  12. INIT_MESSAGE = "INIT"
  13. TICK_MESSAGE = "TICK"
  14. )
  15. // Log for sending client events from the server to the dashboard.
  16. var (
  17. LogHook = logHook(1)
  18. store = newDataStore()
  19. )
  20. type dataStore struct {
  21. lock sync.Mutex
  22. // List of samples of RAM usage
  23. ramTicks []point
  24. // List of samples of number of connected clients
  25. nClientTicks []point
  26. // Up-to-date number of clients
  27. nClients uint64
  28. subs map[string]chan<- *message
  29. }
  30. func newDataStore() *dataStore {
  31. subs := make(map[string]chan<- *message)
  32. return &dataStore{
  33. ramTicks: make([]point, 0, maxTicks),
  34. nClientTicks: make([]point, 0, maxTicks),
  35. subs: subs,
  36. }
  37. }
  38. func (ds *dataStore) addRAMPoint(p point) {
  39. if len(ds.ramTicks) == int(maxTicks) {
  40. ds.ramTicks = append(ds.ramTicks[1:], p)
  41. } else {
  42. ds.ramTicks = append(ds.ramTicks, p)
  43. }
  44. }
  45. func (ds *dataStore) addNClientPoint(p point) {
  46. if len(ds.nClientTicks) == int(maxTicks) {
  47. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  48. } else {
  49. ds.nClientTicks = append(ds.nClientTicks, p)
  50. }
  51. }
  52. func (ds *dataStore) subscribe(id string, c chan<- *message) {
  53. ds.subs[id] = c
  54. }
  55. func (ds *dataStore) unsubscribe(id string) {
  56. delete(ds.subs, id)
  57. }
  58. func (ds *dataStore) notify(m *message) {
  59. for _, c := range ds.subs {
  60. select {
  61. case c <- m:
  62. default:
  63. }
  64. }
  65. }
  66. func (ds *dataStore) initSession(sess *session) {
  67. store.subs[sess.id] <- &message{INIT_MESSAGE, initFrame{
  68. Ram: store.ramTicks,
  69. NClients: store.nClientTicks,
  70. }}
  71. }
  72. type point struct {
  73. X time.Time `json:"x"`
  74. Y uint64 `json:"y"`
  75. }
  76. func dataListener(interval time.Duration) {
  77. ticker := time.Tick(interval)
  78. memStats := &runtime.MemStats{}
  79. for {
  80. t := <-ticker
  81. runtime.ReadMemStats(memStats)
  82. ramPoint := point{t, memStats.Alloc}
  83. nClientPoint := point{t, store.nClients}
  84. log.Info("datastore:89", ramPoint, nClientPoint)
  85. store.addRAMPoint(ramPoint)
  86. store.addNClientPoint(nClientPoint)
  87. store.notify(&message{TICK_MESSAGE, dataFrame{
  88. Ram: ramPoint,
  89. NClients: nClientPoint,
  90. }})
  91. }
  92. }
  93. type dataFrame struct {
  94. Ram point `json:"ram"`
  95. NClients point `json:"nClients"`
  96. // top5Helo []string // TODO add for aggregation
  97. // top5IP []string
  98. }
  99. type initFrame struct {
  100. Ram []point `json:"ram"`
  101. NClients []point `json:"nClients"`
  102. // top5Helo []string // TODO add for aggregation
  103. // top5IP []string
  104. }
  105. // Format of messages to be sent over WebSocket
  106. type message struct {
  107. Type string `json:"type"`
  108. Payload interface{} `json:"payload"`
  109. }
  110. type logHook int
  111. func (h logHook) Levels() []log.Level {
  112. return log.AllLevels
  113. }
  114. func (h logHook) Fire(e *log.Entry) error {
  115. event, ok := e.Data["event"]
  116. if !ok {
  117. return nil
  118. }
  119. event, ok = event.(string)
  120. if !ok {
  121. return nil
  122. }
  123. store.lock.Lock()
  124. defer store.lock.Unlock()
  125. switch event {
  126. case "connect":
  127. store.nClients++
  128. case "disconnect":
  129. store.nClients--
  130. }
  131. return nil
  132. }