queue.go 1.2 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849
  1. package queue
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/enriquebris/goconcurrentqueue"
  6. "github.com/gravitl/netmaker/logger"
  7. "github.com/gravitl/netmaker/models"
  8. "github.com/gravitl/netmaker/servercfg"
  9. )
  10. // EventQueue - responsible for queueing and handling events sent to the server
  11. var EventQueue goconcurrentqueue.Queue
  12. // StartQueue - starts the queue and listens for messages
  13. func StartQueue(ctx context.Context) {
  14. initQueue()
  15. go func(ctx context.Context) {
  16. logger.Log(2, "initialized queue service!")
  17. for {
  18. msg, err := EventQueue.DequeueOrWaitForNextElementContext(ctx)
  19. if err != nil { // handle dequeue error
  20. logger.Log(0, "error when dequeuing event -", err.Error())
  21. continue
  22. } else { // handle event
  23. event := msg.(models.Event)
  24. switch event.Topic {
  25. case "test":
  26. fmt.Printf("received test topic event %+v \n", event)
  27. default:
  28. fmt.Printf("topic unknown\n")
  29. }
  30. }
  31. logger.Log(0, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
  32. }
  33. }(ctx)
  34. }
  35. // == private ==
  36. func initQueue() {
  37. size := servercfg.GetQueueSize()
  38. if size > 0 {
  39. EventQueue = goconcurrentqueue.NewFixedFIFO(size)
  40. } else {
  41. EventQueue = goconcurrentqueue.NewFIFO()
  42. }
  43. }