queue.go 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. package queue
  2. import (
  3. "context"
  4. "errors"
  5. "fmt"
  6. "github.com/enriquebris/goconcurrentqueue"
  7. "github.com/gravitl/netmaker/logger"
  8. "github.com/gravitl/netmaker/models"
  9. "github.com/gravitl/netmaker/servercfg"
  10. )
  11. // EventQueue - responsible for queueing and handling events sent to the server
  12. var EventQueue goconcurrentqueue.Queue
  13. // StartQueue - starts the queue and listens for messages
  14. func StartQueue(ctx context.Context) {
  15. initQueue()
  16. go func(ctx context.Context) {
  17. for {
  18. msg, err := EventQueue.DequeueOrWaitForNextElementContext(ctx)
  19. if err != nil { // handle dequeue error
  20. if errors.Is(err, context.Canceled) {
  21. logger.Log(0, "queue shutdown successfully")
  22. break
  23. }
  24. logger.Log(0, "error dequeuing event -", err.Error())
  25. continue
  26. }
  27. event := msg.(models.Event)
  28. if _, ok := handlerFuncs[event.Topic]; ok {
  29. handlerFuncs[event.Topic](&event)
  30. } else {
  31. logger.Log(0, fmt.Sprintf("received an unknown topic %d \n", event.Topic))
  32. }
  33. logger.Log(3, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
  34. }
  35. }(ctx)
  36. }
  37. // == private ==
  38. func initQueue() {
  39. size := servercfg.GetQueueSize()
  40. if size > 0 {
  41. logger.Log(0, "started queue with fixed allocation -", fmt.Sprintf("%d", size))
  42. EventQueue = goconcurrentqueue.NewFixedFIFO(size)
  43. } else {
  44. logger.Log(0, "started queue with dynamic allocation")
  45. EventQueue = goconcurrentqueue.NewFIFO()
  46. }
  47. initializeHandlers()
  48. }