pop.cpp 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "pqueue_type.h"
  6. #include "item.h"
  7. namespace {
  8. static size_t s_nThreadCount = 8;
  9. static size_t s_nQueueSize = 2000000;
  10. class pqueue_pop: public cds_test::stress_fixture
  11. {
  12. typedef cds_test::stress_fixture base_class;
  13. protected:
  14. template <class PQueue>
  15. class Consumer: public cds_test::thread
  16. {
  17. typedef cds_test::thread base_class;
  18. public:
  19. Consumer( cds_test::thread_pool& pool, PQueue& queue )
  20. : base_class( pool )
  21. , m_Queue( queue )
  22. {}
  23. Consumer( Consumer& src )
  24. : base_class( src )
  25. , m_Queue( src.m_Queue )
  26. {}
  27. virtual thread * clone()
  28. {
  29. return new Consumer( *this );
  30. }
  31. virtual void test()
  32. {
  33. typedef typename PQueue::value_type value_type;
  34. size_t nPrevKey;
  35. value_type val;
  36. if ( m_Queue.pop( val )) {
  37. ++m_nPopSuccess;
  38. nPrevKey = val.key;
  39. bool prevPopFailed = false;
  40. while ( m_Queue.pop( val )) {
  41. ++m_nPopSuccess;
  42. if ( val.key > nPrevKey ) {
  43. ++m_nPopError;
  44. m_arrFailedPops.emplace_back( failed_pops{ nPrevKey, val.key, static_cast<size_t>(-1) } );
  45. prevPopFailed = true;
  46. }
  47. else if ( val.key == nPrevKey ) {
  48. ++m_nPopErrorEq;
  49. m_arrFailedPops.emplace_back( failed_pops{ nPrevKey, val.key, static_cast<size_t>(-1) } );
  50. }
  51. else {
  52. if ( prevPopFailed )
  53. m_arrFailedPops.back().next_key = val.key;
  54. prevPopFailed = false;
  55. }
  56. if ( nPrevKey > val.key )
  57. nPrevKey = val.key;
  58. }
  59. }
  60. else
  61. ++m_nPopFailed;
  62. }
  63. public:
  64. PQueue& m_Queue;
  65. size_t m_nPopError = 0;
  66. size_t m_nPopErrorEq = 0;
  67. size_t m_nPopSuccess = 0;
  68. size_t m_nPopFailed = 0;
  69. struct failed_pops {
  70. size_t prev_key;
  71. size_t popped_key;
  72. size_t next_key;
  73. };
  74. std::vector< failed_pops > m_arrFailedPops;
  75. };
  76. protected:
  77. template <class PQueue>
  78. void test( PQueue& q )
  79. {
  80. cds_test::thread_pool& pool = get_pool();
  81. // push
  82. {
  83. std::vector< size_t > arr;
  84. arr.reserve( s_nQueueSize );
  85. for ( size_t i = 0; i < s_nQueueSize; ++i )
  86. arr.push_back( i );
  87. shuffle( arr.begin(), arr.end());
  88. size_t nPushError = 0;
  89. typedef typename PQueue::value_type value_type;
  90. for ( auto it = arr.begin(); it != arr.end(); ++it ) {
  91. if ( !q.push( value_type( *it )))
  92. ++nPushError;
  93. }
  94. s_nQueueSize -= nPushError;
  95. }
  96. propout() << std::make_pair( "thread_count", s_nThreadCount )
  97. << std::make_pair( "push_count", s_nQueueSize );
  98. // pop
  99. {
  100. pool.add( new Consumer<PQueue>( pool, q ), s_nThreadCount );
  101. std::chrono::milliseconds duration = pool.run();
  102. propout() << std::make_pair( "consumer_duration", duration );
  103. // Analyze result
  104. size_t nTotalPopped = 0;
  105. size_t nTotalError = 0;
  106. size_t nTotalErrorEq = 0;
  107. size_t nTotalFailed = 0;
  108. for ( size_t i = 0; i < pool.size(); ++i ) {
  109. Consumer<PQueue>& cons = static_cast<Consumer<PQueue>&>( pool.get(i));
  110. nTotalPopped += cons.m_nPopSuccess;
  111. nTotalError += cons.m_nPopError;
  112. nTotalErrorEq += cons.m_nPopErrorEq;
  113. nTotalFailed += cons.m_nPopFailed;
  114. if ( !cons.m_arrFailedPops.empty()) {
  115. std::cerr << "Priority violations, thread " << i;
  116. for ( size_t k = 0; k < cons.m_arrFailedPops.size(); ++k ) {
  117. std::cerr << "\n " << "prev_key=" << cons.m_arrFailedPops[k].prev_key << " popped_key=" << cons.m_arrFailedPops[k].popped_key;
  118. if ( cons.m_arrFailedPops[k].next_key != static_cast<size_t>(-1))
  119. std::cerr << " next_key=" << cons.m_arrFailedPops[k].next_key;
  120. else
  121. std::cerr << " next_key unspecified";
  122. }
  123. std::cerr << std::endl;
  124. }
  125. }
  126. propout()
  127. << std::make_pair( "total_popped", nTotalPopped )
  128. << std::make_pair( "error_pop_double", nTotalErrorEq )
  129. << std::make_pair( "error_priority_violation", nTotalError );
  130. EXPECT_EQ( nTotalPopped, s_nQueueSize );
  131. EXPECT_EQ( nTotalError, 0u ) << "priority violations";
  132. EXPECT_EQ( nTotalErrorEq, 0u ) << "double key";
  133. }
  134. propout() << q.statistics();
  135. }
  136. public:
  137. static void SetUpTestCase()
  138. {
  139. cds_test::config const& cfg = get_config( "pqueue_pop" );
  140. s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
  141. s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
  142. if ( s_nThreadCount == 0u )
  143. s_nThreadCount = 1;
  144. if ( s_nQueueSize == 0u )
  145. s_nQueueSize = 1000;
  146. }
  147. //static void TearDownTestCase();
  148. };
  149. #define CDSSTRESS_MSPriorityQueue( fixture_t, pqueue_t ) \
  150. TEST_F( fixture_t, pqueue_t ) \
  151. { \
  152. typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
  153. pqueue_type pq( s_nQueueSize + 1 ); \
  154. test( pq ); \
  155. }
  156. CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less )
  157. CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_less_stat )
  158. CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_cmp )
  159. //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_dyn_mutex ) // too slow
  160. #define CDSSTRESS_MSPriorityQueue_static( fixture_t, pqueue_t ) \
  161. TEST_F( fixture_t, pqueue_t ) \
  162. { \
  163. typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
  164. std::unique_ptr< pqueue_type > pq( new pqueue_type ); \
  165. test( *pq.get()); \
  166. }
  167. //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_static_less )
  168. //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_static_less_stat )
  169. //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_static_cmp )
  170. //CDSSTRESS_MSPriorityQueue( pqueue_pop, MSPriorityQueue_static_mutex )
  171. #define CDSSTRESS_PriorityQueue( fixture_t, pqueue_t ) \
  172. TEST_F( fixture_t, pqueue_t ) \
  173. { \
  174. typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
  175. pqueue_type pq; \
  176. test( pq ); \
  177. }
  178. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_vector )
  179. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_vector_stat )
  180. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_deque )
  181. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_deque_stat )
  182. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_deque )
  183. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_deque_stat )
  184. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_stable_vector )
  185. CDSSTRESS_PriorityQueue( pqueue_pop, FCPQueue_boost_stable_vector_stat )
  186. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_max )
  187. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_max_stat )
  188. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_min )
  189. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_HP_min_stat )
  190. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_max )
  191. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_max_stat )
  192. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_min )
  193. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_DHP_min_stat )
  194. // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_max )
  195. // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_max_stat )
  196. // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_min )
  197. // CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpi_min_stat )
  198. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_max )
  199. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_max_stat )
  200. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_min )
  201. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpb_min_stat )
  202. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_max )
  203. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_max_stat )
  204. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_min )
  205. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_gpt_min_stat )
  206. #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
  207. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_max )
  208. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_max_stat )
  209. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_min )
  210. CDSSTRESS_PriorityQueue( pqueue_pop, EllenBinTree_RCU_shb_min_stat )
  211. #endif
  212. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_HP_max )
  213. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_HP_max_stat )
  214. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_HP_min )
  215. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_HP_min_stat )
  216. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_DHP_max )
  217. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_DHP_max_stat )
  218. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_DHP_min )
  219. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_DHP_min_stat )
  220. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_gpi_max )
  221. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_gpi_min )
  222. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_gpb_max )
  223. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_gpb_min )
  224. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_gpt_max )
  225. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_gpt_min )
  226. #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
  227. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_shb_max )
  228. CDSSTRESS_PriorityQueue( pqueue_pop, SkipList32_RCU_shb_min )
  229. #endif
  230. CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_vector_spin )
  231. CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_vector_mutex )
  232. CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_deque_spin )
  233. CDSSTRESS_PriorityQueue( pqueue_pop, StdPQueue_deque_mutex )
  234. } // namespace