|
|
@@ -250,6 +250,7 @@ inline bool igl::parallel_for(
|
|
|
return parallel_for(loop_size, no_op, wrapper, no_op, min_parallel);
|
|
|
}
|
|
|
|
|
|
+
|
|
|
template<
|
|
|
typename Index,
|
|
|
typename PreFunctionType,
|
|
|
@@ -262,14 +263,15 @@ inline bool igl::parallel_for(
|
|
|
const AccumFunctionType & accum_func,
|
|
|
const size_t min_parallel)
|
|
|
{
|
|
|
- assert(loop_size>=0);
|
|
|
- if(loop_size==0) return false;
|
|
|
+ assert(loop_size >= 0);
|
|
|
+ if (loop_size == 0) return false;
|
|
|
|
|
|
- // If we're already inside a ThreadPool worker, run serial to avoid deadlock
|
|
|
- if(igl::internal::is_worker_thread())
|
|
|
+ // If we're already inside a ThreadPool worker, run serial to avoid nested
|
|
|
+ // deadlock with the global pool.
|
|
|
+ if (igl::internal::is_worker_thread())
|
|
|
{
|
|
|
prep_func(1);
|
|
|
- for(Index i = 0; i < loop_size; ++i)
|
|
|
+ for (Index i = 0; i < loop_size; ++i)
|
|
|
{
|
|
|
func(i, 0);
|
|
|
}
|
|
|
@@ -283,76 +285,86 @@ inline bool igl::parallel_for(
|
|
|
const size_t configured_threads = igl::default_num_threads();
|
|
|
#endif
|
|
|
|
|
|
- if(loop_size < static_cast<Index>(min_parallel) || configured_threads <= 1)
|
|
|
+ if (loop_size < static_cast<Index>(min_parallel) || configured_threads <= 1)
|
|
|
{
|
|
|
- // serial
|
|
|
+ // Serial fallback
|
|
|
prep_func(1);
|
|
|
- for(Index i = 0; i < loop_size; ++i)
|
|
|
+ for (Index i = 0; i < loop_size; ++i)
|
|
|
{
|
|
|
func(i, 0);
|
|
|
}
|
|
|
accum_func(0);
|
|
|
return false;
|
|
|
}
|
|
|
- else
|
|
|
- {
|
|
|
- // Use shared thread pool
|
|
|
- auto & pool = igl::internal::ThreadPool::instance(configured_threads);
|
|
|
- const size_t pool_threads = std::max<size_t>(1, pool.size());
|
|
|
|
|
|
- // Keep semantics: prep called with number of potential threads
|
|
|
- prep_func(pool_threads);
|
|
|
+ // --- Parallel branch using shared thread pool ---
|
|
|
|
|
|
- // Number of logical jobs (chunks)
|
|
|
- const size_t jobs = static_cast<size_t>(
|
|
|
- std::min<Index>(loop_size, static_cast<Index>(pool_threads)));
|
|
|
+ auto & pool = igl::internal::ThreadPool::instance(configured_threads);
|
|
|
+ const size_t pool_threads = std::max<size_t>(1, pool.size());
|
|
|
|
|
|
- struct SharedCounter
|
|
|
- {
|
|
|
- std::atomic<size_t> remaining;
|
|
|
- };
|
|
|
+ // Match old semantics: prep called with number of *potential* threads.
|
|
|
+ prep_func(pool_threads);
|
|
|
|
|
|
- auto counter = std::make_shared<SharedCounter>();
|
|
|
- counter->remaining.store(jobs, std::memory_order_relaxed);
|
|
|
+ // Number of "logical jobs" (chunks of the index range).
|
|
|
+ const size_t jobs = static_cast<size_t>(
|
|
|
+ std::min<Index>(loop_size, static_cast<Index>(pool_threads)));
|
|
|
|
|
|
- const Index total = loop_size;
|
|
|
- const Index base = total / static_cast<Index>(jobs);
|
|
|
- const Index rem = total % static_cast<Index>(jobs);
|
|
|
+ struct Group
|
|
|
+ {
|
|
|
+ std::mutex mutex;
|
|
|
+ std::condition_variable cv;
|
|
|
+ std::atomic<size_t> remaining;
|
|
|
+ };
|
|
|
|
|
|
- for(size_t t = 0; t < jobs; ++t)
|
|
|
- {
|
|
|
- const Index start =
|
|
|
- static_cast<Index>(t) * base
|
|
|
- + std::min<Index>(static_cast<Index>(t), rem);
|
|
|
- const Index end = start + base + (t < static_cast<size_t>(rem) ? 1 : 0);
|
|
|
+ auto group = std::make_shared<Group>();
|
|
|
+ group->remaining.store(jobs, std::memory_order_relaxed);
|
|
|
|
|
|
- pool.enqueue([counter, &func, start, end, t]()
|
|
|
- {
|
|
|
- for(Index k = start; k < end; ++k)
|
|
|
- {
|
|
|
- func(k, t);
|
|
|
- }
|
|
|
- counter->remaining.fetch_sub(1, std::memory_order_acq_rel);
|
|
|
- });
|
|
|
- }
|
|
|
+ const Index total = loop_size;
|
|
|
+ const Index base = total / static_cast<Index>(jobs);
|
|
|
+ const Index rem = total % static_cast<Index>(jobs);
|
|
|
+
|
|
|
+ for (size_t t = 0; t < jobs; ++t)
|
|
|
+ {
|
|
|
+ const Index start =
|
|
|
+ static_cast<Index>(t) * base
|
|
|
+ + std::min<Index>(static_cast<Index>(t), rem);
|
|
|
|
|
|
- // Wait until all jobs for this parallel_for are finished.
|
|
|
- // Busy-wait with yield to avoid hammering a core.
|
|
|
- while(counter->remaining.load(std::memory_order_acquire) != 0)
|
|
|
+ const Index end = start + base + (t < static_cast<size_t>(rem) ? 1 : 0);
|
|
|
+
|
|
|
+ pool.enqueue([group, &func, start, end, t]()
|
|
|
{
|
|
|
- std::this_thread::yield();
|
|
|
- }
|
|
|
+ // Each job processes its contiguous slice [start, end)
|
|
|
+ for (Index k = start; k < end; ++k)
|
|
|
+ {
|
|
|
+ func(k, t);
|
|
|
+ }
|
|
|
|
|
|
- // Accumulate across all potential threads (like original impl)
|
|
|
- for(size_t t = 0; t < pool_threads; ++t)
|
|
|
+ // Signal completion of this job.
|
|
|
+ if (group->remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lock(group->mutex);
|
|
|
+ group->cv.notify_one();
|
|
|
+ }
|
|
|
+ });
|
|
|
+ }
|
|
|
+
|
|
|
+ // Wait for all jobs for this parallel_for call to finish.
|
|
|
+ {
|
|
|
+ std::unique_lock<std::mutex> lock(group->mutex);
|
|
|
+ group->cv.wait(lock, [&group]()
|
|
|
{
|
|
|
- accum_func(t);
|
|
|
- }
|
|
|
+ return group->remaining.load(std::memory_order_acquire) == 0;
|
|
|
+ });
|
|
|
+ }
|
|
|
|
|
|
- return true;
|
|
|
+ // Accumulate across all potential threads (same as original implementation).
|
|
|
+ for (size_t t = 0; t < pool_threads; ++t)
|
|
|
+ {
|
|
|
+ accum_func(t);
|
|
|
}
|
|
|
-}
|
|
|
|
|
|
+ return true;
|
|
|
+}
|
|
|
|
|
|
#endif
|
|
|
|