spscqueue.h 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. /*
  2. * Copyright 2010-2013 Branimir Karadzic. All rights reserved.
  3. * License: http://www.opensource.org/licenses/BSD-2-Clause
  4. */
  5. #ifndef __BX_SPSCQUEUE_H__
  6. #define __BX_SPSCQUEUE_H__
  7. #include "bx.h"
  8. #include "cpu.h"
  9. #include "mutex.h"
  10. #include "uint32_t.h"
  11. #include <list>
  12. namespace bx
  13. {
  14. // http://drdobbs.com/article/print?articleId=210604448&siteSectionName=
  15. template <typename Ty>
  16. class SpScUnboundedQueueLf
  17. {
  18. BX_CLASS(SpScUnboundedQueueLf
  19. , NO_COPY
  20. , NO_ASSIGNMENT
  21. );
  22. public:
  23. SpScUnboundedQueueLf()
  24. : m_first(new Node(NULL) )
  25. , m_divider(m_first)
  26. , m_last(m_first)
  27. {
  28. }
  29. ~SpScUnboundedQueueLf()
  30. {
  31. while (NULL != m_first)
  32. {
  33. Node* node = m_first;
  34. m_first = node->m_next;
  35. delete node;
  36. }
  37. }
  38. void push(Ty* _ptr) // producer only
  39. {
  40. m_last->m_next = new Node( (void*)_ptr);
  41. atomicExchangePtr( (void**)&m_last, m_last->m_next);
  42. while (m_first != m_divider)
  43. {
  44. Node* node = m_first;
  45. m_first = m_first->m_next;
  46. delete node;
  47. }
  48. }
  49. Ty* peek() // consumer only
  50. {
  51. if (m_divider != m_last)
  52. {
  53. Ty* ptr = (Ty*)m_divider->m_next->m_ptr;
  54. return ptr;
  55. }
  56. return NULL;
  57. }
  58. Ty* pop() // consumer only
  59. {
  60. if (m_divider != m_last)
  61. {
  62. Ty* ptr = (Ty*)m_divider->m_next->m_ptr;
  63. atomicExchangePtr( (void**)&m_divider, m_divider->m_next);
  64. return ptr;
  65. }
  66. return NULL;
  67. }
  68. private:
  69. struct Node
  70. {
  71. Node(void* _ptr)
  72. : m_ptr(_ptr)
  73. , m_next(NULL)
  74. {
  75. }
  76. void* m_ptr;
  77. Node* m_next;
  78. };
  79. Node* m_first;
  80. Node* m_divider;
  81. Node* m_last;
  82. };
  83. template<typename Ty>
  84. class SpScUnboundedQueueMutex
  85. {
  86. BX_CLASS(SpScUnboundedQueueMutex
  87. , NO_COPY
  88. , NO_ASSIGNMENT
  89. );
  90. public:
  91. SpScUnboundedQueueMutex()
  92. {
  93. }
  94. ~SpScUnboundedQueueMutex()
  95. {
  96. BX_CHECK(m_queue.empty(), "Queue is not empty!");
  97. }
  98. void push(Ty* _item)
  99. {
  100. bx::LwMutexScope lock(m_mutex);
  101. m_queue.push_back(_item);
  102. }
  103. Ty* peek()
  104. {
  105. bx::LwMutexScope lock(m_mutex);
  106. if (!m_queue.empty() )
  107. {
  108. return m_queue.front();
  109. }
  110. return NULL;
  111. }
  112. Ty* pop()
  113. {
  114. bx::LwMutexScope lock(m_mutex);
  115. if (!m_queue.empty() )
  116. {
  117. Ty* item = m_queue.front();
  118. m_queue.pop_front();
  119. return item;
  120. }
  121. return NULL;
  122. }
  123. private:
  124. bx::LwMutex m_mutex;
  125. std::list<Ty*> m_queue;
  126. };
  127. #if BX_CONFIG_SPSCQUEUE_USE_MUTEX
  128. # define SpScUnboundedQueue SpScUnboundedQueueMutex
  129. #else
  130. # define SpScUnboundedQueue SpScUnboundedQueueLf
  131. #endif // BX_CONFIG_SPSCQUEUE_USE_MUTEX
  132. template <typename Ty>
  133. class SpScBlockingUnboundedQueue
  134. {
  135. BX_CLASS(SpScBlockingUnboundedQueue
  136. , NO_COPY
  137. , NO_ASSIGNMENT
  138. );
  139. public:
  140. SpScBlockingUnboundedQueue()
  141. {
  142. }
  143. ~SpScBlockingUnboundedQueue()
  144. {
  145. }
  146. void push(Ty* _ptr) // producer only
  147. {
  148. m_queue.push( (void*)_ptr);
  149. m_count.post();
  150. }
  151. Ty* peek() // consumer only
  152. {
  153. return (Ty*)m_queue.peek();
  154. }
  155. Ty* pop(int32_t _msecs = -1) // consumer only
  156. {
  157. if (m_count.wait(_msecs) )
  158. {
  159. return (Ty*)m_queue.pop();
  160. }
  161. return NULL;
  162. }
  163. private:
  164. Semaphore m_count;
  165. SpScUnboundedQueue<void> m_queue;
  166. };
  167. } // namespace bx
  168. #endif // __BX_RINGBUFFER_H__