datastore.go 2.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. package dashboard
  2. import (
  3. "runtime"
  4. "time"
  5. log "github.com/Sirupsen/logrus"
  6. )
  7. const (
  8. tickInterval = time.Second * 5
  9. maxWindow = time.Hour * 24
  10. maxTicks = int(maxWindow / tickInterval)
  11. )
  12. // Log for sending client events from the server to the dashboard.
  13. var (
  14. LogHook = logHook(1)
  15. store = newDataStore()
  16. )
  17. type dataStore struct {
  18. // List of samples of RAM usage
  19. ramTicks []point
  20. // List of samples of number of connected clients
  21. nClientTicks []point
  22. // Up-to-date number of clients
  23. nClients uint64
  24. subs map[string]chan<- *dataFrame
  25. }
  26. func newDataStore() *dataStore {
  27. subs := make(map[string]chan<- *dataFrame)
  28. return &dataStore{
  29. ramTicks: make([]point, 0, maxTicks),
  30. nClientTicks: make([]point, 0, maxTicks),
  31. subs: subs,
  32. }
  33. }
  34. func (ds *dataStore) addRAMPoint(p point) {
  35. if len(ds.ramTicks) == int(maxTicks) {
  36. ds.ramTicks = append(ds.ramTicks[1:], p)
  37. } else {
  38. ds.ramTicks = append(ds.ramTicks, p)
  39. }
  40. }
  41. func (ds *dataStore) addNClientPoint(p point) {
  42. if len(ds.nClientTicks) == int(maxTicks) {
  43. ds.nClientTicks = append(ds.nClientTicks[1:], p)
  44. } else {
  45. ds.nClientTicks = append(ds.nClientTicks, p)
  46. }
  47. }
  48. func (ds *dataStore) subscribe(id string, c chan<- *dataFrame) {
  49. ds.subs[id] = c
  50. }
  51. func (ds *dataStore) unsubscribe(id string) {
  52. delete(ds.subs, id)
  53. }
  54. func (ds *dataStore) notify(p *dataFrame) {
  55. for _, c := range ds.subs {
  56. select {
  57. case c <- p:
  58. default:
  59. }
  60. }
  61. }
  62. type point struct {
  63. X time.Time `json:"x"`
  64. Y uint64 `json:"y"`
  65. }
  66. func dataListener(interval time.Duration) {
  67. ticker := time.Tick(interval)
  68. memStats := &runtime.MemStats{}
  69. for {
  70. t := <-ticker
  71. runtime.ReadMemStats(memStats)
  72. ramPoint := point{t, memStats.Alloc}
  73. nClientPoint := point{t, store.nClients}
  74. log.Info("datastore:89", ramPoint, nClientPoint)
  75. store.addRAMPoint(ramPoint)
  76. store.addNClientPoint(nClientPoint)
  77. store.notify(&dataFrame{
  78. Ram: ramPoint,
  79. NClients: nClientPoint,
  80. })
  81. }
  82. }
  83. type dataFrame struct {
  84. Ram point `json:"ram"`
  85. NClients point `json:"n_clients"`
  86. // top5Helo []string // TODO add for aggregation
  87. // top5IP []string
  88. }
  89. type logHook int
  90. func (h logHook) Levels() []log.Level {
  91. return []log.Level{log.InfoLevel}
  92. }
  93. func (h logHook) Fire(e *log.Entry) error {
  94. event, ok := e.Data["event"]
  95. if !ok {
  96. return nil
  97. }
  98. event, ok = event.(string)
  99. if !ok {
  100. return nil
  101. }
  102. switch event {
  103. case "connect":
  104. store.nClients++
  105. case "disconnect":
  106. store.nClients--
  107. }
  108. return nil
  109. }