random.cpp 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "queue_type.h"
  6. #include <vector>
  7. // Multi-threaded queue test for random push/pop operation
  8. namespace {
  9. static size_t s_nThreadCount = 16;
  10. static size_t s_nQueueSize = 10000000;
  11. std::atomic< size_t > s_nProducerCount(0);
  12. class queue_random: public cds_test::stress_fixture
  13. {
  14. typedef cds_test::stress_fixture base_class;
  15. protected:
  16. struct value_type {
  17. size_t nNo;
  18. size_t nThread;
  19. value_type() {}
  20. value_type( size_t n ) : nNo( n ) {}
  21. };
  22. template <class Queue>
  23. class Strain: public cds_test::thread
  24. {
  25. typedef cds_test::thread base_class;
  26. public:
  27. Strain( cds_test::thread_pool& pool, Queue& q, size_t nPushCount, size_t nSpread = 0 )
  28. : base_class( pool )
  29. , m_Queue( q )
  30. , m_nSpread( nSpread )
  31. , m_nTotalPushCount( nPushCount )
  32. {}
  33. Strain( Strain& src )
  34. : base_class( src )
  35. , m_Queue( src.m_Queue )
  36. , m_nSpread( src.m_nSpread )
  37. , m_nTotalPushCount( src.m_nTotalPushCount )
  38. {}
  39. virtual thread * clone()
  40. {
  41. return new Strain( *this );
  42. }
  43. virtual void test()
  44. {
  45. size_t const nThreadCount = s_nThreadCount;
  46. size_t const nTotalPush = m_nTotalPushCount;
  47. m_arrLastRead.resize( nThreadCount, 0 );
  48. m_arrPopCountPerThread.resize( nThreadCount, 0 );
  49. value_type node;
  50. while ( m_nPushCount < nTotalPush ) {
  51. if ( ( std::rand() & 3) != 3 ) {
  52. node.nThread = id();
  53. node.nNo = ++m_nPushCount;
  54. if ( !m_Queue.push( node )) {
  55. ++m_nPushError;
  56. --m_nPushCount;
  57. }
  58. }
  59. else
  60. pop( nThreadCount );
  61. }
  62. s_nProducerCount.fetch_sub( 1, std::memory_order_relaxed );
  63. while ( !m_Queue.empty() || s_nProducerCount.load( std::memory_order_relaxed ) != 0 )
  64. pop( nThreadCount );
  65. }
  66. bool pop( size_t nThreadCount )
  67. {
  68. value_type node;
  69. node.nThread = nThreadCount;
  70. node.nNo = ~0;
  71. if ( m_Queue.pop( node )) {
  72. ++m_nPopCount;
  73. if ( node.nThread < nThreadCount ) {
  74. m_arrPopCountPerThread[ node.nThread ] += 1;
  75. if ( m_nSpread ) {
  76. if ( m_arrLastRead[ node.nThread ] > node.nNo ) {
  77. if ( m_arrLastRead[ node.nThread ] - node.nNo > m_nSpread )
  78. ++m_nRepeatValue;
  79. }
  80. else if ( m_arrLastRead[ node.nThread ] == node.nNo )
  81. ++m_nRepeatValue;
  82. m_arrLastRead[ node.nThread ] = node.nNo;
  83. }
  84. else {
  85. if ( m_arrLastRead[ node.nThread ] < node.nNo )
  86. m_arrLastRead[ node.nThread ] = node.nNo;
  87. else
  88. ++m_nRepeatValue;
  89. }
  90. }
  91. else
  92. ++m_nUndefWriter;
  93. }
  94. else {
  95. ++m_nEmptyPop;
  96. return false;
  97. }
  98. return true;
  99. }
  100. public:
  101. Queue& m_Queue;
  102. size_t m_nPushCount = 0;
  103. size_t m_nPopCount = 0;
  104. size_t m_nEmptyPop = 0;
  105. size_t m_nUndefWriter = 0;
  106. size_t m_nRepeatValue = 0;
  107. size_t m_nPushError = 0;
  108. std::vector<size_t> m_arrLastRead;
  109. std::vector<size_t> m_arrPopCountPerThread;
  110. size_t const m_nSpread;
  111. size_t const m_nTotalPushCount;
  112. };
  113. public:
  114. static void SetUpTestCase()
  115. {
  116. cds_test::config const& cfg = get_config( "queue_random" );
  117. s_nThreadCount = cfg.get_size_t( "ThreadCount", s_nThreadCount );
  118. s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
  119. if ( s_nThreadCount == 0u )
  120. s_nThreadCount = 1;
  121. if ( s_nQueueSize == 0u )
  122. s_nQueueSize = 1000;
  123. }
  124. //static void TearDownTestCase();
  125. protected:
  126. template <class Queue>
  127. void analyze( Queue& q )
  128. {
  129. EXPECT_TRUE( q.empty());
  130. std::vector< size_t > arrPushCount;
  131. arrPushCount.resize( s_nThreadCount, 0 );
  132. size_t nPushTotal = 0;
  133. size_t nPopTotal = 0;
  134. size_t nPushError = 0;
  135. cds_test::thread_pool& pool = get_pool();
  136. for ( size_t i = 0; i < pool.size(); ++i ) {
  137. Strain<Queue>& thr = static_cast<Strain<Queue> &>( pool.get(i));
  138. EXPECT_EQ( thr.m_nUndefWriter, 0u );
  139. EXPECT_EQ( thr.m_nRepeatValue, 0u );
  140. EXPECT_EQ( thr.m_nPushError, 0u );
  141. nPushError += thr.m_nPushError;
  142. arrPushCount[ thr.id() ] += thr.m_nPushCount;
  143. nPushTotal += thr.m_nPushCount;
  144. nPopTotal += thr.m_nPopCount;
  145. }
  146. EXPECT_EQ( nPushTotal, s_nQueueSize );
  147. EXPECT_EQ( nPopTotal, s_nQueueSize );
  148. size_t const nThreadPushCount = s_nQueueSize / s_nThreadCount;
  149. for ( size_t i = 0; i < s_nThreadCount; ++i )
  150. EXPECT_EQ( arrPushCount[i], nThreadPushCount ) << "thread=" << i;
  151. }
  152. template <class Queue>
  153. void test( Queue& q )
  154. {
  155. size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
  156. cds_test::thread_pool& pool = get_pool();
  157. pool.add( new Strain<Queue>( pool, q, nThreadPushCount ), s_nThreadCount );
  158. s_nQueueSize = nThreadPushCount * s_nThreadCount;
  159. propout() << std::make_pair( "thread_count", s_nThreadCount )
  160. << std::make_pair( "push_count", s_nQueueSize );
  161. s_nProducerCount.store( pool.size(), std::memory_order_release );
  162. std::chrono::milliseconds duration = pool.run();
  163. propout() << std::make_pair( "duration", duration );
  164. analyze( q );
  165. propout() << q.statistics();
  166. }
  167. };
  168. CDSSTRESS_MSQueue( queue_random )
  169. CDSSTRESS_MoirQueue( queue_random )
  170. CDSSTRESS_BasketQueue( queue_random )
  171. CDSSTRESS_OptimsticQueue( queue_random )
  172. CDSSTRESS_FCQueue( queue_random )
  173. CDSSTRESS_FCDeque( queue_random )
  174. CDSSTRESS_RWQueue( queue_random )
  175. CDSSTRESS_StdQueue( queue_random )
  176. #undef CDSSTRESS_Queue_F
  177. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  178. TEST_F( test_fixture, type_name ) \
  179. { \
  180. typedef queue::Types< value_type >::type_name queue_type; \
  181. queue_type queue( s_nQueueSize ); \
  182. test( queue ); \
  183. }
  184. CDSSTRESS_VyukovQueue( queue_random )
  185. #undef CDSSTRESS_Queue_F
  186. // ********************************************************************
  187. // SegmentedQueue test
  188. class segmented_queue_random
  189. : public queue_random
  190. , public ::testing::WithParamInterface< size_t >
  191. {
  192. typedef queue_random base_class;
  193. protected:
  194. template <typename Queue>
  195. void test()
  196. {
  197. size_t quasi_factor = GetParam();
  198. Queue q( quasi_factor );
  199. propout() << std::make_pair( "quasi_factor", quasi_factor );
  200. size_t nThreadPushCount = s_nQueueSize / s_nThreadCount;
  201. cds_test::thread_pool& pool = get_pool();
  202. pool.add( new Strain<Queue>( pool, q, nThreadPushCount, quasi_factor * 2 ), s_nThreadCount );
  203. s_nQueueSize = nThreadPushCount * s_nThreadCount;
  204. propout() << std::make_pair( "thread_count", s_nThreadCount )
  205. << std::make_pair( "push_count", s_nQueueSize );
  206. s_nProducerCount.store( pool.size(), std::memory_order_release );
  207. std::chrono::milliseconds duration = pool.run();
  208. propout() << std::make_pair( "duration", duration );
  209. analyze( q );
  210. propout() << q.statistics();
  211. }
  212. public:
  213. static std::vector< size_t > get_test_parameters()
  214. {
  215. cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push" );
  216. bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
  217. size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
  218. std::vector<size_t> args;
  219. if ( bIterative && quasi_factor > 4 ) {
  220. for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
  221. args.push_back( qf );
  222. }
  223. else {
  224. if ( quasi_factor > 2 )
  225. args.push_back( quasi_factor );
  226. else
  227. args.push_back( 2 );
  228. }
  229. return args;
  230. }
  231. };
  232. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  233. TEST_P( test_fixture, type_name ) \
  234. { \
  235. typedef typename queue::Types<value_type>::type_name queue_type; \
  236. test< queue_type >(); \
  237. }
  238. CDSSTRESS_SegmentedQueue( segmented_queue_random )
  239. #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
  240. static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
  241. {
  242. return std::to_string( p.param );
  243. }
  244. INSTANTIATE_TEST_CASE_P( SQ,
  245. segmented_queue_random,
  246. ::testing::ValuesIn( segmented_queue_random::get_test_parameters()), get_test_parameter_name );
  247. #else
  248. INSTANTIATE_TEST_CASE_P( SQ,
  249. segmented_queue_random,
  250. ::testing::ValuesIn( segmented_queue_random::get_test_parameters()));
  251. #endif
  252. } // namespace