queue.go 1.6 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758
  1. package queue
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/enriquebris/goconcurrentqueue"
  7. "github.com/gorilla/websocket"
  8. "github.com/gravitl/netmaker/logger"
  9. "github.com/gravitl/netmaker/models"
  10. "github.com/gravitl/netmaker/servercfg"
  11. )
  12. // EventQueue - responsible for queueing and handling events sent to the server
  13. var EventQueue goconcurrentqueue.Queue
  14. // StartQueue - starts the queue and listens for messages
  15. func StartQueue(ctx context.Context) {
  16. initQueue()
  17. go func(ctx context.Context) {
  18. for {
  19. msg, err := EventQueue.DequeueOrWaitForNextElementContext(ctx)
  20. if err != nil { // handle dequeue error
  21. if errors.Is(err, context.Canceled) {
  22. logger.Log(0, "queue shutdown successfully")
  23. break
  24. }
  25. logger.Log(0, "error dequeuing event -", err.Error())
  26. continue
  27. }
  28. event := msg.(models.Event)
  29. switch event.Topic {
  30. case models.EventTopics.Test:
  31. conn, ok := ConnMap.Load(event.ID)
  32. if ok {
  33. conn.(*websocket.Conn).WriteMessage(websocket.TextMessage, []byte("success"))
  34. }
  35. default:
  36. logger.Log(0, fmt.Sprintf("received an unknown topic %d \n", event.Topic))
  37. }
  38. logger.Log(3, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
  39. }
  40. }(ctx)
  41. }
  42. // == private ==
  43. func initQueue() {
  44. size := servercfg.GetQueueSize()
  45. if size > 0 {
  46. logger.Log(0, "started queue with fixed allocation -", fmt.Sprintf("%d", size))
  47. EventQueue = goconcurrentqueue.NewFixedFIFO(size)
  48. } else {
  49. logger.Log(0, "started queue with dynamic allocation")
  50. EventQueue = goconcurrentqueue.NewFIFO()
  51. }
  52. }