monitor.go 2.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. package monitor
  2. //go:generate esc -o templates.go templates/
  3. import (
  4. "encoding/json"
  5. "os"
  6. "runtime"
  7. "time"
  8. "github.com/rcrowley/go-metrics"
  9. "golang.org/x/net/websocket"
  10. )
  11. type ServerInfo struct {
  12. Version string
  13. ID string
  14. IP string
  15. UUID string
  16. Groups []string
  17. Started time.Time
  18. }
  19. // Initial status message on websocket
  20. type statusStreamMsgStart struct {
  21. Hostname string `json:"h,omitemty"`
  22. Version string `json:"v"`
  23. GoVersion string `json:"gov"`
  24. ID string `json:"id"`
  25. IP string `json:"ip"`
  26. UUID string `json:"uuid"`
  27. Uptime int `json:"up"`
  28. Started int `json:"started"`
  29. Groups []string `json:"groups"`
  30. }
  31. // Update message on websocket
  32. type statusStreamMsgUpdate struct {
  33. Uptime int `json:"up"`
  34. QueryCount int64 `json:"qs"`
  35. Qps int64 `json:"qps"`
  36. Qps1m float64 `json:"qps1m,omitempty"`
  37. }
  38. type wsConnection struct {
  39. // The websocket connection.
  40. ws *websocket.Conn
  41. // Buffered channel of outbound messages.
  42. send chan string
  43. }
  44. type monitor struct {
  45. serverInfo *ServerInfo
  46. }
  47. func NewMonitor(serverInfo *ServerInfo) *monitor {
  48. return &monitor{serverInfo: serverInfo}
  49. }
  50. func (m *monitor) initialStatus() string {
  51. status := new(statusStreamMsgStart)
  52. status.Version = m.serverInfo.Version
  53. status.ID = m.serverInfo.ID
  54. status.IP = m.serverInfo.IP
  55. status.UUID = m.serverInfo.UUID
  56. status.GoVersion = runtime.Version()
  57. if len(m.serverInfo.Groups) > 0 {
  58. status.Groups = m.serverInfo.Groups
  59. }
  60. hostname, err := os.Hostname()
  61. if err == nil {
  62. status.Hostname = hostname
  63. }
  64. started := m.serverInfo.Started
  65. status.Started = int(started.Unix())
  66. status.Uptime = int(time.Since(started).Seconds())
  67. message, err := json.Marshal(status)
  68. return string(message)
  69. }
  70. func (m *monitor) Run() {
  71. go hub.run(m.initialStatus)
  72. qCounter := metrics.Get("queries").(metrics.Meter)
  73. lastQueryCount := qCounter.Count()
  74. status := new(statusStreamMsgUpdate)
  75. var lastQps1m float64
  76. for {
  77. current := qCounter.Count()
  78. newQueries := current - lastQueryCount
  79. lastQueryCount = current
  80. status.Uptime = int(time.Since(m.serverInfo.Started).Seconds())
  81. status.QueryCount = qCounter.Count()
  82. status.Qps = newQueries
  83. newQps1m := qCounter.Rate1()
  84. if newQps1m != lastQps1m {
  85. status.Qps1m = newQps1m
  86. lastQps1m = newQps1m
  87. } else {
  88. status.Qps1m = 0
  89. }
  90. message, err := json.Marshal(status)
  91. if err == nil {
  92. hub.broadcast <- string(message)
  93. }
  94. time.Sleep(1 * time.Second)
  95. }
  96. }
  97. func (m *monitor) Handler() websocket.Handler {
  98. return websocket.Handler(wsHandler)
  99. }