BlockingQueue.hpp 2.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123
  1. /* This Source Code Form is subject to the terms of the Mozilla Public
  2. * License, v. 2.0. If a copy of the MPL was not distributed with this
  3. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  4. *
  5. * (c) ZeroTier, Inc.
  6. * https://www.zerotier.com/
  7. */
  8. #ifndef ZT_BLOCKINGQUEUE_HPP
  9. #define ZT_BLOCKINGQUEUE_HPP
  10. #include <atomic>
  11. #include <chrono>
  12. #include <condition_variable>
  13. #include <mutex>
  14. #include <queue>
  15. #include <vector>
  16. namespace ZeroTier {
  17. /**
  18. * Simple C++11 thread-safe queue
  19. *
  20. * Do not use in node/ since we have not gone C++11 there yet.
  21. */
  22. template <class T> class BlockingQueue {
  23. public:
  24. BlockingQueue(void) : r(true)
  25. {
  26. }
  27. inline void post(T t)
  28. {
  29. std::lock_guard<std::mutex> lock(m);
  30. q.push(t);
  31. c.notify_one();
  32. }
  33. inline void postLimit(T t, const unsigned long limit)
  34. {
  35. std::unique_lock<std::mutex> lock(m);
  36. for (;;) {
  37. if (q.size() < limit) {
  38. q.push(t);
  39. c.notify_one();
  40. break;
  41. }
  42. if (! r)
  43. break;
  44. gc.wait(lock);
  45. }
  46. }
  47. inline void stop(void)
  48. {
  49. std::lock_guard<std::mutex> lock(m);
  50. r = false;
  51. c.notify_all();
  52. gc.notify_all();
  53. }
  54. inline bool get(T& value)
  55. {
  56. std::unique_lock<std::mutex> lock(m);
  57. if (! r)
  58. return false;
  59. while (q.empty()) {
  60. c.wait(lock);
  61. if (! r) {
  62. gc.notify_all();
  63. return false;
  64. }
  65. }
  66. value = q.front();
  67. q.pop();
  68. gc.notify_all();
  69. return true;
  70. }
  71. inline std::vector<T> drain()
  72. {
  73. std::vector<T> v;
  74. while (! q.empty()) {
  75. v.push_back(q.front());
  76. q.pop();
  77. }
  78. return v;
  79. }
  80. enum TimedWaitResult { OK, TIMED_OUT, STOP };
  81. inline TimedWaitResult get(T& value, const unsigned long ms)
  82. {
  83. const std::chrono::milliseconds ms2 { ms };
  84. std::unique_lock<std::mutex> lock(m);
  85. if (! r)
  86. return STOP;
  87. while (q.empty()) {
  88. if (c.wait_for(lock, ms2) == std::cv_status::timeout)
  89. return ((r) ? TIMED_OUT : STOP);
  90. else if (! r)
  91. return STOP;
  92. }
  93. value = q.front();
  94. q.pop();
  95. return OK;
  96. }
  97. inline size_t size() const
  98. {
  99. return q.size();
  100. }
  101. private:
  102. std::queue<T> q;
  103. mutable std::mutex m;
  104. mutable std::condition_variable c, gc;
  105. std::atomic_bool r;
  106. };
  107. } // namespace ZeroTier
  108. #endif