datastore.go 2.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. package dashboard
  2. import (
  3. "runtime"
  4. "sync"
  5. "time"
  6. log "github.com/Sirupsen/logrus"
  7. )
  8. const (
  9. tickInterval = time.Second * 5
  10. maxWindow = time.Hour * 24
  11. maxTicks = int(maxWindow / tickInterval)
  12. )
  13. // Log for sending client events from the server to the dashboard.
  14. var (
  15. LogHook = logHook(1)
  16. store = newDataStore()
  17. )
  18. type dataStore struct {
  19. lock sync.Mutex
  20. // List of samples of RAM usage
  21. ramTicks []point
  22. // List of samples of number of connected clients
  23. nClientTicks []point
  24. // Up-to-date number of clients
  25. nClients uint64
  26. subs map[string]chan<- *dataFrame
  27. }
  28. func newDataStore() *dataStore {
  29. subs := make(map[string]chan<- *dataFrame)
  30. return &dataStore{
  31. ramTicks: make([]point, 0, maxTicks),
  32. nClientTicks: make([]point, 0, maxTicks),
  33. subs: subs,
  34. }
  35. }
  36. func (ds *dataStore) addRAMPoint(p point) {
  37. if len(ds.ramTicks) == int(maxTicks) {
  38. ds.ramTicks = append(ds.ramTicks[1:], p)
  39. } else {
  40. ds.ramTicks = append(ds.ramTicks, p)
  41. }
  42. }
  43. func (ds *dataStore) addNClientPoint(p point) {
  44. if len(ds.nClientTicks) == int(maxTicks) {
  45. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  46. } else {
  47. ds.nClientTicks = append(ds.nClientTicks, p)
  48. }
  49. }
  50. func (ds *dataStore) subscribe(id string, c chan<- *dataFrame) {
  51. ds.subs[id] = c
  52. }
  53. func (ds *dataStore) unsubscribe(id string) {
  54. delete(ds.subs, id)
  55. }
  56. func (ds *dataStore) notify(p *dataFrame) {
  57. for _, c := range ds.subs {
  58. select {
  59. case c <- p:
  60. default:
  61. }
  62. }
  63. }
  64. func (ds *dataStore) initSession(sess *session) {
  65. // TODO implement
  66. }
  67. type point struct {
  68. X time.Time `json:"x"`
  69. Y uint64 `json:"y"`
  70. }
  71. func dataListener(interval time.Duration) {
  72. ticker := time.Tick(interval)
  73. memStats := &runtime.MemStats{}
  74. for {
  75. t := <-ticker
  76. runtime.ReadMemStats(memStats)
  77. ramPoint := point{t, memStats.Alloc}
  78. nClientPoint := point{t, store.nClients}
  79. log.Info("datastore:89", ramPoint, nClientPoint)
  80. store.addRAMPoint(ramPoint)
  81. store.addNClientPoint(nClientPoint)
  82. store.notify(&dataFrame{
  83. Ram: ramPoint,
  84. NClients: nClientPoint,
  85. })
  86. }
  87. }
  88. type dataFrame struct {
  89. Ram point `json:"ram"`
  90. NClients point `json:"n_clients"`
  91. // top5Helo []string // TODO add for aggregation
  92. // top5IP []string
  93. }
  94. type logHook int
  95. func (h logHook) Levels() []log.Level {
  96. return []log.Level{log.InfoLevel}
  97. }
  98. func (h logHook) Fire(e *log.Entry) error {
  99. event, ok := e.Data["event"]
  100. if !ok {
  101. return nil
  102. }
  103. event, ok = event.(string)
  104. if !ok {
  105. return nil
  106. }
  107. store.lock.Lock()
  108. defer store.lock.Unlock()
  109. switch event {
  110. case "connect":
  111. store.nClients++
  112. case "disconnect":
  113. store.nClients--
  114. }
  115. return nil
  116. }