BlockingQueue.hpp 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2025-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_BLOCKINGQUEUE_HPP
  14. #define ZT_BLOCKINGQUEUE_HPP
  15. #include <queue>
  16. #include <mutex>
  17. #include <condition_variable>
  18. #include <chrono>
  19. #include <atomic>
  20. #include <vector>
  21. namespace ZeroTier {
  22. /**
  23. * Simple C++11 thread-safe queue
  24. *
  25. * Do not use in node/ since we have not gone C++11 there yet.
  26. */
  27. template <class T>
  28. class BlockingQueue
  29. {
  30. public:
  31. BlockingQueue(void) : r(true) {}
  32. inline void post(T t)
  33. {
  34. std::lock_guard<std::mutex> lock(m);
  35. q.push(t);
  36. c.notify_one();
  37. }
  38. inline void postLimit(T t,const unsigned long limit)
  39. {
  40. std::unique_lock<std::mutex> lock(m);
  41. for(;;) {
  42. if (q.size() < limit) {
  43. q.push(t);
  44. c.notify_one();
  45. break;
  46. }
  47. if (!r)
  48. break;
  49. gc.wait(lock);
  50. }
  51. }
  52. inline void stop(void)
  53. {
  54. std::lock_guard<std::mutex> lock(m);
  55. r = false;
  56. c.notify_all();
  57. gc.notify_all();
  58. }
  59. inline bool get(T &value)
  60. {
  61. std::unique_lock<std::mutex> lock(m);
  62. if (!r)
  63. return false;
  64. while (q.empty()) {
  65. c.wait(lock);
  66. if (!r) {
  67. gc.notify_all();
  68. return false;
  69. }
  70. }
  71. value = q.front();
  72. q.pop();
  73. gc.notify_all();
  74. return true;
  75. }
  76. inline std::vector<T> drain()
  77. {
  78. std::vector<T> v;
  79. while (!q.empty()) {
  80. v.push_back(q.front());
  81. q.pop();
  82. }
  83. return v;
  84. }
  85. enum TimedWaitResult
  86. {
  87. OK,
  88. TIMED_OUT,
  89. STOP
  90. };
  91. inline TimedWaitResult get(T &value,const unsigned long ms)
  92. {
  93. const std::chrono::milliseconds ms2{ms};
  94. std::unique_lock<std::mutex> lock(m);
  95. if (!r)
  96. return STOP;
  97. while (q.empty()) {
  98. if (c.wait_for(lock,ms2) == std::cv_status::timeout)
  99. return ((r) ? TIMED_OUT : STOP);
  100. else if (!r)
  101. return STOP;
  102. }
  103. value = q.front();
  104. q.pop();
  105. return OK;
  106. }
  107. private:
  108. std::queue<T> q;
  109. mutable std::mutex m;
  110. mutable std::condition_variable c,gc;
  111. std::atomic_bool r;
  112. };
  113. } // namespace ZeroTier
  114. #endif