monitor.go 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319
  1. package main
  2. import (
  3. "code.google.com/p/go.net/websocket"
  4. "encoding/json"
  5. "fmt"
  6. "github.com/abh/go-metrics"
  7. "html/template"
  8. "io"
  9. "log"
  10. "math"
  11. "net/http"
  12. "os"
  13. "runtime"
  14. "sort"
  15. "strconv"
  16. "time"
  17. )
  18. type wsConnection struct {
  19. // The websocket connection.
  20. ws *websocket.Conn
  21. // Buffered channel of outbound messages.
  22. send chan string
  23. }
  24. type monitorHub struct {
  25. connections map[*wsConnection]bool
  26. broadcast chan string
  27. register chan *wsConnection
  28. unregister chan *wsConnection
  29. }
  30. var hub = monitorHub{
  31. broadcast: make(chan string),
  32. register: make(chan *wsConnection, 10),
  33. unregister: make(chan *wsConnection, 10),
  34. connections: make(map[*wsConnection]bool),
  35. }
  36. func (h *monitorHub) run() {
  37. for {
  38. select {
  39. case c := <-h.register:
  40. h.connections[c] = true
  41. log.Println("Queuing initial status")
  42. c.send <- initialStatus()
  43. case c := <-h.unregister:
  44. log.Println("Unregistering connection")
  45. delete(h.connections, c)
  46. case m := <-h.broadcast:
  47. for c := range h.connections {
  48. if len(c.send)+5 > cap(c.send) {
  49. log.Println("WS connection too close to cap")
  50. c.send <- `{"error": "too slow"}`
  51. close(c.send)
  52. go c.ws.Close()
  53. h.unregister <- c
  54. continue
  55. }
  56. select {
  57. case c.send <- m:
  58. default:
  59. close(c.send)
  60. delete(h.connections, c)
  61. log.Println("Closing channel when sending")
  62. go c.ws.Close()
  63. }
  64. }
  65. }
  66. }
  67. }
  68. func (c *wsConnection) reader() {
  69. for {
  70. var message string
  71. err := websocket.Message.Receive(c.ws, &message)
  72. if err != nil {
  73. if err == io.EOF {
  74. log.Println("WS connection closed")
  75. } else {
  76. log.Println("WS read error:", err)
  77. }
  78. break
  79. }
  80. log.Println("WS message", message)
  81. // TODO(ask) take configuration options etc
  82. //h.broadcast <- message
  83. }
  84. c.ws.Close()
  85. }
  86. func (c *wsConnection) writer() {
  87. for message := range c.send {
  88. err := websocket.Message.Send(c.ws, message)
  89. if err != nil {
  90. log.Println("WS write error:", err)
  91. break
  92. }
  93. }
  94. c.ws.Close()
  95. }
  96. func wsHandler(ws *websocket.Conn) {
  97. log.Println("Starting new WS connection")
  98. c := &wsConnection{send: make(chan string, 180), ws: ws}
  99. hub.register <- c
  100. defer func() {
  101. log.Println("sending unregister message")
  102. hub.unregister <- c
  103. }()
  104. go c.writer()
  105. c.reader()
  106. }
  107. func initialStatus() string {
  108. status := make(map[string]interface{})
  109. status["v"] = VERSION
  110. status["id"] = serverId
  111. status["ip"] = serverIP
  112. if len(serverGroups) > 0 {
  113. status["groups"] = serverGroups
  114. }
  115. hostname, err := os.Hostname()
  116. if err == nil {
  117. status["h"] = hostname
  118. }
  119. status["up"] = strconv.Itoa(int(time.Since(timeStarted).Seconds()))
  120. status["started"] = strconv.Itoa(int(timeStarted.Unix()))
  121. message, err := json.Marshal(status)
  122. return string(message)
  123. }
  124. func logStatus() {
  125. log.Println(initialStatus())
  126. // Does not impact performance too much
  127. lastQueryCount := qCounter.Count()
  128. for {
  129. current := qCounter.Count()
  130. newQueries := current - lastQueryCount
  131. lastQueryCount = current
  132. log.Println("goroutines", runtime.NumGoroutine(), "queries", newQueries)
  133. time.Sleep(60 * time.Second)
  134. }
  135. }
  136. func monitor(zones Zones) {
  137. go logStatus()
  138. if len(*flaghttp) == 0 {
  139. return
  140. }
  141. go hub.run()
  142. go httpHandler(zones)
  143. lastQueryCount := qCounter.Count()
  144. for {
  145. current := qCounter.Count()
  146. newQueries := current - lastQueryCount
  147. lastQueryCount = current
  148. status := map[string]string{}
  149. status["up"] = strconv.Itoa(int(time.Since(timeStarted).Seconds()))
  150. status["qs"] = strconv.FormatInt(qCounter.Count(), 10)
  151. status["qps"] = strconv.FormatInt(newQueries, 10)
  152. status["qps1"] = fmt.Sprintf("%.3f", qCounter.Rate1())
  153. message, err := json.Marshal(status)
  154. if err == nil {
  155. hub.broadcast <- string(message)
  156. }
  157. time.Sleep(1 * time.Second)
  158. }
  159. }
  160. func MainServer(w http.ResponseWriter, req *http.Request) {
  161. if req.RequestURI != "/version" {
  162. http.NotFound(w, req)
  163. return
  164. }
  165. io.WriteString(w, `<html><head><title>GeoDNS `+
  166. VERSION+`</title><body>`+
  167. initialStatus()+
  168. `</body></html>`)
  169. }
  170. type rate struct {
  171. Name string
  172. Count int64
  173. Metrics ZoneMetrics
  174. }
  175. type Rates []*rate
  176. func (s Rates) Len() int { return len(s) }
  177. func (s Rates) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  178. type RatesByCount struct{ Rates }
  179. func (s RatesByCount) Less(i, j int) bool {
  180. ic := s.Rates[i].Count
  181. jc := s.Rates[j].Count
  182. if ic == jc {
  183. return s.Rates[i].Name < s.Rates[j].Name
  184. }
  185. return ic > jc
  186. }
  187. func round(val float64, prec int) float64 {
  188. var rounder float64
  189. intermed := val * math.Pow(10, float64(prec))
  190. if val >= 0.5 {
  191. rounder = math.Ceil(intermed)
  192. } else {
  193. rounder = math.Floor(intermed)
  194. }
  195. return rounder / math.Pow(10, float64(prec))
  196. }
  197. type histogramData struct {
  198. Max int64
  199. Min int64
  200. Mean float64
  201. Pct90 float64
  202. Pct99 float64
  203. Pct999 float64
  204. StdDev float64
  205. }
  206. func setupHistogramData(met *metrics.StandardHistogram, dat *histogramData) {
  207. dat.Max = met.Max()
  208. dat.Min = met.Min()
  209. dat.Mean = met.Mean()
  210. dat.StdDev = met.StdDev()
  211. percentiles := met.Percentiles([]float64{0.90, 0.99, 0.999})
  212. dat.Pct90 = percentiles[0]
  213. dat.Pct99 = percentiles[1]
  214. dat.Pct999 = percentiles[2]
  215. }
  216. func StatusServer(zones Zones) func(http.ResponseWriter, *http.Request) {
  217. return func(w http.ResponseWriter, req *http.Request) {
  218. tmpl := template.New("status_html")
  219. tmpl, err := tmpl.Parse(string(status_html()))
  220. if err != nil {
  221. str := fmt.Sprintf("Could not parse template: %s", err)
  222. io.WriteString(w, str)
  223. return
  224. }
  225. tmpl.Funcs(map[string]interface{}{
  226. "round": round,
  227. })
  228. rates := make(Rates, 0)
  229. for name, zone := range zones {
  230. count := zone.Metrics.Queries.Count()
  231. rates = append(rates, &rate{Name: name, Count: count, Metrics: zone.Metrics})
  232. }
  233. sort.Sort(RatesByCount{rates})
  234. type statusData struct {
  235. Version string
  236. Zones Rates
  237. Uptime DayDuration
  238. Platform string
  239. Global struct {
  240. Queries *metrics.StandardMeter
  241. Histogram histogramData
  242. HistogramRecent histogramData
  243. }
  244. }
  245. uptime := DayDuration{time.Since(timeStarted)}
  246. status := statusData{
  247. Version: VERSION,
  248. Zones: rates,
  249. Uptime: uptime,
  250. Platform: runtime.GOARCH + "-" + runtime.GOOS,
  251. }
  252. status.Global.Queries = metrics.Get("queries").(*metrics.StandardMeter)
  253. setupHistogramData(metrics.Get("queries-histogram").(*metrics.StandardHistogram), &status.Global.Histogram)
  254. setupHistogramData(metrics.Get("queries-histogram-recent").(*metrics.StandardHistogram), &status.Global.HistogramRecent)
  255. err = tmpl.Execute(w, status)
  256. if err != nil {
  257. log.Println("Status template error", err)
  258. }
  259. }
  260. }
  261. func httpHandler(zones Zones) {
  262. http.Handle("/monitor", websocket.Handler(wsHandler))
  263. http.HandleFunc("/status", StatusServer(zones))
  264. http.HandleFunc("/", MainServer)
  265. log.Println("Starting HTTP interface on", *flaghttp)
  266. log.Fatal(http.ListenAndServe(*flaghttp, nil))
  267. }