123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899 |
- package monitor
- import (
- "io"
- "log"
- "golang.org/x/net/websocket"
- )
- type monitorHub struct {
- connections map[*wsConnection]bool
- broadcast chan string
- register chan *wsConnection
- unregister chan *wsConnection
- }
- var hub = monitorHub{
- broadcast: make(chan string),
- register: make(chan *wsConnection, 10),
- unregister: make(chan *wsConnection, 10),
- connections: make(map[*wsConnection]bool),
- }
- type initialStatusFn func() string
- func (h *monitorHub) run(statusFn initialStatusFn) {
- for {
- select {
- case c := <-h.register:
- h.connections[c] = true
- log.Println("Queuing initial status")
- c.send <- statusFn()
- case c := <-h.unregister:
- log.Println("Unregistering connection")
- delete(h.connections, c)
- case m := <-h.broadcast:
- for c := range h.connections {
- if len(c.send)+5 > cap(c.send) {
- log.Println("WS connection too close to cap")
- c.send <- `{"error": "too slow"}`
- close(c.send)
- go c.ws.Close()
- h.unregister <- c
- continue
- }
- select {
- case c.send <- m:
- default:
- close(c.send)
- delete(h.connections, c)
- log.Println("Closing channel when sending")
- go c.ws.Close()
- }
- }
- }
- }
- }
- func (c *wsConnection) reader() {
- for {
- var message string
- err := websocket.Message.Receive(c.ws, &message)
- if err != nil {
- if err == io.EOF {
- log.Println("WS connection closed")
- } else {
- log.Println("WS read error:", err)
- }
- break
- }
- log.Println("WS message", message)
- // TODO(ask) take configuration options etc
- //h.broadcast <- message
- }
- c.ws.Close()
- }
- func (c *wsConnection) writer() {
- for message := range c.send {
- err := websocket.Message.Send(c.ws, message)
- if err != nil {
- log.Println("WS write error:", err)
- break
- }
- }
- c.ws.Close()
- }
- func wsHandler(ws *websocket.Conn) {
- log.Println("Starting new WS connection")
- c := &wsConnection{send: make(chan string, 180), ws: ws}
- hub.register <- c
- defer func() {
- log.Println("sending unregister message")
- hub.unregister <- c
- }()
- go c.writer()
- c.reader()
- }
|