package monitor import ( "io" "log" "golang.org/x/net/websocket" ) type monitorHub struct { connections map[*wsConnection]bool broadcast chan string register chan *wsConnection unregister chan *wsConnection } var hub = monitorHub{ broadcast: make(chan string), register: make(chan *wsConnection, 10), unregister: make(chan *wsConnection, 10), connections: make(map[*wsConnection]bool), } type initialStatusFn func() string func (h *monitorHub) run(statusFn initialStatusFn) { for { select { case c := <-h.register: h.connections[c] = true log.Println("Queuing initial status") c.send <- statusFn() case c := <-h.unregister: log.Println("Unregistering connection") delete(h.connections, c) case m := <-h.broadcast: for c := range h.connections { if len(c.send)+5 > cap(c.send) { log.Println("WS connection too close to cap") c.send <- `{"error": "too slow"}` close(c.send) go c.ws.Close() h.unregister <- c continue } select { case c.send <- m: default: close(c.send) delete(h.connections, c) log.Println("Closing channel when sending") go c.ws.Close() } } } } } func (c *wsConnection) reader() { for { var message string err := websocket.Message.Receive(c.ws, &message) if err != nil { if err == io.EOF { log.Println("WS connection closed") } else { log.Println("WS read error:", err) } break } log.Println("WS message", message) // TODO(ask) take configuration options etc //h.broadcast <- message } c.ws.Close() } func (c *wsConnection) writer() { for message := range c.send { err := websocket.Message.Send(c.ws, message) if err != nil { log.Println("WS write error:", err) break } } c.ws.Close() } func wsHandler(ws *websocket.Conn) { log.Println("Starting new WS connection") c := &wsConnection{send: make(chan string, 180), ws: ws} hub.register <- c defer func() { log.Println("sending unregister message") hub.unregister <- c }() go c.writer() c.reader() }