fcpriority_queue.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
  6. #define CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H
  7. #include <cds/algo/flat_combining.h>
  8. #include <cds/algo/elimination_opt.h>
  9. #include <queue>
  10. namespace cds { namespace container {
  11. /// FCPriorityQueue related definitions
  12. /** @ingroup cds_nonintrusive_helper
  13. */
  14. namespace fcpqueue {
  15. /// FCPriorityQueue internal statistics
  16. template <typename Counter = cds::atomicity::event_counter >
  17. struct stat: public cds::algo::flat_combining::stat<Counter>
  18. {
  19. typedef cds::algo::flat_combining::stat<Counter> flat_combining_stat; ///< Flat-combining statistics
  20. typedef typename flat_combining_stat::counter_type counter_type; ///< Counter type
  21. counter_type m_nPush ; ///< Count of push operations
  22. counter_type m_nPushMove ; ///< Count of push operations with move semantics
  23. counter_type m_nPop ; ///< Count of success pop operations
  24. counter_type m_nFailedPop; ///< Count of failed pop operations (pop from empty queue)
  25. //@cond
  26. void onPush() { ++m_nPush; }
  27. void onPushMove() { ++m_nPushMove; }
  28. void onPop( bool bFailed ) { if ( bFailed ) ++m_nFailedPop; else ++m_nPop; }
  29. //@endcond
  30. };
  31. /// FCPriorityQueue dummy statistics, no overhead
  32. struct empty_stat: public cds::algo::flat_combining::empty_stat
  33. {
  34. //@cond
  35. void onPush() {}
  36. void onPushMove() {}
  37. void onPop(bool) {}
  38. //@endcond
  39. };
  40. /// FCPriorityQueue traits
  41. struct traits: public cds::algo::flat_combining::traits
  42. {
  43. typedef empty_stat stat; ///< Internal statistics
  44. };
  45. /// Metafunction converting option list to traits
  46. /**
  47. \p Options are:
  48. - any \p cds::algo::flat_combining::make_traits options
  49. - \p opt::stat - internal statistics, possible type: \p fcpqueue::stat, \p fcpqueue::empty_stat (the default)
  50. */
  51. template <typename... Options>
  52. struct make_traits {
  53. # ifdef CDS_DOXYGEN_INVOKED
  54. typedef implementation_defined type ; ///< Metafunction result
  55. # else
  56. typedef typename cds::opt::make_options<
  57. typename cds::opt::find_type_traits< traits, Options... >::type
  58. ,Options...
  59. >::type type;
  60. # endif
  61. };
  62. } // namespace fcpqueue
  63. /// Flat-combining priority queue
  64. /**
  65. @ingroup cds_nonintrusive_priority_queue
  66. @ingroup cds_flat_combining_container
  67. \ref cds_flat_combining_description "Flat combining" sequential priority queue.
  68. The class can be considered as a concurrent FC-based wrapper for \p std::priority_queue.
  69. Template parameters:
  70. - \p T - a value type stored in the queue
  71. - \p PriorityQueue - sequential priority queue implementation, default is \p std::priority_queue<T>
  72. - \p Traits - type traits of flat combining, default is \p fcpqueue::traits.
  73. \p fcpqueue::make_traits metafunction can be used to construct specialized \p %fcpqueue::traits
  74. */
  75. template <typename T,
  76. class PriorityQueue = std::priority_queue<T>,
  77. typename Traits = fcpqueue::traits
  78. >
  79. class FCPriorityQueue
  80. #ifndef CDS_DOXYGEN_INVOKED
  81. : public cds::algo::flat_combining::container
  82. #endif
  83. {
  84. public:
  85. typedef T value_type; ///< Value type
  86. typedef PriorityQueue priority_queue_type; ///< Sequential priority queue class
  87. typedef Traits traits; ///< Priority queue type traits
  88. typedef typename traits::stat stat; ///< Internal statistics type
  89. protected:
  90. //@cond
  91. // Priority queue operation IDs
  92. enum fc_operation {
  93. op_push = cds::algo::flat_combining::req_Operation,
  94. op_push_move,
  95. op_pop,
  96. op_clear
  97. };
  98. // Flat combining publication list record
  99. struct fc_record: public cds::algo::flat_combining::publication_record
  100. {
  101. union {
  102. value_type const * pValPush; // Value to push
  103. value_type * pValPop; // Pop destination
  104. };
  105. bool bEmpty; // true if the queue is empty
  106. };
  107. //@endcond
  108. /// Flat combining kernel
  109. typedef cds::algo::flat_combining::kernel< fc_record, traits > fc_kernel;
  110. protected:
  111. //@cond
  112. mutable fc_kernel m_FlatCombining;
  113. priority_queue_type m_PQueue;
  114. //@endcond
  115. public:
  116. /// Initializes empty priority queue object
  117. FCPriorityQueue()
  118. {}
  119. /// Initializes empty priority queue object and gives flat combining parameters
  120. FCPriorityQueue(
  121. unsigned int nCompactFactor ///< Flat combining: publication list compacting factor
  122. ,unsigned int nCombinePassCount ///< Flat combining: number of combining passes for combiner thread
  123. )
  124. : m_FlatCombining( nCompactFactor, nCombinePassCount )
  125. {}
  126. /// Inserts a new element in the priority queue
  127. /**
  128. The function always returns \p true
  129. */
  130. bool push(
  131. value_type const& val ///< Value to be copied to inserted element
  132. )
  133. {
  134. auto pRec = m_FlatCombining.acquire_record();
  135. pRec->pValPush = &val;
  136. m_FlatCombining.combine( op_push, pRec, *this );
  137. assert( pRec->is_done());
  138. m_FlatCombining.release_record( pRec );
  139. m_FlatCombining.internal_statistics().onPush();
  140. return true;
  141. }
  142. /// Inserts a new element in the priority queue (move semantics)
  143. /**
  144. The function always returns \p true
  145. */
  146. bool push(
  147. value_type&& val ///< Value to be moved to inserted element
  148. )
  149. {
  150. auto pRec = m_FlatCombining.acquire_record();
  151. pRec->pValPush = &val;
  152. m_FlatCombining.combine( op_push_move, pRec, *this );
  153. assert( pRec->is_done());
  154. m_FlatCombining.release_record( pRec );
  155. m_FlatCombining.internal_statistics().onPushMove();
  156. return true;
  157. }
  158. /// Removes the top element from priority queue
  159. /**
  160. The function returns \p false if the queue is empty, \p true otherwise.
  161. If the queue is empty \p val is not changed.
  162. */
  163. bool pop(
  164. value_type& val ///< Target to be received the copy of top element
  165. )
  166. {
  167. auto pRec = m_FlatCombining.acquire_record();
  168. pRec->pValPop = &val;
  169. m_FlatCombining.combine( op_pop, pRec, *this );
  170. assert( pRec->is_done());
  171. m_FlatCombining.release_record( pRec );
  172. m_FlatCombining.internal_statistics().onPop( pRec->bEmpty );
  173. return !pRec->bEmpty;
  174. }
  175. /// Exclusive access to underlying priority queue object
  176. /**
  177. The functor \p f can do any operation with underlying \p priority_queue_type in exclusive mode.
  178. For example, you can iterate over the queue.
  179. \p Func signature is:
  180. \code
  181. void f( priority_queue_type& deque );
  182. \endcode
  183. */
  184. template <typename Func>
  185. void apply( Func f )
  186. {
  187. auto& pqueue = m_PQueue;
  188. m_FlatCombining.invoke_exclusive( [&pqueue, &f]() { f( pqueue ); } );
  189. }
  190. /// Exclusive access to underlying priority queue object
  191. /**
  192. The functor \p f can do any operation with underlying \p proiprity_queue_type in exclusive mode.
  193. For example, you can iterate over the queue.
  194. \p Func signature is:
  195. \code
  196. void f( priority_queue_type const& queue );
  197. \endcode
  198. */
  199. template <typename Func>
  200. void apply( Func f ) const
  201. {
  202. auto const& pqueue = m_PQueue;
  203. m_FlatCombining.invoke_exclusive( [&pqueue, &f]() { f( pqueue ); } );
  204. }
  205. /// Clears the priority queue
  206. void clear()
  207. {
  208. auto pRec = m_FlatCombining.acquire_record();
  209. m_FlatCombining.combine( op_clear, pRec, *this );
  210. assert( pRec->is_done());
  211. m_FlatCombining.release_record( pRec );
  212. }
  213. /// Returns the number of elements in the priority queue.
  214. /**
  215. Note that <tt>size() == 0</tt> does not mean that the queue is empty because
  216. combining record can be in process.
  217. To check emptiness use \ref empty function.
  218. */
  219. size_t size() const
  220. {
  221. return m_PQueue.size();
  222. }
  223. /// Checks if the priority queue is empty
  224. /**
  225. If the combining is in process the function waits while combining done.
  226. */
  227. bool empty()
  228. {
  229. bool bRet = false;
  230. auto const& pq = m_PQueue;
  231. m_FlatCombining.invoke_exclusive( [&pq, &bRet]() { bRet = pq.empty(); } );
  232. return bRet;
  233. }
  234. /// Internal statistics
  235. stat const& statistics() const
  236. {
  237. return m_FlatCombining.statistics();
  238. }
  239. public: // flat combining cooperation, not for direct use!
  240. //@cond
  241. /*
  242. The function is called by \ref cds::algo::flat_combining::kernel "flat combining kernel"
  243. object if the current thread becomes a combiner. Invocation of the function means that
  244. the priority queue should perform an action recorded in \p pRec.
  245. */
  246. void fc_apply( fc_record * pRec )
  247. {
  248. assert( pRec );
  249. // this function is called under FC mutex, so switch TSan off
  250. //CDS_TSAN_ANNOTATE_IGNORE_RW_BEGIN;
  251. switch ( pRec->op()) {
  252. case op_push:
  253. assert( pRec->pValPush );
  254. m_PQueue.push( *(pRec->pValPush));
  255. break;
  256. case op_push_move:
  257. assert( pRec->pValPush );
  258. m_PQueue.push( std::move( *(pRec->pValPush )));
  259. break;
  260. case op_pop:
  261. assert( pRec->pValPop );
  262. pRec->bEmpty = m_PQueue.empty();
  263. if ( !pRec->bEmpty ) {
  264. *(pRec->pValPop) = std::move( m_PQueue.top());
  265. m_PQueue.pop();
  266. }
  267. break;
  268. case op_clear:
  269. while ( !m_PQueue.empty())
  270. m_PQueue.pop();
  271. break;
  272. default:
  273. assert(false);
  274. break;
  275. }
  276. //CDS_TSAN_ANNOTATE_IGNORE_RW_END;
  277. }
  278. //@endcond
  279. };
  280. }} // namespace cds::container
  281. #endif // #ifndef CDSLIB_CONTAINER_FCPRIORITY_QUEUE_H