hub.go 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. package monitor
  2. import (
  3. "io"
  4. "log"
  5. "golang.org/x/net/websocket"
  6. )
  7. type monitorHub struct {
  8. connections map[*wsConnection]bool
  9. broadcast chan string
  10. register chan *wsConnection
  11. unregister chan *wsConnection
  12. }
  13. var hub = monitorHub{
  14. broadcast: make(chan string),
  15. register: make(chan *wsConnection, 10),
  16. unregister: make(chan *wsConnection, 10),
  17. connections: make(map[*wsConnection]bool),
  18. }
  19. type initialStatusFn func() string
  20. func (h *monitorHub) run(statusFn initialStatusFn) {
  21. for {
  22. select {
  23. case c := <-h.register:
  24. h.connections[c] = true
  25. log.Println("Queuing initial status")
  26. c.send <- statusFn()
  27. case c := <-h.unregister:
  28. log.Println("Unregistering connection")
  29. delete(h.connections, c)
  30. case m := <-h.broadcast:
  31. for c := range h.connections {
  32. if len(c.send)+5 > cap(c.send) {
  33. log.Println("WS connection too close to cap")
  34. c.send <- `{"error": "too slow"}`
  35. close(c.send)
  36. go c.ws.Close()
  37. h.unregister <- c
  38. continue
  39. }
  40. select {
  41. case c.send <- m:
  42. default:
  43. close(c.send)
  44. delete(h.connections, c)
  45. log.Println("Closing channel when sending")
  46. go c.ws.Close()
  47. }
  48. }
  49. }
  50. }
  51. }
  52. func (c *wsConnection) reader() {
  53. for {
  54. var message string
  55. err := websocket.Message.Receive(c.ws, &message)
  56. if err != nil {
  57. if err == io.EOF {
  58. log.Println("WS connection closed")
  59. } else {
  60. log.Println("WS read error:", err)
  61. }
  62. break
  63. }
  64. log.Println("WS message", message)
  65. // TODO(ask) take configuration options etc
  66. //h.broadcast <- message
  67. }
  68. c.ws.Close()
  69. }
  70. func (c *wsConnection) writer() {
  71. for message := range c.send {
  72. err := websocket.Message.Send(c.ws, message)
  73. if err != nil {
  74. log.Println("WS write error:", err)
  75. break
  76. }
  77. }
  78. c.ws.Close()
  79. }
  80. func wsHandler(ws *websocket.Conn) {
  81. log.Println("Starting new WS connection")
  82. c := &wsConnection{send: make(chan string, 180), ws: ws}
  83. hub.register <- c
  84. defer func() {
  85. log.Println("sending unregister message")
  86. hub.unregister <- c
  87. }()
  88. go c.writer()
  89. c.reader()
  90. }