parallel_for.cpp 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647
  1. #include <test_common.h>
  2. #include <igl/parallel_for.h>
  3. #include <atomic>
  4. #include <vector>
  5. #include <numeric>
  6. #include <thread>
  7. #include <set>
  8. TEST_CASE("parallel_for: serial_fallback", "[igl][parallel_for]")
  9. {
  10. // loop_size < min_parallel ⇒ must run serial
  11. std::vector<int> vals(10, 0);
  12. bool used_parallel = igl::parallel_for(
  13. (int)vals.size(),
  14. [&](int i){ vals[i] = 1; },
  15. /*min_parallel=*/1000
  16. );
  17. REQUIRE(used_parallel == false);
  18. for (int v : vals)
  19. REQUIRE(v == 1);
  20. }
  21. TEST_CASE("parallel_for: basic_parallelism", "[igl][parallel_for]")
  22. {
  23. const int N = 20000;
  24. std::vector<int> hit(N, 0);
  25. std::atomic<int> counter(0);
  26. bool used_parallel = igl::parallel_for(
  27. N,
  28. [&](int i)
  29. {
  30. hit[i] = 1;
  31. counter.fetch_add(1, std::memory_order_relaxed);
  32. },
  33. /*min_parallel=*/1
  34. );
  35. REQUIRE(used_parallel == true);
  36. REQUIRE(counter.load() == N);
  37. for (int v : hit)
  38. REQUIRE(v == 1);
  39. }
  40. TEST_CASE("parallel_for: accumulation", "[igl][parallel_for]")
  41. {
  42. const int N = 10000;
  43. // Per-thread buckets
  44. std::vector<double> buckets;
  45. const auto prep = [&](size_t nt)
  46. {
  47. buckets.assign(nt, 0.0);
  48. };
  49. const auto func = [&](int /*i*/, size_t t)
  50. {
  51. buckets[t] += 1.0; // increment per-thread
  52. };
  53. double total = 0.0;
  54. const auto accum = [&](size_t t)
  55. {
  56. total += buckets[t];
  57. };
  58. bool used_parallel = igl::parallel_for(N, prep, func, accum, 1);
  59. REQUIRE(used_parallel == true);
  60. REQUIRE(total == Approx((double)N));
  61. }
  62. TEST_CASE("parallel_for: equivalence_to_serial", "[igl][parallel_for]")
  63. {
  64. const int N = 15000;
  65. // serial result
  66. std::vector<int> S(N);
  67. for (int i = 0; i < N; ++i) S[i] = i*i;
  68. // parallel result
  69. std::vector<int> P(N);
  70. igl::parallel_for(
  71. N,
  72. [&](int i){ P[i] = i*i; },
  73. /*min_parallel=*/1
  74. );
  75. for (int i = 0; i < N; ++i)
  76. REQUIRE(P[i] == S[i]);
  77. }
  78. TEST_CASE("parallel_for: min_parallel_threshold", "[igl][parallel_for]")
  79. {
  80. const int N = 500;
  81. std::vector<int> A(N,0), B(N,0);
  82. bool p1 = igl::parallel_for(
  83. N, [&](int i){ A[i] = i; },
  84. /*min_parallel=*/1000 // too large → serial
  85. );
  86. bool p2 = igl::parallel_for(
  87. N, [&](int i){ B[i] = i; },
  88. /*min_parallel=*/1 // small → parallel
  89. );
  90. REQUIRE(p1 == false);
  91. REQUIRE(p2 == true);
  92. REQUIRE(A == B);
  93. }
  94. TEST_CASE("parallel_for: nested_calls", "[igl][parallel_for]")
  95. {
  96. const int N = 2000;
  97. std::vector<int> out(N, 0);
  98. bool ok = igl::parallel_for(
  99. N,
  100. [&](int i)
  101. {
  102. // a tiny nested parallel_for
  103. igl::parallel_for(
  104. 10,
  105. [&](int j){ (void)j; }, 1
  106. );
  107. out[i] = 1;
  108. },
  109. /*min_parallel=*/1
  110. );
  111. REQUIRE(ok == true);
  112. for (int v : out)
  113. REQUIRE(v == 1);
  114. }
  115. // -----------------------------------------------------------------------------
  116. // Additional tests
  117. // -----------------------------------------------------------------------------
  118. TEST_CASE("parallel_for: zero_iterations_does_nothing", "[igl][parallel_for]")
  119. {
  120. std::atomic<int> prep_calls(0);
  121. std::atomic<int> func_calls(0);
  122. std::atomic<int> accum_calls(0);
  123. const auto prep = [&](size_t /*nt*/)
  124. {
  125. prep_calls.fetch_add(1, std::memory_order_relaxed);
  126. };
  127. const auto func = [&](int /*i*/, size_t /*t*/)
  128. {
  129. func_calls.fetch_add(1, std::memory_order_relaxed);
  130. };
  131. const auto accum = [&](size_t /*t*/)
  132. {
  133. accum_calls.fetch_add(1, std::memory_order_relaxed);
  134. };
  135. bool used_parallel = igl::parallel_for(
  136. 0,
  137. prep,
  138. func,
  139. accum,
  140. /*min_parallel=*/1
  141. );
  142. REQUIRE(used_parallel == false);
  143. REQUIRE(prep_calls.load() == 0);
  144. REQUIRE(func_calls.load() == 0);
  145. REQUIRE(accum_calls.load() == 0);
  146. }
  147. TEST_CASE("parallel_for: min_parallel_equal_threshold", "[igl][parallel_for]")
  148. {
  149. const int N = 1024;
  150. std::vector<int> A(N,0), B(N,0);
  151. // spec: serial if loop_size < min_parallel, so == should be parallel
  152. bool serial_used = igl::parallel_for(
  153. N, [&](int i){ A[i] = i; },
  154. /*min_parallel=*/N+1
  155. );
  156. bool parallel_used = igl::parallel_for(
  157. N, [&](int i){ B[i] = i; },
  158. /*min_parallel=*/N
  159. );
  160. REQUIRE(serial_used == false);
  161. REQUIRE(parallel_used == true);
  162. REQUIRE(A == B);
  163. }
  164. TEST_CASE("parallel_for: thread_id_range_and_accum_calls", "[igl][parallel_for]")
  165. {
  166. const int N = 10000;
  167. std::vector<long long> bucket_counts;
  168. std::atomic<size_t> prep_nt(0);
  169. std::atomic<size_t> max_t_seen(0);
  170. std::atomic<size_t> accum_calls(0);
  171. const auto prep = [&](size_t nt)
  172. {
  173. prep_nt.store(nt, std::memory_order_relaxed);
  174. bucket_counts.assign(nt, 0); // plain ints, no atomics needed
  175. };
  176. const auto func = [&](int /*i*/, size_t t)
  177. {
  178. // track max t across threads
  179. size_t cur = max_t_seen.load(std::memory_order_relaxed);
  180. while (t > cur && !max_t_seen.compare_exchange_weak(
  181. cur, t,
  182. std::memory_order_relaxed,
  183. std::memory_order_relaxed))
  184. {
  185. // spin until we either win or see a newer/bigger cur
  186. }
  187. // Each logical t corresponds to a single job in this implementation,
  188. // so all calls with the same t are executed on the same worker thread,
  189. // sequentially. No race here; we can use a plain increment.
  190. bucket_counts[t] += 1;
  191. };
  192. const auto accum = [&](size_t /*t*/)
  193. {
  194. accum_calls.fetch_add(1, std::memory_order_relaxed);
  195. };
  196. bool used_parallel = igl::parallel_for(
  197. N, prep, func, accum,
  198. /*min_parallel=*/1
  199. );
  200. REQUIRE(used_parallel == true);
  201. const size_t nt = prep_nt.load();
  202. REQUIRE(nt >= 1);
  203. // t must always be < nt
  204. REQUIRE(max_t_seen.load() < nt);
  205. // accum must be called once per potential thread
  206. REQUIRE(accum_calls.load() == nt);
  207. // Sanity: total counted iterations == N
  208. long long total = 0;
  209. for (size_t t = 0; t < nt; ++t)
  210. total += bucket_counts[t];
  211. REQUIRE(total == N);
  212. }
  213. TEST_CASE("parallel_for: nested_inner_serial_fallback", "[igl][parallel_for]")
  214. {
  215. const int N = 1000;
  216. std::vector<int> outer_hits(N, 0);
  217. std::atomic<bool> inner_parallel_seen(false);
  218. bool outer_parallel = igl::parallel_for(
  219. N,
  220. [&](int i)
  221. {
  222. bool inner_used_parallel = igl::parallel_for(
  223. 10,
  224. [&](int j){ (void)j; },
  225. /*min_parallel=*/1
  226. );
  227. if (inner_used_parallel)
  228. {
  229. inner_parallel_seen.store(true, std::memory_order_relaxed);
  230. }
  231. outer_hits[i] = 1;
  232. },
  233. /*min_parallel=*/1
  234. );
  235. REQUIRE(outer_parallel == true);
  236. for (int v : outer_hits)
  237. REQUIRE(v == 1);
  238. // With the is_worker_thread() guard in the implementation,
  239. // inner calls from pool workers should always be serial.
  240. REQUIRE(inner_parallel_seen.load() == false);
  241. }
  242. TEST_CASE("parallel_for: deep_nested_calls", "[igl][parallel_for]")
  243. {
  244. const int N = 256;
  245. std::vector<int> hits(N, 0);
  246. bool outer_parallel = igl::parallel_for(
  247. N,
  248. [&](int i)
  249. {
  250. igl::parallel_for(
  251. 8,
  252. [&](int j)
  253. {
  254. // third level
  255. igl::parallel_for(
  256. 4,
  257. [&](int k){ (void)k; },
  258. 1
  259. );
  260. (void)j;
  261. },
  262. 1
  263. );
  264. hits[i] = 1;
  265. },
  266. /*min_parallel=*/1
  267. );
  268. REQUIRE(outer_parallel == true);
  269. for (int v : hits)
  270. REQUIRE(v == 1);
  271. }
  272. TEST_CASE("parallel_for: many_small_jobs_reuse_pool", "[igl][parallel_for]")
  273. {
  274. const int iterations = 200;
  275. const int N = 64;
  276. std::vector<int> buf(N);
  277. for (int it = 0; it < iterations; ++it)
  278. {
  279. std::fill(buf.begin(), buf.end(), 0);
  280. bool used_parallel = igl::parallel_for(
  281. N,
  282. [&](int i){ buf[i] = it; },
  283. /*min_parallel=*/1
  284. );
  285. REQUIRE(used_parallel == true);
  286. for (int i = 0; i < N; ++i)
  287. REQUIRE(buf[i] == it);
  288. }
  289. }
  290. TEST_CASE("parallel_for: different_index_types", "[igl][parallel_for]")
  291. {
  292. const long long N = 12345;
  293. std::vector<int> buf((size_t)N, 0);
  294. bool used_parallel = igl::parallel_for(
  295. N,
  296. [&](long long i)
  297. {
  298. buf[(size_t)i] = 1;
  299. },
  300. /*min_parallel=*/1
  301. );
  302. REQUIRE(used_parallel == true);
  303. for (int v : buf)
  304. REQUIRE(v == 1);
  305. }
  306. TEST_CASE("parallel_for: accumulation_equivalence_to_serial_sum", "[igl][parallel_for]")
  307. {
  308. const int N = 10000;
  309. // serial sum
  310. long long serial_sum = 0;
  311. for (int i = 0; i < N; ++i)
  312. {
  313. serial_sum += i;
  314. }
  315. // parallel sum: S[t] accumulates partial sums, then accum collects.
  316. std::vector<long long> S;
  317. long long parallel_sum = 0;
  318. const auto prep = [&](size_t nt)
  319. {
  320. S.assign(nt, 0);
  321. };
  322. const auto func = [&](int i, size_t t)
  323. {
  324. S[t] += i;
  325. };
  326. const auto accum = [&](size_t t)
  327. {
  328. parallel_sum += S[t];
  329. };
  330. bool used_parallel = igl::parallel_for(
  331. N, prep, func, accum,
  332. /*min_parallel=*/1
  333. );
  334. REQUIRE(used_parallel == true);
  335. REQUIRE(parallel_sum == serial_sum);
  336. }
  337. #ifdef IGL_PARALLEL_FOR_FORCE_SERIAL
  338. TEST_CASE("parallel_for: force_serial_macro", "[igl][parallel_for]")
  339. {
  340. // If compiled with IGL_PARALLEL_FOR_FORCE_SERIAL, we must never see parallel.
  341. const int N = 1000;
  342. std::vector<int> buf(N, 0);
  343. bool used_parallel = igl::parallel_for(
  344. N,
  345. [&](int i){ buf[i] = i; },
  346. /*min_parallel=*/1
  347. );
  348. REQUIRE(used_parallel == false);
  349. for (int i = 0; i < N; ++i)
  350. REQUIRE(buf[i] == i);
  351. }
  352. #endif
  353. //#define IGL_PARALLEL_FOR_TIMING_TESTS
  354. #ifdef IGL_PARALLEL_FOR_TIMING_TESTS
  355. #include <chrono>
  356. #include <cmath>
  357. // Helper alias
  358. using igl_pf_clock = std::chrono::steady_clock;
  359. TEST_CASE("parallel_for: timing_large_loop", "[igl][parallel_for][timing]")
  360. {
  361. const int N = 5'000'000;
  362. std::vector<double> a(N), b(N);
  363. for (int i = 0; i < N; ++i)
  364. {
  365. a[i] = 0.5 * i;
  366. }
  367. // --- Serial baseline ---
  368. auto serial_start = igl_pf_clock::now();
  369. for (int i = 0; i < N; ++i)
  370. {
  371. // mildly non-trivial work to avoid being optimized away
  372. b[i] = std::sqrt(a[i] * a[i] + 1.0);
  373. }
  374. auto serial_end = igl_pf_clock::now();
  375. auto serial_ms =
  376. std::chrono::duration_cast<std::chrono::milliseconds>(
  377. serial_end - serial_start).count();
  378. // --- Parallel version ---
  379. std::fill(b.begin(), b.end(), 0.0);
  380. auto parallel_start = igl_pf_clock::now();
  381. bool used_parallel = igl::parallel_for(
  382. N,
  383. [&](int i)
  384. {
  385. b[i] = std::sqrt(a[i] * a[i] + 1.0);
  386. },
  387. /*min_parallel=*/1
  388. );
  389. auto parallel_end = igl_pf_clock::now();
  390. auto parallel_ms =
  391. std::chrono::duration_cast<std::chrono::milliseconds>(
  392. parallel_end - parallel_start).count();
  393. INFO("timing_large_loop: serial_ms = " << serial_ms);
  394. INFO("timing_large_loop: parallel_ms = " << parallel_ms);
  395. INFO("timing_large_loop: used_parallel = " << used_parallel);
  396. // Sanity: results should match a re-run of serial
  397. std::vector<double> c(N);
  398. for (int i = 0; i < N; ++i)
  399. {
  400. c[i] = std::sqrt(a[i] * a[i] + 1.0);
  401. }
  402. for (int i = 0; i < N; ++i)
  403. {
  404. REQUIRE(b[i] == Approx(c[i]));
  405. }
  406. // Very soft performance assertion:
  407. // If we actually ran in parallel and the serial baseline took > 0 ms,
  408. // then parallel should not be crazy slower (e.g., 10x).
  409. if (used_parallel && serial_ms > 0)
  410. {
  411. double ratio = (parallel_ms > 0)
  412. ? double(parallel_ms) / double(serial_ms)
  413. : 0.0;
  414. INFO("timing_large_loop: parallel / serial ratio = " << ratio);
  415. // Soft bound: allow parallel to be up to 10x slower in worst case.
  416. CHECK(ratio < 10.0);
  417. }
  418. }
  419. TEST_CASE("parallel_for: timing_many_small_jobs", "[igl][parallel_for][timing]")
  420. {
  421. // This is meant to stress the thread pool reuse behavior: many small jobs.
  422. const int iterations = 500;
  423. const int N = 1024;
  424. std::vector<double> data(N, 1.0);
  425. // --- Serial: do all work in a single loop ---
  426. auto serial_start = igl_pf_clock::now();
  427. double serial_sum = 0.0;
  428. for (int it = 0; it < iterations; ++it)
  429. {
  430. for (int i = 0; i < N; ++i)
  431. {
  432. serial_sum += data[i] * 0.5;
  433. }
  434. }
  435. auto serial_end = igl_pf_clock::now();
  436. auto serial_ms =
  437. std::chrono::duration_cast<std::chrono::milliseconds>(
  438. serial_end - serial_start).count();
  439. // --- Parallel: same total work, but split into many parallel_for calls ---
  440. auto parallel_start = igl_pf_clock::now();
  441. double parallel_sum = 0.0;
  442. for (int it = 0; it < iterations; ++it)
  443. {
  444. double local_sum = 0.0;
  445. // Here we use the accum-variant to test that path too.
  446. std::vector<double> buckets;
  447. const auto prep = [&](size_t nt)
  448. {
  449. buckets.assign(nt, 0.0);
  450. };
  451. const auto func = [&](int i, size_t t)
  452. {
  453. buckets[t] += data[i] * 0.5;
  454. };
  455. const auto accum = [&](size_t t)
  456. {
  457. local_sum += buckets[t];
  458. };
  459. (void)igl::parallel_for(
  460. N, prep, func, accum,
  461. /*min_parallel=*/1
  462. );
  463. parallel_sum += local_sum;
  464. }
  465. auto parallel_end = igl_pf_clock::now();
  466. auto parallel_ms =
  467. std::chrono::duration_cast<std::chrono::milliseconds>(
  468. parallel_end - parallel_start).count();
  469. INFO("timing_many_small_jobs: serial_ms = " << serial_ms);
  470. INFO("timing_many_small_jobs: parallel_ms = " << parallel_ms);
  471. INFO("timing_many_small_jobs: serial_sum = " << serial_sum);
  472. INFO("timing_many_small_jobs: parallel_sum = " << parallel_sum);
  473. // Check correctness first
  474. REQUIRE(parallel_sum == Approx(serial_sum));
  475. if (serial_ms > 0)
  476. {
  477. double ratio = (parallel_ms > 0)
  478. ? double(parallel_ms) / double(serial_ms)
  479. : 0.0;
  480. INFO("timing_many_small_jobs: parallel / serial ratio = " << ratio);
  481. // Again: super loose bound just to catch pathological regressions.
  482. CHECK(ratio < 20.0);
  483. }
  484. }
  485. TEST_CASE("parallel_for: nested_serial_fallback", "[igl][parallel_for]")
  486. {
  487. // If there is only a single hardware thread available, this test
  488. // can't meaningfully distinguish nested/parallel behavior.
  489. if(igl::default_num_threads() <= 1)
  490. {
  491. SUCCEED("Only one hardware thread; nested parallel test skipped.");
  492. return;
  493. }
  494. const int outer_loop_size = 4;
  495. const int inner_loop_size = 4;
  496. std::atomic<bool> any_inner_parallel(false);
  497. std::atomic<int> counter(0);
  498. // Outer parallel_for should use multiple threads.
  499. bool outer_used_parallel = igl::parallel_for(
  500. outer_loop_size,
  501. [&](int /*i*/)
  502. {
  503. bool inner_used_parallel = igl::parallel_for(
  504. inner_loop_size,
  505. [&](int /*j*/)
  506. {
  507. // Just do some work so we know the inner loop ran.
  508. counter.fetch_add(1, std::memory_order_relaxed);
  509. }
  510. );
  511. if(inner_used_parallel)
  512. {
  513. any_inner_parallel.store(true, std::memory_order_relaxed);
  514. }
  515. }
  516. );
  517. // Sanity: outer loop should be parallel when threads > 1.
  518. REQUIRE(outer_used_parallel == true);
  519. // Sanity: all iterations of both loops ran.
  520. REQUIRE(counter.load(std::memory_order_relaxed)
  521. == outer_loop_size * inner_loop_size);
  522. // The key assertion: inner parallel_for must fall back to serial when nested.
  523. REQUIRE(any_inner_parallel.load(std::memory_order_relaxed) == false);
  524. }
  525. #endif // IGL_PARALLEL_FOR_TIMING_TESTS