push_pop.cpp 14 KB


  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "queue_type.h"
  6. #include <vector>
  7. #include <algorithm>
  8. #include <type_traits>
  9. // Multi-threaded queue push/pop test
  10. namespace {
  11. static size_t s_nConsumerThreadCount = 4;
  12. static size_t s_nProducerThreadCount = 4;
  13. static size_t s_nQueueSize = 4000000;
  14. static size_t s_nHeavyValueSize = 100;
  15. static std::atomic<size_t> s_nProducerDone( 0 );
  16. struct old_value
  17. {
  18. size_t nNo;
  19. size_t nWriterNo;
  20. };
  21. template<class Value = old_value>
  22. class queue_push_pop: public cds_test::stress_fixture
  23. {
  24. protected:
  25. using value_type = Value;
  26. enum {
  27. producer_thread,
  28. consumer_thread
  29. };
  30. template <class Queue>
  31. class Producer: public cds_test::thread
  32. {
  33. typedef cds_test::thread base_class;
  34. public:
  35. Producer( cds_test::thread_pool& pool, Queue& queue, size_t nPushCount )
  36. : base_class( pool, producer_thread )
  37. , m_Queue( queue )
  38. , m_nPushFailed( 0 )
  39. , m_nPushCount( nPushCount )
  40. {}
  41. Producer( Producer& src )
  42. : base_class( src )
  43. , m_Queue( src.m_Queue )
  44. , m_nPushFailed( 0 )
  45. , m_nPushCount( src.m_nPushCount )
  46. {}
  47. virtual thread * clone()
  48. {
  49. return new Producer( *this );
  50. }
  51. virtual void test()
  52. {
  53. size_t const nPushCount = m_nPushCount;
  54. value_type v;
  55. v.nWriterNo = id();
  56. v.nNo = 0;
  57. m_nPushFailed = 0;
  58. while ( v.nNo < nPushCount ) {
  59. if ( m_Queue.push( v ))
  60. ++v.nNo;
  61. else
  62. ++m_nPushFailed;
  63. }
  64. s_nProducerDone.fetch_add( 1 );
  65. }
  66. public:
  67. Queue& m_Queue;
  68. size_t m_nPushFailed;
  69. size_t const m_nPushCount;
  70. };
  71. template <class Queue>
  72. class Consumer: public cds_test::thread
  73. {
  74. typedef cds_test::thread base_class;
  75. public:
  76. Queue& m_Queue;
  77. size_t const m_nPushPerProducer;
  78. size_t m_nPopEmpty;
  79. size_t m_nPopped;
  80. size_t m_nBadWriter;
  81. typedef std::vector<size_t> popped_data;
  82. typedef std::vector<size_t>::iterator data_iterator;
  83. typedef std::vector<size_t>::const_iterator const_data_iterator;
  84. std::vector<popped_data> m_WriterData;
  85. private:
  86. void initPoppedData()
  87. {
  88. const size_t nProducerCount = s_nProducerThreadCount;
  89. m_WriterData.resize( nProducerCount );
  90. for ( size_t i = 0; i < nProducerCount; ++i )
  91. m_WriterData[i].reserve( m_nPushPerProducer );
  92. }
  93. public:
  94. Consumer( cds_test::thread_pool& pool, Queue& queue, size_t nPushPerProducer )
  95. : base_class( pool, consumer_thread )
  96. , m_Queue( queue )
  97. , m_nPushPerProducer( nPushPerProducer )
  98. , m_nPopEmpty( 0 )
  99. , m_nPopped( 0 )
  100. , m_nBadWriter( 0 )
  101. {
  102. initPoppedData();
  103. }
  104. Consumer( Consumer& src )
  105. : base_class( src )
  106. , m_Queue( src.m_Queue )
  107. , m_nPushPerProducer( src.m_nPushPerProducer )
  108. , m_nPopEmpty( 0 )
  109. , m_nPopped( 0 )
  110. , m_nBadWriter( 0 )
  111. {
  112. initPoppedData();
  113. }
  114. virtual thread * clone()
  115. {
  116. return new Consumer( *this );
  117. }
  118. virtual void test()
  119. {
  120. m_nPopEmpty = 0;
  121. m_nPopped = 0;
  122. m_nBadWriter = 0;
  123. const size_t nTotalWriters = s_nProducerThreadCount;
  124. value_type v;
  125. while ( true ) {
  126. if ( m_Queue.pop( v )) {
  127. ++m_nPopped;
  128. if ( v.nWriterNo < nTotalWriters )
  129. m_WriterData[ v.nWriterNo ].push_back( v.nNo );
  130. else
  131. ++m_nBadWriter;
  132. }
  133. else {
  134. ++m_nPopEmpty;
  135. if ( s_nProducerDone.load() >= nTotalWriters ) {
  136. if ( m_Queue.empty())
  137. break;
  138. }
  139. }
  140. }
  141. }
  142. };
  143. protected:
  144. size_t m_nThreadPushCount;
  145. protected:
  146. template <class Queue>
  147. void analyze( Queue& q, size_t /*nLeftOffset*/ = 0, size_t nRightOffset = 0 )
  148. {
  149. cds_test::thread_pool& pool = get_pool();
  150. typedef Consumer<Queue> consumer_type;
  151. typedef Producer<Queue> producer_type;
  152. size_t nPostTestPops = 0;
  153. {
  154. value_type v;
  155. while ( q.pop( v ))
  156. ++nPostTestPops;
  157. }
  158. size_t nTotalPops = 0;
  159. size_t nPopFalse = 0;
  160. size_t nPoppedItems = 0;
  161. size_t nPushFailed = 0;
  162. std::vector< consumer_type * > arrConsumer;
  163. for ( size_t i = 0; i < pool.size(); ++i ) {
  164. cds_test::thread& thr = pool.get(i);
  165. if ( thr.type() == consumer_thread ) {
  166. consumer_type& consumer = static_cast<consumer_type&>( thr );
  167. nTotalPops += consumer.m_nPopped;
  168. nPopFalse += consumer.m_nPopEmpty;
  169. arrConsumer.push_back( &consumer );
  170. EXPECT_EQ( consumer.m_nBadWriter, 0u ) << "consumer_thread_no " << i;
  171. size_t nPopped = 0;
  172. for ( size_t n = 0; n < s_nProducerThreadCount; ++n )
  173. nPopped += consumer.m_WriterData[n].size();
  174. nPoppedItems += nPopped;
  175. }
  176. else {
  177. assert( thr.type() == producer_thread );
  178. producer_type& producer = static_cast<producer_type&>( thr );
  179. nPushFailed += producer.m_nPushFailed;
  180. EXPECT_EQ( producer.m_nPushFailed, 0u ) << "producer_thread_no " << i;
  181. }
  182. }
  183. EXPECT_EQ( nTotalPops, nPoppedItems );
  184. EXPECT_EQ( nTotalPops + nPostTestPops, s_nQueueSize ) << "nTotalPops=" << nTotalPops << ", nPostTestPops=" << nPostTestPops;
  185. EXPECT_TRUE( q.empty());
  186. // Test consistency of popped sequence
  187. for ( size_t nWriter = 0; nWriter < s_nProducerThreadCount; ++nWriter ) {
  188. std::vector<size_t> arrData;
  189. arrData.reserve( m_nThreadPushCount );
  190. for ( size_t nReader = 0; nReader < arrConsumer.size(); ++nReader ) {
  191. auto it = arrConsumer[nReader]->m_WriterData[nWriter].begin();
  192. auto itEnd = arrConsumer[nReader]->m_WriterData[nWriter].end();
  193. if ( it != itEnd ) {
  194. auto itPrev = it;
  195. for ( ++it; it != itEnd; ++it ) {
  196. EXPECT_LT( *itPrev, *it + nRightOffset ) << "consumer=" << nReader << ", producer=" << nWriter;
  197. itPrev = it;
  198. }
  199. }
  200. for ( it = arrConsumer[nReader]->m_WriterData[nWriter].begin(); it != itEnd; ++it )
  201. arrData.push_back( *it );
  202. }
  203. std::sort( arrData.begin(), arrData.end());
  204. for ( size_t i=1; i < arrData.size(); ++i ) {
  205. EXPECT_EQ( arrData[i - 1] + 1, arrData[i] ) << "producer=" << nWriter;
  206. }
  207. EXPECT_EQ( arrData[0], 0u ) << "producer=" << nWriter;
  208. EXPECT_EQ( arrData[arrData.size() - 1], m_nThreadPushCount - 1 ) << "producer=" << nWriter;
  209. }
  210. }
  211. template <class Queue>
  212. void test_queue( Queue& q )
  213. {
  214. m_nThreadPushCount = s_nQueueSize / s_nProducerThreadCount;
  215. cds_test::thread_pool& pool = get_pool();
  216. pool.add( new Producer<Queue>( pool, q, m_nThreadPushCount ), s_nProducerThreadCount );
  217. pool.add( new Consumer<Queue>( pool, q, m_nThreadPushCount ), s_nConsumerThreadCount );
  218. s_nProducerDone.store( 0 );
  219. s_nQueueSize = m_nThreadPushCount * s_nProducerThreadCount;
  220. propout() << std::make_pair( "producer_count", s_nProducerThreadCount )
  221. << std::make_pair( "consumer_count", s_nConsumerThreadCount )
  222. << std::make_pair( "push_count", s_nQueueSize );
  223. std::chrono::milliseconds duration = pool.run();
  224. propout() << std::make_pair( "duration", duration );
  225. }
  226. template <class Queue>
  227. void test( Queue& q )
  228. {
  229. test_queue( q );
  230. analyze( q );
  231. propout() << q.statistics();
  232. }
  233. private:
  234. static void set_array_size( size_t size ) {
  235. const bool tmp = fc_test::has_set_array_size<value_type>::value;
  236. set_array_size(size, std::integral_constant<bool, tmp>());
  237. }
  238. static void set_array_size(size_t size, std::true_type){
  239. value_type::set_array_size(size);
  240. }
  241. static void set_array_size(size_t, std::false_type)
  242. {
  243. }
  244. public:
  245. static void SetUpTestCase()
  246. {
  247. cds_test::config const& cfg = get_config( "queue_push_pop" );
  248. s_nConsumerThreadCount = cfg.get_size_t( "ConsumerCount", s_nConsumerThreadCount );
  249. s_nProducerThreadCount = cfg.get_size_t( "ProducerCount", s_nProducerThreadCount );
  250. s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
  251. s_nHeavyValueSize = cfg.get_size_t( "HeavyValueSize", s_nHeavyValueSize );
  252. if ( s_nConsumerThreadCount == 0u )
  253. s_nConsumerThreadCount = 1;
  254. if ( s_nProducerThreadCount == 0u )
  255. s_nProducerThreadCount = 1;
  256. if ( s_nQueueSize == 0u )
  257. s_nQueueSize = 1000;
  258. if ( s_nHeavyValueSize == 0 )
  259. s_nHeavyValueSize = 1;
  260. set_array_size( s_nHeavyValueSize );
  261. }
  262. //static void TearDownTestCase();
  263. };
  264. using fc_with_heavy_value = queue_push_pop< fc_test::heavy_value<36000> >;
  265. using simple_queue_push_pop = queue_push_pop<>;
  266. CDSSTRESS_MSQueue( simple_queue_push_pop )
  267. CDSSTRESS_MoirQueue( simple_queue_push_pop )
  268. CDSSTRESS_BasketQueue( simple_queue_push_pop )
  269. CDSSTRESS_OptimsticQueue( simple_queue_push_pop )
  270. CDSSTRESS_FCQueue( simple_queue_push_pop )
  271. CDSSTRESS_FCDeque( simple_queue_push_pop )
  272. CDSSTRESS_FCDeque_HeavyValue( fc_with_heavy_value )
  273. CDSSTRESS_RWQueue( simple_queue_push_pop )
  274. CDSSTRESS_StdQueue( simple_queue_push_pop )
  275. #undef CDSSTRESS_Queue_F
  276. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  277. TEST_F( test_fixture, type_name ) \
  278. { \
  279. typedef queue::Types< value_type >::type_name queue_type; \
  280. queue_type queue( s_nQueueSize ); \
  281. test( queue ); \
  282. }
  283. CDSSTRESS_VyukovQueue( simple_queue_push_pop )
  284. #undef CDSSTRESS_Queue_F
  285. // ********************************************************************
  286. // SegmentedQueue test
  287. class segmented_queue_push_pop
  288. : public queue_push_pop<>
  289. , public ::testing::WithParamInterface< size_t >
  290. {
  291. typedef queue_push_pop<> base_class;
  292. protected:
  293. template <typename Queue>
  294. void test()
  295. {
  296. size_t quasi_factor = GetParam();
  297. Queue q( quasi_factor );
  298. propout() << std::make_pair( "quasi_factor", quasi_factor );
  299. base_class::test_queue( q );
  300. analyze( q, quasi_factor * 2, quasi_factor );
  301. propout() << q.statistics();
  302. }
  303. public:
  304. static std::vector< size_t > get_test_parameters()
  305. {
  306. cds_test::config const& cfg = cds_test::stress_fixture::get_config( "queue_push_pop" );
  307. bool bIterative = cfg.get_bool( "SegmentedQueue_Iterate", false );
  308. size_t quasi_factor = cfg.get_size_t( "SegmentedQueue_SegmentSize", 256 );
  309. std::vector<size_t> args;
  310. if ( bIterative && quasi_factor > 4 ) {
  311. for ( size_t qf = 4; qf <= quasi_factor; qf *= 2 )
  312. args.push_back( qf );
  313. } else {
  314. if ( quasi_factor > 2 )
  315. args.push_back( quasi_factor );
  316. else
  317. args.push_back( 2 );
  318. }
  319. return args;
  320. }
  321. };
  322. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  323. TEST_P( test_fixture, type_name ) \
  324. { \
  325. typedef typename queue::Types<value_type>::type_name queue_type; \
  326. test< queue_type >(); \
  327. }
  328. CDSSTRESS_SegmentedQueue( segmented_queue_push_pop )
  329. #ifdef CDSTEST_GTEST_INSTANTIATE_TEST_CASE_P_HAS_4TH_ARG
  330. static std::string get_test_parameter_name( testing::TestParamInfo<size_t> const& p )
  331. {
  332. return std::to_string( p.param );
  333. }
  334. INSTANTIATE_TEST_CASE_P( SQ,
  335. segmented_queue_push_pop,
  336. ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()), get_test_parameter_name );
  337. #else
  338. INSTANTIATE_TEST_CASE_P( SQ,
  339. segmented_queue_push_pop,
  340. ::testing::ValuesIn( segmented_queue_push_pop::get_test_parameters()));
  341. #endif
  342. } // namespace