threading.cpp 6.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183
  1. // zlib open source license
  2. //
  3. // Copyright (c) 2017 to 2024 David Forsgren Piuva
  4. //
  5. // This software is provided 'as-is', without any express or implied
  6. // warranty. In no event will the authors be held liable for any damages
  7. // arising from the use of this software.
  8. //
  9. // Permission is granted to anyone to use this software for any purpose,
  10. // including commercial applications, and to alter it and redistribute it
  11. // freely, subject to the following restrictions:
  12. //
  13. // 1. The origin of this software must not be misrepresented; you must not
  14. // claim that you wrote the original software. If you use this software
  15. // in a product, an acknowledgment in the product documentation would be
  16. // appreciated but is not required.
  17. //
  18. // 2. Altered source versions must be plainly marked as such, and must not be
  19. // misrepresented as being the original software.
  20. //
  21. // 3. This notice may not be removed or altered from any source
  22. // distribution.
  23. #include "threading.h"
  24. #include "virtualStack.h"
  25. #include "../math/scalar.h"
  26. // Requires -pthread for linking
  27. #include <future>
  28. #include <thread>
  29. #include <mutex>
  30. #include <atomic>
  31. namespace dsr {
  32. // Enable this macro to disable multi-threading
  33. // If your application still crashes when using a single thread, it's probably not a concurrency problem
  34. //#define DISABLE_MULTI_THREADING
  35. int getThreadCount() {
  36. return (int)std::thread::hardware_concurrency();
  37. }
  38. void threadedWorkFromArray(std::function<void()>* jobs, int jobCount, int maxThreadCount) {
  39. #ifdef DISABLE_MULTI_THREADING
  40. // Reference implementation
  41. for (int i = 0; i < jobCount; i++) {
  42. jobs[i]();
  43. }
  44. #else
  45. if (jobCount <= 0) {
  46. return;
  47. } else if (jobCount == 1) {
  48. jobs[0]();
  49. } else {
  50. static std::recursive_mutex getTaskLock;
  51. if (maxThreadCount <= 0) {
  52. // No limit.
  53. maxThreadCount = jobCount;
  54. }
  55. // When having more than one thread, one should be reserved for fast responses.
  56. // Otherwise one thread will keep the others waiting while struggling to manage interrupts with expensive context switches.
  57. int availableThreads = max(getThreadCount() - 1, 1);
  58. int workerCount = min(availableThreads, maxThreadCount, jobCount); // All used threads
  59. int helperCount = workerCount - 1; // Excluding the main thread
  60. // Multi-threaded work loop
  61. if (workerCount == 1) {
  62. // Run on the main thread if there is only one.
  63. for (int i = 0; i < jobCount; i++) {
  64. jobs[i]();
  65. }
  66. } else {
  67. // A shared counter protected by getTaskLock.
  68. int nextJobIndex = 0;
  69. VirtualStackAllocation<std::function<void()>> workers(workerCount);
  70. VirtualStackAllocation<std::future<void>> helpers(helperCount);
  71. for (int w = 0; w < workerCount; w++) {
  72. workers[w] = [&nextJobIndex, jobs, jobCount]() {
  73. while (true) {
  74. getTaskLock.lock();
  75. int taskIndex = nextJobIndex;
  76. nextJobIndex++;
  77. getTaskLock.unlock();
  78. if (taskIndex < jobCount) {
  79. jobs[taskIndex]();
  80. } else {
  81. break;
  82. }
  83. }
  84. };
  85. }
  86. // Start working in the helper threads
  87. for (int h = 0; h < helperCount; h++) {
  88. helpers[h] = std::async(std::launch::async, workers[h]);
  89. }
  90. // Perform the same work on the main thread
  91. workers[workerCount - 1]();
  92. // Wait for all helpers to complete their work once all tasks have been handed out
  93. for (int h = 0; h < helperCount; h++) {
  94. if (helpers[h].valid()) {
  95. helpers[h].wait();
  96. }
  97. }
  98. }
  99. }
  100. #endif
  101. }
  102. void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount, int maxThreadCount) {
  103. threadedWorkFromArray(jobs.getUnsafe(), jobCount, maxThreadCount);
  104. }
  105. void threadedWorkFromList(List<std::function<void()>> jobs, int maxThreadCount) {
  106. threadedWorkFromArray(&jobs[0], jobs.length(), maxThreadCount);
  107. jobs.clear();
  108. }
  109. void threadedSplit(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task, int minimumJobSize, int jobsPerThread) {
  110. int totalCount = stopIndex - startIndex;
  111. int maxJobs = totalCount / minimumJobSize;
  112. int jobCount = std::thread::hardware_concurrency() * jobsPerThread;
  113. if (jobCount > maxJobs) { jobCount = maxJobs; }
  114. if (jobCount < 1) { jobCount = 1; }
  115. if (jobCount == 1) {
  116. // Too little work for multi-threading
  117. task(startIndex, stopIndex);
  118. } else {
  119. // Use multiple threads
  120. VirtualStackAllocation<std::function<void()>> jobs(jobCount);
  121. int givenRow = startIndex;
  122. for (int s = 0; s < jobCount; s++) {
  123. int remainingJobs = jobCount - s;
  124. int remainingRows = stopIndex - givenRow;
  125. int y1 = givenRow; // Inclusive
  126. int taskSize = remainingRows / remainingJobs;
  127. givenRow = givenRow + taskSize;
  128. int y2 = givenRow; // Exclusive
  129. jobs[s] = [task, y1, y2]() {
  130. task(y1, y2);
  131. };
  132. }
  133. threadedWorkFromArray(jobs, jobCount);
  134. }
  135. }
  136. void threadedSplit_disabled(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task) {
  137. task(startIndex, stopIndex);
  138. }
  139. void threadedSplit(const IRect& bound, std::function<void(const IRect& bound)> task, int minimumRowsPerJob, int jobsPerThread) {
  140. int maxJobs = bound.height() / minimumRowsPerJob;
  141. int jobCount = std::thread::hardware_concurrency() * jobsPerThread;
  142. if (jobCount > maxJobs) { jobCount = maxJobs; }
  143. if (jobCount < 1) { jobCount = 1; }
  144. if (jobCount == 1) {
  145. // Too little work for multi-threading
  146. task(bound);
  147. } else {
  148. // Use multiple threads
  149. VirtualStackAllocation<std::function<void()>> jobs(jobCount);
  150. int givenRow = bound.top();
  151. for (int s = 0; s < jobCount; s++) {
  152. int remainingJobs = jobCount - s;
  153. int remainingRows = bound.bottom() - givenRow;
  154. int y1 = givenRow;
  155. int taskSize = remainingRows / remainingJobs;
  156. givenRow = givenRow + taskSize;
  157. IRect subBound = IRect(bound.left(), y1, bound.width(), taskSize);
  158. jobs[s] = [task, subBound]() {
  159. task(subBound);
  160. };
  161. }
  162. threadedWorkFromArray(jobs, jobCount);
  163. }
  164. }
  165. void threadedSplit_disabled(const IRect& bound, std::function<void(const IRect& bound)> task) {
  166. task(bound);
  167. }
  168. }