monitor.go 7.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355
  1. package main
  2. //go:generate esc -o templates.go templates/
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "html/template"
  7. "io"
  8. "log"
  9. "net/http"
  10. "os"
  11. "runtime"
  12. "sort"
  13. "strconv"
  14. "time"
  15. "github.com/abh/geodns/Godeps/_workspace/src/github.com/rcrowley/go-metrics"
  16. "github.com/abh/geodns/Godeps/_workspace/src/golang.org/x/net/websocket"
  17. )
  18. // Initial status message on websocket
  19. type statusStreamMsgStart struct {
  20. Hostname string `json:"h,omitemty"`
  21. Version string `json:"v"`
  22. ID string `json:"id"`
  23. IP string `json:"ip"`
  24. Uptime int `json:"up"`
  25. Started int `json:"started"`
  26. Groups []string `json:"groups"`
  27. }
  28. // Update message on websocket
  29. type statusStreamMsgUpdate struct {
  30. Uptime int `json:"up"`
  31. QueryCount int64 `json:"qs"`
  32. Qps int64 `json:"qps"`
  33. Qps1m float64 `json:"qps1m,omitempty"`
  34. }
  35. type wsConnection struct {
  36. // The websocket connection.
  37. ws *websocket.Conn
  38. // Buffered channel of outbound messages.
  39. send chan string
  40. }
  41. type monitorHub struct {
  42. connections map[*wsConnection]bool
  43. broadcast chan string
  44. register chan *wsConnection
  45. unregister chan *wsConnection
  46. }
  47. var hub = monitorHub{
  48. broadcast: make(chan string),
  49. register: make(chan *wsConnection, 10),
  50. unregister: make(chan *wsConnection, 10),
  51. connections: make(map[*wsConnection]bool),
  52. }
  53. func (h *monitorHub) run() {
  54. for {
  55. select {
  56. case c := <-h.register:
  57. h.connections[c] = true
  58. log.Println("Queuing initial status")
  59. c.send <- initialStatus()
  60. case c := <-h.unregister:
  61. log.Println("Unregistering connection")
  62. delete(h.connections, c)
  63. case m := <-h.broadcast:
  64. for c := range h.connections {
  65. if len(c.send)+5 > cap(c.send) {
  66. log.Println("WS connection too close to cap")
  67. c.send <- `{"error": "too slow"}`
  68. close(c.send)
  69. go c.ws.Close()
  70. h.unregister <- c
  71. continue
  72. }
  73. select {
  74. case c.send <- m:
  75. default:
  76. close(c.send)
  77. delete(h.connections, c)
  78. log.Println("Closing channel when sending")
  79. go c.ws.Close()
  80. }
  81. }
  82. }
  83. }
  84. }
  85. func (c *wsConnection) reader() {
  86. for {
  87. var message string
  88. err := websocket.Message.Receive(c.ws, &message)
  89. if err != nil {
  90. if err == io.EOF {
  91. log.Println("WS connection closed")
  92. } else {
  93. log.Println("WS read error:", err)
  94. }
  95. break
  96. }
  97. log.Println("WS message", message)
  98. // TODO(ask) take configuration options etc
  99. //h.broadcast <- message
  100. }
  101. c.ws.Close()
  102. }
  103. func (c *wsConnection) writer() {
  104. for message := range c.send {
  105. err := websocket.Message.Send(c.ws, message)
  106. if err != nil {
  107. log.Println("WS write error:", err)
  108. break
  109. }
  110. }
  111. c.ws.Close()
  112. }
  113. func wsHandler(ws *websocket.Conn) {
  114. log.Println("Starting new WS connection")
  115. c := &wsConnection{send: make(chan string, 180), ws: ws}
  116. hub.register <- c
  117. defer func() {
  118. log.Println("sending unregister message")
  119. hub.unregister <- c
  120. }()
  121. go c.writer()
  122. c.reader()
  123. }
  124. func initialStatus() string {
  125. status := new(statusStreamMsgStart)
  126. status.Version = VERSION
  127. status.ID = serverID
  128. status.IP = serverIP
  129. if len(serverGroups) > 0 {
  130. status.Groups = serverGroups
  131. }
  132. hostname, err := os.Hostname()
  133. if err == nil {
  134. status.Hostname = hostname
  135. }
  136. status.Uptime = int(time.Since(timeStarted).Seconds())
  137. status.Started = int(timeStarted.Unix())
  138. message, err := json.Marshal(status)
  139. return string(message)
  140. }
  141. func logStatus() {
  142. log.Println(initialStatus())
  143. qCounter := metrics.Get("queries").(metrics.Meter)
  144. lastQueryCount := qCounter.Count()
  145. for {
  146. current := qCounter.Count()
  147. newQueries := current - lastQueryCount
  148. lastQueryCount = current
  149. log.Println("goroutines", runtime.NumGoroutine(), "queries", newQueries)
  150. time.Sleep(60 * time.Second)
  151. }
  152. }
  153. func monitor(zones Zones) {
  154. go logStatus()
  155. if len(*flaghttp) == 0 {
  156. return
  157. }
  158. go hub.run()
  159. go httpHandler(zones)
  160. qCounter := metrics.Get("queries").(metrics.Meter)
  161. lastQueryCount := qCounter.Count()
  162. status := new(statusStreamMsgUpdate)
  163. var lastQps1m float64
  164. for {
  165. current := qCounter.Count()
  166. newQueries := current - lastQueryCount
  167. lastQueryCount = current
  168. status.Uptime = int(time.Since(timeStarted).Seconds())
  169. status.QueryCount = qCounter.Count()
  170. status.Qps = newQueries
  171. newQps1m := qCounter.Rate1()
  172. if newQps1m != lastQps1m {
  173. status.Qps1m = newQps1m
  174. lastQps1m = newQps1m
  175. } else {
  176. status.Qps1m = 0
  177. }
  178. message, err := json.Marshal(status)
  179. if err == nil {
  180. hub.broadcast <- string(message)
  181. }
  182. time.Sleep(1 * time.Second)
  183. }
  184. }
  185. func MainServer(w http.ResponseWriter, req *http.Request) {
  186. if req.RequestURI != "/version" {
  187. http.NotFound(w, req)
  188. return
  189. }
  190. io.WriteString(w, `<html><head><title>GeoDNS `+
  191. VERSION+`</title><body>`+
  192. initialStatus()+
  193. `</body></html>`)
  194. }
  195. type rate struct {
  196. Name string
  197. Count int64
  198. Metrics ZoneMetrics
  199. }
  200. type Rates []*rate
  201. func (s Rates) Len() int { return len(s) }
  202. func (s Rates) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  203. type RatesByCount struct{ Rates }
  204. func (s RatesByCount) Less(i, j int) bool {
  205. ic := s.Rates[i].Count
  206. jc := s.Rates[j].Count
  207. if ic == jc {
  208. return s.Rates[i].Name < s.Rates[j].Name
  209. }
  210. return ic > jc
  211. }
  212. type histogramData struct {
  213. Max int64
  214. Min int64
  215. Mean float64
  216. Pct90 float64
  217. Pct99 float64
  218. Pct999 float64
  219. StdDev float64
  220. }
  221. func setupHistogramData(met *metrics.StandardHistogram, dat *histogramData) {
  222. dat.Max = met.Max()
  223. dat.Min = met.Min()
  224. dat.Mean = met.Mean()
  225. dat.StdDev = met.StdDev()
  226. percentiles := met.Percentiles([]float64{0.90, 0.99, 0.999})
  227. dat.Pct90 = percentiles[0]
  228. dat.Pct99 = percentiles[1]
  229. dat.Pct999 = percentiles[2]
  230. }
  231. func StatusServer(zones Zones) func(http.ResponseWriter, *http.Request) {
  232. return func(w http.ResponseWriter, req *http.Request) {
  233. req.ParseForm()
  234. topOption := 10
  235. topParam := req.Form["top"]
  236. if len(topParam) > 0 {
  237. var err error
  238. topOption, err = strconv.Atoi(topParam[0])
  239. if err != nil {
  240. topOption = 10
  241. }
  242. }
  243. statusTemplate, err := FSString(development, "/templates/status.html")
  244. if err != nil {
  245. log.Println("Could not read template:", err)
  246. w.WriteHeader(500)
  247. return
  248. }
  249. tmpl, err := template.New("status_html").Parse(statusTemplate)
  250. if err != nil {
  251. str := fmt.Sprintf("Could not parse template: %s", err)
  252. io.WriteString(w, str)
  253. return
  254. }
  255. rates := make(Rates, 0)
  256. for name, zone := range zones {
  257. count := zone.Metrics.Queries.Count()
  258. rates = append(rates, &rate{
  259. Name: name,
  260. Count: count,
  261. Metrics: zone.Metrics,
  262. })
  263. }
  264. sort.Sort(RatesByCount{rates})
  265. type statusData struct {
  266. Version string
  267. Zones Rates
  268. Uptime DayDuration
  269. Platform string
  270. Global struct {
  271. Queries *metrics.StandardMeter
  272. Histogram histogramData
  273. HistogramRecent histogramData
  274. }
  275. TopOption int
  276. }
  277. uptime := DayDuration{time.Since(timeStarted)}
  278. status := statusData{
  279. Version: VERSION,
  280. Zones: rates,
  281. Uptime: uptime,
  282. Platform: runtime.GOARCH + "-" + runtime.GOOS,
  283. TopOption: topOption,
  284. }
  285. status.Global.Queries = metrics.Get("queries").(*metrics.StandardMeter)
  286. setupHistogramData(metrics.Get("queries-histogram").(*metrics.StandardHistogram), &status.Global.Histogram)
  287. err = tmpl.Execute(w, status)
  288. if err != nil {
  289. log.Println("Status template error", err)
  290. }
  291. }
  292. }
  293. func httpHandler(zones Zones) {
  294. http.Handle("/monitor", websocket.Handler(wsHandler))
  295. http.HandleFunc("/status", StatusServer(zones))
  296. http.HandleFunc("/", MainServer)
  297. log.Println("Starting HTTP interface on", *flaghttp)
  298. log.Fatal(http.ListenAndServe(*flaghttp, nil))
  299. }