monitor.go 9.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. package main
  2. //go:generate esc -o templates.go templates/
  3. import (
  4. "encoding/json"
  5. "fmt"
  6. "html/template"
  7. "io"
  8. "log"
  9. "net/http"
  10. "os"
  11. "runtime"
  12. "sort"
  13. "strconv"
  14. "time"
  15. "github.com/abh/geodns/zones"
  16. "github.com/rcrowley/go-metrics"
  17. "golang.org/x/net/websocket"
  18. )
  19. // Initial status message on websocket
  20. type statusStreamMsgStart struct {
  21. Hostname string `json:"h,omitemty"`
  22. Version string `json:"v"`
  23. GoVersion string `json:"gov"`
  24. ID string `json:"id"`
  25. IP string `json:"ip"`
  26. UUID string `json:"uuid"`
  27. Uptime int `json:"up"`
  28. Started int `json:"started"`
  29. Groups []string `json:"groups"`
  30. }
  31. // Update message on websocket
  32. type statusStreamMsgUpdate struct {
  33. Uptime int `json:"up"`
  34. QueryCount int64 `json:"qs"`
  35. Qps int64 `json:"qps"`
  36. Qps1m float64 `json:"qps1m,omitempty"`
  37. }
  38. type wsConnection struct {
  39. // The websocket connection.
  40. ws *websocket.Conn
  41. // Buffered channel of outbound messages.
  42. send chan string
  43. }
  44. type monitorHub struct {
  45. connections map[*wsConnection]bool
  46. broadcast chan string
  47. register chan *wsConnection
  48. unregister chan *wsConnection
  49. }
  50. var hub = monitorHub{
  51. broadcast: make(chan string),
  52. register: make(chan *wsConnection, 10),
  53. unregister: make(chan *wsConnection, 10),
  54. connections: make(map[*wsConnection]bool),
  55. }
  56. func (h *monitorHub) run() {
  57. for {
  58. select {
  59. case c := <-h.register:
  60. h.connections[c] = true
  61. log.Println("Queuing initial status")
  62. c.send <- initialStatus()
  63. case c := <-h.unregister:
  64. log.Println("Unregistering connection")
  65. delete(h.connections, c)
  66. case m := <-h.broadcast:
  67. for c := range h.connections {
  68. if len(c.send)+5 > cap(c.send) {
  69. log.Println("WS connection too close to cap")
  70. c.send <- `{"error": "too slow"}`
  71. close(c.send)
  72. go c.ws.Close()
  73. h.unregister <- c
  74. continue
  75. }
  76. select {
  77. case c.send <- m:
  78. default:
  79. close(c.send)
  80. delete(h.connections, c)
  81. log.Println("Closing channel when sending")
  82. go c.ws.Close()
  83. }
  84. }
  85. }
  86. }
  87. }
  88. func (c *wsConnection) reader() {
  89. for {
  90. var message string
  91. err := websocket.Message.Receive(c.ws, &message)
  92. if err != nil {
  93. if err == io.EOF {
  94. log.Println("WS connection closed")
  95. } else {
  96. log.Println("WS read error:", err)
  97. }
  98. break
  99. }
  100. log.Println("WS message", message)
  101. // TODO(ask) take configuration options etc
  102. //h.broadcast <- message
  103. }
  104. c.ws.Close()
  105. }
  106. func (c *wsConnection) writer() {
  107. for message := range c.send {
  108. err := websocket.Message.Send(c.ws, message)
  109. if err != nil {
  110. log.Println("WS write error:", err)
  111. break
  112. }
  113. }
  114. c.ws.Close()
  115. }
  116. func wsHandler(ws *websocket.Conn) {
  117. log.Println("Starting new WS connection")
  118. c := &wsConnection{send: make(chan string, 180), ws: ws}
  119. hub.register <- c
  120. defer func() {
  121. log.Println("sending unregister message")
  122. hub.unregister <- c
  123. }()
  124. go c.writer()
  125. c.reader()
  126. }
  127. func initialStatus() string {
  128. status := new(statusStreamMsgStart)
  129. status.Version = VERSION
  130. status.ID = serverID
  131. status.IP = serverIP
  132. status.UUID = serverUUID
  133. status.GoVersion = runtime.Version()
  134. if len(serverGroups) > 0 {
  135. status.Groups = serverGroups
  136. }
  137. hostname, err := os.Hostname()
  138. if err == nil {
  139. status.Hostname = hostname
  140. }
  141. status.Uptime = int(time.Since(timeStarted).Seconds())
  142. status.Started = int(timeStarted.Unix())
  143. message, err := json.Marshal(status)
  144. return string(message)
  145. }
  146. func monitor(zones zones.Zones) {
  147. if len(*flaghttp) == 0 {
  148. return
  149. }
  150. go hub.run()
  151. go httpHandler(zones)
  152. qCounter := metrics.Get("queries").(metrics.Meter)
  153. lastQueryCount := qCounter.Count()
  154. status := new(statusStreamMsgUpdate)
  155. var lastQps1m float64
  156. for {
  157. current := qCounter.Count()
  158. newQueries := current - lastQueryCount
  159. lastQueryCount = current
  160. status.Uptime = int(time.Since(timeStarted).Seconds())
  161. status.QueryCount = qCounter.Count()
  162. status.Qps = newQueries
  163. newQps1m := qCounter.Rate1()
  164. if newQps1m != lastQps1m {
  165. status.Qps1m = newQps1m
  166. lastQps1m = newQps1m
  167. } else {
  168. status.Qps1m = 0
  169. }
  170. message, err := json.Marshal(status)
  171. if err == nil {
  172. hub.broadcast <- string(message)
  173. }
  174. time.Sleep(1 * time.Second)
  175. }
  176. }
  177. func MainServer(w http.ResponseWriter, req *http.Request) {
  178. if req.RequestURI != "/version" {
  179. http.NotFound(w, req)
  180. return
  181. }
  182. io.WriteString(w, `<html><head><title>GeoDNS `+
  183. VERSION+`</title><body>`+
  184. initialStatus()+
  185. `</body></html>`)
  186. }
  187. type rate struct {
  188. Name string
  189. Count int64
  190. Metrics zones.ZoneMetrics
  191. }
  192. type Rates []*rate
  193. func (s Rates) Len() int { return len(s) }
  194. func (s Rates) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
  195. type RatesByCount struct{ Rates }
  196. func (s RatesByCount) Less(i, j int) bool {
  197. ic := s.Rates[i].Count
  198. jc := s.Rates[j].Count
  199. if ic == jc {
  200. return s.Rates[i].Name < s.Rates[j].Name
  201. }
  202. return ic > jc
  203. }
  204. type histogramData struct {
  205. Max int64
  206. Min int64
  207. Mean float64
  208. Pct90 float64
  209. Pct99 float64
  210. Pct999 float64
  211. StdDev float64
  212. }
  213. func setupHistogramData(met metrics.Histogram, dat *histogramData) {
  214. dat.Max = met.Max()
  215. dat.Min = met.Min()
  216. dat.Mean = met.Mean()
  217. dat.StdDev = met.StdDev()
  218. percentiles := met.Percentiles([]float64{0.90, 0.99, 0.999})
  219. dat.Pct90 = percentiles[0]
  220. dat.Pct99 = percentiles[1]
  221. dat.Pct999 = percentiles[2]
  222. }
  223. func topParam(req *http.Request, def int) int {
  224. req.ParseForm()
  225. topOption := def
  226. topParam := req.Form["top"]
  227. if len(topParam) > 0 {
  228. var err error
  229. topOption, err = strconv.Atoi(topParam[0])
  230. if err != nil {
  231. topOption = def
  232. }
  233. }
  234. return topOption
  235. }
  236. func StatusJSONHandler(zones zones.Zones) func(http.ResponseWriter, *http.Request) {
  237. return func(w http.ResponseWriter, req *http.Request) {
  238. zonemetrics := make(map[string]metrics.Registry)
  239. for name, zone := range zones {
  240. zone.Lock()
  241. zonemetrics[name] = zone.Metrics.Registry
  242. zone.Unlock()
  243. }
  244. type statusData struct {
  245. Version string
  246. GoVersion string
  247. Uptime int64
  248. Platform string
  249. Zones map[string]metrics.Registry
  250. Global metrics.Registry
  251. ID string
  252. IP string
  253. UUID string
  254. Groups []string
  255. }
  256. uptime := int64(time.Since(timeStarted).Seconds())
  257. status := statusData{
  258. Version: VERSION,
  259. GoVersion: runtime.Version(),
  260. Uptime: uptime,
  261. Platform: runtime.GOARCH + "-" + runtime.GOOS,
  262. Zones: zonemetrics,
  263. Global: metrics.DefaultRegistry,
  264. ID: serverID,
  265. IP: serverIP,
  266. UUID: serverUUID,
  267. Groups: serverGroups,
  268. }
  269. b, err := json.Marshal(status)
  270. if err != nil {
  271. http.Error(w, "Error encoding JSON", 500)
  272. return
  273. }
  274. w.Header().Set("Content-Type", "application/json")
  275. w.Write(b)
  276. return
  277. }
  278. }
  279. func StatusHandler(zones zones.Zones) func(http.ResponseWriter, *http.Request) {
  280. return func(w http.ResponseWriter, req *http.Request) {
  281. topOption := topParam(req, 10)
  282. rates := make(Rates, 0)
  283. for name, zone := range zones {
  284. count := zone.Metrics.Queries.Count()
  285. rates = append(rates, &rate{
  286. Name: name,
  287. Count: count,
  288. Metrics: zone.Metrics,
  289. })
  290. }
  291. sort.Sort(RatesByCount{rates})
  292. type statusData struct {
  293. Version string
  294. Zones Rates
  295. Uptime DayDuration
  296. Platform string
  297. Global struct {
  298. Queries metrics.Meter
  299. Histogram histogramData
  300. HistogramRecent histogramData
  301. }
  302. TopOption int
  303. }
  304. uptime := DayDuration{time.Since(timeStarted)}
  305. status := statusData{
  306. Version: VERSION,
  307. Zones: rates,
  308. Uptime: uptime,
  309. Platform: runtime.GOARCH + "-" + runtime.GOOS,
  310. TopOption: topOption,
  311. }
  312. status.Global.Queries = metrics.Get("queries").(*metrics.StandardMeter).Snapshot()
  313. setupHistogramData(metrics.Get("queries-histogram").(*metrics.StandardHistogram).Snapshot(), &status.Global.Histogram)
  314. statusTemplate, err := FSString(development, "/templates/status.html")
  315. if err != nil {
  316. log.Println("Could not read template:", err)
  317. w.WriteHeader(500)
  318. return
  319. }
  320. tmpl, err := template.New("status_html").Parse(statusTemplate)
  321. if err != nil {
  322. str := fmt.Sprintf("Could not parse template: %s", err)
  323. io.WriteString(w, str)
  324. return
  325. }
  326. err = tmpl.Execute(w, status)
  327. if err != nil {
  328. log.Println("Status template error", err)
  329. }
  330. }
  331. }
  332. type basicauth struct {
  333. h http.Handler
  334. }
  335. func (b *basicauth) ServeHTTP(w http.ResponseWriter, r *http.Request) {
  336. // don't request passwords for the websocket interface (for now)
  337. // because 'wscat' doesn't support that.
  338. if r.RequestURI == "/monitor" {
  339. b.h.ServeHTTP(w, r)
  340. return
  341. }
  342. cfgMutex.RLock()
  343. user := Config.HTTP.User
  344. password := Config.HTTP.Password
  345. cfgMutex.RUnlock()
  346. if len(user) == 0 {
  347. b.h.ServeHTTP(w, r)
  348. return
  349. }
  350. ruser, rpass, ok := r.BasicAuth()
  351. if ok {
  352. if ruser == user && rpass == password {
  353. b.h.ServeHTTP(w, r)
  354. return
  355. }
  356. }
  357. w.Header().Set("WWW-Authenticate", fmt.Sprintf(`Basic realm=%q`, "GeoDNS Status"))
  358. http.Error(w, http.StatusText(http.StatusUnauthorized), http.StatusUnauthorized)
  359. return
  360. }
  361. func httpHandler(zones zones.Zones) {
  362. http.Handle("/monitor", websocket.Handler(wsHandler))
  363. http.HandleFunc("/status", StatusHandler(zones))
  364. http.HandleFunc("/status.json", StatusJSONHandler(zones))
  365. http.HandleFunc("/", MainServer)
  366. log.Println("Starting HTTP interface on", *flaghttp)
  367. log.Fatal(http.ListenAndServe(*flaghttp, &basicauth{h: http.DefaultServeMux}))
  368. }