parallel_for.h 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370
  1. // This file is part of libigl, a simple c++ geometry processing library.
  2. //
  3. // Copyright (C) 2016 Alec Jacobson <[email protected]>
  4. //
  5. // This Source Code Form is subject to the terms of the Mozilla Public License
  6. // v. 2.0. If a copy of the MPL was not distributed with this file, You can
  7. // obtain one at http://mozilla.org/MPL/2.0/.
  8. #ifndef IGL_PARALLEL_FOR_H
  9. #define IGL_PARALLEL_FOR_H
  10. #include "igl_inline.h"
  11. #include <functional>
  12. //#warning "Defining IGL_PARALLEL_FOR_FORCE_SERIAL"
  13. //#define IGL_PARALLEL_FOR_FORCE_SERIAL
  14. namespace igl
  15. {
  16. /// Functional implementation of a basic, open-mp style, parallel
  17. /// for loop. If the inner block of a for-loop can be rewritten/encapsulated in
  18. /// a single (anonymous/lambda) function call `func` so that the serial code
  19. /// looks like:
  20. ///
  21. /// \code{cpp}
  22. /// for(int i = 0;i<loop_size;i++)
  23. /// {
  24. /// func(i);
  25. /// }
  26. /// \endcode
  27. ///
  28. /// then `parallel_for(loop_size,func,min_parallel)` will use as many threads as
  29. /// available on the current hardware to parallelize this for loop so long as
  30. /// loop_size<min_parallel, otherwise it will just use a serial for loop.
  31. ///
  32. /// Often if your code looks like:
  33. ///
  34. /// \code{cpp}
  35. /// for(int i = 0;i<loop_size;i++)
  36. /// {
  37. /// …
  38. /// }
  39. /// \endcode
  40. ///
  41. /// Then you can make a minimal two-line change to parallelize it:
  42. ///
  43. /// \code{cpp}
  44. /// //for(int i = 0;i<loop_size;i++)
  45. /// parallel_for(loop_size,[&](int i)
  46. /// {
  47. /// …
  48. /// }
  49. /// ,1000);
  50. /// \endcode
  51. ///
  52. /// @param[in] loop_size number of iterations. I.e. for(int i = 0;i<loop_size;i++) ...
  53. /// @param[in] func function handle taking iteration index as only argument to compute
  54. /// inner block of for loop I.e. for(int i ...){ func(i); }
  55. /// @param[in] min_parallel min size of loop_size such that parallel (non-serial)
  56. /// thread pooling should be attempted {0}
  57. /// @return true iff thread pool was invoked
  58. template<typename Index, typename FunctionType >
  59. inline bool parallel_for(
  60. const Index loop_size,
  61. const FunctionType & func,
  62. const size_t min_parallel=0);
  63. /// Functional implementation of an open-mp style, parallel for loop with
  64. /// accumulation. For example, serial code separated into n chunks (each to be
  65. /// parallelized with a thread) might look like:
  66. ///
  67. /// \code{cpp}
  68. /// Eigen::VectorXd S;
  69. /// const auto & prep_func = [&S](int n){ S = Eigen:VectorXd::Zero(n); };
  70. /// const auto & func = [&X,&S](int i, int t){ S(t) += X(i); };
  71. /// const auto & accum_func = [&S,&sum](int t){ sum += S(t); };
  72. /// prep_func(n);
  73. /// for(int i = 0;i<loop_size;i++)
  74. /// {
  75. /// func(i,i%n);
  76. /// }
  77. /// double sum = 0;
  78. /// for(int t = 0;t<n;t++)
  79. /// {
  80. /// accum_func(t);
  81. /// }
  82. /// \endcode
  83. ///
  84. /// @param[in] loop_size number of iterations. I.e. for(int i = 0;i<loop_size;i++) ...
  85. /// @param[in] prep_func function handle taking n >= number of threads as only
  86. /// argument
  87. /// @param[in] func function handle taking iteration index i and thread id t as only
  88. /// arguments to compute inner block of for loop I.e.
  89. /// for(int i ...){ func(i,t); }
  90. /// @param[in] accum_func function handle taking thread index as only argument, to be
  91. /// called after all calls of func, e.g., for serial accumulation across
  92. /// all n (potential) threads, see n in description of prep_func.
  93. /// @param[in] min_parallel min size of loop_size such that parallel (non-serial)
  94. /// thread pooling should be attempted {0}
  95. /// @return true iff thread pool was invoked
  96. template<
  97. typename Index,
  98. typename PrepFunctionType,
  99. typename FunctionType,
  100. typename AccumFunctionType
  101. >
  102. inline bool parallel_for(
  103. const Index loop_size,
  104. const PrepFunctionType & prep_func,
  105. const FunctionType & func,
  106. const AccumFunctionType & accum_func,
  107. const size_t min_parallel=0);
  108. }
  109. // Implementation
  110. #include "default_num_threads.h"
  111. #include <cmath>
  112. #include <cassert>
  113. #include <thread>
  114. #include <vector>
  115. #include <algorithm>
  116. #include <queue>
  117. #include <mutex>
  118. #include <condition_variable>
  119. #include <atomic>
  120. namespace igl {
  121. namespace internal
  122. {
  123. inline bool & worker_flag()
  124. {
  125. static thread_local bool flag = false;
  126. return flag;
  127. }
  128. inline bool is_worker_thread()
  129. {
  130. return worker_flag();
  131. }
  132. inline void set_worker_thread(bool v)
  133. {
  134. worker_flag() = v;
  135. }
  136. } // namespace internal
  137. } // namespace igl
  138. namespace igl {
  139. namespace internal
  140. {
  141. // Simple shared thread pool using only std::
  142. class ThreadPool
  143. {
  144. public:
  145. using Task = std::function<void()>;
  146. // First call fixes the pool size; later calls ignore nthreads_hint.
  147. static ThreadPool & instance(size_t nthreads_hint)
  148. {
  149. static ThreadPool pool(nthreads_hint);
  150. return pool;
  151. }
  152. size_t size() const
  153. {
  154. return workers.size();
  155. }
  156. void enqueue(Task task)
  157. {
  158. {
  159. std::unique_lock<std::mutex> lock(mutex);
  160. tasks.emplace(std::move(task));
  161. }
  162. cv.notify_one();
  163. }
  164. private:
  165. ThreadPool(size_t nthreads_hint)
  166. : stop(false)
  167. {
  168. size_t nthreads = nthreads_hint == 0 ? 1 : nthreads_hint;
  169. workers.reserve(nthreads);
  170. for(size_t i = 0; i < nthreads; ++i)
  171. {
  172. workers.emplace_back([this]()
  173. {
  174. igl::internal::set_worker_thread(true); // <- mark this thread as a pool worker
  175. for(;;)
  176. {
  177. Task task;
  178. {
  179. std::unique_lock<std::mutex> lock(mutex);
  180. cv.wait(lock, [this]()
  181. {
  182. return stop || !tasks.empty();
  183. });
  184. if(stop && tasks.empty())
  185. {
  186. return;
  187. }
  188. task = std::move(tasks.front());
  189. tasks.pop();
  190. }
  191. task();
  192. }
  193. });
  194. }
  195. }
  196. ~ThreadPool()
  197. {
  198. {
  199. std::unique_lock<std::mutex> lock(mutex);
  200. stop = true;
  201. }
  202. cv.notify_all();
  203. for(std::thread & t : workers)
  204. {
  205. if(t.joinable()) t.join();
  206. }
  207. }
  208. std::vector<std::thread> workers;
  209. std::queue<Task> tasks;
  210. mutable std::mutex mutex;
  211. std::condition_variable cv;
  212. bool stop;
  213. };
  214. } // namespace internal
  215. } // namespace igl
  216. template<typename Index, typename FunctionType >
  217. inline bool igl::parallel_for(
  218. const Index loop_size,
  219. const FunctionType & func,
  220. const size_t min_parallel)
  221. {
  222. // no-op preparation/accumulation
  223. const auto & no_op = [](const size_t /*n_or_t*/){};
  224. // two-parameter wrapper ignoring thread id
  225. const auto & wrapper = [&func](Index i, size_t /*t*/){ func(i); };
  226. return parallel_for(loop_size, no_op, wrapper, no_op, min_parallel);
  227. }
  228. template<
  229. typename Index,
  230. typename PreFunctionType,
  231. typename FunctionType,
  232. typename AccumFunctionType>
  233. inline bool igl::parallel_for(
  234. const Index loop_size,
  235. const PreFunctionType & prep_func,
  236. const FunctionType & func,
  237. const AccumFunctionType & accum_func,
  238. const size_t min_parallel)
  239. {
  240. assert(loop_size >= 0);
  241. if (loop_size == 0) return false;
  242. // If we're already inside a ThreadPool worker, run serial to avoid nested
  243. // deadlock with the global pool.
  244. if (igl::internal::is_worker_thread())
  245. {
  246. prep_func(1);
  247. for (Index i = 0; i < loop_size; ++i)
  248. {
  249. func(i, 0);
  250. }
  251. accum_func(0);
  252. return false;
  253. }
  254. #ifdef IGL_PARALLEL_FOR_FORCE_SERIAL
  255. const size_t configured_threads = 1;
  256. #else
  257. const size_t configured_threads = igl::default_num_threads();
  258. #endif
  259. if (loop_size < static_cast<Index>(min_parallel) || configured_threads <= 1)
  260. {
  261. // Serial fallback
  262. prep_func(1);
  263. for (Index i = 0; i < loop_size; ++i)
  264. {
  265. func(i, 0);
  266. }
  267. accum_func(0);
  268. return false;
  269. }
  270. // --- Parallel branch using shared thread pool ---
  271. auto & pool = igl::internal::ThreadPool::instance(configured_threads);
  272. const size_t pool_threads = std::max<size_t>(1, pool.size());
  273. // Match old semantics: prep called with number of *potential* threads.
  274. prep_func(pool_threads);
  275. // Number of "logical jobs" (chunks of the index range).
  276. const size_t jobs = static_cast<size_t>(
  277. std::min<Index>(loop_size, static_cast<Index>(pool_threads)));
  278. struct Group
  279. {
  280. std::mutex mutex;
  281. std::condition_variable cv;
  282. std::atomic<size_t> remaining;
  283. };
  284. auto group = std::make_shared<Group>();
  285. group->remaining.store(jobs, std::memory_order_relaxed);
  286. const Index total = loop_size;
  287. const Index base = total / static_cast<Index>(jobs);
  288. const Index rem = total % static_cast<Index>(jobs);
  289. for (size_t t = 0; t < jobs; ++t)
  290. {
  291. const Index start =
  292. static_cast<Index>(t) * base
  293. + std::min<Index>(static_cast<Index>(t), rem);
  294. const Index end = start + base + (t < static_cast<size_t>(rem) ? 1 : 0);
  295. pool.enqueue([group, &func, start, end, t]()
  296. {
  297. // Each job processes its contiguous slice [start, end)
  298. for (Index k = start; k < end; ++k)
  299. {
  300. func(k, t);
  301. }
  302. // Signal completion of this job.
  303. if (group->remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
  304. {
  305. std::unique_lock<std::mutex> lock(group->mutex);
  306. group->cv.notify_one();
  307. }
  308. });
  309. }
  310. // Wait for all jobs for this parallel_for call to finish.
  311. {
  312. std::unique_lock<std::mutex> lock(group->mutex);
  313. group->cv.wait(lock, [&group]()
  314. {
  315. return group->remaining.load(std::memory_order_acquire) == 0;
  316. });
  317. }
  318. // Accumulate across all potential threads (same as original implementation).
  319. for (size_t t = 0; t < pool_threads; ++t)
  320. {
  321. accum_func(t);
  322. }
  323. return true;
  324. }
  325. #endif