push.cpp 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "queue_type.h"
  6. // Multi-threaded queue test for push operation
  7. namespace {
  8. static size_t s_nThreadCount = 8;
  9. static size_t s_nQueueSize = 20000000 ; // no more than 20 million records
  10. class queue_push: public cds_test::stress_fixture
  11. {
  12. protected:
  13. struct value_type
  14. {
  15. size_t nNo;
  16. value_type()
  17. : nNo( 0 )
  18. {}
  19. value_type( size_t n )
  20. : nNo( n )
  21. {}
  22. };
  23. template <class Queue>
  24. class Producer: public cds_test::thread
  25. {
  26. typedef cds_test::thread base_class;
  27. public:
  28. Producer( cds_test::thread_pool& pool, Queue& queue )
  29. : base_class( pool )
  30. , m_Queue( queue )
  31. , m_nStartItem( 0 )
  32. , m_nEndItem( 0 )
  33. , m_nPushError( 0 )
  34. {}
  35. Producer( Producer& src )
  36. : base_class( src )
  37. , m_Queue( src.m_Queue )
  38. , m_nStartItem( 0 )
  39. , m_nEndItem( 0 )
  40. , m_nPushError( 0 )
  41. {}
  42. virtual thread * clone()
  43. {
  44. return new Producer( *this );
  45. }
  46. virtual void test()
  47. {
  48. for ( size_t nItem = m_nStartItem; nItem < m_nEndItem; ++nItem ) {
  49. if ( !m_Queue.push( nItem ))
  50. ++m_nPushError;
  51. }
  52. }
  53. public:
  54. Queue& m_Queue;
  55. size_t m_nStartItem;
  56. size_t m_nEndItem;
  57. size_t m_nPushError;
  58. };
  59. public:
  60. static void SetUpTestCase()
  61. {
  62. cds_test::config const& cfg = get_config( "queue_push" );
  63. s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
  64. s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
  65. if ( s_nThreadCount == 0u )
  66. s_nThreadCount = 1;
  67. if ( s_nQueueSize == 0u )
  68. s_nQueueSize = 1000;
  69. }
  70. //static void TearDownTestCase();
  71. protected:
  72. template <class Queue>
  73. void test( Queue& q )
  74. {
  75. cds_test::thread_pool& pool = get_pool();
  76. pool.add( new Producer<Queue>( pool, q ), s_nThreadCount );
  77. size_t nStart = 0;
  78. size_t nThreadItemCount = s_nQueueSize / s_nThreadCount;
  79. for ( size_t i = 0; i < pool.size(); ++i ) {
  80. Producer<Queue>& thread = static_cast<Producer<Queue>&>(pool.get( i ));
  81. thread.m_nStartItem = nStart;
  82. nStart += nThreadItemCount;
  83. thread.m_nEndItem = nStart;
  84. }
  85. s_nQueueSize = nThreadItemCount * s_nThreadCount;
  86. propout() << std::make_pair( "thread_count", s_nThreadCount )
  87. << std::make_pair( "push_count", s_nQueueSize );
  88. std::chrono::milliseconds duration = pool.run();
  89. propout() << std::make_pair( "duration", duration );
  90. analyze( q );
  91. propout() << q.statistics();
  92. }
  93. template <class Queue>
  94. void analyze( Queue& q )
  95. {
  96. size_t nThreadItems = s_nQueueSize / s_nThreadCount;
  97. cds_test::thread_pool& pool = get_pool();
  98. for ( size_t i = 0; i < pool.size(); ++i ) {
  99. Producer<Queue>& thread = static_cast<Producer<Queue>&>(pool.get( i ));
  100. EXPECT_EQ( thread.m_nPushError, 0u ) << " producer thread " << i;
  101. }
  102. EXPECT_TRUE( !q.empty());
  103. std::unique_ptr< uint8_t[] > arr( new uint8_t[s_nQueueSize] );
  104. memset( arr.get(), 0, sizeof(arr[0]) * s_nQueueSize );
  105. size_t nPopped = 0;
  106. value_type val;
  107. while ( q.pop( val )) {
  108. nPopped++;
  109. ++arr[ val.nNo ];
  110. }
  111. size_t nTotalItems = nThreadItems * s_nThreadCount;
  112. for ( size_t i = 0; i < nTotalItems; ++i ) {
  113. EXPECT_EQ( arr[i], 1 ) << "i=" << i;
  114. }
  115. }
  116. };
  117. CDSSTRESS_MSQueue( queue_push )
  118. CDSSTRESS_MoirQueue( queue_push )
  119. CDSSTRESS_BasketQueue( queue_push )
  120. CDSSTRESS_OptimsticQueue( queue_push )
  121. CDSSTRESS_FCQueue( queue_push )
  122. CDSSTRESS_FCDeque( queue_push )
  123. CDSSTRESS_RWQueue( queue_push )
  124. CDSSTRESS_StdQueue( queue_push )
  125. #undef CDSSTRESS_Queue_F
  126. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  127. TEST_F( test_fixture, type_name ) \
  128. { \
  129. typedef queue::Types< value_type >::type_name queue_type; \
  130. queue_type queue( s_nQueueSize ); \
  131. test( queue ); \
  132. }
  133. CDSSTRESS_VyukovQueue( queue_push )
  134. #undef CDSSTRESS_Queue_F
  135. // ********************************************************************
  136. // SegmentedQueue test
  137. class segmented_queue_push
  138. : public queue_push
  139. , public ::testing::WithParamInterface< size_t >
  140. {
  141. typedef queue_push base_class;
  142. protected:
  143. template <typename Queue>
  144. void test()
  145. {
  146. size_t quasi_factor = GetParam();
  147. Queue q( quasi_factor );
  148. propout() << std::make_pair( "quasi_factor", quasi_factor );
  149. base_class::test( q );
  150. }
  151. public:
  152. static std::vector< size_t > get_test_parameters()
  153. {
  154. cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
  155. bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
  156. size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
  157. std::vector<size_t> args;
  158. if ( bIterative && quasi_factor > 4 ) {
  159. for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
  160. args.push_back( qf );
  161. }
  162. else {
  163. if ( quasi_factor > 2 )
  164. args.push_back( quasi_factor );
  165. else
  166. args.push_back( 2 );
  167. }
  168. return args;
  169. }
  170. };
  171. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  172. TEST_P( test_fixture, type_name ) \
  173. { \
  174. typedef typename queue::Types<value_type>::type_name queue_type; \
  175. test< queue_type >(); \
  176. }
  177. CDSSTRESS_SegmentedQueue( segmented_queue_push )
  178. #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
  179. static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
  180. {
  181. return std::to_string( p.param );
  182. }
  183. INSTANTIATE_TEST_CASE_P( SQ,
  184. segmented_queue_push,
  185. ::testing::ValuesIn( segmented_queue_push::get_test_parameters()), get_test_parameter_name );
  186. #else
  187. INSTANTIATE_TEST_CASE_P( SQ,
  188. segmented_queue_push,
  189. ::testing::ValuesIn( segmented_queue_push::get_test_parameters()));
  190. #endif
  191. } // namespace