pop.cpp 6.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "queue_type.h"
  6. // Multi-threaded queue test for pop operation
  7. namespace {
  8. static size_t s_nThreadCount = 8;
  9. static size_t s_nQueueSize = 20000000 ; // no more than 20 million records
  10. struct SimpleValue {
  11. size_t nNo;
  12. SimpleValue(): nNo(0) {}
  13. SimpleValue( size_t n ): nNo(n) {}
  14. size_t getNo() const { return nNo; }
  15. };
  16. class queue_pop: public cds_test::stress_fixture
  17. {
  18. protected:
  19. struct value_type
  20. {
  21. size_t nNo;
  22. value_type()
  23. : nNo( 0 )
  24. {}
  25. value_type( size_t n )
  26. : nNo( n )
  27. {}
  28. };
  29. template <class Queue>
  30. class Consumer: public cds_test::thread
  31. {
  32. typedef cds_test::thread base_class;
  33. public:
  34. Consumer( cds_test::thread_pool& pool, Queue& queue )
  35. : base_class( pool )
  36. , m_Queue( queue )
  37. , m_arr( new uint8_t[ s_nQueueSize ])
  38. , m_nPopCount( 0 )
  39. {}
  40. Consumer( Consumer& src )
  41. : base_class( src )
  42. , m_Queue( src.m_Queue )
  43. , m_arr( new uint8_t[ s_nQueueSize ])
  44. , m_nPopCount( 0 )
  45. {}
  46. virtual thread * clone()
  47. {
  48. return new Consumer( *this );
  49. }
  50. virtual void test()
  51. {
  52. memset( m_arr.get(), 0, sizeof( m_arr[0] ) * s_nQueueSize );
  53. typedef typename Queue::value_type value_type;
  54. value_type value;
  55. size_t nPopCount = 0;
  56. while ( m_Queue.pop( value )) {
  57. ++m_arr[ value.nNo ];
  58. ++nPopCount;
  59. }
  60. m_nPopCount = nPopCount;
  61. }
  62. public:
  63. Queue& m_Queue;
  64. std::unique_ptr< uint8_t[] > m_arr;
  65. size_t m_nPopCount;
  66. };
  67. public:
  68. static void SetUpTestCase()
  69. {
  70. cds_test::config const& cfg = get_config( "queue_pop" );
  71. s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
  72. s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
  73. if ( s_nThreadCount == 0 )
  74. s_nThreadCount = 1;
  75. if ( s_nQueueSize == 0 )
  76. s_nQueueSize = 1000;
  77. }
  78. //static void TearDownTestCase();
  79. protected:
  80. template <class Queue>
  81. void analyze( Queue& q )
  82. {
  83. cds_test::thread_pool& pool = get_pool();
  84. std::unique_ptr< uint8_t[] > arr( new uint8_t[s_nQueueSize] );
  85. memset(arr.get(), 0, sizeof(arr[0]) * s_nQueueSize );
  86. size_t nTotalPops = 0;
  87. for ( size_t i = 0; i < pool.size(); ++i ) {
  88. Consumer<Queue>& thread = static_cast<Consumer<Queue>&>(pool.get( i ));
  89. for ( size_t j = 0; j < s_nQueueSize; ++j )
  90. arr[j] += thread.m_arr[j];
  91. nTotalPops += thread.m_nPopCount;
  92. }
  93. EXPECT_EQ( nTotalPops, s_nQueueSize );
  94. EXPECT_TRUE( q.empty());
  95. for ( size_t i = 0; i < s_nQueueSize; ++i ) {
  96. EXPECT_EQ( arr[i], 1 ) << "i=" << i;
  97. }
  98. }
  99. template <class Queue>
  100. void test( Queue& q )
  101. {
  102. cds_test::thread_pool& pool = get_pool();
  103. pool.add( new Consumer<Queue>( pool, q ), s_nThreadCount );
  104. for ( size_t i = 0; i < s_nQueueSize; ++i )
  105. q.push( i );
  106. propout() << std::make_pair( "thread_count", s_nThreadCount )
  107. << std::make_pair( "push_count", s_nQueueSize );
  108. std::chrono::milliseconds duration = pool.run();
  109. propout() << std::make_pair( "duration", duration );
  110. analyze( q );
  111. propout() << q.statistics();
  112. }
  113. };
  114. CDSSTRESS_MSQueue( queue_pop )
  115. CDSSTRESS_MoirQueue( queue_pop )
  116. CDSSTRESS_BasketQueue( queue_pop )
  117. CDSSTRESS_OptimsticQueue( queue_pop )
  118. CDSSTRESS_FCQueue( queue_pop )
  119. CDSSTRESS_FCDeque( queue_pop )
  120. CDSSTRESS_RWQueue( queue_pop )
  121. CDSSTRESS_StdQueue( queue_pop )
  122. #undef CDSSTRESS_Queue_F
  123. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  124. TEST_F( test_fixture, type_name ) \
  125. { \
  126. typedef queue::Types< value_type >::type_name queue_type; \
  127. queue_type queue( s_nQueueSize ); \
  128. test( queue ); \
  129. }
  130. CDSSTRESS_VyukovQueue( queue_pop )
  131. #undef CDSSTRESS_Queue_F
  132. // ********************************************************************
  133. // SegmentedQueue test
  134. class segmented_queue_pop
  135. : public queue_pop
  136. , public ::testing::WithParamInterface< size_t >
  137. {
  138. typedef queue_pop base_class;
  139. protected:
  140. template <typename Queue>
  141. void test()
  142. {
  143. size_t quasi_factor = GetParam();
  144. Queue q( quasi_factor );
  145. propout() << std::make_pair( "quasi_factor", quasi_factor );
  146. base_class::test( q );
  147. }
  148. public:
  149. static std::vector< size_t > get_test_parameters()
  150. {
  151. cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_pop" );
  152. bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
  153. size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
  154. std::vector<size_t> args;
  155. if ( bIterative && quasi_factor > 4 ) {
  156. for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
  157. args.push_back( qf );
  158. }
  159. else {
  160. if ( quasi_factor > 2 )
  161. args.push_back( quasi_factor );
  162. else
  163. args.push_back( 2 );
  164. }
  165. return args;
  166. }
  167. };
  168. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  169. TEST_P( test_fixture, type_name ) \
  170. { \
  171. typedef typename queue::Types<value_type>::type_name queue_type; \
  172. test< queue_type >(); \
  173. }
  174. CDSSTRESS_SegmentedQueue( segmented_queue_pop )
  175. #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
  176. static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
  177. {
  178. return std::to_string( p.param );
  179. }
  180. INSTANTIATE_TEST_CASE_P( SQ,
  181. segmented_queue_pop,
  182. ::testing::ValuesIn( segmented_queue_pop::get_test_parameters()), get_test_parameter_name );
  183. #else
  184. INSTANTIATE_TEST_CASE_P( SQ,
  185. segmented_queue_pop,
  186. ::testing::ValuesIn( segmented_queue_pop::get_test_parameters()));
  187. #endif
  188. } // namespace