WaitFreeQueue.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340
  1. /* Copyright The kNet Project.
  2. Licensed under the Apache License, Version 2.0 (the "License");
  3. you may not use this file except in compliance with the License.
  4. You may obtain a copy of the License at
  5. http://www.apache.org/licenses/LICENSE-2.0
  6. Unless required by applicable law or agreed to in writing, software
  7. distributed under the License is distributed on an "AS IS" BASIS,
  8. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  9. See the License for the specific language governing permissions and
  10. limitations under the License. */
  11. #pragma once
  12. /** @file WaitFreeQueue.h
  13. @brief The WaitFreeQueue<T> template class. */
  14. #include <stddef.h>
  15. #include "Alignment.h"
  16. namespace kNet
  17. {
  18. /// A wait-free queue for communication unidirectionally between two threads.
  19. /** This data structure is useful for simple and efficient lockless data/message passing between two-threads.
  20. It is implemented using a lockless circular ring buffer, and has the following properties:
  21. - At most one thread can act as a publisher/producer/writer and call Insert() to add new items to the queue.
  22. - At most one thread can consume/read the items from the queue by calling Front() and PopFront().
  23. - Does not use locks or spin-waits, and is hence wait-free.
  24. - Does not perform any memory allocation after initialization.
  25. - Only POD types are supported. If you need non-POD objects, store pointers to objects instead.
  26. - The queue has a fixed upper size that must be a power-of-2 and must be speficied in the constructor. */
  27. template<typename T>
  28. class WaitFreeQueue
  29. {
  30. public:
  31. /// @param maxElements A power-of-2 number, > 2, that specifies the size of the ring buffer to construct. The number of elements the queue can store is maxElements-1.
  32. explicit WaitFreeQueue(size_t maxElements)
  33. :head(0), tail(0)
  34. {
  35. assert(IS_POW2(maxElements)); // The caller really needs to round to correct pow2,
  36. assert(maxElements > 2);
  37. maxElements = (size_t)RoundUpToNextPow2((u32)maxElements); // but avoid any silliness in release anyways.
  38. data = new T[maxElements];
  39. maxElementsMask = (unsigned long)maxElements - 1;
  40. }
  41. /// Warning: This is not thread-safe.
  42. WaitFreeQueue(const WaitFreeQueue &rhs)
  43. :maxElementsMask(rhs.maxElementsMask), head(rhs.head), tail(rhs.tail)
  44. {
  45. size_t maxElements = rhs.maxElementsMask+1;
  46. data = new T[maxElements];
  47. for(size_t i = 0; i < maxElements; ++i)
  48. data[i] = rhs.data[i];
  49. }
  50. /// Warning: This is not thread-safe.
  51. WaitFreeQueue &operator =(const WaitFreeQueue &rhs)
  52. {
  53. if (this == &rhs)
  54. return *this;
  55. head = rhs.head;
  56. tail = rhs.tail;
  57. size_t maxElements = rhs.maxElementsMask+1;
  58. maxElementsMask = rhs.maxElementsMask;
  59. delete[] data;
  60. data = new T[maxElements];
  61. for(size_t i = 0; i < maxElements; ++i)
  62. data[i] = rhs.data[i];
  63. return *this;
  64. }
  65. ~WaitFreeQueue()
  66. {
  67. delete[] data;
  68. }
  69. /// Returns the total capacity of the queue, i.e. the total maximum number of items that can be contained in it.
  70. /// Thread-safe.
  71. int Capacity() const { return maxElementsMask; }
  72. /// Returns the number of items that can still be inserted in the queue, i.e. the total space left.
  73. /// Thread-safe.
  74. int CapacityLeft() const { return Capacity() - Size(); }
  75. /// Starts a new item insert operation.
  76. /// This function returns a pointer to the next element to be added to this queue.
  77. /// You can fill there the next value to add, and call FinishInsert() when done. The consumer
  78. /// will not see the object in the queue before FinishInsert() is called.
  79. /// Use Begin/FinishInsert when the type T has a heavy copy-ctor to avoid generating a temporary.
  80. /// \note Do not call BeginInsert() several times before calling FinishInsert() in between. You can
  81. /// have only one outstanding BeginInsert() call active at a time (multiple consecutive BeginInsert()
  82. /// calls will keep returning the same pointer until FinishInsert() is called)
  83. /// @return A pointer where to fill the next value to produce to the queue, or 0 if the queue is full
  84. /// and no value can be added. For each returned pointer, call FinishInsert after filling the value
  85. /// to commit it to the queue. If 0 is returned, FinishInsert does not need to be called.
  86. /// This function can be called only by a single producer thread.
  87. T *BeginInsert()
  88. {
  89. unsigned long tail_ = tail;
  90. unsigned long nextTail = (tail_ + 1) & maxElementsMask;
  91. if (nextTail == head)
  92. return 0;
  93. return &data[tail];
  94. }
  95. /// Commits to the end of the queue the item filled with a previous call to BeginInsert.
  96. /// This function can be called only by a single producer thread.
  97. void FinishInsert()
  98. {
  99. unsigned long tail_ = tail;
  100. unsigned long nextTail = (tail_ + 1) & maxElementsMask;
  101. assert(nextTail != head && "Error: Calling FinishInsert after BeginInsert failed, or was not even called!");
  102. tail = nextTail;
  103. }
  104. /// Inserts the new value into the queue. Can be called only by a single producer thread.
  105. bool Insert(const T &value)
  106. {
  107. // Inserts are made to the 'tail' of the queue, incrementing the tail index.
  108. unsigned long tail_ = tail;
  109. unsigned long nextTail = (tail_ + 1) & maxElementsMask;
  110. if (nextTail == head)
  111. return false;
  112. data[tail_] = value;
  113. tail = nextTail;
  114. return true;
  115. }
  116. /// Inserts the new value into the queue. If there is not enough free space in the queue, the capacity
  117. /// of the queue is doubled.
  118. /// \note This function is not thread-safe. Do not call this if you cannot guarantee that the other
  119. /// thread will not be accessing the queue at the same time.
  120. void InsertWithResize(const T &value)
  121. {
  122. bool success = Insert(value);
  123. if (!success)
  124. {
  125. DoubleCapacity();
  126. #ifdef _DEBUG
  127. success =
  128. #endif
  129. Insert(value);
  130. }
  131. #ifdef _DEBUG
  132. assert(success);
  133. #endif
  134. }
  135. /// Re-allocates the queue to the new maximum size. All old elements are copied over.
  136. /// \note This function is not thread-safe. Do not call this if you cannot guarantee that the other
  137. /// thread will not be accessing the queue at the same time.
  138. void Resize(size_t newSize)
  139. {
  140. assert(IS_POW2(newSize)); // The caller really needs to round to correct pow2,
  141. newSize = (size_t)RoundUpToNextPow2((u32)newSize); // but avoid any silliness in release anyways.
  142. T *newData = new T[newSize];
  143. unsigned long newTail = 0;
  144. for(int i = 0; i < Size(); ++i)
  145. newData[newTail++] = *ItemAt(i);
  146. delete[] data;
  147. data = newData;
  148. head = 0;
  149. tail = newTail;
  150. maxElementsMask = (unsigned long)newSize - 1;
  151. }
  152. /// Resizes this queue to hold twice the amount of maximum elements.
  153. /// \note This function is not thread-safe. Do not call this if you cannot guarantee that the other
  154. /// thread will not be accessing the queue at the same time.
  155. void DoubleCapacity() { Resize(2*(maxElementsMask+1)); }
  156. /// Returns a pointer to the first item in the queue (the item that is coming off next, i.e. the one that has
  157. /// been in the queue the longest). Can be called only from a single consumer thread.
  158. /// This function can safely be called even if the queue is empty, in which case 0 is returned.
  159. T *Front()
  160. {
  161. if (head == tail)
  162. return 0;
  163. return &data[head];
  164. }
  165. /// Returns a pointer to the first item in the queue (the item that is coming off next, i.e. the one that has
  166. /// been in the queue the longest). Can be called only from a single consumer thread.
  167. /// This function can safely be called even if the queue is empty, in which case 0 is returned.
  168. const T *Front() const
  169. {
  170. if (head == tail)
  171. return 0;
  172. return &data[head];
  173. }
  174. /// Returns a copy of the first item in the queue and pops that item off the queue. Can be called only from a single consumer thread.
  175. /// Requires that there exists an element in the queue.
  176. T TakeFront()
  177. {
  178. assert(Front());
  179. T frontVal = *Front();
  180. PopFront();
  181. return frontVal;
  182. }
  183. /// Returns a pointer to the last item (the item that was just added) in the queue.
  184. /// Can be called only from a single consumer thread.
  185. T *Back()
  186. {
  187. if (head == tail)
  188. return 0;
  189. return &data[(tail + maxElementsMask) & maxElementsMask];
  190. }
  191. /// Returns a pointer to the last item (the item that was just added) in the queue.
  192. /// Can be called only from a single consumer thread.
  193. const T *Back() const
  194. {
  195. if (head == tail)
  196. return 0;
  197. return &data[(tail + maxElementsMask) & maxElementsMask];
  198. }
  199. /// Returns a pointer to the item at the given index. ItemAt(0) will return the first item (the front item)
  200. /// in the queue. Can be called only from a single consumer thread.
  201. /// Never returns a null pointer.
  202. T *ItemAt(int index)
  203. {
  204. assert(index >= 0 && index < (int)Size());
  205. return &data[(head + index) & maxElementsMask];
  206. }
  207. /// Returns a pointer to the item at the given index. Can be called only from a single consumer thread.
  208. /// Never returns a null pointer.
  209. const T *ItemAt(int index) const
  210. {
  211. assert(index >= 0 && index < (int)Size());
  212. return &data[(head + index) & maxElementsMask];
  213. }
  214. /// Returns true if the given element exists in the queue. Can be called only from a single consumer thread.
  215. bool Contains(const T &item) const
  216. {
  217. for(int i = 0; i < (int)Size(); ++i)
  218. if (*ItemAt(i) == item)
  219. return true;
  220. return false;
  221. }
  222. /// Removes the element at the given index.
  223. ///\note Not thread-safe.
  224. void EraseItemAt(int index)
  225. {
  226. if (index <= Size()>>1)
  227. EraseItemAtMoveFront(index);
  228. else
  229. EraseItemAtMoveBack(index);
  230. }
  231. /// Removes all elements in the queue. Does not call dtors for removed objects, as this queue is only for POD types.
  232. /// Can be called only from a single consumer thread.
  233. void Clear()
  234. {
  235. tail = head;
  236. }
  237. /// Returns the number of elements currently filled in the queue. Can be called from either the consumer or producer thread.
  238. int Size() const
  239. {
  240. unsigned long head_ = head;
  241. unsigned long tail_ = tail;
  242. if (tail_ >= head_)
  243. return tail_ - head_;
  244. else
  245. return maxElementsMask + 1 - (head_ - tail_);
  246. }
  247. /// Removes the first item in the queue. Can be called only from a single consumer thread.
  248. void PopFront()
  249. {
  250. assert(head != tail);
  251. if (head == tail)
  252. return;
  253. size_t head_ = (head + 1) & maxElementsMask;
  254. head = (unsigned long)head_;
  255. }
  256. private:
  257. T *data;
  258. /// Stores the AND mask (2^Size-1) used to perform the modulo check.
  259. unsigned long maxElementsMask;
  260. /// Stores the index of the first element in the queue. The next item to come off the queue is at this position,
  261. /// unless head==tail, and the queue is empty. \todo Convert to C++0x atomic<unsigned long> head;
  262. volatile unsigned long head;
  263. /// Stores the index of one past the last element in the queue. \todo Convert to C++0x atomic<unsigned long> head;
  264. volatile unsigned long tail;
  265. /// Removes the element at the given index, but instead of filling the contiguous gap that forms by moving elements to the
  266. /// right, this function will instead move items at the front of the queue.
  267. ///\note Not thread-safe.
  268. void EraseItemAtMoveFront(int index)
  269. {
  270. assert(Size() > 0);
  271. int numItemsToMove = index;
  272. for(int i = 0; i < numItemsToMove; ++i)
  273. data[(head+index + maxElementsMask+1 -i)&maxElementsMask] = data[(head+index + maxElementsMask+1 -i-1) &maxElementsMask];
  274. head = (head+1) & maxElementsMask;
  275. }
  276. /// Removes the element at the given index, and fills the contiguous gap that forms by shuffling each item after index one space down.
  277. ///\note Not thread-safe.
  278. void EraseItemAtMoveBack(int index)
  279. {
  280. assert(Size() > 0);
  281. int numItemsToMove = Size()-1-index;
  282. for(int i = 0; i < numItemsToMove; ++i)
  283. data[(head+index+i)&maxElementsMask] = data[(head+index+i+1)&maxElementsMask];
  284. tail = (tail + maxElementsMask+1 - 1) & maxElementsMask;
  285. }
  286. };
  287. /// Checks that the specified conditions for the container apply.
  288. /// Warning: This is a non-threadsafe check for the container, only to be used for debugging.
  289. /// Warning #2: This function is very slow, as it performs a N^2 search through the container.
  290. template<typename T>
  291. bool ContainerUniqueAndNoNullElements(const WaitFreeQueue<T> &cont)
  292. {
  293. for(size_t i = 0; i < cont.Size(); ++i)
  294. for(size_t j = i+1; j < cont.Size(); ++j)
  295. if (*cont.ItemAt(i) == *cont.ItemAt(j) || *cont.ItemAt(i) == 0)
  296. return false;
  297. return true;
  298. }
  299. } // ~kNet