WorkQueue.h 4.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181
  1. /*
  2. * Copyright (c) 2016-present, Facebook, Inc.
  3. * All rights reserved.
  4. *
  5. * This source code is licensed under both the BSD-style license (found in the
  6. * LICENSE file in the root directory of this source tree) and the GPLv2 (found
  7. * in the COPYING file in the root directory of this source tree).
  8. */
  9. #pragma once
  10. #include "utils/Buffer.h"
  11. #include <atomic>
  12. #include <cassert>
  13. #include <cstddef>
  14. #include <condition_variable>
  15. #include <cstddef>
  16. #include <functional>
  17. #include <mutex>
  18. #include <queue>
  19. namespace pzstd {
  20. /// Unbounded thread-safe work queue.
  21. template <typename T>
  22. class WorkQueue {
  23. // Protects all member variable access
  24. std::mutex mutex_;
  25. std::condition_variable readerCv_;
  26. std::condition_variable writerCv_;
  27. std::condition_variable finishCv_;
  28. std::queue<T> queue_;
  29. bool done_;
  30. std::size_t maxSize_;
  31. // Must have lock to call this function
  32. bool full() const {
  33. if (maxSize_ == 0) {
  34. return false;
  35. }
  36. return queue_.size() >= maxSize_;
  37. }
  38. public:
  39. /**
  40. * Constructs an empty work queue with an optional max size.
  41. * If `maxSize == 0` the queue size is unbounded.
  42. *
  43. * @param maxSize The maximum allowed size of the work queue.
  44. */
  45. WorkQueue(std::size_t maxSize = 0) : done_(false), maxSize_(maxSize) {}
  46. /**
  47. * Push an item onto the work queue. Notify a single thread that work is
  48. * available. If `finish()` has been called, do nothing and return false.
  49. * If `push()` returns false, then `item` has not been moved from.
  50. *
  51. * @param item Item to push onto the queue.
  52. * @returns True upon success, false if `finish()` has been called. An
  53. * item was pushed iff `push()` returns true.
  54. */
  55. bool push(T&& item) {
  56. {
  57. std::unique_lock<std::mutex> lock(mutex_);
  58. while (full() && !done_) {
  59. writerCv_.wait(lock);
  60. }
  61. if (done_) {
  62. return false;
  63. }
  64. queue_.push(std::move(item));
  65. }
  66. readerCv_.notify_one();
  67. return true;
  68. }
  69. /**
  70. * Attempts to pop an item off the work queue. It will block until data is
  71. * available or `finish()` has been called.
  72. *
  73. * @param[out] item If `pop` returns `true`, it contains the popped item.
  74. * If `pop` returns `false`, it is unmodified.
  75. * @returns True upon success. False if the queue is empty and
  76. * `finish()` has been called.
  77. */
  78. bool pop(T& item) {
  79. {
  80. std::unique_lock<std::mutex> lock(mutex_);
  81. while (queue_.empty() && !done_) {
  82. readerCv_.wait(lock);
  83. }
  84. if (queue_.empty()) {
  85. assert(done_);
  86. return false;
  87. }
  88. item = std::move(queue_.front());
  89. queue_.pop();
  90. }
  91. writerCv_.notify_one();
  92. return true;
  93. }
  94. /**
  95. * Sets the maximum queue size. If `maxSize == 0` then it is unbounded.
  96. *
  97. * @param maxSize The new maximum queue size.
  98. */
  99. void setMaxSize(std::size_t maxSize) {
  100. {
  101. std::lock_guard<std::mutex> lock(mutex_);
  102. maxSize_ = maxSize;
  103. }
  104. writerCv_.notify_all();
  105. }
  106. /**
  107. * Promise that `push()` won't be called again, so once the queue is empty
  108. * there will never any more work.
  109. */
  110. void finish() {
  111. {
  112. std::lock_guard<std::mutex> lock(mutex_);
  113. assert(!done_);
  114. done_ = true;
  115. }
  116. readerCv_.notify_all();
  117. writerCv_.notify_all();
  118. finishCv_.notify_all();
  119. }
  120. /// Blocks until `finish()` has been called (but the queue may not be empty).
  121. void waitUntilFinished() {
  122. std::unique_lock<std::mutex> lock(mutex_);
  123. while (!done_) {
  124. finishCv_.wait(lock);
  125. }
  126. }
  127. };
  128. /// Work queue for `Buffer`s that knows the total number of bytes in the queue.
  129. class BufferWorkQueue {
  130. WorkQueue<Buffer> queue_;
  131. std::atomic<std::size_t> size_;
  132. public:
  133. BufferWorkQueue(std::size_t maxSize = 0) : queue_(maxSize), size_(0) {}
  134. void push(Buffer buffer) {
  135. size_.fetch_add(buffer.size());
  136. queue_.push(std::move(buffer));
  137. }
  138. bool pop(Buffer& buffer) {
  139. bool result = queue_.pop(buffer);
  140. if (result) {
  141. size_.fetch_sub(buffer.size());
  142. }
  143. return result;
  144. }
  145. void setMaxSize(std::size_t maxSize) {
  146. queue_.setMaxSize(maxSize);
  147. }
  148. void finish() {
  149. queue_.finish();
  150. }
  151. /**
  152. * Blocks until `finish()` has been called.
  153. *
  154. * @returns The total number of bytes of all the `Buffer`s currently in the
  155. * queue.
  156. */
  157. std::size_t size() {
  158. queue_.waitUntilFinished();
  159. return size_.load();
  160. }
  161. };
  162. }