parallel_for.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602
  1. #include <test_common.h>
  2. #include <igl/parallel_for.h>
  3. #include <atomic>
  4. #include <vector>
  5. #include <numeric>
  6. #include <thread>
  7. #include <set>
  8. // -----------------------------------------------------------------------------
  9. // Existing tests from your message
  10. // -----------------------------------------------------------------------------
  11. TEST_CASE("parallel_for: serial_fallback", "[igl][parallel_for]")
  12. {
  13. // loop_size < min_parallel ⇒ must run serial
  14. std::vector<int> vals(10, 0);
  15. bool used_parallel = igl::parallel_for(
  16. (int)vals.size(),
  17. [&](int i){ vals[i] = 1; },
  18. /*min_parallel=*/1000
  19. );
  20. REQUIRE(used_parallel == false);
  21. for (int v : vals)
  22. REQUIRE(v == 1);
  23. }
  24. TEST_CASE("parallel_for: basic_parallelism", "[igl][parallel_for]")
  25. {
  26. const int N = 20000;
  27. std::vector<int> hit(N, 0);
  28. std::atomic<int> counter(0);
  29. bool used_parallel = igl::parallel_for(
  30. N,
  31. [&](int i)
  32. {
  33. hit[i] = 1;
  34. counter.fetch_add(1, std::memory_order_relaxed);
  35. },
  36. /*min_parallel=*/1
  37. );
  38. REQUIRE(used_parallel == true);
  39. REQUIRE(counter.load() == N);
  40. for (int v : hit)
  41. REQUIRE(v == 1);
  42. }
  43. TEST_CASE("parallel_for: accumulation", "[igl][parallel_for]")
  44. {
  45. const int N = 10000;
  46. // Per-thread buckets
  47. std::vector<double> buckets;
  48. const auto prep = [&](size_t nt)
  49. {
  50. buckets.assign(nt, 0.0);
  51. };
  52. const auto func = [&](int /*i*/, size_t t)
  53. {
  54. buckets[t] += 1.0; // increment per-thread
  55. };
  56. double total = 0.0;
  57. const auto accum = [&](size_t t)
  58. {
  59. total += buckets[t];
  60. };
  61. bool used_parallel = igl::parallel_for(N, prep, func, accum, 1);
  62. REQUIRE(used_parallel == true);
  63. REQUIRE(total == Approx((double)N));
  64. }
  65. TEST_CASE("parallel_for: equivalence_to_serial", "[igl][parallel_for]")
  66. {
  67. const int N = 15000;
  68. // serial result
  69. std::vector<int> S(N);
  70. for (int i = 0; i < N; ++i) S[i] = i*i;
  71. // parallel result
  72. std::vector<int> P(N);
  73. igl::parallel_for(
  74. N,
  75. [&](int i){ P[i] = i*i; },
  76. /*min_parallel=*/1
  77. );
  78. for (int i = 0; i < N; ++i)
  79. REQUIRE(P[i] == S[i]);
  80. }
  81. TEST_CASE("parallel_for: min_parallel_threshold", "[igl][parallel_for]")
  82. {
  83. const int N = 500;
  84. std::vector<int> A(N,0), B(N,0);
  85. bool p1 = igl::parallel_for(
  86. N, [&](int i){ A[i] = i; },
  87. /*min_parallel=*/1000 // too large → serial
  88. );
  89. bool p2 = igl::parallel_for(
  90. N, [&](int i){ B[i] = i; },
  91. /*min_parallel=*/1 // small → parallel
  92. );
  93. REQUIRE(p1 == false);
  94. REQUIRE(p2 == true);
  95. REQUIRE(A == B);
  96. }
  97. TEST_CASE("parallel_for: nested_calls", "[igl][parallel_for]")
  98. {
  99. const int N = 2000;
  100. std::vector<int> out(N, 0);
  101. bool ok = igl::parallel_for(
  102. N,
  103. [&](int i)
  104. {
  105. // a tiny nested parallel_for
  106. igl::parallel_for(
  107. 10,
  108. [&](int j){ (void)j; }, 1
  109. );
  110. out[i] = 1;
  111. },
  112. /*min_parallel=*/1
  113. );
  114. REQUIRE(ok == true);
  115. for (int v : out)
  116. REQUIRE(v == 1);
  117. }
  118. // -----------------------------------------------------------------------------
  119. // Additional tests
  120. // -----------------------------------------------------------------------------
  121. TEST_CASE("parallel_for: zero_iterations_does_nothing", "[igl][parallel_for]")
  122. {
  123. std::atomic<int> prep_calls(0);
  124. std::atomic<int> func_calls(0);
  125. std::atomic<int> accum_calls(0);
  126. const auto prep = [&](size_t /*nt*/)
  127. {
  128. prep_calls.fetch_add(1, std::memory_order_relaxed);
  129. };
  130. const auto func = [&](int /*i*/, size_t /*t*/)
  131. {
  132. func_calls.fetch_add(1, std::memory_order_relaxed);
  133. };
  134. const auto accum = [&](size_t /*t*/)
  135. {
  136. accum_calls.fetch_add(1, std::memory_order_relaxed);
  137. };
  138. bool used_parallel = igl::parallel_for(
  139. 0,
  140. prep,
  141. func,
  142. accum,
  143. /*min_parallel=*/1
  144. );
  145. REQUIRE(used_parallel == false);
  146. REQUIRE(prep_calls.load() == 0);
  147. REQUIRE(func_calls.load() == 0);
  148. REQUIRE(accum_calls.load() == 0);
  149. }
  150. TEST_CASE("parallel_for: min_parallel_equal_threshold", "[igl][parallel_for]")
  151. {
  152. const int N = 1024;
  153. std::vector<int> A(N,0), B(N,0);
  154. // spec: serial if loop_size < min_parallel, so == should be parallel
  155. bool serial_used = igl::parallel_for(
  156. N, [&](int i){ A[i] = i; },
  157. /*min_parallel=*/N+1
  158. );
  159. bool parallel_used = igl::parallel_for(
  160. N, [&](int i){ B[i] = i; },
  161. /*min_parallel=*/N
  162. );
  163. REQUIRE(serial_used == false);
  164. REQUIRE(parallel_used == true);
  165. REQUIRE(A == B);
  166. }
  167. TEST_CASE("parallel_for: thread_id_range_and_accum_calls", "[igl][parallel_for]")
  168. {
  169. const int N = 10000;
  170. std::vector<long long> bucket_counts;
  171. std::atomic<size_t> prep_nt(0);
  172. std::atomic<size_t> max_t_seen(0);
  173. std::atomic<size_t> accum_calls(0);
  174. const auto prep = [&](size_t nt)
  175. {
  176. prep_nt.store(nt, std::memory_order_relaxed);
  177. bucket_counts.assign(nt, 0); // plain ints, no atomics needed
  178. };
  179. const auto func = [&](int /*i*/, size_t t)
  180. {
  181. // track max t across threads
  182. size_t cur = max_t_seen.load(std::memory_order_relaxed);
  183. while (t > cur && !max_t_seen.compare_exchange_weak(
  184. cur, t,
  185. std::memory_order_relaxed,
  186. std::memory_order_relaxed))
  187. {
  188. // spin until we either win or see a newer/bigger cur
  189. }
  190. // Each logical t corresponds to a single job in this implementation,
  191. // so all calls with the same t are executed on the same worker thread,
  192. // sequentially. No race here; we can use a plain increment.
  193. bucket_counts[t] += 1;
  194. };
  195. const auto accum = [&](size_t /*t*/)
  196. {
  197. accum_calls.fetch_add(1, std::memory_order_relaxed);
  198. };
  199. bool used_parallel = igl::parallel_for(
  200. N, prep, func, accum,
  201. /*min_parallel=*/1
  202. );
  203. REQUIRE(used_parallel == true);
  204. const size_t nt = prep_nt.load();
  205. REQUIRE(nt >= 1);
  206. // t must always be < nt
  207. REQUIRE(max_t_seen.load() < nt);
  208. // accum must be called once per potential thread
  209. REQUIRE(accum_calls.load() == nt);
  210. // Sanity: total counted iterations == N
  211. long long total = 0;
  212. for (size_t t = 0; t < nt; ++t)
  213. total += bucket_counts[t];
  214. REQUIRE(total == N);
  215. }
  216. TEST_CASE("parallel_for: nested_inner_serial_fallback", "[igl][parallel_for]")
  217. {
  218. const int N = 1000;
  219. std::vector<int> outer_hits(N, 0);
  220. std::atomic<bool> inner_parallel_seen(false);
  221. bool outer_parallel = igl::parallel_for(
  222. N,
  223. [&](int i)
  224. {
  225. bool inner_used_parallel = igl::parallel_for(
  226. 10,
  227. [&](int j){ (void)j; },
  228. /*min_parallel=*/1
  229. );
  230. if (inner_used_parallel)
  231. {
  232. inner_parallel_seen.store(true, std::memory_order_relaxed);
  233. }
  234. outer_hits[i] = 1;
  235. },
  236. /*min_parallel=*/1
  237. );
  238. REQUIRE(outer_parallel == true);
  239. for (int v : outer_hits)
  240. REQUIRE(v == 1);
  241. // With the is_worker_thread() guard in the implementation,
  242. // inner calls from pool workers should always be serial.
  243. REQUIRE(inner_parallel_seen.load() == false);
  244. }
  245. TEST_CASE("parallel_for: deep_nested_calls", "[igl][parallel_for]")
  246. {
  247. const int N = 256;
  248. std::vector<int> hits(N, 0);
  249. bool outer_parallel = igl::parallel_for(
  250. N,
  251. [&](int i)
  252. {
  253. igl::parallel_for(
  254. 8,
  255. [&](int j)
  256. {
  257. // third level
  258. igl::parallel_for(
  259. 4,
  260. [&](int k){ (void)k; },
  261. 1
  262. );
  263. (void)j;
  264. },
  265. 1
  266. );
  267. hits[i] = 1;
  268. },
  269. /*min_parallel=*/1
  270. );
  271. REQUIRE(outer_parallel == true);
  272. for (int v : hits)
  273. REQUIRE(v == 1);
  274. }
  275. TEST_CASE("parallel_for: many_small_jobs_reuse_pool", "[igl][parallel_for]")
  276. {
  277. const int iterations = 200;
  278. const int N = 64;
  279. std::vector<int> buf(N);
  280. for (int it = 0; it < iterations; ++it)
  281. {
  282. std::fill(buf.begin(), buf.end(), 0);
  283. bool used_parallel = igl::parallel_for(
  284. N,
  285. [&](int i){ buf[i] = it; },
  286. /*min_parallel=*/1
  287. );
  288. REQUIRE(used_parallel == true);
  289. for (int i = 0; i < N; ++i)
  290. REQUIRE(buf[i] == it);
  291. }
  292. }
  293. TEST_CASE("parallel_for: different_index_types", "[igl][parallel_for]")
  294. {
  295. const long long N = 12345;
  296. std::vector<int> buf((size_t)N, 0);
  297. bool used_parallel = igl::parallel_for(
  298. N,
  299. [&](long long i)
  300. {
  301. buf[(size_t)i] = 1;
  302. },
  303. /*min_parallel=*/1
  304. );
  305. REQUIRE(used_parallel == true);
  306. for (int v : buf)
  307. REQUIRE(v == 1);
  308. }
  309. TEST_CASE("parallel_for: accumulation_equivalence_to_serial_sum", "[igl][parallel_for]")
  310. {
  311. const int N = 10000;
  312. // serial sum
  313. long long serial_sum = 0;
  314. for (int i = 0; i < N; ++i)
  315. {
  316. serial_sum += i;
  317. }
  318. // parallel sum: S[t] accumulates partial sums, then accum collects.
  319. std::vector<long long> S;
  320. long long parallel_sum = 0;
  321. const auto prep = [&](size_t nt)
  322. {
  323. S.assign(nt, 0);
  324. };
  325. const auto func = [&](int i, size_t t)
  326. {
  327. S[t] += i;
  328. };
  329. const auto accum = [&](size_t t)
  330. {
  331. parallel_sum += S[t];
  332. };
  333. bool used_parallel = igl::parallel_for(
  334. N, prep, func, accum,
  335. /*min_parallel=*/1
  336. );
  337. REQUIRE(used_parallel == true);
  338. REQUIRE(parallel_sum == serial_sum);
  339. }
  340. #ifdef IGL_PARALLEL_FOR_FORCE_SERIAL
  341. TEST_CASE("parallel_for: force_serial_macro", "[igl][parallel_for]")
  342. {
  343. // If compiled with IGL_PARALLEL_FOR_FORCE_SERIAL, we must never see parallel.
  344. const int N = 1000;
  345. std::vector<int> buf(N, 0);
  346. bool used_parallel = igl::parallel_for(
  347. N,
  348. [&](int i){ buf[i] = i; },
  349. /*min_parallel=*/1
  350. );
  351. REQUIRE(used_parallel == false);
  352. for (int i = 0; i < N; ++i)
  353. REQUIRE(buf[i] == i);
  354. }
  355. #endif
  356. #define IGL_PARALLEL_FOR_TIMING_TESTS
  357. #ifdef IGL_PARALLEL_FOR_TIMING_TESTS
  358. #include <chrono>
  359. #include <cmath>
  360. // Helper alias
  361. using igl_pf_clock = std::chrono::steady_clock;
  362. TEST_CASE("parallel_for: timing_large_loop", "[igl][parallel_for][timing]")
  363. {
  364. const int N = 5'000'000;
  365. std::vector<double> a(N), b(N);
  366. for (int i = 0; i < N; ++i)
  367. {
  368. a[i] = 0.5 * i;
  369. }
  370. // --- Serial baseline ---
  371. auto serial_start = igl_pf_clock::now();
  372. for (int i = 0; i < N; ++i)
  373. {
  374. // mildly non-trivial work to avoid being optimized away
  375. b[i] = std::sqrt(a[i] * a[i] + 1.0);
  376. }
  377. auto serial_end = igl_pf_clock::now();
  378. auto serial_ms =
  379. std::chrono::duration_cast<std::chrono::milliseconds>(
  380. serial_end - serial_start).count();
  381. // --- Parallel version ---
  382. std::fill(b.begin(), b.end(), 0.0);
  383. auto parallel_start = igl_pf_clock::now();
  384. bool used_parallel = igl::parallel_for(
  385. N,
  386. [&](int i)
  387. {
  388. b[i] = std::sqrt(a[i] * a[i] + 1.0);
  389. },
  390. /*min_parallel=*/1
  391. );
  392. auto parallel_end = igl_pf_clock::now();
  393. auto parallel_ms =
  394. std::chrono::duration_cast<std::chrono::milliseconds>(
  395. parallel_end - parallel_start).count();
  396. INFO("timing_large_loop: serial_ms = " << serial_ms);
  397. INFO("timing_large_loop: parallel_ms = " << parallel_ms);
  398. INFO("timing_large_loop: used_parallel = " << used_parallel);
  399. // Sanity: results should match a re-run of serial
  400. std::vector<double> c(N);
  401. for (int i = 0; i < N; ++i)
  402. {
  403. c[i] = std::sqrt(a[i] * a[i] + 1.0);
  404. }
  405. for (int i = 0; i < N; ++i)
  406. {
  407. REQUIRE(b[i] == Approx(c[i]));
  408. }
  409. // Very soft performance assertion:
  410. // If we actually ran in parallel and the serial baseline took > 0 ms,
  411. // then parallel should not be crazy slower (e.g., 10x).
  412. if (used_parallel && serial_ms > 0)
  413. {
  414. double ratio = (parallel_ms > 0)
  415. ? double(parallel_ms) / double(serial_ms)
  416. : 0.0;
  417. INFO("timing_large_loop: parallel / serial ratio = " << ratio);
  418. // Soft bound: allow parallel to be up to 10x slower in worst case.
  419. CHECK(ratio < 10.0);
  420. }
  421. }
  422. TEST_CASE("parallel_for: timing_many_small_jobs", "[igl][parallel_for][timing]")
  423. {
  424. // This is meant to stress the thread pool reuse behavior: many small jobs.
  425. const int iterations = 500;
  426. const int N = 1024;
  427. std::vector<double> data(N, 1.0);
  428. // --- Serial: do all work in a single loop ---
  429. auto serial_start = igl_pf_clock::now();
  430. double serial_sum = 0.0;
  431. for (int it = 0; it < iterations; ++it)
  432. {
  433. for (int i = 0; i < N; ++i)
  434. {
  435. serial_sum += data[i] * 0.5;
  436. }
  437. }
  438. auto serial_end = igl_pf_clock::now();
  439. auto serial_ms =
  440. std::chrono::duration_cast<std::chrono::milliseconds>(
  441. serial_end - serial_start).count();
  442. // --- Parallel: same total work, but split into many parallel_for calls ---
  443. auto parallel_start = igl_pf_clock::now();
  444. double parallel_sum = 0.0;
  445. for (int it = 0; it < iterations; ++it)
  446. {
  447. double local_sum = 0.0;
  448. // Here we use the accum-variant to test that path too.
  449. std::vector<double> buckets;
  450. const auto prep = [&](size_t nt)
  451. {
  452. buckets.assign(nt, 0.0);
  453. };
  454. const auto func = [&](int i, size_t t)
  455. {
  456. buckets[t] += data[i] * 0.5;
  457. };
  458. const auto accum = [&](size_t t)
  459. {
  460. local_sum += buckets[t];
  461. };
  462. (void)igl::parallel_for(
  463. N, prep, func, accum,
  464. /*min_parallel=*/1
  465. );
  466. parallel_sum += local_sum;
  467. }
  468. auto parallel_end = igl_pf_clock::now();
  469. auto parallel_ms =
  470. std::chrono::duration_cast<std::chrono::milliseconds>(
  471. parallel_end - parallel_start).count();
  472. INFO("timing_many_small_jobs: serial_ms = " << serial_ms);
  473. INFO("timing_many_small_jobs: parallel_ms = " << parallel_ms);
  474. INFO("timing_many_small_jobs: serial_sum = " << serial_sum);
  475. INFO("timing_many_small_jobs: parallel_sum = " << parallel_sum);
  476. // Check correctness first
  477. REQUIRE(parallel_sum == Approx(serial_sum));
  478. if (serial_ms > 0)
  479. {
  480. double ratio = (parallel_ms > 0)
  481. ? double(parallel_ms) / double(serial_ms)
  482. : 0.0;
  483. INFO("timing_many_small_jobs: parallel / serial ratio = " << ratio);
  484. // Again: super loose bound just to catch pathological regressions.
  485. CHECK(ratio < 20.0);
  486. }
  487. }
  488. #endif // IGL_PARALLEL_FOR_TIMING_TESTS