monitor.go 3.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175
  1. package main
  2. import (
  3. "code.google.com/p/go.net/websocket"
  4. "encoding/json"
  5. "io"
  6. "log"
  7. "net/http"
  8. "os"
  9. "runtime"
  10. "strconv"
  11. "time"
  12. )
  13. type wsConnection struct {
  14. // The websocket connection.
  15. ws *websocket.Conn
  16. // Buffered channel of outbound messages.
  17. send chan string
  18. }
  19. type monitorHub struct {
  20. connections map[*wsConnection]bool
  21. broadcast chan string
  22. register chan *wsConnection
  23. unregister chan *wsConnection
  24. }
  25. var hub = monitorHub{
  26. broadcast: make(chan string),
  27. register: make(chan *wsConnection, 10),
  28. unregister: make(chan *wsConnection, 10),
  29. connections: make(map[*wsConnection]bool),
  30. }
  31. func (h *monitorHub) run() {
  32. for {
  33. select {
  34. case c := <-h.register:
  35. h.connections[c] = true
  36. log.Println("Queuing initial status")
  37. c.send <- initialStatus()
  38. case c := <-h.unregister:
  39. log.Println("Unregistering connection")
  40. delete(h.connections, c)
  41. case m := <-h.broadcast:
  42. for c := range h.connections {
  43. if len(c.send)+5 > cap(c.send) {
  44. log.Println("WS connection too close to cap")
  45. c.send <- `{"error": "too slow"}`
  46. close(c.send)
  47. go c.ws.Close()
  48. h.unregister <- c
  49. continue
  50. }
  51. select {
  52. case c.send <- m:
  53. default:
  54. close(c.send)
  55. delete(h.connections, c)
  56. log.Println("Closing channel when sending")
  57. go c.ws.Close()
  58. }
  59. }
  60. }
  61. }
  62. }
  63. func (c *wsConnection) reader() {
  64. for {
  65. var message string
  66. err := websocket.Message.Receive(c.ws, &message)
  67. if err != nil {
  68. log.Println("WS read error:", err)
  69. break
  70. }
  71. log.Println("WS message", message)
  72. // TODO(ask) take configuration options etc
  73. //h.broadcast <- message
  74. }
  75. c.ws.Close()
  76. }
  77. func (c *wsConnection) writer() {
  78. for message := range c.send {
  79. err := websocket.Message.Send(c.ws, message)
  80. if err != nil {
  81. log.Println("WS write error:", err)
  82. break
  83. }
  84. }
  85. c.ws.Close()
  86. }
  87. func wsHandler(ws *websocket.Conn) {
  88. log.Println("Starting new WS connection")
  89. c := &wsConnection{send: make(chan string, 180), ws: ws}
  90. hub.register <- c
  91. defer func() {
  92. log.Println("sending unregister message")
  93. hub.unregister <- c
  94. }()
  95. go c.writer()
  96. c.reader()
  97. }
  98. func initialStatus() string {
  99. status := map[string]string{"v": VERSION, "id": serverId}
  100. hostname, err := os.Hostname()
  101. if err == nil {
  102. status["h"] = hostname
  103. }
  104. message, err := json.Marshal(status)
  105. return string(message)
  106. }
  107. func logStatus() {
  108. log.Println(initialStatus())
  109. lastQueryCount := qCounter
  110. for {
  111. newQueries := qCounter - lastQueryCount
  112. lastQueryCount = qCounter
  113. log.Println("goroutines", runtime.NumGoroutine(), "queries", newQueries)
  114. time.Sleep(60 * time.Second)
  115. }
  116. }
  117. func monitor() {
  118. go logStatus()
  119. if len(*flaghttp) == 0 {
  120. return
  121. }
  122. go hub.run()
  123. go httpHandler()
  124. lastQueryCount := qCounter
  125. for {
  126. newQueries := qCounter - lastQueryCount
  127. lastQueryCount = qCounter
  128. status := map[string]string{}
  129. status["up"] = strconv.Itoa(int(time.Since(timeStarted).Seconds()))
  130. status["qs"] = strconv.FormatUint(qCounter, 10)
  131. status["qps"] = strconv.FormatUint(newQueries, 10)
  132. message, err := json.Marshal(status)
  133. if err == nil {
  134. hub.broadcast <- string(message)
  135. }
  136. time.Sleep(1 * time.Second)
  137. }
  138. }
  139. func MainServer(w http.ResponseWriter, req *http.Request) {
  140. if req.RequestURI != "/version" {
  141. http.NotFound(w, req)
  142. return
  143. }
  144. io.WriteString(w, `<html><head><title>GeoDNS `+
  145. VERSION+`</title><body>`+
  146. initialStatus()+
  147. `</body></html>`)
  148. }
  149. func httpHandler() {
  150. http.Handle("/monitor", websocket.Handler(wsHandler))
  151. http.HandleFunc("/", MainServer)
  152. log.Fatal(http.ListenAndServe(*flaghttp, nil))
  153. }