push_pop.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "pqueue_type.h"
  6. #include "item.h"
  7. namespace {
  8. static size_t s_nPushThreadCount = 4;
  9. static size_t s_nPopThreadCount = 4;
  10. static size_t s_nQueueSize = 2000000;
  11. atomics::atomic<size_t> s_nProducerCount(0);
  12. class pqueue_push_pop: public cds_test::stress_fixture
  13. {
  14. typedef cds_test::stress_fixture base_class;
  15. public:
  16. enum {
  17. producer_thread,
  18. consumer_thread
  19. };
  20. template <class PQueue>
  21. class Producer: public cds_test::thread
  22. {
  23. typedef cds_test::thread base_class;
  24. public:
  25. Producer( cds_test::thread_pool& pool, PQueue& queue )
  26. : base_class( pool, producer_thread )
  27. , m_Queue( queue )
  28. {}
  29. Producer( Producer& src )
  30. : base_class( src )
  31. , m_Queue( src.m_Queue )
  32. {}
  33. virtual thread * clone()
  34. {
  35. return new Producer( *this );
  36. }
  37. virtual void test()
  38. {
  39. typedef typename PQueue::value_type value_type;
  40. for ( auto it = m_arr.begin(); it != m_arr.end(); ++it ) {
  41. if ( !m_Queue.push( value_type( *it )))
  42. ++m_nPushError;
  43. }
  44. s_nProducerCount.fetch_sub( 1, atomics::memory_order_relaxed );
  45. }
  46. void prepare( size_t nStart, size_t nEnd )
  47. {
  48. m_arr.reserve( nEnd - nStart );
  49. for ( size_t i = nStart; i < nEnd; ++i )
  50. m_arr.push_back( i );
  51. shuffle( m_arr.begin(), m_arr.end());
  52. }
  53. public:
  54. PQueue& m_Queue;
  55. size_t m_nPushError = 0;
  56. typedef std::vector<size_t> array_type;
  57. array_type m_arr;
  58. };
  59. template <class PQueue>
  60. class Consumer: public cds_test::thread
  61. {
  62. typedef cds_test::thread base_class;
  63. public:
  64. Consumer( cds_test::thread_pool& pool, PQueue& queue )
  65. : base_class( pool, consumer_thread )
  66. , m_Queue( queue )
  67. {}
  68. Consumer( Consumer& src )
  69. : base_class( src )
  70. , m_Queue( src.m_Queue )
  71. {}
  72. virtual thread * clone()
  73. {
  74. return new Consumer( *this );
  75. }
  76. virtual void test()
  77. {
  78. typename PQueue::value_type val;
  79. while ( s_nProducerCount.load( atomics::memory_order_relaxed ) != 0 || !m_Queue.empty()) {
  80. if ( m_Queue.pop( val ))
  81. ++m_nPopSuccess;
  82. else
  83. ++m_nPopFailed;
  84. }
  85. }
  86. public:
  87. PQueue& m_Queue;
  88. size_t m_nPopSuccess = 0;
  89. size_t m_nPopFailed = 0;
  90. typedef std::vector<size_t> array_type;
  91. array_type m_arr;
  92. };
  93. protected:
  94. template <class PQueue>
  95. void test( PQueue& q )
  96. {
  97. size_t const nThreadItemCount = s_nQueueSize / s_nPushThreadCount;
  98. s_nQueueSize = nThreadItemCount * s_nPushThreadCount;
  99. propout() << std::make_pair( "producer_count", s_nPushThreadCount )
  100. << std::make_pair( "consunmer_count", s_nPopThreadCount )
  101. << std::make_pair( "queue_size", s_nQueueSize );
  102. cds_test::thread_pool& pool = get_pool();
  103. pool.add( new Producer<PQueue>( pool, q ), s_nPushThreadCount );
  104. size_t nStart = 0;
  105. for ( size_t i = 0; i < pool.size(); ++i ) {
  106. static_cast<Producer<PQueue>&>(pool.get( i )).prepare( nStart, nStart + nThreadItemCount );
  107. nStart += nThreadItemCount;
  108. }
  109. pool.add( new Consumer<PQueue>( pool, q ), s_nPopThreadCount );
  110. s_nProducerCount.store( s_nPushThreadCount, atomics::memory_order_release );
  111. std::chrono::milliseconds duration = pool.run();
  112. propout() << std::make_pair( "duration", duration );
  113. // Analyze result
  114. size_t nTotalPopped = 0;
  115. size_t nPushFailed = 0;
  116. size_t nPopFailed = 0;
  117. for ( size_t i = 0; i < pool.size(); ++i ) {
  118. cds_test::thread& t = pool.get(i);
  119. if ( t.type() == consumer_thread ) {
  120. Consumer<PQueue>& cons = static_cast<Consumer<PQueue>&>( t );
  121. nTotalPopped += cons.m_nPopSuccess;
  122. nPopFailed += cons.m_nPopFailed;
  123. }
  124. else {
  125. assert( t.type() == producer_thread );
  126. Producer<PQueue>& prod = static_cast<Producer<PQueue>&>(t);
  127. nPushFailed += prod.m_nPushError;
  128. EXPECT_EQ( prod.m_nPushError , 0u ) << "producer " << i;
  129. }
  130. }
  131. propout() << std::make_pair( "total_popped", nTotalPopped )
  132. << std::make_pair( "empty_pop", nPopFailed )
  133. << std::make_pair( "push_error", nPushFailed );
  134. EXPECT_EQ( nTotalPopped, s_nQueueSize );
  135. EXPECT_EQ( nPushFailed, 0u );
  136. //check_statistics( testQueue.statistics());
  137. propout() << q.statistics();
  138. }
  139. public:
  140. static void SetUpTestCase()
  141. {
  142. cds_test::config const& cfg = get_config( "pqueue_push_pop" );
  143. s_nPushThreadCount = cfg.get_size_t( "PushThreadCount", s_nPushThreadCount );
  144. s_nPopThreadCount = cfg.get_size_t( "PopThreadCount", s_nPopThreadCount );
  145. s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
  146. if ( s_nPushThreadCount == 0u )
  147. s_nPushThreadCount = 1;
  148. if ( s_nPopThreadCount == 0u )
  149. s_nPopThreadCount = 1;
  150. if ( s_nQueueSize == 0u )
  151. s_nQueueSize = 1000;
  152. }
  153. //static void TearDownTestCase();
  154. };
  155. #define CDSSTRESS_MSPriorityQueue( fixture_t, pqueue_t ) \
  156. TEST_F( fixture_t, pqueue_t ) \
  157. { \
  158. typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
  159. pqueue_type pq( s_nQueueSize ); \
  160. test( pq ); \
  161. }
  162. CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_less )
  163. CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_less_stat )
  164. CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_cmp )
  165. //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_dyn_mutex ) // too slow
  166. #define CDSSTRESS_MSPriorityQueue_static( fixture_t, pqueue_t ) \
  167. TEST_F( fixture_t, pqueue_t ) \
  168. { \
  169. typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
  170. std::unique_ptr< pqueue_type > pq( new pqueue_type ); \
  171. test( *pq.get()); \
  172. }
  173. //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_less )
  174. //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_less_stat )
  175. //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_cmp )
  176. //CDSSTRESS_MSPriorityQueue( pqueue_push_pop, MSPriorityQueue_static_mutex )
  177. #define CDSSTRESS_PriorityQueue( fixture_t, pqueue_t ) \
  178. TEST_F( fixture_t, pqueue_t ) \
  179. { \
  180. typedef pqueue::Types<pqueue::simple_value>::pqueue_t pqueue_type; \
  181. pqueue_type pq; \
  182. test( pq ); \
  183. }
  184. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_vector )
  185. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_vector_stat )
  186. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_deque )
  187. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_deque_stat )
  188. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_deque )
  189. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_deque_stat )
  190. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_stable_vector )
  191. CDSSTRESS_PriorityQueue( pqueue_push_pop, FCPQueue_boost_stable_vector_stat )
  192. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_max )
  193. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_max_stat )
  194. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_min )
  195. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_HP_min_stat )
  196. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_max )
  197. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_max_stat )
  198. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_min )
  199. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_DHP_min_stat )
  200. // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_max )
  201. // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_max_stat )
  202. // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_min )
  203. // CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpi_min_stat )
  204. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_max )
  205. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_max_stat )
  206. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_min )
  207. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpb_min_stat )
  208. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_max )
  209. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_max_stat )
  210. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_min )
  211. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_gpt_min_stat )
  212. #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
  213. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_max )
  214. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_max_stat )
  215. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_min )
  216. CDSSTRESS_PriorityQueue( pqueue_push_pop, EllenBinTree_RCU_shb_min_stat )
  217. #endif
  218. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_HP_max )
  219. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_HP_max )
  220. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_HP_max )
  221. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_HP_max_stat )
  222. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_HP_max_stat )
  223. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_HP_max_stat )
  224. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_HP_min )
  225. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_HP_min )
  226. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_HP_min )
  227. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_HP_min_stat )
  228. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_HP_min_stat )
  229. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_HP_min_stat )
  230. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_DHP_max )
  231. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_DHP_max )
  232. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_DHP_max )
  233. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_DHP_max_stat )
  234. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_DHP_max_stat )
  235. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_DHP_max_stat )
  236. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_DHP_min )
  237. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_DHP_min )
  238. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_DHP_min )
  239. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_DHP_min_stat )
  240. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_DHP_min_stat )
  241. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_DHP_min_stat )
  242. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_gpi_max )
  243. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_gpi_min )
  244. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_gpb_max )
  245. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_RCU_gpb_max )
  246. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_RCU_gpb_max )
  247. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_gpb_min )
  248. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList24_RCU_gpb_min )
  249. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList16_RCU_gpb_min )
  250. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_gpt_max )
  251. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_gpt_min )
  252. #ifdef CDS_URCU_SIGNAL_HANDLING_ENABLED
  253. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_shb_max )
  254. CDSSTRESS_PriorityQueue( pqueue_push_pop, SkipList32_RCU_shb_min )
  255. #endif
  256. CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_vector_spin )
  257. CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_vector_mutex )
  258. CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_deque_spin )
  259. CDSSTRESS_PriorityQueue( pqueue_push_pop, StdPQueue_deque_mutex )
  260. } // namespace