monitor.go 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353
  1. package main
  2. import (
  3. "encoding/json"
  4. "fmt"
  5. "html/template"
  6. "io"
  7. "log"
  8. "net/http"
  9. "os"
  10. "runtime"
  11. "sort"
  12. "strconv"
  13. "time"
  14. "code.google.com/p/go.net/websocket"
  15. "github.com/rcrowley/go-metrics"
  16. )
  17. // Initial status message on websocket
  18. type statusStreamMsgStart struct {
  19. Hostname string `json:"h,omitemty"`
  20. Version string `json:"v"`
  21. ID string `json:"id"`
  22. IP string `json:"ip"`
  23. Uptime int `json:"up"`
  24. Started int `json:"started"`
  25. Groups []string `json:"groups"`
  26. }
  27. // Update message on websocket
  28. type statusStreamMsgUpdate struct {
  29. Uptime int `json:"up"`
  30. QueryCount int64 `json:"qs"`
  31. Qps int64 `json:"qps"`
  32. Qps1m float64 `json:"qps1m,omitempty"`
  33. }
  34. type wsConnection struct {
  35. // The websocket connection.
  36. ws *websocket.Conn
  37. // Buffered channel of outbound messages.
  38. send chan string
  39. }
  40. type monitorHub struct {
  41. connections map[*wsConnection]bool
  42. broadcast chan string
  43. register chan *wsConnection
  44. unregister chan *wsConnection
  45. }
  46. var hub = monitorHub{
  47. broadcast: make(chan string),
  48. register: make(chan *wsConnection, 10),
  49. unregister: make(chan *wsConnection, 10),
  50. connections: make(map[*wsConnection]bool),
  51. }
  52. func (h *monitorHub) run() {
  53. for {
  54. select {
  55. case c := <-h.register:
  56. h.connections[c] = true
  57. log.Println("Queuing initial status")
  58. c.send <- initialStatus()
  59. case c := <-h.unregister:
  60. log.Println("Unregistering connection")
  61. delete(h.connections, c)
  62. case m := <-h.broadcast:
  63. for c := range h.connections {
  64. if len(c.send)+5 > cap(c.send) {
  65. log.Println("WS connection too close to cap")
  66. c.send <- `{"error": "too slow"}`
  67. close(c.send)
  68. go c.ws.Close()
  69. h.unregister <- c
  70. continue
  71. }
  72. select {
  73. case c.send <- m:
  74. default:
  75. close(c.send)
  76. delete(h.connections, c)
  77. log.Println("Closing channel when sending")
  78. go c.ws.Close()
  79. }
  80. }
  81. }
  82. }
  83. }
  84. func (c *wsConnection) reader() {
  85. for {
  86. var message string
  87. err := websocket.Message.Receive(c.ws, &message)
  88. if err != nil {
  89. if err == io.EOF {
  90. log.Println("WS connection closed")
  91. } else {
  92. log.Println("WS read error:", err)
  93. }
  94. break
  95. }
  96. log.Println("WS message", message)
  97. // TODO(ask) take configuration options etc
  98. //h.broadcast <- message
  99. }
  100. c.ws.Close()
  101. }
  102. func (c *wsConnection) writer() {
  103. for message := range c.send {
  104. err := websocket.Message.Send(c.ws, message)
  105. if err != nil {
  106. log.Println("WS write error:", err)
  107. break
  108. }
  109. }
  110. c.ws.Close()
  111. }
  112. func wsHandler(ws *websocket.Conn) {
  113. log.Println("Starting new WS connection")
  114. c := &wsConnection{send: make(chan string, 180), ws: ws}
  115. hub.register <- c
  116. defer func() {
  117. log.Println("sending unregister message")
  118. hub.unregister <- c
  119. }()
  120. go c.writer()
  121. c.reader()
  122. }
  123. func initialStatus() string {
  124. status := new(statusStreamMsgStart)
  125. status.Version = VERSION
  126. status.ID = serverID
  127. status.IP = serverIP
  128. if len(serverGroups) > 0 {
  129. status.Groups = serverGroups
  130. }
  131. hostname, err := os.Hostname()
  132. if err == nil {
  133. status.Hostname = hostname
  134. }
  135. status.Uptime = int(time.Since(timeStarted).Seconds())
  136. status.Started = int(timeStarted.Unix())
  137. message, err := json.Marshal(status)
  138. return string(message)
  139. }
  140. func logStatus() {
  141. log.Println(initialStatus())
  142. qCounter := metrics.Get("queries").(metrics.Meter)
  143. lastQueryCount := qCounter.Count()
  144. for {
  145. current := qCounter.Count()
  146. newQueries := current - lastQueryCount
  147. lastQueryCount = current
  148. log.Println("goroutines", runtime.NumGoroutine(), "queries", newQueries)
  149. time.Sleep(60 * time.Second)
  150. }
  151. }
  152. func monitor(zones Zones) {
  153. go logStatus()
  154. if len(*flaghttp) == 0 {
  155. return
  156. }
  157. go hub.run()
  158. go httpHandler(zones)
  159. qCounter := metrics.Get("queries").(metrics.Meter)
  160. lastQueryCount := qCounter.Count()
  161. status := new(statusStreamMsgUpdate)
  162. var lastQps1m float64
  163. for {
  164. current := qCounter.Count()
  165. newQueries := current - lastQueryCount
  166. lastQueryCount = current
  167. status.Uptime = int(time.Since(timeStarted).Seconds())
  168. status.QueryCount = qCounter.Count()
  169. status.Qps = newQueries
  170. newQps1m := qCounter.Rate1()
  171. if newQps1m != lastQps1m {
  172. status.Qps1m = newQps1m
  173. lastQps1m = newQps1m
  174. } else {
  175. status.Qps1m = 0
  176. }
  177. message, err := json.Marshal(status)
  178. if err == nil {
  179. hub.broadcast <- string(message)
  180. }
  181. time.Sleep(1 * time.Second)
  182. }
  183. }
  184. func MainServer(w http.ResponseWriter, req *http.Request) {
  185. if req.RequestURI != "/version" {
  186. http.NotFound(w, req)
  187. return
  188. }
  189. io.WriteString(w, `<html><head><title>GeoDNS `+
  190. VERSION+`</title><body>`+
  191. initialStatus()+
  192. `</body></html>`)
  193. }
  194. type rate struct {
  195. Name string
  196. Count int64
  197. Metrics ZoneMetrics
  198. }
  199. type Rates []*rate
  200. func (s Rates) Len() int { return len(s) }
  201. func (s Rates) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  202. type RatesByCount struct{ Rates }
  203. func (s RatesByCount) Less(i, j int) bool {
  204. ic := s.Rates[i].Count
  205. jc := s.Rates[j].Count
  206. if ic == jc {
  207. return s.Rates[i].Name < s.Rates[j].Name
  208. }
  209. return ic > jc
  210. }
  211. type histogramData struct {
  212. Max int64
  213. Min int64
  214. Mean float64
  215. Pct90 float64
  216. Pct99 float64
  217. Pct999 float64
  218. StdDev float64
  219. }
  220. func setupHistogramData(met *metrics.StandardHistogram, dat *histogramData) {
  221. dat.Max = met.Max()
  222. dat.Min = met.Min()
  223. dat.Mean = met.Mean()
  224. dat.StdDev = met.StdDev()
  225. percentiles := met.Percentiles([]float64{0.90, 0.99, 0.999})
  226. dat.Pct90 = percentiles[0]
  227. dat.Pct99 = percentiles[1]
  228. dat.Pct999 = percentiles[2]
  229. }
  230. func StatusServer(zones Zones) func(http.ResponseWriter, *http.Request) {
  231. return func(w http.ResponseWriter, req *http.Request) {
  232. req.ParseForm()
  233. topOption := 10
  234. topParam := req.Form["top"]
  235. if len(topParam) > 0 {
  236. var err error
  237. topOption, err = strconv.Atoi(topParam[0])
  238. if err != nil {
  239. topOption = 10
  240. }
  241. }
  242. statusTemplate, err := templates_status_html()
  243. if err != nil {
  244. log.Println("Could not read template", err)
  245. w.WriteHeader(500)
  246. return
  247. }
  248. tmpl, err := template.New("status_html").Parse(string(statusTemplate))
  249. if err != nil {
  250. str := fmt.Sprintf("Could not parse template: %s", err)
  251. io.WriteString(w, str)
  252. return
  253. }
  254. rates := make(Rates, 0)
  255. for name, zone := range zones {
  256. count := zone.Metrics.Queries.Count()
  257. rates = append(rates, &rate{
  258. Name: name,
  259. Count: count,
  260. Metrics: zone.Metrics,
  261. })
  262. }
  263. sort.Sort(RatesByCount{rates})
  264. type statusData struct {
  265. Version string
  266. Zones Rates
  267. Uptime DayDuration
  268. Platform string
  269. Global struct {
  270. Queries *metrics.StandardMeter
  271. Histogram histogramData
  272. HistogramRecent histogramData
  273. }
  274. TopOption int
  275. }
  276. uptime := DayDuration{time.Since(timeStarted)}
  277. status := statusData{
  278. Version: VERSION,
  279. Zones: rates,
  280. Uptime: uptime,
  281. Platform: runtime.GOARCH + "-" + runtime.GOOS,
  282. TopOption: topOption,
  283. }
  284. status.Global.Queries = metrics.Get("queries").(*metrics.StandardMeter)
  285. setupHistogramData(metrics.Get("queries-histogram").(*metrics.StandardHistogram), &status.Global.Histogram)
  286. err = tmpl.Execute(w, status)
  287. if err != nil {
  288. log.Println("Status template error", err)
  289. }
  290. }
  291. }
  292. func httpHandler(zones Zones) {
  293. http.Handle("/monitor", websocket.Handler(wsHandler))
  294. http.HandleFunc("/status", StatusServer(zones))
  295. http.HandleFunc("/", MainServer)
  296. log.Println("Starting HTTP interface on", *flaghttp)
  297. log.Fatal(http.ListenAndServe(*flaghttp, nil))
  298. }