thread.h 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef CDSTEST_THREAD_H
  6. #define CDSTEST_THREAD_H
  7. #include <cds_test/ext_gtest.h>
  8. #include <vector>
  9. #include <thread>
  10. #include <condition_variable>
  11. #include <mutex>
  12. #include <chrono>
  13. #include <cds/threading/model.h>
  14. namespace cds_test {
  15. // Forwards
  16. class thread;
  17. class thread_pool;
  18. // Test thread
  19. class thread
  20. {
  21. void run();
  22. protected: // thread_pool interface
  23. thread( thread const& sample );
  24. virtual ~thread()
  25. {}
  26. protected:
  27. virtual thread * clone() = 0;
  28. virtual void test() = 0;
  29. virtual void SetUp()
  30. {
  31. cds::threading::Manager::attachThread();
  32. }
  33. virtual void TearDown()
  34. {
  35. cds::threading::Manager::detachThread();
  36. }
  37. public:
  38. explicit thread( thread_pool& master, int type = 0 );
  39. thread_pool& pool() { return m_pool; }
  40. int type() const { return m_type; }
  41. size_t id() const { return m_id; }
  42. bool time_elapsed() const;
  43. private:
  44. friend class thread_pool;
  45. thread_pool& m_pool;
  46. int const m_type;
  47. size_t const m_id;
  48. };
  49. // Pool of test threads
  50. class thread_pool
  51. {
  52. class barrier
  53. {
  54. public:
  55. barrier()
  56. : m_count( 0 )
  57. {}
  58. void reset( size_t count )
  59. {
  60. std::unique_lock< std::mutex > lock( m_mtx );
  61. m_count = count;
  62. }
  63. bool wait()
  64. {
  65. std::unique_lock< std::mutex > lock( m_mtx );
  66. if ( --m_count == 0 ) {
  67. m_cv.notify_all();
  68. return true;
  69. }
  70. while ( m_count != 0 )
  71. m_cv.wait( lock );
  72. return false;
  73. }
  74. private:
  75. size_t m_count;
  76. std::mutex m_mtx;
  77. std::condition_variable m_cv;
  78. };
  79. class initial_gate
  80. {
  81. public:
  82. initial_gate()
  83. : m_ready( false )
  84. {}
  85. void wait()
  86. {
  87. std::unique_lock< std::mutex > lock( m_mtx );
  88. while ( !m_ready )
  89. m_cv.wait( lock );
  90. }
  91. void ready()
  92. {
  93. std::unique_lock< std::mutex > lock( m_mtx );
  94. m_ready = true;
  95. m_cv.notify_all();
  96. }
  97. void reset()
  98. {
  99. std::unique_lock< std::mutex > lock( m_mtx );
  100. m_ready = false;
  101. }
  102. private:
  103. std::mutex m_mtx;
  104. std::condition_variable m_cv;
  105. bool m_ready;
  106. };
  107. public:
  108. explicit thread_pool( ::testing::Test& fx )
  109. : m_fixture( fx )
  110. , m_bTimeElapsed( false )
  111. {}
  112. ~thread_pool()
  113. {
  114. clear();
  115. }
  116. void add( thread * what )
  117. {
  118. m_workers.push_back( what );
  119. }
  120. void add( thread * what, size_t count )
  121. {
  122. add( what );
  123. for ( size_t i = 1; i < count; ++i ) {
  124. thread * p = what->clone();
  125. add( p );
  126. }
  127. }
  128. std::chrono::milliseconds run()
  129. {
  130. return run( std::chrono::seconds::zero());
  131. }
  132. std::chrono::milliseconds run( std::chrono::seconds duration )
  133. {
  134. m_startBarrier.reset( m_workers.size() + 1 );
  135. m_stopBarrier.reset( m_workers.size() + 1 );
  136. // Create threads
  137. std::vector< std::thread > threads;
  138. threads.reserve( m_workers.size());
  139. for ( auto w : m_workers )
  140. threads.emplace_back( &thread::run, w );
  141. // The pool is intialized
  142. m_startPoint.ready();
  143. m_bTimeElapsed.store( false, std::memory_order_release );
  144. auto native_duration = std::chrono::duration_cast<std::chrono::steady_clock::duration>(duration);
  145. // The pool is ready to start all workers
  146. m_startBarrier.wait();
  147. auto time_start = std::chrono::steady_clock::now();
  148. auto const expected_end = time_start + native_duration;
  149. if ( duration != std::chrono::seconds::zero()) {
  150. for ( ;; ) {
  151. std::this_thread::sleep_for( native_duration );
  152. auto time_now = std::chrono::steady_clock::now();
  153. if ( time_now >= expected_end )
  154. break;
  155. native_duration = expected_end - time_now;
  156. }
  157. }
  158. m_bTimeElapsed.store( true, std::memory_order_release );
  159. // Waiting for all workers done
  160. m_stopBarrier.wait();
  161. auto time_end = std::chrono::steady_clock::now();
  162. for ( auto& t : threads )
  163. t.join();
  164. return m_testDuration = std::chrono::duration_cast<std::chrono::milliseconds>(time_end - time_start);
  165. }
  166. size_t size() const { return m_workers.size(); }
  167. thread& get( size_t idx ) const { return *m_workers.at( idx ); }
  168. template <typename Fixture>
  169. Fixture& fixture()
  170. {
  171. return static_cast<Fixture&>(m_fixture);
  172. }
  173. std::chrono::milliseconds duration() const { return m_testDuration; }
  174. void clear()
  175. {
  176. for ( auto t : m_workers )
  177. delete t;
  178. m_workers.clear();
  179. m_startPoint.reset();
  180. }
  181. void reset()
  182. {
  183. clear();
  184. }
  185. protected: // thread interface
  186. size_t get_next_id()
  187. {
  188. return m_workers.size();
  189. }
  190. void ready_to_start( thread& /*who*/ )
  191. {
  192. // Called from test thread
  193. // Wait until the pool is ready
  194. m_startPoint.wait();
  195. // Wait until all thread ready
  196. m_startBarrier.wait();
  197. }
  198. void thread_done( thread& /*who*/ )
  199. {
  200. // Called from test thread
  201. m_stopBarrier.wait();
  202. }
  203. private:
  204. friend class thread;
  205. ::testing::Test& m_fixture;
  206. std::vector<thread *> m_workers;
  207. initial_gate m_startPoint;
  208. barrier m_startBarrier;
  209. barrier m_stopBarrier;
  210. std::atomic<bool> m_bTimeElapsed;
  211. std::chrono::milliseconds m_testDuration;
  212. };
  213. inline thread::thread( thread_pool& master, int type /*= 0*/ )
  214. : m_pool( master )
  215. , m_type( type )
  216. , m_id( master.get_next_id())
  217. {}
  218. inline thread::thread( thread const& sample )
  219. : m_pool( sample.m_pool )
  220. , m_type( sample.m_type )
  221. , m_id( m_pool.get_next_id())
  222. {}
  223. inline void thread::run()
  224. {
  225. SetUp();
  226. m_pool.ready_to_start( *this );
  227. test();
  228. m_pool.thread_done( *this );
  229. TearDown();
  230. }
  231. inline bool thread::time_elapsed() const
  232. {
  233. return m_pool.m_bTimeElapsed.load( std::memory_order_acquire );
  234. }
  235. } // namespace cds_test
  236. #endif // CDSTEST_THREAD_H