Browse Source

revert and use simple flag to stop parallel nesting

Alec Jacobson 1 month ago
parent
commit
59c46302cd
2 changed files with 114 additions and 210 deletions
  1. 65 206
      include/igl/parallel_for.h
  2. 49 4
      tests/include/igl/parallel_for.cpp

+ 65 - 206
include/igl/parallel_for.h

@@ -62,7 +62,6 @@ namespace igl
     const Index loop_size,
     const FunctionType & func,
     const size_t min_parallel=0);
-
   /// Functional implementation of an open-mp style, parallel for loop with
   /// accumulation. For example, serial code separated into n chunks (each to be
   /// parallelized with a thread) might look like:
@@ -114,128 +113,22 @@ namespace igl
 
 #include "default_num_threads.h"
 
-#include <cmath>
-#include <cassert>
-#include <thread>
-#include <vector>
-#include <algorithm>
-#include <queue>
-#include <mutex>
-#include <condition_variable>
-#include <atomic>
-
-namespace igl {
-namespace internal
-{
-
-inline bool & worker_flag()
-{
-  static thread_local bool flag = false;
-  return flag;
-}
-
-inline bool is_worker_thread()
-{
-  return worker_flag();
-}
-
-inline void set_worker_thread(bool v)
-{
-  worker_flag() = v;
-}
-
-} // namespace internal
-} // namespace igl
-
 namespace igl {
-namespace internal
-{
-
-// Simple shared thread pool using only std::
-class ThreadPool
-{
-public:
-  using Task = std::function<void()>;
-
-  // First call fixes the pool size; later calls ignore nthreads_hint.
-  static ThreadPool & instance(size_t nthreads_hint)
-  {
-    static ThreadPool pool(nthreads_hint);
-    return pool;
-  }
-
-  size_t size() const
-  {
-    return workers.size();
-  }
-
-  void enqueue(Task task)
-  {
-    {
-      std::unique_lock<std::mutex> lock(mutex);
-      tasks.emplace(std::move(task));
-    }
-    cv.notify_one();
-  }
-
-private:
-  ThreadPool(size_t nthreads_hint)
-    : stop(false)
-  {
-    size_t nthreads = nthreads_hint == 0 ? 1 : nthreads_hint;
-    workers.reserve(nthreads);
-    for(size_t i = 0; i < nthreads; ++i)
-    {
-      workers.emplace_back([this]()
-          {
-          igl::internal::set_worker_thread(true); // <- mark this thread as a pool worker
-
-          for(;;)
-          {
-          Task task;
-          {
-          std::unique_lock<std::mutex> lock(mutex);
-          cv.wait(lock, [this]()
-              {
-              return stop || !tasks.empty();
-              });
-
-          if(stop && tasks.empty())
-          {
-          return;
-          }
-
-          task = std::move(tasks.front());
-          tasks.pop();
-          }
-          task();
-          }
-          });
-    }
-  }
-
-  ~ThreadPool()
-  {
+  namespace internal {
+    inline std::size_t & parallel_for_nesting_level()
     {
-      std::unique_lock<std::mutex> lock(mutex);
-      stop = true;
-    }
-    cv.notify_all();
-    for(std::thread & t : workers)
-    {
-      if(t.joinable()) t.join();
+      // One counter *per thread*
+      static thread_local std::size_t level = 0;
+      return level;
     }
   }
+}
 
-  std::vector<std::thread> workers;
-  std::queue<Task> tasks;
-  mutable std::mutex mutex;
-  std::condition_variable cv;
-  bool stop;
-};
-
-} // namespace internal
-} // namespace igl
+#include <cmath>
+#include <cassert>
+#include <thread>
+#include <vector>
+#include <algorithm>
 
 template<typename Index, typename FunctionType >
 inline bool igl::parallel_for(
@@ -243,14 +136,13 @@ inline bool igl::parallel_for(
   const FunctionType & func,
   const size_t min_parallel)
 {
-  // no-op preparation/accumulation
-  const auto & no_op = [](const size_t /*n_or_t*/){};
+  // no op preparation/accumulation
+  const auto & no_op = [](const size_t /*n/t*/){};
   // two-parameter wrapper ignoring thread id
-  const auto & wrapper = [&func](Index i, size_t /*t*/){ func(i); };
-  return parallel_for(loop_size, no_op, wrapper, no_op, min_parallel);
+  const auto & wrapper = [&func](Index i,size_t /*t*/){ func(i); };
+  return parallel_for(loop_size,no_op,wrapper,no_op,min_parallel);
 }
 
-
 template<
   typename Index,
   typename PreFunctionType,
@@ -263,108 +155,75 @@ inline bool igl::parallel_for(
   const AccumFunctionType & accum_func,
   const size_t min_parallel)
 {
-  assert(loop_size >= 0);
-  if (loop_size == 0) return false;
-
-  // If we're already inside a ThreadPool worker, run serial to avoid nested
-  // deadlock with the global pool.
-  if (igl::internal::is_worker_thread())
-  {
-    prep_func(1);
-    for (Index i = 0; i < loop_size; ++i)
-    {
-      func(i, 0);
-    }
-    accum_func(0);
-    return false;
-  }
+  assert(loop_size>=0);
+  if(loop_size==0) return false;
 
 #ifdef IGL_PARALLEL_FOR_FORCE_SERIAL
-  const size_t configured_threads = 1;
+  const size_t nthreads = 1;
 #else
-  const size_t configured_threads = igl::default_num_threads();
+  const size_t nthreads = igl::default_num_threads();
 #endif
 
-  if (loop_size < static_cast<Index>(min_parallel) || configured_threads <= 1)
+  // NEW: are we already inside a parallel_for worker?
+  const bool nested = igl::internal::parallel_for_nesting_level() > 0;
+
+  if(loop_size<min_parallel || nthreads<=1 || nested)
   {
-    // Serial fallback
+    // serial
     prep_func(1);
-    for (Index i = 0; i < loop_size; ++i)
-    {
-      func(i, 0);
-    }
+    // (We do *not* change nesting_level here, so non-nested, purely serial
+    // uses of parallel_for can still parallelize their own inner loops.)
+    for(Index i = 0;i<loop_size;i++) func(i,0);
     accum_func(0);
     return false;
-  }
-
-  // --- Parallel branch using shared thread pool ---
-
-  auto & pool = igl::internal::ThreadPool::instance(configured_threads);
-  const size_t pool_threads = std::max<size_t>(1, pool.size());
-
-  // Match old semantics: prep called with number of *potential* threads.
-  prep_func(pool_threads);
-
-  // Number of "logical jobs" (chunks of the index range).
-  const size_t jobs = static_cast<size_t>(
-    std::min<Index>(loop_size, static_cast<Index>(pool_threads)));
-
-  struct Group
+  }else
   {
-    std::mutex mutex;
-    std::condition_variable cv;
-    std::atomic<size_t> remaining;
-  };
-
-  auto group = std::make_shared<Group>();
-  group->remaining.store(jobs, std::memory_order_relaxed);
+    // Size of a slice for the range functions
+    Index slice =
+      std::max(
+        (Index)std::round((loop_size+1)/static_cast<double>(nthreads)),(Index)1);
 
-  const Index total = loop_size;
-  const Index base  = total / static_cast<Index>(jobs);
-  const Index rem   = total % static_cast<Index>(jobs);
-
-  for (size_t t = 0; t < jobs; ++t)
-  {
-    const Index start =
-      static_cast<Index>(t) * base
-      + std::min<Index>(static_cast<Index>(t), rem);
-
-    const Index end = start + base + (t < static_cast<size_t>(rem) ? 1 : 0);
-
-    pool.enqueue([group, &func, start, end, t]()
+    // [Helper] Inner loop
+    const auto & range = [&func](const Index k1, const Index k2, const size_t t)
+    {
+      // NEW: mark this thread as being in a parallel_for while running func
+      auto & level = igl::internal::parallel_for_nesting_level();
+      level++;
+      for(Index k = k1; k < k2; k++) func(k,t);
+      level--;
+    };
+
+    prep_func(nthreads);
+    std::vector<std::thread> pool;
+    pool.reserve(nthreads);
+
+    Index i1 = 0;
+    Index i2 = std::min(0 + slice, loop_size);
     {
-      // Each job processes its contiguous slice [start, end)
-      for (Index k = start; k < end; ++k)
+      size_t t = 0;
+      for (; t+1 < nthreads && i1 < loop_size; ++t)
       {
-        func(k, t);
+        pool.emplace_back(range, i1, i2, t);
+        i1 = i2;
+        i2 = std::min(i2 + slice, loop_size);
       }
-
-      // Signal completion of this job.
-      if (group->remaining.fetch_sub(1, std::memory_order_acq_rel) == 1)
+      if (i1 < loop_size)
       {
-        std::unique_lock<std::mutex> lock(group->mutex);
-        group->cv.notify_one();
+        pool.emplace_back(range, i1, loop_size, t);
       }
-    });
-  }
+    }
+    for (std::thread &t : pool) if (t.joinable()) t.join();
 
-  // Wait for all jobs for this parallel_for call to finish.
-  {
-    std::unique_lock<std::mutex> lock(group->mutex);
-    group->cv.wait(lock, [&group]()
+    for(size_t t = 0;t<nthreads;t++)
     {
-      return group->remaining.load(std::memory_order_acquire) == 0;
-    });
-  }
-
-  // Accumulate across all potential threads (same as original implementation).
-  for (size_t t = 0; t < pool_threads; ++t)
-  {
-    accum_func(t);
+      accum_func(t);
+    }
+    return true;
   }
-
-  return true;
 }
 
-#endif
 
+//#ifndef IGL_STATIC_LIBRARY
+//#include "parallel_for.cpp"
+//#endif
+#endif

+ 49 - 4
tests/include/igl/parallel_for.cpp

@@ -6,10 +6,6 @@
 #include <thread>
 #include <set>
 
-// -----------------------------------------------------------------------------
-// Existing tests from your message
-// -----------------------------------------------------------------------------
-
 TEST_CASE("parallel_for: serial_fallback", "[igl][parallel_for]")
 {
   // loop_size < min_parallel ⇒ must run serial
@@ -598,5 +594,54 @@ TEST_CASE("parallel_for: timing_many_small_jobs", "[igl][parallel_for][timing]")
   }
 }
 
+
+TEST_CASE("parallel_for: nested_serial_fallback", "[igl][parallel_for]")
+{
+  // If there is only a single hardware thread available, this test
+  // can't meaningfully distinguish nested/parallel behavior.
+  if(igl::default_num_threads() <= 1)
+  {
+    SUCCEED("Only one hardware thread; nested parallel test skipped.");
+    return;
+  }
+
+  const int outer_loop_size = 4;
+  const int inner_loop_size = 4;
+
+  std::atomic<bool> any_inner_parallel(false);
+  std::atomic<int> counter(0);
+
+  // Outer parallel_for should use multiple threads.
+  bool outer_used_parallel = igl::parallel_for(
+    outer_loop_size,
+    [&](int /*i*/)
+    {
+      bool inner_used_parallel = igl::parallel_for(
+        inner_loop_size,
+        [&](int /*j*/)
+        {
+          // Just do some work so we know the inner loop ran.
+          counter.fetch_add(1, std::memory_order_relaxed);
+        }
+      );
+
+      if(inner_used_parallel)
+      {
+        any_inner_parallel.store(true, std::memory_order_relaxed);
+      }
+    }
+  );
+
+  // Sanity: outer loop should be parallel when threads > 1.
+  REQUIRE(outer_used_parallel == true);
+
+  // Sanity: all iterations of both loops ran.
+  REQUIRE(counter.load(std::memory_order_relaxed)
+          == outer_loop_size * inner_loop_size);
+
+  // The key assertion: inner parallel_for must fall back to serial when nested.
+  REQUIRE(any_inner_parallel.load(std::memory_order_relaxed) == false);
+}
+
 #endif // IGL_PARALLEL_FOR_TIMING_TESTS