spsc_buffer.cpp 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #include "queue_type.h"
  6. #include <vector>
  7. #include <algorithm>
  8. #include <type_traits>
  9. #include <cmath>
  10. // Single producer/single consumer buffer push/pop test
  11. namespace {
  12. static size_t s_nBufferSize = 1024*1024;
  13. static size_t s_nPushCount = 1000000;
  14. static std::atomic<size_t> s_nProducerDone( 0 );
  15. class spsc_buffer: public cds_test::stress_fixture
  16. {
  17. protected:
  18. typedef size_t value_type;
  19. enum {
  20. producer_thread,
  21. consumer_thread
  22. };
  23. class empty_functor
  24. {
  25. public:
  26. void operator()()
  27. {}
  28. double result()
  29. {
  30. return 0.0;
  31. }
  32. };
  33. class payload_functor
  34. {
  35. public:
  36. void operator()()
  37. {
  38. std::random_device rd;
  39. std::mt19937 gen( rd());
  40. std::uniform_int_distribution<unsigned> dis( 0, 64 * 1024* 1024 );
  41. quad_sum += std::sqrt( static_cast<double>( dis(gen)));
  42. }
  43. double result()
  44. {
  45. return quad_sum;
  46. }
  47. private:
  48. double quad_sum = 0.0;
  49. };
  50. template <class Queue, class Payload = empty_functor>
  51. class Producer: public cds_test::thread
  52. {
  53. typedef cds_test::thread base_class;
  54. typedef Payload payload_type;
  55. public:
  56. Producer( cds_test::thread_pool& pool, Queue& queue )
  57. : base_class( pool, producer_thread )
  58. , m_Queue( queue )
  59. {}
  60. Producer( Producer& src )
  61. : base_class( src )
  62. , m_Queue( src.m_Queue )
  63. {}
  64. virtual thread * clone()
  65. {
  66. return new Producer( *this );
  67. }
  68. virtual void test()
  69. {
  70. size_t const nPushCount = s_nPushCount;
  71. payload_type func;
  72. for ( size_t i = 0; i < nPushCount; ++i ) {
  73. func();
  74. size_t len = rand( 1024 ) + 64;
  75. void* buf = m_Queue.back( len );
  76. if ( buf ) {
  77. memset( buf, len % 256, len );
  78. m_Queue.push_back();
  79. m_nPushed += len;
  80. }
  81. else
  82. ++m_nPushFailed;
  83. }
  84. s_nProducerDone.fetch_add( 1 );
  85. m_PayloadResult = func.result();
  86. }
  87. public:
  88. Queue& m_Queue;
  89. size_t m_nPushFailed = 0;
  90. size_t m_nPushed = 0;
  91. double m_PayloadResult = 0.0;
  92. };
  93. template <class Queue, class Payload = empty_functor>
  94. class Consumer: public cds_test::thread
  95. {
  96. typedef cds_test::thread base_class;
  97. typedef Payload payload_type;
  98. public:
  99. Queue& m_Queue;
  100. size_t m_nPopEmpty = 0;
  101. size_t m_nPopped = 0;
  102. size_t m_nBadValue = 0;
  103. size_t m_nPopFrontFailed = 0;
  104. double m_PayloadResult = 0.0;
  105. public:
  106. Consumer( cds_test::thread_pool& pool, Queue& queue )
  107. : base_class( pool, consumer_thread )
  108. , m_Queue( queue )
  109. {}
  110. Consumer( Consumer& src )
  111. : base_class( src )
  112. , m_Queue( src.m_Queue )
  113. {}
  114. virtual thread * clone()
  115. {
  116. return new Consumer( *this );
  117. }
  118. virtual void test()
  119. {
  120. payload_type func;
  121. while ( true ) {
  122. func();
  123. auto buf = m_Queue.front();
  124. if ( buf.first ) {
  125. m_nPopped += buf.second;
  126. uint8_t val = static_cast<uint8_t>( buf.second % 256 );
  127. uint8_t const* p = reinterpret_cast<uint8_t*>( buf.first );
  128. for ( uint8_t const* pEnd = p + buf.second; p < pEnd; ++p ) {
  129. if ( *p != val ) {
  130. ++m_nBadValue;
  131. break;
  132. }
  133. }
  134. if ( !m_Queue.pop_front())
  135. ++m_nPopFrontFailed;
  136. }
  137. else {
  138. ++m_nPopEmpty;
  139. if ( s_nProducerDone.load() != 0 ) {
  140. if ( m_Queue.empty())
  141. break;
  142. }
  143. }
  144. }
  145. m_PayloadResult = func.result();
  146. }
  147. };
  148. protected:
  149. size_t m_nThreadPushCount;
  150. protected:
  151. template <class ProducerPayload, class ConsumerPayload, class Queue >
  152. void test_queue( Queue& q )
  153. {
  154. cds_test::thread_pool& pool = get_pool();
  155. auto producer = new Producer<Queue, ProducerPayload>( pool, q );
  156. auto consumer = new Consumer<Queue, ConsumerPayload>( pool, q );
  157. pool.add( producer, 1 );
  158. pool.add( consumer, 1 );
  159. s_nProducerDone.store( 0 );
  160. propout() << std::make_pair( "buffer_size", s_nBufferSize )
  161. << std::make_pair( "push_count", s_nPushCount );
  162. std::chrono::milliseconds duration = pool.run();
  163. propout() << std::make_pair( "duration", duration );
  164. // analyze result
  165. EXPECT_EQ( consumer->m_nBadValue, 0u );
  166. EXPECT_EQ( consumer->m_nPopFrontFailed, 0u );
  167. EXPECT_EQ( consumer->m_nPopped, producer->m_nPushed );
  168. propout()
  169. << std::make_pair( "producer_push_length", producer->m_nPushed )
  170. << std::make_pair( "producer_push_failed", producer->m_nPushFailed )
  171. << std::make_pair( "consumer_pop_length", consumer->m_nPopped )
  172. << std::make_pair( "consumer_pop_empty", consumer->m_nPopEmpty )
  173. << std::make_pair( "consumer_bad_value", consumer->m_nBadValue )
  174. << std::make_pair( "consumer_pop_front_failed", consumer->m_nPopFrontFailed );
  175. }
  176. template <class Queue>
  177. void test( Queue& q )
  178. {
  179. test_queue<empty_functor, empty_functor>( q );
  180. propout() << q.statistics();
  181. }
  182. public:
  183. static void SetUpTestCase()
  184. {
  185. cds_test::config const& cfg = get_config( "spsc_buffer" );
  186. s_nBufferSize = cfg.get_size_t( "BufferSize", s_nBufferSize );
  187. s_nPushCount = cfg.get_size_t( "PushCount", s_nPushCount );
  188. if ( s_nBufferSize < 1024 * 64 )
  189. s_nBufferSize = 1024 * 64;
  190. if ( s_nPushCount == 0u )
  191. s_nPushCount = 1024;
  192. }
  193. };
  194. class spsc_buffer_slow_producer: public spsc_buffer
  195. {
  196. public:
  197. template <class Queue>
  198. void test( Queue& q )
  199. {
  200. test_queue<payload_functor, empty_functor>( q );
  201. propout() << q.statistics();
  202. }
  203. };
  204. class spsc_buffer_slow_consumer: public spsc_buffer
  205. {
  206. public:
  207. template <class Queue>
  208. void test( Queue& q )
  209. {
  210. test_queue<empty_functor, payload_functor>( q );
  211. propout() << q.statistics();
  212. }
  213. };
  214. #undef CDSSTRESS_Queue_F
  215. #define CDSSTRESS_Queue_F( test_fixture, type_name ) \
  216. TEST_F( test_fixture, type_name ) \
  217. { \
  218. typedef queue::Types< value_type >::type_name queue_type; \
  219. queue_type queue( s_nBufferSize ); \
  220. test( queue ); \
  221. }
  222. CDSSTRESS_WeakRingBuffer_void( spsc_buffer )
  223. CDSSTRESS_WeakRingBuffer_void( spsc_buffer_slow_producer )
  224. CDSSTRESS_WeakRingBuffer_void( spsc_buffer_slow_consumer )
  225. #undef CDSSTRESS_Queue_F
  226. } // namespace