queue.go 1.0 KB

1234567891011121314151617181920212223242526272829303132333435363738
  1. package queue
  2. import (
  3. "context"
  4. "fmt"
  5. "github.com/enriquebris/goconcurrentqueue"
  6. "github.com/gravitl/netmaker/logger"
  7. "github.com/gravitl/netmaker/models"
  8. )
  9. // EventQueue - responsible for queueing and handling events sent to the server
  10. var EventQueue goconcurrentqueue.Queue
  11. // StartQueue - starts the queue and listens for messages
  12. func StartQueue(ctx context.Context) {
  13. EventQueue = goconcurrentqueue.NewFIFO()
  14. go func(ctx context.Context) {
  15. logger.Log(2, "initialized queue service!")
  16. for {
  17. msg, err := EventQueue.DequeueOrWaitForNextElementContext(ctx)
  18. if err != nil { // handle dequeue error
  19. logger.Log(0, "error when dequeuing event -", err.Error())
  20. continue
  21. } else { // handle event
  22. event := msg.(models.Event)
  23. switch event.Topic {
  24. case "test":
  25. fmt.Printf("received test topic event %+v \n", event)
  26. default:
  27. fmt.Printf("topic unknown\n")
  28. }
  29. }
  30. logger.Log(0, fmt.Sprintf("queue stats: queued elements %d, openCapacity: %d \n", EventQueue.GetLen(), EventQueue.GetCap()))
  31. }
  32. }(ctx)
  33. }