barrier.cpp 6.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289
  1. // Copyright 2009-2021 Intel Corporation
  2. // SPDX-License-Identifier: Apache-2.0
  3. #include "barrier.h"
  4. #include "condition.h"
  5. #include "regression.h"
  6. #include "thread.h"
  7. #if defined (__WIN32__)
  8. #define WIN32_LEAN_AND_MEAN
  9. #include <windows.h>
  10. namespace embree
  11. {
  12. struct BarrierSysImplementation
  13. {
  14. __forceinline BarrierSysImplementation (size_t N)
  15. : i(0), enterCount(0), exitCount(0), barrierSize(0)
  16. {
  17. events[0] = CreateEvent(nullptr, TRUE, FALSE, nullptr);
  18. events[1] = CreateEvent(nullptr, TRUE, FALSE, nullptr);
  19. init(N);
  20. }
  21. __forceinline ~BarrierSysImplementation ()
  22. {
  23. CloseHandle(events[0]);
  24. CloseHandle(events[1]);
  25. }
  26. __forceinline void init(size_t N)
  27. {
  28. barrierSize = N;
  29. enterCount.store(N);
  30. exitCount.store(N);
  31. }
  32. __forceinline void wait()
  33. {
  34. /* every thread entering the barrier decrements this count */
  35. size_t i0 = i;
  36. size_t cnt0 = enterCount--;
  37. /* all threads except the last one are wait in the barrier */
  38. if (cnt0 > 1)
  39. {
  40. if (WaitForSingleObject(events[i0], INFINITE) != WAIT_OBJECT_0)
  41. THROW_RUNTIME_ERROR("WaitForSingleObjects failed");
  42. }
  43. /* the last thread starts all threads waiting at the barrier */
  44. else
  45. {
  46. i = 1-i;
  47. enterCount.store(barrierSize);
  48. if (SetEvent(events[i0]) == 0)
  49. THROW_RUNTIME_ERROR("SetEvent failed");
  50. }
  51. /* every thread leaving the barrier decrements this count */
  52. size_t cnt1 = exitCount--;
  53. /* the last thread that left the barrier resets the event again */
  54. if (cnt1 == 1)
  55. {
  56. exitCount.store(barrierSize);
  57. if (ResetEvent(events[i0]) == 0)
  58. THROW_RUNTIME_ERROR("ResetEvent failed");
  59. }
  60. }
  61. public:
  62. HANDLE events[2];
  63. atomic<size_t> i;
  64. atomic<size_t> enterCount;
  65. atomic<size_t> exitCount;
  66. size_t barrierSize;
  67. };
  68. }
  69. #else
  70. namespace embree
  71. {
  72. struct BarrierSysImplementation
  73. {
  74. __forceinline BarrierSysImplementation (size_t N)
  75. : count(0), barrierSize(0)
  76. {
  77. init(N);
  78. }
  79. __forceinline void init(size_t N)
  80. {
  81. assert(count == 0);
  82. count = 0;
  83. barrierSize = N;
  84. }
  85. __forceinline void wait()
  86. {
  87. mutex.lock();
  88. count++;
  89. if (count == barrierSize) {
  90. count = 0;
  91. cond.notify_all();
  92. mutex.unlock();
  93. return;
  94. }
  95. cond.wait(mutex);
  96. mutex.unlock();
  97. return;
  98. }
  99. public:
  100. MutexSys mutex;
  101. ConditionSys cond;
  102. volatile size_t count;
  103. volatile size_t barrierSize;
  104. };
  105. }
  106. #endif
  107. namespace embree
  108. {
  109. BarrierSys::BarrierSys (size_t N) {
  110. opaque = new BarrierSysImplementation(N);
  111. }
  112. BarrierSys::~BarrierSys () {
  113. delete (BarrierSysImplementation*) opaque;
  114. }
  115. void BarrierSys::init(size_t count) {
  116. ((BarrierSysImplementation*) opaque)->init(count);
  117. }
  118. void BarrierSys::wait() {
  119. ((BarrierSysImplementation*) opaque)->wait();
  120. }
  121. LinearBarrierActive::LinearBarrierActive (size_t N)
  122. : count0(nullptr), count1(nullptr), mode(0), flag0(0), flag1(0), threadCount(0)
  123. {
  124. if (N == 0) N = getNumberOfLogicalThreads();
  125. init(N);
  126. }
  127. LinearBarrierActive::~LinearBarrierActive()
  128. {
  129. delete[] count0;
  130. delete[] count1;
  131. }
  132. void LinearBarrierActive::init(size_t N)
  133. {
  134. if (threadCount != N) {
  135. threadCount = N;
  136. if (count0) delete[] count0; count0 = new unsigned char[N];
  137. if (count1) delete[] count1; count1 = new unsigned char[N];
  138. }
  139. mode = 0;
  140. flag0 = 0;
  141. flag1 = 0;
  142. for (size_t i=0; i<N; i++) count0[i] = 0;
  143. for (size_t i=0; i<N; i++) count1[i] = 0;
  144. }
  145. void LinearBarrierActive::wait (const size_t threadIndex)
  146. {
  147. if (mode == 0)
  148. {
  149. if (threadIndex == 0)
  150. {
  151. for (size_t i=0; i<threadCount; i++)
  152. count1[i] = 0;
  153. for (size_t i=1; i<threadCount; i++)
  154. {
  155. while (likely(count0[i] == 0))
  156. pause_cpu();
  157. }
  158. mode = 1;
  159. flag1 = 0;
  160. __memory_barrier();
  161. flag0 = 1;
  162. }
  163. else
  164. {
  165. count0[threadIndex] = 1;
  166. {
  167. while (likely(flag0 == 0))
  168. pause_cpu();
  169. }
  170. }
  171. }
  172. else
  173. {
  174. if (threadIndex == 0)
  175. {
  176. for (size_t i=0; i<threadCount; i++)
  177. count0[i] = 0;
  178. for (size_t i=1; i<threadCount; i++)
  179. {
  180. while (likely(count1[i] == 0))
  181. pause_cpu();
  182. }
  183. mode = 0;
  184. flag0 = 0;
  185. __memory_barrier();
  186. flag1 = 1;
  187. }
  188. else
  189. {
  190. count1[threadIndex] = 1;
  191. {
  192. while (likely(flag1 == 0))
  193. pause_cpu();
  194. }
  195. }
  196. }
  197. }
  198. struct barrier_sys_regression_test : public RegressionTest
  199. {
  200. BarrierSys barrier;
  201. std::atomic<size_t> threadID;
  202. std::atomic<size_t> numFailed;
  203. std::vector<size_t> threadResults;
  204. barrier_sys_regression_test()
  205. : RegressionTest("barrier_sys_regression_test"), threadID(0), numFailed(0)
  206. {
  207. registerRegressionTest(this);
  208. }
  209. static void thread_alloc(barrier_sys_regression_test* This)
  210. {
  211. size_t tid = This->threadID++;
  212. for (size_t j=0; j<1000; j++)
  213. {
  214. This->barrier.wait();
  215. This->threadResults[tid] = tid;
  216. This->barrier.wait();
  217. }
  218. }
  219. bool run ()
  220. {
  221. threadID.store(0);
  222. numFailed.store(0);
  223. size_t numThreads = getNumberOfLogicalThreads();
  224. threadResults.resize(numThreads);
  225. barrier.init(numThreads+1);
  226. /* create threads */
  227. std::vector<thread_t> threads;
  228. for (size_t i=0; i<numThreads; i++)
  229. threads.push_back(createThread((thread_func)thread_alloc,this));
  230. /* run test */
  231. for (size_t i=0; i<1000; i++)
  232. {
  233. for (size_t i=0; i<numThreads; i++) threadResults[i] = 0;
  234. barrier.wait();
  235. barrier.wait();
  236. for (size_t i=0; i<numThreads; i++) numFailed += threadResults[i] != i;
  237. }
  238. /* destroy threads */
  239. for (size_t i=0; i<numThreads; i++)
  240. join(threads[i]);
  241. return numFailed == 0;
  242. }
  243. };
  244. barrier_sys_regression_test barrier_sys_regression_test;
  245. }