parallel_for.cpp 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666
  1. #include <test_common.h>
  2. #include <igl/parallel_for.h>
  3. #include <atomic>
  4. #include <vector>
  5. #include <numeric>
  6. #include <thread>
  7. #include <set>
  8. TEST_CASE("parallel_for: serial_fallback", "[igl][parallel_for]")
  9. {
  10. // loop_size < min_parallel ⇒ must run serial
  11. std::vector<int> vals(10, 0);
  12. bool used_parallel = igl::parallel_for(
  13. (int)vals.size(),
  14. [&](int i){ vals[i] = 1; },
  15. /*min_parallel=*/1000
  16. );
  17. REQUIRE(used_parallel == false);
  18. for (int v : vals)
  19. REQUIRE(v == 1);
  20. }
  21. TEST_CASE("parallel_for: basic_parallelism", "[igl][parallel_for]")
  22. {
  23. if(igl::default_num_threads() <= 1) { SUCCEED("Only one hardware thread; nested parallel test skipped."); return; }
  24. const int N = 20000;
  25. std::vector<int> hit(N, 0);
  26. std::atomic<int> counter(0);
  27. bool used_parallel = igl::parallel_for(
  28. N,
  29. [&](int i)
  30. {
  31. hit[i] = 1;
  32. counter.fetch_add(1, std::memory_order_relaxed);
  33. },
  34. /*min_parallel=*/1
  35. );
  36. REQUIRE(used_parallel == true);
  37. REQUIRE(counter.load() == N);
  38. for (int v : hit)
  39. REQUIRE(v == 1);
  40. }
  41. TEST_CASE("parallel_for: accumulation", "[igl][parallel_for]")
  42. {
  43. const int N = 10000;
  44. // Per-thread buckets
  45. std::vector<double> buckets;
  46. const auto prep = [&](size_t nt)
  47. {
  48. buckets.assign(nt, 0.0);
  49. };
  50. const auto func = [&](int /*i*/, size_t t)
  51. {
  52. buckets[t] += 1.0; // increment per-thread
  53. };
  54. double total = 0.0;
  55. const auto accum = [&](size_t t)
  56. {
  57. total += buckets[t];
  58. };
  59. bool used_parallel = igl::parallel_for(N, prep, func, accum, 1);
  60. if(igl::default_num_threads() > 1)
  61. {
  62. REQUIRE(used_parallel == true);
  63. }
  64. REQUIRE(total == Approx((double)N));
  65. }
  66. TEST_CASE("parallel_for: equivalence_to_serial", "[igl][parallel_for]")
  67. {
  68. const int N = 15000;
  69. // serial result
  70. std::vector<int> S(N);
  71. for (int i = 0; i < N; ++i) S[i] = i*i;
  72. // parallel result
  73. std::vector<int> P(N);
  74. igl::parallel_for(
  75. N,
  76. [&](int i){ P[i] = i*i; },
  77. /*min_parallel=*/1
  78. );
  79. for (int i = 0; i < N; ++i)
  80. REQUIRE(P[i] == S[i]);
  81. }
  82. TEST_CASE("parallel_for: min_parallel_threshold", "[igl][parallel_for]")
  83. {
  84. if(igl::default_num_threads() <= 1) { SUCCEED("Only one hardware thread; nested parallel test skipped."); return; }
  85. const int N = 500;
  86. std::vector<int> A(N,0), B(N,0);
  87. bool p1 = igl::parallel_for(
  88. N, [&](int i){ A[i] = i; },
  89. /*min_parallel=*/1000 // too large → serial
  90. );
  91. bool p2 = igl::parallel_for(
  92. N, [&](int i){ B[i] = i; },
  93. /*min_parallel=*/1 // small → parallel
  94. );
  95. REQUIRE(p1 == false);
  96. REQUIRE(p2 == true);
  97. REQUIRE(A == B);
  98. }
  99. TEST_CASE("parallel_for: nested_calls", "[igl][parallel_for]")
  100. {
  101. const int N = 2000;
  102. std::vector<int> out(N, 0);
  103. bool used_parallel = igl::parallel_for(
  104. N,
  105. [&](int i)
  106. {
  107. // a tiny nested parallel_for
  108. igl::parallel_for(
  109. 10,
  110. [&](int j){ (void)j; }, 1
  111. );
  112. out[i] = 1;
  113. },
  114. /*min_parallel=*/1
  115. );
  116. if(igl::default_num_threads() > 1)
  117. {
  118. REQUIRE(used_parallel == true);
  119. }
  120. for (int v : out)
  121. REQUIRE(v == 1);
  122. }
  123. // -----------------------------------------------------------------------------
  124. // Additional tests
  125. // -----------------------------------------------------------------------------
  126. TEST_CASE("parallel_for: zero_iterations_does_nothing", "[igl][parallel_for]")
  127. {
  128. std::atomic<int> prep_calls(0);
  129. std::atomic<int> func_calls(0);
  130. std::atomic<int> accum_calls(0);
  131. const auto prep = [&](size_t /*nt*/)
  132. {
  133. prep_calls.fetch_add(1, std::memory_order_relaxed);
  134. };
  135. const auto func = [&](int /*i*/, size_t /*t*/)
  136. {
  137. func_calls.fetch_add(1, std::memory_order_relaxed);
  138. };
  139. const auto accum = [&](size_t /*t*/)
  140. {
  141. accum_calls.fetch_add(1, std::memory_order_relaxed);
  142. };
  143. bool used_parallel = igl::parallel_for(
  144. 0,
  145. prep,
  146. func,
  147. accum,
  148. /*min_parallel=*/1
  149. );
  150. REQUIRE(used_parallel == false);
  151. REQUIRE(prep_calls.load() == 0);
  152. REQUIRE(func_calls.load() == 0);
  153. REQUIRE(accum_calls.load() == 0);
  154. }
  155. TEST_CASE("parallel_for: min_parallel_equal_threshold", "[igl][parallel_for]")
  156. {
  157. if(igl::default_num_threads() <= 1) { SUCCEED("Only one hardware thread; nested parallel test skipped."); return; }
  158. const int N = 1024;
  159. std::vector<int> A(N,0), B(N,0);
  160. // spec: serial if loop_size < min_parallel, so == should be parallel
  161. bool serial_used = igl::parallel_for(
  162. N, [&](int i){ A[i] = i; },
  163. /*min_parallel=*/N+1
  164. );
  165. bool parallel_used = igl::parallel_for(
  166. N, [&](int i){ B[i] = i; },
  167. /*min_parallel=*/N
  168. );
  169. REQUIRE(serial_used == false);
  170. REQUIRE(parallel_used == true);
  171. REQUIRE(A == B);
  172. }
  173. TEST_CASE("parallel_for: thread_id_range_and_accum_calls", "[igl][parallel_for]")
  174. {
  175. const int N = 10000;
  176. std::vector<long long> bucket_counts;
  177. std::atomic<size_t> prep_nt(0);
  178. std::atomic<size_t> max_t_seen(0);
  179. std::atomic<size_t> accum_calls(0);
  180. const auto prep = [&](size_t nt)
  181. {
  182. prep_nt.store(nt, std::memory_order_relaxed);
  183. bucket_counts.assign(nt, 0); // plain ints, no atomics needed
  184. };
  185. const auto func = [&](int /*i*/, size_t t)
  186. {
  187. // track max t across threads
  188. size_t cur = max_t_seen.load(std::memory_order_relaxed);
  189. while (t > cur && !max_t_seen.compare_exchange_weak(
  190. cur, t,
  191. std::memory_order_relaxed,
  192. std::memory_order_relaxed))
  193. {
  194. // spin until we either win or see a newer/bigger cur
  195. }
  196. // Each logical t corresponds to a single job in this implementation,
  197. // so all calls with the same t are executed on the same worker thread,
  198. // sequentially. No race here; we can use a plain increment.
  199. bucket_counts[t] += 1;
  200. };
  201. const auto accum = [&](size_t /*t*/)
  202. {
  203. accum_calls.fetch_add(1, std::memory_order_relaxed);
  204. };
  205. bool used_parallel = igl::parallel_for(
  206. N, prep, func, accum,
  207. /*min_parallel=*/1
  208. );
  209. const size_t nt = prep_nt.load();
  210. REQUIRE(nt >= 1);
  211. // t must always be < nt
  212. REQUIRE(max_t_seen.load() < nt);
  213. // accum must be called once per potential thread
  214. REQUIRE(accum_calls.load() == nt);
  215. // Sanity: total counted iterations == N
  216. long long total = 0;
  217. for (size_t t = 0; t < nt; ++t)
  218. total += bucket_counts[t];
  219. REQUIRE(total == N);
  220. }
  221. TEST_CASE("parallel_for: nested_inner_serial_fallback", "[igl][parallel_for]")
  222. {
  223. const int N = 1000;
  224. std::vector<int> outer_hits(N, 0);
  225. std::atomic<bool> inner_parallel_seen(false);
  226. bool outer_parallel = igl::parallel_for(
  227. N,
  228. [&](int i)
  229. {
  230. bool inner_used_parallel = igl::parallel_for(
  231. 10,
  232. [&](int j){ (void)j; },
  233. /*min_parallel=*/1
  234. );
  235. if (inner_used_parallel)
  236. {
  237. inner_parallel_seen.store(true, std::memory_order_relaxed);
  238. }
  239. outer_hits[i] = 1;
  240. },
  241. /*min_parallel=*/1
  242. );
  243. if(igl::default_num_threads() > 1)
  244. {
  245. REQUIRE(outer_parallel == true);
  246. }
  247. for (int v : outer_hits)
  248. REQUIRE(v == 1);
  249. // With the is_worker_thread() guard in the implementation,
  250. // inner calls from pool workers should always be serial.
  251. REQUIRE(inner_parallel_seen.load() == false);
  252. }
  253. TEST_CASE("parallel_for: deep_nested_calls", "[igl][parallel_for]")
  254. {
  255. const int N = 256;
  256. std::vector<int> hits(N, 0);
  257. bool outer_parallel = igl::parallel_for(
  258. N,
  259. [&](int i)
  260. {
  261. igl::parallel_for(
  262. 8,
  263. [&](int j)
  264. {
  265. // third level
  266. igl::parallel_for(
  267. 4,
  268. [&](int k){ (void)k; },
  269. 1
  270. );
  271. (void)j;
  272. },
  273. 1
  274. );
  275. hits[i] = 1;
  276. },
  277. /*min_parallel=*/1
  278. );
  279. if(igl::default_num_threads() > 1)
  280. {
  281. REQUIRE(outer_parallel == true);
  282. }
  283. for (int v : hits)
  284. REQUIRE(v == 1);
  285. }
  286. TEST_CASE("parallel_for: many_small_jobs_reuse_pool", "[igl][parallel_for]")
  287. {
  288. if(igl::default_num_threads() <= 1) { SUCCEED("Only one hardware thread; nested parallel test skipped."); return; }
  289. const int iterations = 200;
  290. const int N = 64;
  291. std::vector<int> buf(N);
  292. for (int it = 0; it < iterations; ++it)
  293. {
  294. std::fill(buf.begin(), buf.end(), 0);
  295. bool used_parallel = igl::parallel_for(
  296. N,
  297. [&](int i){ buf[i] = it; },
  298. /*min_parallel=*/1
  299. );
  300. if(igl::default_num_threads() > 1)
  301. {
  302. REQUIRE(used_parallel == true);
  303. }
  304. for (int i = 0; i < N; ++i)
  305. REQUIRE(buf[i] == it);
  306. }
  307. }
  308. TEST_CASE("parallel_for: different_index_types", "[igl][parallel_for]")
  309. {
  310. if(igl::default_num_threads() <= 1) { SUCCEED("Only one hardware thread; nested parallel test skipped."); return; }
  311. const long long N = 12345;
  312. std::vector<int> buf((size_t)N, 0);
  313. bool used_parallel = igl::parallel_for(
  314. N,
  315. [&](long long i)
  316. {
  317. buf[(size_t)i] = 1;
  318. },
  319. /*min_parallel=*/1
  320. );
  321. REQUIRE(used_parallel == true);
  322. for (int v : buf)
  323. REQUIRE(v == 1);
  324. }
  325. TEST_CASE("parallel_for: accumulation_equivalence_to_serial_sum", "[igl][parallel_for]")
  326. {
  327. const int N = 10000;
  328. // serial sum
  329. long long serial_sum = 0;
  330. for (int i = 0; i < N; ++i)
  331. {
  332. serial_sum += i;
  333. }
  334. // parallel sum: S[t] accumulates partial sums, then accum collects.
  335. std::vector<long long> S;
  336. long long parallel_sum = 0;
  337. const auto prep = [&](size_t nt)
  338. {
  339. S.assign(nt, 0);
  340. };
  341. const auto func = [&](int i, size_t t)
  342. {
  343. S[t] += i;
  344. };
  345. const auto accum = [&](size_t t)
  346. {
  347. parallel_sum += S[t];
  348. };
  349. bool used_parallel = igl::parallel_for(
  350. N, prep, func, accum,
  351. /*min_parallel=*/1
  352. );
  353. if(igl::default_num_threads() > 1)
  354. {
  355. REQUIRE(used_parallel == true);
  356. }
  357. REQUIRE(parallel_sum == serial_sum);
  358. }
  359. #ifdef IGL_PARALLEL_FOR_FORCE_SERIAL
  360. TEST_CASE("parallel_for: force_serial_macro", "[igl][parallel_for]")
  361. {
  362. // If compiled with IGL_PARALLEL_FOR_FORCE_SERIAL, we must never see parallel.
  363. const int N = 1000;
  364. std::vector<int> buf(N, 0);
  365. bool used_parallel = igl::parallel_for(
  366. N,
  367. [&](int i){ buf[i] = i; },
  368. /*min_parallel=*/1
  369. );
  370. REQUIRE(used_parallel == false);
  371. for (int i = 0; i < N; ++i)
  372. REQUIRE(buf[i] == i);
  373. }
  374. #endif
  375. //#define IGL_PARALLEL_FOR_TIMING_TESTS
  376. #ifdef IGL_PARALLEL_FOR_TIMING_TESTS
  377. #include <chrono>
  378. #include <cmath>
  379. // Helper alias
  380. using igl_pf_clock = std::chrono::steady_clock;
  381. TEST_CASE("parallel_for: timing_large_loop", "[igl][parallel_for][timing]")
  382. {
  383. if(igl::default_num_threads() <= 1) { SUCCEED("Only one hardware thread; nested parallel test skipped."); return; }
  384. const int N = 5'000'000;
  385. std::vector<double> a(N), b(N);
  386. for (int i = 0; i < N; ++i)
  387. {
  388. a[i] = 0.5 * i;
  389. }
  390. // --- Serial baseline ---
  391. auto serial_start = igl_pf_clock::now();
  392. for (int i = 0; i < N; ++i)
  393. {
  394. // mildly non-trivial work to avoid being optimized away
  395. b[i] = std::sqrt(a[i] * a[i] + 1.0);
  396. }
  397. auto serial_end = igl_pf_clock::now();
  398. auto serial_ms =
  399. std::chrono::duration_cast<std::chrono::milliseconds>(
  400. serial_end - serial_start).count();
  401. // --- Parallel version ---
  402. std::fill(b.begin(), b.end(), 0.0);
  403. auto parallel_start = igl_pf_clock::now();
  404. bool used_parallel = igl::parallel_for(
  405. N,
  406. [&](int i)
  407. {
  408. b[i] = std::sqrt(a[i] * a[i] + 1.0);
  409. },
  410. /*min_parallel=*/1
  411. );
  412. auto parallel_end = igl_pf_clock::now();
  413. auto parallel_ms =
  414. std::chrono::duration_cast<std::chrono::milliseconds>(
  415. parallel_end - parallel_start).count();
  416. INFO("timing_large_loop: serial_ms = " << serial_ms);
  417. INFO("timing_large_loop: parallel_ms = " << parallel_ms);
  418. INFO("timing_large_loop: used_parallel = " << used_parallel);
  419. // Sanity: results should match a re-run of serial
  420. std::vector<double> c(N);
  421. for (int i = 0; i < N; ++i)
  422. {
  423. c[i] = std::sqrt(a[i] * a[i] + 1.0);
  424. }
  425. for (int i = 0; i < N; ++i)
  426. {
  427. REQUIRE(b[i] == Approx(c[i]));
  428. }
  429. // Very soft performance assertion:
  430. // If we actually ran in parallel and the serial baseline took > 0 ms,
  431. // then parallel should not be crazy slower (e.g., 10x).
  432. if (used_parallel && serial_ms > 0)
  433. {
  434. double ratio = (parallel_ms > 0)
  435. ? double(parallel_ms) / double(serial_ms)
  436. : 0.0;
  437. INFO("timing_large_loop: parallel / serial ratio = " << ratio);
  438. // Soft bound: allow parallel to be up to 10x slower in worst case.
  439. CHECK(ratio < 10.0);
  440. }
  441. }
  442. TEST_CASE("parallel_for: timing_many_small_jobs", "[igl][parallel_for][timing]")
  443. {
  444. if(igl::default_num_threads() <= 1) { SUCCEED("Only one hardware thread; nested parallel test skipped."); return; }
  445. // This is meant to stress the thread pool reuse behavior: many small jobs.
  446. const int iterations = 500;
  447. const int N = 1024;
  448. std::vector<double> data(N, 1.0);
  449. // --- Serial: do all work in a single loop ---
  450. auto serial_start = igl_pf_clock::now();
  451. double serial_sum = 0.0;
  452. for (int it = 0; it < iterations; ++it)
  453. {
  454. for (int i = 0; i < N; ++i)
  455. {
  456. serial_sum += data[i] * 0.5;
  457. }
  458. }
  459. auto serial_end = igl_pf_clock::now();
  460. auto serial_ms =
  461. std::chrono::duration_cast<std::chrono::milliseconds>(
  462. serial_end - serial_start).count();
  463. // --- Parallel: same total work, but split into many parallel_for calls ---
  464. auto parallel_start = igl_pf_clock::now();
  465. double parallel_sum = 0.0;
  466. for (int it = 0; it < iterations; ++it)
  467. {
  468. double local_sum = 0.0;
  469. // Here we use the accum-variant to test that path too.
  470. std::vector<double> buckets;
  471. const auto prep = [&](size_t nt)
  472. {
  473. buckets.assign(nt, 0.0);
  474. };
  475. const auto func = [&](int i, size_t t)
  476. {
  477. buckets[t] += data[i] * 0.5;
  478. };
  479. const auto accum = [&](size_t t)
  480. {
  481. local_sum += buckets[t];
  482. };
  483. (void)igl::parallel_for(
  484. N, prep, func, accum,
  485. /*min_parallel=*/1
  486. );
  487. parallel_sum += local_sum;
  488. }
  489. auto parallel_end = igl_pf_clock::now();
  490. auto parallel_ms =
  491. std::chrono::duration_cast<std::chrono::milliseconds>(
  492. parallel_end - parallel_start).count();
  493. INFO("timing_many_small_jobs: serial_ms = " << serial_ms);
  494. INFO("timing_many_small_jobs: parallel_ms = " << parallel_ms);
  495. INFO("timing_many_small_jobs: serial_sum = " << serial_sum);
  496. INFO("timing_many_small_jobs: parallel_sum = " << parallel_sum);
  497. // Check correctness first
  498. REQUIRE(parallel_sum == Approx(serial_sum));
  499. if (serial_ms > 0)
  500. {
  501. double ratio = (parallel_ms > 0)
  502. ? double(parallel_ms) / double(serial_ms)
  503. : 0.0;
  504. INFO("timing_many_small_jobs: parallel / serial ratio = " << ratio);
  505. // Again: super loose bound just to catch pathological regressions.
  506. CHECK(ratio < 20.0);
  507. }
  508. }
  509. TEST_CASE("parallel_for: nested_serial_fallback", "[igl][parallel_for]")
  510. {
  511. const int outer_loop_size = 4;
  512. const int inner_loop_size = 4;
  513. std::atomic<bool> any_inner_parallel(false);
  514. std::atomic<int> counter(0);
  515. // Outer parallel_for should use multiple threads.
  516. bool outer_used_parallel = igl::parallel_for(
  517. outer_loop_size,
  518. [&](int /*i*/)
  519. {
  520. bool inner_used_parallel = igl::parallel_for(
  521. inner_loop_size,
  522. [&](int /*j*/)
  523. {
  524. // Just do some work so we know the inner loop ran.
  525. counter.fetch_add(1, std::memory_order_relaxed);
  526. }
  527. );
  528. if(inner_used_parallel)
  529. {
  530. any_inner_parallel.store(true, std::memory_order_relaxed);
  531. }
  532. }
  533. );
  534. // Sanity: outer loop should be parallel when threads > 1.
  535. if(igl::default_num_threads() > 1)
  536. {
  537. REQUIRE(outer_used_parallel == true);
  538. }
  539. // Sanity: all iterations of both loops ran.
  540. REQUIRE(counter.load(std::memory_order_relaxed)
  541. == outer_loop_size * inner_loop_size);
  542. // The key assertion: inner parallel_for must fall back to serial when nested.
  543. REQUIRE(any_inner_parallel.load(std::memory_order_relaxed) == false);
  544. }
  545. #endif // IGL_PARALLEL_FOR_TIMING_TESTS