rwqueue.h 13 KB


  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef CDSLIB_CONTAINER_RWQUEUE_H
  6. #define CDSLIB_CONTAINER_RWQUEUE_H
  7. #include <cds/sync/spinlock.h>
  8. #include <cds/opt/options.h>
  9. #include <cds/details/allocator.h>
  10. #include <mutex> // unique_lock
  11. #include <memory>
  12. namespace cds { namespace container {
  13. /// RWQueue related definitions
  14. /** @ingroup cds_nonintrusive_helper
  15. */
  16. namespace rwqueue {
  17. /// RWQueue default type traits
  18. struct traits
  19. {
  20. /// Lock policy
  21. typedef cds::sync::spin lock_type;
  22. /// Node allocator
  23. typedef CDS_DEFAULT_ALLOCATOR allocator;
  24. /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
  25. typedef cds::atomicity::empty_item_counter item_counter;
  26. /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
  27. enum { padding = opt::cache_line_padding };
  28. };
  29. /// Metafunction converting option list to \p rwqueue::traits
  30. /**
  31. Supported \p Options are:
  32. - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
  33. - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
  34. - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
  35. To enable item counting use \p cds::atomicity::item_counter.
  36. - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
  37. Example: declare mutex-based \p %RWQueue with item counting
  38. \code
  39. typedef cds::container::RWQueue< Foo,
  40. typename cds::container::rwqueue::make_traits<
  41. cds::opt::item_counter< cds::atomicity::item_counter >,
  42. cds::opt::lock_type< std::mutex >
  43. >::type
  44. > myQueue;
  45. \endcode
  46. */
  47. template <typename... Options>
  48. struct make_traits {
  49. # ifdef CDS_DOXYGEN_INVOKED
  50. typedef implementation_defined type; ///< Metafunction result
  51. # else
  52. typedef typename cds::opt::make_options<
  53. typename cds::opt::find_type_traits< traits, Options... >::type
  54. , Options...
  55. >::type type;
  56. # endif
  57. };
  58. } // namespace rwqueue
  59. /// Michael & Scott blocking queue with fine-grained synchronization schema
  60. /** @ingroup cds_nonintrusive_queue
  61. The queue has two different locks: one for reading and one for writing.
  62. Therefore, one writer and one reader can simultaneously access to the queue.
  63. The queue does not require any garbage collector.
  64. <b>Source</b>
  65. - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
  66. and blocking concurrent queue algorithms"
  67. <b>Template arguments</b>
  68. - \p T - value type to be stored in the queue
  69. - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
  70. metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
  71. \code
  72. struct myTraits: public cds::container::rwqueue::traits {
  73. typedef cds::atomicity::item_counter item_counter;
  74. };
  75. typedef cds::container::RWQueue< Foo, myTraits > myQueue;
  76. // Equivalent make_traits example:
  77. typedef cds::container::RWQueue< Foo,
  78. typename cds::container::rwqueue::make_traits<
  79. cds::opt::item_counter< cds::atomicity::item_counter >
  80. >::type
  81. > myQueue;
  82. \endcode
  83. */
  84. template <typename T, typename Traits = rwqueue::traits >
  85. class RWQueue
  86. {
  87. public:
  88. /// Rebind template arguments
  89. template <typename T2, typename Traits2>
  90. struct rebind {
  91. typedef RWQueue< T2, Traits2 > other ; ///< Rebinding result
  92. };
  93. public:
  94. typedef T value_type; ///< Type of value to be stored in the queue
  95. typedef Traits traits; ///< Queue traits
  96. typedef typename traits::lock_type lock_type; ///< Locking primitive
  97. typedef typename traits::item_counter item_counter; ///< Item counting policy used
  98. protected:
  99. //@cond
  100. /// Node type
  101. struct node_type
  102. {
  103. atomics::atomic< node_type *> m_pNext; ///< Pointer to the next node in the queue
  104. value_type m_value; ///< Value stored in the node
  105. node_type( value_type const& v )
  106. : m_pNext( nullptr )
  107. , m_value(v)
  108. {}
  109. node_type()
  110. : m_pNext( nullptr )
  111. {}
  112. template <typename... Args>
  113. node_type( Args&&... args )
  114. : m_pNext( nullptr )
  115. , m_value( std::forward<Args>(args)...)
  116. {}
  117. };
  118. //@endcond
  119. public:
  120. /// Allocator type used for allocate/deallocate the queue nodes
  121. typedef typename std::allocator_traits<
  122. typename traits::allocator
  123. >::template rebind_alloc<node_type> allocator_type;
  124. protected:
  125. //@cond
  126. typedef std::unique_lock<lock_type> scoped_lock;
  127. typedef cds::details::Allocator< node_type, allocator_type > node_allocator;
  128. struct head_type {
  129. mutable lock_type lock;
  130. node_type * ptr;
  131. };
  132. head_type m_Head;
  133. typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
  134. head_type m_Tail;
  135. item_counter m_ItemCounter;
  136. //@endcond
  137. protected:
  138. //@cond
  139. static node_type * alloc_node()
  140. {
  141. return node_allocator().New();
  142. }
  143. static node_type * alloc_node( T const& data )
  144. {
  145. return node_allocator().New( data );
  146. }
  147. template <typename... Args>
  148. static node_type * alloc_node_move( Args&&... args )
  149. {
  150. return node_allocator().MoveNew( std::forward<Args>( args )... );
  151. }
  152. static void free_node( node_type * pNode )
  153. {
  154. node_allocator().Delete( pNode );
  155. }
  156. bool enqueue_node( node_type * p )
  157. {
  158. assert( p != nullptr );
  159. {
  160. scoped_lock lock( m_Tail.lock );
  161. m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
  162. m_Tail.ptr = p;
  163. }
  164. ++m_ItemCounter;
  165. return true;
  166. }
  167. struct node_disposer {
  168. void operator()( node_type * pNode )
  169. {
  170. free_node( pNode );
  171. }
  172. };
  173. typedef std::unique_ptr< node_type, node_disposer > scoped_node_ptr;
  174. //@endcond
  175. public:
  176. /// Makes empty queue
  177. RWQueue()
  178. {
  179. node_type * pNode = alloc_node();
  180. m_Head.ptr =
  181. m_Tail.ptr = pNode;
  182. }
  183. /// Destructor clears queue
  184. ~RWQueue()
  185. {
  186. clear();
  187. assert( m_Head.ptr == m_Tail.ptr );
  188. free_node( m_Head.ptr );
  189. }
  190. /// Enqueues \p data. Always return \a true
  191. bool enqueue( value_type const& data )
  192. {
  193. scoped_node_ptr p( alloc_node( data ));
  194. if ( enqueue_node( p.get())) {
  195. p.release();
  196. return true;
  197. }
  198. return false;
  199. }
  200. /// Enqueues \p data, move semantics
  201. bool enqueue( value_type&& data )
  202. {
  203. scoped_node_ptr p( alloc_node_move( std::move( data )));
  204. if ( enqueue_node( p.get())) {
  205. p.release();
  206. return true;
  207. }
  208. return false;
  209. }
  210. /// Enqueues \p data to the queue using a functor
  211. /**
  212. \p Func is a functor called to create node.
  213. The functor \p f takes one argument - a reference to a new node of type \ref value_type :
  214. \code
  215. cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
  216. Bar bar;
  217. myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
  218. \endcode
  219. */
  220. template <typename Func>
  221. bool enqueue_with( Func f )
  222. {
  223. scoped_node_ptr p( alloc_node());
  224. f( p->m_value );
  225. if ( enqueue_node( p.get())) {
  226. p.release();
  227. return true;
  228. }
  229. return false;
  230. }
  231. /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
  232. template <typename... Args>
  233. bool emplace( Args&&... args )
  234. {
  235. scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
  236. if ( enqueue_node( p.get())) {
  237. p.release();
  238. return true;
  239. }
  240. return false;
  241. }
  242. /// Synonym for \p enqueue( value_type const& ) function
  243. bool push( value_type const& val )
  244. {
  245. return enqueue( val );
  246. }
  247. /// Synonym for \p enqueue( value_type&& ) function
  248. bool push( value_type&& val )
  249. {
  250. return enqueue( std::move( val ));
  251. }
  252. /// Synonym for \p enqueue_with() function
  253. template <typename Func>
  254. bool push_with( Func f )
  255. {
  256. return enqueue_with( f );
  257. }
  258. /// Dequeues a value to \p dest.
  259. /**
  260. If queue is empty returns \a false, \p dest can be corrupted.
  261. If queue is not empty returns \a true, \p dest contains the value dequeued
  262. */
  263. bool dequeue( value_type& dest )
  264. {
  265. return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); });
  266. }
  267. /// Dequeues a value using a functor
  268. /**
  269. \p Func is a functor called to copy dequeued value.
  270. The functor takes one argument - a reference to removed node:
  271. \code
  272. cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
  273. Bar bar;
  274. myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
  275. \endcode
  276. The functor is called only if the queue is not empty.
  277. */
  278. template <typename Func>
  279. bool dequeue_with( Func f )
  280. {
  281. node_type * pNode;
  282. {
  283. scoped_lock lock( m_Head.lock );
  284. pNode = m_Head.ptr;
  285. node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
  286. if ( pNewHead == nullptr )
  287. return false;
  288. f( pNewHead->m_value );
  289. m_Head.ptr = pNewHead;
  290. } // unlock here
  291. --m_ItemCounter;
  292. free_node( pNode );
  293. return true;
  294. }
  295. /// Synonym for \p dequeue() function
  296. bool pop( value_type& dest )
  297. {
  298. return dequeue( dest );
  299. }
  300. /// Synonym for \p dequeue_with() function
  301. template <typename Func>
  302. bool pop_with( Func f )
  303. {
  304. return dequeue_with( f );
  305. }
  306. /// Checks if queue is empty
  307. bool empty() const
  308. {
  309. scoped_lock lock( m_Head.lock );
  310. return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
  311. }
  312. /// Clears queue
  313. void clear()
  314. {
  315. scoped_lock lockR( m_Head.lock );
  316. scoped_lock lockW( m_Tail.lock );
  317. while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
  318. node_type * pHead = m_Head.ptr;
  319. m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
  320. free_node( pHead );
  321. }
  322. m_ItemCounter.reset();
  323. }
  324. /// Returns queue's item count
  325. /**
  326. The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
  327. this function always returns 0.
  328. @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
  329. is empty. To check queue emptyness use \p empty() method.
  330. */
  331. size_t size() const
  332. {
  333. return m_ItemCounter.value();
  334. }
  335. //@cond
  336. /// The class has no internal statistics. For test consistency only
  337. std::nullptr_t statistics() const
  338. {
  339. return nullptr;
  340. }
  341. //@endcond
  342. };
  343. }} // namespace cds::container
  344. #endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H