package main import ( "encoding/json" "fmt" "html/template" "io" "log" "net/http" "os" "runtime" "sort" "strconv" "time" "code.google.com/p/go.net/websocket" "github.com/rcrowley/go-metrics" ) // Initial status message on websocket type statusStreamMsgStart struct { Hostname string `json:"h,omitemty"` Version string `json:"v"` ID string `json:"id"` IP string `json:"ip"` Uptime int `json:"up"` Started int `json:"started"` Groups []string `json:"groups"` } // Update message on websocket type statusStreamMsgUpdate struct { Uptime int `json:"up"` QueryCount int64 `json:"qs"` Qps int64 `json:"qps"` Qps1m float64 `json:"qps1m,omitempty"` } type wsConnection struct { // The websocket connection. ws *websocket.Conn // Buffered channel of outbound messages. send chan string } type monitorHub struct { connections map[*wsConnection]bool broadcast chan string register chan *wsConnection unregister chan *wsConnection } var hub = monitorHub{ broadcast: make(chan string), register: make(chan *wsConnection, 10), unregister: make(chan *wsConnection, 10), connections: make(map[*wsConnection]bool), } func (h *monitorHub) run() { for { select { case c := <-h.register: h.connections[c] = true log.Println("Queuing initial status") c.send <- initialStatus() case c := <-h.unregister: log.Println("Unregistering connection") delete(h.connections, c) case m := <-h.broadcast: for c := range h.connections { if len(c.send)+5 > cap(c.send) { log.Println("WS connection too close to cap") c.send <- `{"error": "too slow"}` close(c.send) go c.ws.Close() h.unregister <- c continue } select { case c.send <- m: default: close(c.send) delete(h.connections, c) log.Println("Closing channel when sending") go c.ws.Close() } } } } } func (c *wsConnection) reader() { for { var message string err := websocket.Message.Receive(c.ws, &message) if err != nil { if err == io.EOF { log.Println("WS connection closed") } else { log.Println("WS read error:", err) } break } log.Println("WS message", message) // TODO(ask) take configuration options etc //h.broadcast <- message } c.ws.Close() } func (c *wsConnection) writer() { for message := range c.send { err := websocket.Message.Send(c.ws, message) if err != nil { log.Println("WS write error:", err) break } } c.ws.Close() } func wsHandler(ws *websocket.Conn) { log.Println("Starting new WS connection") c := &wsConnection{send: make(chan string, 180), ws: ws} hub.register <- c defer func() { log.Println("sending unregister message") hub.unregister <- c }() go c.writer() c.reader() } func initialStatus() string { status := new(statusStreamMsgStart) status.Version = VERSION status.ID = serverID status.IP = serverIP if len(serverGroups) > 0 { status.Groups = serverGroups } hostname, err := os.Hostname() if err == nil { status.Hostname = hostname } status.Uptime = int(time.Since(timeStarted).Seconds()) status.Started = int(timeStarted.Unix()) message, err := json.Marshal(status) return string(message) } func logStatus() { log.Println(initialStatus()) qCounter := metrics.Get("queries").(metrics.Meter) lastQueryCount := qCounter.Count() for { current := qCounter.Count() newQueries := current - lastQueryCount lastQueryCount = current log.Println("goroutines", runtime.NumGoroutine(), "queries", newQueries) time.Sleep(60 * time.Second) } } func monitor(zones Zones) { go logStatus() if len(*flaghttp) == 0 { return } go hub.run() go httpHandler(zones) qCounter := metrics.Get("queries").(metrics.Meter) lastQueryCount := qCounter.Count() status := new(statusStreamMsgUpdate) var lastQps1m float64 for { current := qCounter.Count() newQueries := current - lastQueryCount lastQueryCount = current status.Uptime = int(time.Since(timeStarted).Seconds()) status.QueryCount = qCounter.Count() status.Qps = newQueries newQps1m := qCounter.Rate1() if newQps1m != lastQps1m { status.Qps1m = newQps1m lastQps1m = newQps1m } else { status.Qps1m = 0 } message, err := json.Marshal(status) if err == nil { hub.broadcast <- string(message) } time.Sleep(1 * time.Second) } } func MainServer(w http.ResponseWriter, req *http.Request) { if req.RequestURI != "/version" { http.NotFound(w, req) return } io.WriteString(w, `