spsc_queue.cpp 5.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "queue_type.h"
  6. #include <vector>
  7. #include <algorithm>
  8. #include <type_traits>
  9. // Single producer/single consumer queue push/pop test
  10. namespace {
  11. static size_t s_nQueueSize = 1024;
  12. static size_t s_nPassCount = 1024;
  13. static std::atomic<size_t> s_nProducerDone( 0 );
  14. class spsc_queue: public cds_test::stress_fixture
  15. {
  16. protected:
  17. typedef size_t value_type;
  18. enum {
  19. producer_thread,
  20. consumer_thread
  21. };
  22. template <class Queue>
  23. class Producer: public cds_test::thread
  24. {
  25. typedef cds_test::thread base_class;
  26. public:
  27. Producer( cds_test::thread_pool& pool, Queue& queue )
  28. : base_class( pool, producer_thread )
  29. , m_Queue( queue )
  30. {}
  31. Producer( Producer& src )
  32. : base_class( src )
  33. , m_Queue( src.m_Queue )
  34. {}
  35. virtual thread * clone()
  36. {
  37. return new Producer( *this );
  38. }
  39. virtual void test()
  40. {
  41. size_t const nPushCount = s_nQueueSize * s_nPassCount;
  42. m_nPushFailed = 0;
  43. for ( value_type v = 0; v < nPushCount; ++v ) {
  44. if ( !m_Queue.push( v )) {
  45. ++m_nPushFailed;
  46. --v;
  47. }
  48. }
  49. s_nProducerDone.fetch_add( 1 );
  50. }
  51. public:
  52. Queue& m_Queue;
  53. size_t m_nPushFailed = 0;
  54. };
  55. template <class Queue>
  56. class Consumer: public cds_test::thread
  57. {
  58. typedef cds_test::thread base_class;
  59. public:
  60. Queue& m_Queue;
  61. size_t m_nPopEmpty = 0;
  62. size_t m_nPopped = 0;
  63. size_t m_nBadValue = 0;
  64. public:
  65. Consumer( cds_test::thread_pool& pool, Queue& queue )
  66. : base_class( pool, consumer_thread )
  67. , m_Queue( queue )
  68. {}
  69. Consumer( Consumer& src )
  70. : base_class( src )
  71. , m_Queue( src.m_Queue )
  72. {}
  73. virtual thread * clone()
  74. {
  75. return new Consumer( *this );
  76. }
  77. virtual void test()
  78. {
  79. value_type v;
  80. value_type prev = 0 - 1;
  81. while ( true ) {
  82. if ( m_Queue.pop( v )) {
  83. ++m_nPopped;
  84. if ( v != prev + 1 )
  85. ++m_nBadValue;
  86. prev = v;
  87. }
  88. else {
  89. ++m_nPopEmpty;
  90. if ( s_nProducerDone.load() != 0 ) {
  91. if ( m_Queue.empty())
  92. break;
  93. }
  94. }
  95. }
  96. }
  97. };
  98. protected:
  99. size_t m_nThreadPushCount;
  100. protected:
  101. template <class Queue>
  102. void test_queue( Queue& q )
  103. {
  104. cds_test::thread_pool& pool = get_pool();
  105. auto producer = new Producer<Queue>( pool, q );
  106. auto consumer = new Consumer<Queue>( pool, q );
  107. pool.add( producer, 1 );
  108. pool.add( consumer, 1 );
  109. s_nProducerDone.store( 0 );
  110. propout() << std::make_pair( "queue_size", s_nQueueSize )
  111. << std::make_pair( "pass_count", s_nPassCount );
  112. std::chrono::milliseconds duration = pool.run();
  113. propout() << std::make_pair( "duration", duration );
  114. // analyze result
  115. EXPECT_EQ( consumer->m_nBadValue, 0u );
  116. EXPECT_EQ( consumer->m_nPopped, s_nQueueSize * s_nPassCount );
  117. propout()
  118. << std::make_pair( "producer_push_count", s_nQueueSize * s_nPassCount )
  119. << std::make_pair( "producer_push_failed", producer->m_nPushFailed )
  120. << std::make_pair( "consumer_pop_count", consumer->m_nPopped )
  121. << std::make_pair( "consumer_pop_empty", consumer->m_nPopEmpty )
  122. << std::make_pair( "consumer_bad_value", consumer->m_nBadValue );
  123. }
  124. template <class Queue>
  125. void test( Queue& q )
  126. {
  127. test_queue( q );
  128. propout() << q.statistics();
  129. }
  130. public:
  131. static void SetUpTestCase()
  132. {
  133. cds_test::config const& cfg = get_config( "spsc_queue" );
  134. s_nQueueSize = cfg.get_size_t( "QueueSize", s_nQueueSize );
  135. s_nPassCount = cfg.get_size_t( "PassCount", s_nPassCount );
  136. if ( s_nQueueSize == 0u )
  137. s_nQueueSize = 1024;
  138. if ( s_nPassCount == 0u )
  139. s_nPassCount = 1024;
  140. }
  141. };
  142. //CDSSTRESS_MSQueue( spsc_queue )
  143. //CDSSTRESS_MoirQueue( spsc_queue )
  144. //CDSSTRESS_BasketQueue( spsc_queue )
  145. //CDSSTRESS_OptimsticQueue( spsc_queue )
  146. //CDSSTRESS_FCQueue( spsc_queue )
  147. //CDSSTRESS_FCDeque( spsc_queue )
  148. //CDSSTRESS_RWQueue( spsc_queue )
  149. //CDSSTRESS_StdQueue( spsc_queue )
  150. #undef CDSSTRESS_Queue_F
  151. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  152. TEST_F( test_fixture, type_name ) \
  153. { \
  154. typedef queue::Types< value_type >::type_name queue_type; \
  155. queue_type queue( s_nQueueSize ); \
  156. test( queue ); \
  157. }
  158. CDSSTRESS_WeakRingBuffer( spsc_queue )
  159. CDSSTRESS_VyukovQueue( spsc_queue )
  160. CDSSTRESS_VyukovSingleConsumerQueue( spsc_queue )
  161. #undef CDSSTRESS_Queue_F
  162. } // namespace