BlockingQueue.hpp 2.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116
  1. /*
  2. * Copyright (c)2019 ZeroTier, Inc.
  3. *
  4. * Use of this software is governed by the Business Source License included
  5. * in the LICENSE.TXT file in the project's root directory.
  6. *
  7. * Change Date: 2023-01-01
  8. *
  9. * On the date above, in accordance with the Business Source License, use
  10. * of this software will be governed by version 2.0 of the Apache License.
  11. */
  12. /****/
  13. #ifndef ZT_BLOCKINGQUEUE_HPP
  14. #define ZT_BLOCKINGQUEUE_HPP
  15. #include <queue>
  16. #include <mutex>
  17. #include <condition_variable>
  18. #include <chrono>
  19. #include "Thread.hpp"
  20. namespace ZeroTier {
  21. /**
  22. * Simple C++11 thread-safe queue
  23. *
  24. * Do not use in node/ since we have not gone C++11 there yet.
  25. */
  26. template <class T>
  27. class BlockingQueue
  28. {
  29. public:
  30. BlockingQueue(void) : r(true) {}
  31. inline void post(T t)
  32. {
  33. std::lock_guard<std::mutex> lock(m);
  34. q.push(t);
  35. c.notify_one();
  36. }
  37. inline void postLimit(T t,const unsigned long limit)
  38. {
  39. std::unique_lock<std::mutex> lock(m);
  40. for(;;) {
  41. if (q.size() < limit) {
  42. q.push(t);
  43. c.notify_one();
  44. break;
  45. }
  46. if (!r)
  47. break;
  48. gc.wait(lock);
  49. }
  50. }
  51. inline void stop(void)
  52. {
  53. std::lock_guard<std::mutex> lock(m);
  54. r = false;
  55. c.notify_all();
  56. gc.notify_all();
  57. }
  58. inline bool get(T &value)
  59. {
  60. std::unique_lock<std::mutex> lock(m);
  61. if (!r) return false;
  62. while (q.empty()) {
  63. c.wait(lock);
  64. if (!r) {
  65. gc.notify_all();
  66. return false;
  67. }
  68. }
  69. value = q.front();
  70. q.pop();
  71. gc.notify_all();
  72. return true;
  73. }
  74. enum TimedWaitResult
  75. {
  76. OK,
  77. TIMED_OUT,
  78. STOP
  79. };
  80. inline TimedWaitResult get(T &value,const unsigned long ms)
  81. {
  82. const std::chrono::milliseconds ms2{ms};
  83. std::unique_lock<std::mutex> lock(m);
  84. if (!r) return STOP;
  85. while (q.empty()) {
  86. if (c.wait_for(lock,ms2) == std::cv_status::timeout)
  87. return ((r) ? TIMED_OUT : STOP);
  88. else if (!r)
  89. return STOP;
  90. }
  91. value = q.front();
  92. q.pop();
  93. return OK;
  94. }
  95. private:
  96. volatile bool r;
  97. std::queue<T> q;
  98. mutable std::mutex m;
  99. mutable std::condition_variable c,gc;
  100. };
  101. } // namespace ZeroTier
  102. #endif