fcqueue.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef CDSLIB_CONTAINER_FCQUEUE_H
  6. #define CDSLIB_CONTAINER_FCQUEUE_H
  7. #include <cds/algo/flat_combining.h>
  8. #include <cds/algo/elimination_opt.h>
  9. #include <queue>
  10. namespace cds { namespace container {
  11. /// FCQueue related definitions
  12. /** @ingroup cds_nonintrusive_helper
  13. */
  14. namespace fcqueue {
  15. /// FCQueue internal statistics
  16. template <typename Counter = cds::atomicity::event_counter >
  17. struct stat: public cds::algo::flat_combining::stat<Counter>
  18. {
  19. typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
  20. typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
  21. counter_type m_nEnqueue ; ///< Count of enqueue operations
  22. counter_type m_nEnqMove ; ///< Count of enqueue operations with move semantics
  23. counter_type m_nDequeue ; ///< Count of success dequeue operations
  24. counter_type m_nFailedDeq ; ///< Count of failed dequeue operations (pop from empty queue)
  25. counter_type m_nCollided ; ///< How many pairs of enqueue/dequeue were collided, if elimination is enabled
  26. //@cond
  27. void onEnqueue() { ++m_nEnqueue; }
  28. void onEnqMove() { ++m_nEnqMove; }
  29. void onDequeue( bool bFailed ) { if ( bFailed ) ++m_nFailedDeq; else ++m_nDequeue; }
  30. void onCollide() { ++m_nCollided; }
  31. //@endcond
  32. };
  33. /// FCQueue dummy statistics, no overhead
  34. struct empty_stat: public cds::algo::flat_combining::empty_stat
  35. {
  36. //@cond
  37. void onEnqueue() {}
  38. void onEnqMove() {}
  39. void onDequeue(bool) {}
  40. void onCollide() {}
  41. //@endcond
  42. };
  43. /// FCQueue type traits
  44. struct traits: public cds::algo::flat_combining::traits
  45. {
  46. typedef empty_stat stat; ///< Internal statistics
  47. static constexpr const bool enable_elimination = false; ///< Enable \ref cds_elimination_description "elimination"
  48. };
  49. /// Metafunction converting option list to traits
  50. /**
  51. \p Options are:
  52. - any \p cds::algo::flat_combining::make_traits options
  53. - \p opt::stat - internal statistics, possible type: \p fcqueue::stat, \p fcqueue::empty_stat (the default)
  54. - \p opt::enable_elimination - enable/disable operation \ref cds_elimination_description "elimination"
  55. By default, the elimination is disabled. For queue, the elimination is possible if the queue
  56. is empty.
  57. */
  58. template <typename... Options>
  59. struct make_traits {
  60. # ifdef CDS_DOXYGEN_INVOKED
  61. typedef implementation_defined type ; ///< Metafunction result
  62. # else
  63. typedef typename cds::opt::make_options<
  64. typename cds::opt::find_type_traits< traits, Options... >::type
  65. ,Options...
  66. >::type type;
  67. # endif
  68. };
  69. } // namespace fcqueue
  70. /// Flat-combining queue
  71. /**
  72. @ingroup cds_nonintrusive_queue
  73. @ingroup cds_flat_combining_container
  74. \ref cds_flat_combining_description "Flat combining" sequential queue.
  75. The class can be considered as a concurrent FC-based wrapper for \p std::queue.
  76. Template parameters:
  77. - \p T - a value type stored in the queue
  78. - \p Queue - sequential queue implementation, default is \p std::queue<T>
  79. - \p Trats - type traits of flat combining, default is \p fcqueue::traits.
  80. \p fcqueue::make_traits metafunction can be used to construct \p %fcqueue::traits specialization.
  81. */
  82. template <typename T,
  83. class Queue = std::queue<T>,
  84. typename Traits = fcqueue::traits
  85. >
  86. class FCQueue
  87. #ifndef CDS_DOXYGEN_INVOKED
  88. : public cds::algo::flat_combining::container
  89. #endif
  90. {
  91. public:
  92. typedef T value_type; ///< Value type
  93. typedef Queue queue_type; ///< Sequential queue class
  94. typedef Traits traits; ///< Queue type traits
  95. typedef typename traits::stat stat; ///< Internal statistics type
  96. static constexpr const bool c_bEliminationEnabled = traits::enable_elimination; ///< \p true if elimination is enabled
  97. protected:
  98. //@cond
  99. /// Queue operation IDs
  100. enum fc_operation {
  101. op_enq = cds::algo::flat_combining::req_Operation, ///< Enqueue
  102. op_enq_move, ///< Enqueue (move semantics)
  103. op_deq, ///< Dequeue
  104. op_clear ///< Clear
  105. };
  106. /// Flat combining publication list record
  107. struct fc_record: public cds::algo::flat_combining::publication_record
  108. {
  109. union {
  110. value_type const * pValEnq; ///< Value to enqueue
  111. value_type * pValDeq; ///< Dequeue destination
  112. };
  113. bool bEmpty; ///< \p true if the queue is empty
  114. };
  115. //@endcond
  116. /// Flat combining kernel
  117. typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
  118. protected:
  119. //@cond
  120. mutable fc_kernel m_FlatCombining;
  121. queue_type m_Queue;
  122. //@endcond
  123. public:
  124. /// Initializes empty queue object
  125. FCQueue()
  126. {}
  127. /// Initializes empty queue object and gives flat combining parameters
  128. FCQueue(
  129. unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
  130. ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
  131. )
  132. : m_FlatCombining( nCompactFactor, nCombinePassCount )
  133. {}
  134. /// Inserts a new element at the end of the queue
  135. /**
  136. The content of the new element initialized to a copy of \p val.
  137. The function always returns \p true
  138. */
  139. bool enqueue( value_type const& val )
  140. {
  141. auto pRec = m_FlatCombining.acquire_record();
  142. pRec->pValEnq = &val;
  143. constexpr_if ( c_bEliminationEnabled )
  144. m_FlatCombining.batch_combine( op_enq, pRec, *this );
  145. else
  146. m_FlatCombining.combine( op_enq, pRec, *this );
  147. assert( pRec->is_done());
  148. m_FlatCombining.release_record( pRec );
  149. m_FlatCombining.internal_statistics().onEnqueue();
  150. return true;
  151. }
  152. /// Inserts a new element at the end of the queue (a synonym for \ref enqueue)
  153. bool push( value_type const& val )
  154. {
  155. return enqueue( val );
  156. }
  157. /// Inserts a new element at the end of the queue (move semantics)
  158. /**
  159. \p val is moved to inserted element
  160. */
  161. bool enqueue( value_type&& val )
  162. {
  163. auto pRec = m_FlatCombining.acquire_record();
  164. pRec->pValEnq = &val;
  165. constexpr_if ( c_bEliminationEnabled )
  166. m_FlatCombining.batch_combine( op_enq_move, pRec, *this );
  167. else
  168. m_FlatCombining.combine( op_enq_move, pRec, *this );
  169. assert( pRec->is_done());
  170. m_FlatCombining.release_record( pRec );
  171. m_FlatCombining.internal_statistics().onEnqMove();
  172. return true;
  173. }
  174. /// Inserts a new element at the end of the queue (move semantics, synonym for \p enqueue)
  175. bool push( value_type&& val )
  176. {
  177. return enqueue( val );
  178. }
  179. /// Removes the next element from the queue
  180. /**
  181. \p val takes a copy of the element
  182. */
  183. bool dequeue( value_type& val )
  184. {
  185. auto pRec = m_FlatCombining.acquire_record();
  186. pRec->pValDeq = &val;
  187. constexpr_if ( c_bEliminationEnabled )
  188. m_FlatCombining.batch_combine( op_deq, pRec, *this );
  189. else
  190. m_FlatCombining.combine( op_deq, pRec, *this );
  191. assert( pRec->is_done());
  192. m_FlatCombining.release_record( pRec );
  193. m_FlatCombining.internal_statistics().onDequeue( pRec->bEmpty );
  194. return !pRec->bEmpty;
  195. }
  196. /// Removes the next element from the queue (a synonym for \ref dequeue)
  197. bool pop( value_type& val )
  198. {
  199. return dequeue( val );
  200. }
  201. /// Clears the queue
  202. void clear()
  203. {
  204. auto pRec = m_FlatCombining.acquire_record();
  205. constexpr_if ( c_bEliminationEnabled )
  206. m_FlatCombining.batch_combine( op_clear, pRec, *this );
  207. else
  208. m_FlatCombining.combine( op_clear, pRec, *this );
  209. assert( pRec->is_done());
  210. m_FlatCombining.release_record( pRec );
  211. }
  212. /// Exclusive access to underlying queue object
  213. /**
  214. The functor \p f can do any operation with underlying \p queue_type in exclusive mode.
  215. For example, you can iterate over the queue.
  216. \p Func signature is:
  217. \code
  218. void f( queue_type& queue );
  219. \endcode
  220. */
  221. template <typename Func>
  222. void apply( Func f )
  223. {
  224. auto& queue = m_Queue;
  225. m_FlatCombining.invoke_exclusive( [&queue, &f]() { f( queue ); } );
  226. }
  227. /// Exclusive access to underlying queue object
  228. /**
  229. The functor \p f can do any operation with underlying \p queue_type in exclusive mode.
  230. For example, you can iterate over the queue.
  231. \p Func signature is:
  232. \code
  233. void f( queue_type const& queue );
  234. \endcode
  235. */
  236. template <typename Func>
  237. void apply( Func f ) const
  238. {
  239. auto const& queue = m_Queue;
  240. m_FlatCombining.invoke_exclusive( [&queue, &f]() { f( queue ); } );
  241. }
  242. /// Returns the number of elements in the queue.
  243. /**
  244. Note that <tt>size() == 0</tt> is not mean that the queue is empty because
  245. combining record can be in process.
  246. To check emptiness use \ref empty function.
  247. */
  248. size_t size() const
  249. {
  250. return m_Queue.size();
  251. }
  252. /// Checks if the queue is empty
  253. /**
  254. If the combining is in process the function waits while combining done.
  255. */
  256. bool empty() const
  257. {
  258. bool bRet = false;
  259. auto const& queue = m_Queue;
  260. m_FlatCombining.invoke_exclusive( [&queue, &bRet]() { bRet = queue.empty(); } );
  261. return bRet;
  262. }
  263. /// Internal statistics
  264. stat const& statistics() const
  265. {
  266. return m_FlatCombining.statistics();
  267. }
  268. public: // flat combining cooperation, not for direct use!
  269. //@cond
  270. /// Flat combining supporting function. Do not call it directly!
  271. /**
  272. The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
  273. object if the current thread becomes a combiner. Invocation of the function means that
  274. the queue should perform an action recorded in \p pRec.
  275. */
  276. void fc_apply( fc_record * pRec )
  277. {
  278. assert( pRec );
  279. switch ( pRec->op()) {
  280. case op_enq:
  281. assert( pRec->pValEnq );
  282. m_Queue.push( *(pRec->pValEnq ));
  283. break;
  284. case op_enq_move:
  285. assert( pRec->pValEnq );
  286. m_Queue.push( std::move( *(pRec->pValEnq )));
  287. break;
  288. case op_deq:
  289. assert( pRec->pValDeq );
  290. pRec->bEmpty = m_Queue.empty();
  291. if ( !pRec->bEmpty ) {
  292. *(pRec->pValDeq) = std::move( m_Queue.front());
  293. m_Queue.pop();
  294. }
  295. break;
  296. case op_clear:
  297. while ( !m_Queue.empty())
  298. m_Queue.pop();
  299. break;
  300. default:
  301. assert(false);
  302. break;
  303. }
  304. }
  305. /// Batch-processing flat combining
  306. void fc_process( typename fc_kernel::iterator itBegin, typename fc_kernel::iterator itEnd )
  307. {
  308. typedef typename fc_kernel::iterator fc_iterator;
  309. for ( fc_iterator it = itBegin, itPrev = itEnd; it != itEnd; ++it ) {
  310. switch ( it->op( atomics::memory_order_acquire )) {
  311. case op_enq:
  312. case op_enq_move:
  313. case op_deq:
  314. if ( m_Queue.empty()) {
  315. if ( itPrev != itEnd && collide( *itPrev, *it ))
  316. itPrev = itEnd;
  317. else
  318. itPrev = it;
  319. }
  320. break;
  321. }
  322. }
  323. }
  324. //@endcond
  325. private:
  326. //@cond
  327. bool collide( fc_record& rec1, fc_record& rec2 )
  328. {
  329. switch ( rec1.op()) {
  330. case op_enq:
  331. if ( rec2.op() == op_deq ) {
  332. assert(rec1.pValEnq);
  333. assert(rec2.pValDeq);
  334. *rec2.pValDeq = *rec1.pValEnq;
  335. rec2.bEmpty = false;
  336. goto collided;
  337. }
  338. break;
  339. case op_enq_move:
  340. if ( rec2.op() == op_deq ) {
  341. assert(rec1.pValEnq);
  342. assert(rec2.pValDeq);
  343. *rec2.pValDeq = std::move( *rec1.pValEnq );
  344. rec2.bEmpty = false;
  345. goto collided;
  346. }
  347. break;
  348. case op_deq:
  349. switch ( rec2.op()) {
  350. case op_enq:
  351. case op_enq_move:
  352. return collide( rec2, rec1 );
  353. }
  354. }
  355. return false;
  356. collided:
  357. m_FlatCombining.operation_done( rec1 );
  358. m_FlatCombining.operation_done( rec2 );
  359. m_FlatCombining.internal_statistics().onCollide();
  360. return true;
  361. }
  362. //@endcond
  363. };
  364. }} // namespace cds::container
  365. #endif // #ifndef CDSLIB_CONTAINER_FCQUEUE_H