threading.cpp 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263
  1. // zlib open source license
  2. //
  3. // Copyright (c) 2017 to 2024 David Forsgren Piuva
  4. //
  5. // This software is provided 'as-is', without any express or implied
  6. // warranty. In no event will the authors be held liable for any damages
  7. // arising from the use of this software.
  8. //
  9. // Permission is granted to anyone to use this software for any purpose,
  10. // including commercial applications, and to alter it and redistribute it
  11. // freely, subject to the following restrictions:
  12. //
  13. // 1. The origin of this software must not be misrepresented; you must not
  14. // claim that you wrote the original software. If you use this software
  15. // in a product, an acknowledgment in the product documentation would be
  16. // appreciated but is not required.
  17. //
  18. // 2. Altered source versions must be plainly marked as such, and must not be
  19. // misrepresented as being the original software.
  20. //
  21. // 3. This notice may not be removed or altered from any source
  22. // distribution.
  23. #include "threading.h"
  24. #include "virtualStack.h"
  25. #include "../implementation/math/scalar.h"
  26. // Get settings from here.
  27. #include "../settings.h"
  28. #ifndef DISABLE_MULTI_THREADING
  29. // Requires -pthread for linking
  30. #include <thread>
  31. #include <mutex>
  32. #include <future>
  33. #endif
  34. namespace dsr {
  35. #ifndef DISABLE_MULTI_THREADING
  36. static std::mutex getTaskLock;
  37. #endif
  38. int getThreadCount() {
  39. #ifndef DISABLE_MULTI_THREADING
  40. return (int)std::thread::hardware_concurrency();
  41. #else
  42. return 1;
  43. #endif
  44. }
  45. void threadedWorkByIndex(std::function<void(void *context, int jobIndex)> job, void *context, int jobCount, int maxThreadCount) {
  46. #ifdef DISABLE_MULTI_THREADING
  47. // Reference implementation
  48. for (int i = 0; i < jobCount; i++) {
  49. job(context, i);
  50. }
  51. #else
  52. if (jobCount <= 0) {
  53. return;
  54. } else if (jobCount == 1) {
  55. job(context, 0);
  56. } else {
  57. if (maxThreadCount <= 0) {
  58. // No limit.
  59. maxThreadCount = jobCount;
  60. }
  61. // When having more than one thread, one should be reserved for fast responses.
  62. // Otherwise one thread will keep the others waiting while struggling to manage interrupts with expensive context switches.
  63. int availableThreads = max(getThreadCount() - 1, 1);
  64. int workerCount = min(availableThreads, maxThreadCount, jobCount); // All used threads
  65. int helperCount = workerCount - 1; // Excluding the main thread
  66. // Multi-threaded work loop
  67. if (workerCount == 1) {
  68. // Run on the main thread if there is only one.
  69. for (int i = 0; i < jobCount; i++) {
  70. job(context, i);
  71. }
  72. } else {
  73. // A shared counter protected by getTaskLock.
  74. int nextJobIndex = 0;
  75. DestructibleVirtualStackAllocation<std::function<void()>> workers(workerCount);
  76. DestructibleVirtualStackAllocation<std::future<void>> helpers(helperCount);
  77. for (int w = 0; w < workerCount; w++) {
  78. workers[w] = [&nextJobIndex, context, job, jobCount]() {
  79. while (true) {
  80. getTaskLock.lock();
  81. int taskIndex = nextJobIndex;
  82. nextJobIndex++;
  83. getTaskLock.unlock();
  84. if (taskIndex < jobCount) {
  85. job(context, taskIndex);
  86. } else {
  87. break;
  88. }
  89. }
  90. };
  91. }
  92. // Start working in the helper threads
  93. for (int h = 0; h < helperCount; h++) {
  94. helpers[h] = std::async(std::launch::async, workers[h]);
  95. }
  96. // Perform the same work on the main thread
  97. workers[workerCount - 1]();
  98. // Wait for all helpers to complete their work once all tasks have been handed out
  99. for (int h = 0; h < helperCount; h++) {
  100. if (helpers[h].valid()) {
  101. helpers[h].wait();
  102. }
  103. }
  104. }
  105. }
  106. #endif
  107. }
  108. void threadedWorkFromArray(std::function<void()>* jobs, int jobCount, int maxThreadCount) {
  109. #ifdef DISABLE_MULTI_THREADING
  110. // Reference implementation
  111. for (int i = 0; i < jobCount; i++) {
  112. jobs[i]();
  113. }
  114. #else
  115. if (jobCount <= 0) {
  116. return;
  117. } else if (jobCount == 1) {
  118. jobs[0]();
  119. } else {
  120. if (maxThreadCount <= 0) {
  121. // No limit.
  122. maxThreadCount = jobCount;
  123. }
  124. // When having more than one thread, one should be reserved for fast responses.
  125. // Otherwise one thread will keep the others waiting while struggling to manage interrupts with expensive context switches.
  126. int availableThreads = max(getThreadCount() - 1, 1);
  127. int workerCount = min(availableThreads, maxThreadCount, jobCount); // All used threads
  128. int helperCount = workerCount - 1; // Excluding the main thread
  129. // Multi-threaded work loop
  130. if (workerCount == 1) {
  131. // Run on the main thread if there is only one.
  132. for (int i = 0; i < jobCount; i++) {
  133. jobs[i]();
  134. }
  135. } else {
  136. // A shared counter protected by getTaskLock.
  137. int nextJobIndex = 0;
  138. DestructibleVirtualStackAllocation<std::function<void()>> workers(workerCount);
  139. DestructibleVirtualStackAllocation<std::future<void>> helpers(helperCount);
  140. for (int w = 0; w < workerCount; w++) {
  141. workers[w] = [&nextJobIndex, jobs, jobCount]() {
  142. while (true) {
  143. getTaskLock.lock();
  144. int taskIndex = nextJobIndex;
  145. nextJobIndex++;
  146. getTaskLock.unlock();
  147. if (taskIndex < jobCount) {
  148. jobs[taskIndex]();
  149. } else {
  150. break;
  151. }
  152. }
  153. };
  154. }
  155. // Start working in the helper threads
  156. for (int h = 0; h < helperCount; h++) {
  157. helpers[h] = std::async(std::launch::async, workers[h]);
  158. }
  159. // Perform the same work on the main thread
  160. workers[workerCount - 1]();
  161. // Wait for all helpers to complete their work once all tasks have been handed out
  162. for (int h = 0; h < helperCount; h++) {
  163. if (helpers[h].valid()) {
  164. helpers[h].wait();
  165. }
  166. }
  167. }
  168. }
  169. #endif
  170. }
  171. void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount, int maxThreadCount) {
  172. threadedWorkFromArray(jobs.getUnsafe(), jobCount, maxThreadCount);
  173. }
  174. void threadedWorkFromList(List<std::function<void()>> jobs, int maxThreadCount) {
  175. if (jobs.length() > 0) {
  176. threadedWorkFromArray(&jobs[0], jobs.length(), maxThreadCount);
  177. }
  178. jobs.clear();
  179. }
  180. void threadedSplit(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task, int minimumJobSize, int jobsPerThread) {
  181. #ifndef DISABLE_MULTI_THREADING
  182. int totalCount = stopIndex - startIndex;
  183. int maxJobs = totalCount / minimumJobSize;
  184. int jobCount = getThreadCount() * jobsPerThread;
  185. if (jobCount > maxJobs) { jobCount = maxJobs; }
  186. if (jobCount < 1) { jobCount = 1; }
  187. #else
  188. int jobCount = 1;
  189. #endif
  190. if (jobCount == 1) {
  191. // Too little work for multi-threading
  192. task(startIndex, stopIndex);
  193. } else {
  194. // Use multiple threads
  195. DestructibleVirtualStackAllocation<std::function<void()>> jobs(jobCount);
  196. int givenRow = startIndex;
  197. for (int s = 0; s < jobCount; s++) {
  198. int remainingJobs = jobCount - s;
  199. int remainingRows = stopIndex - givenRow;
  200. int y1 = givenRow; // Inclusive
  201. int taskSize = remainingRows / remainingJobs;
  202. givenRow = givenRow + taskSize;
  203. int y2 = givenRow; // Exclusive
  204. jobs[s] = [task, y1, y2]() {
  205. task(y1, y2);
  206. };
  207. }
  208. threadedWorkFromArray(jobs, jobCount);
  209. }
  210. }
  211. void threadedSplit_disabled(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task) {
  212. task(startIndex, stopIndex);
  213. }
  214. void threadedSplit(const IRect& bound, std::function<void(const IRect& bound)> task, int minimumRowsPerJob, int jobsPerThread) {
  215. #ifndef DISABLE_MULTI_THREADING
  216. int maxJobs = bound.height() / minimumRowsPerJob;
  217. int jobCount = getThreadCount() * jobsPerThread;
  218. if (jobCount > maxJobs) { jobCount = maxJobs; }
  219. if (jobCount < 1) { jobCount = 1; }
  220. #else
  221. int jobCount = 1;
  222. #endif
  223. if (jobCount == 1) {
  224. // Too little work for multi-threading
  225. task(bound);
  226. } else {
  227. // Use multiple threads
  228. DestructibleVirtualStackAllocation<std::function<void()>> jobs(jobCount);
  229. int givenRow = bound.top();
  230. for (int s = 0; s < jobCount; s++) {
  231. int remainingJobs = jobCount - s;
  232. int remainingRows = bound.bottom() - givenRow;
  233. int y1 = givenRow;
  234. int taskSize = remainingRows / remainingJobs;
  235. givenRow = givenRow + taskSize;
  236. IRect subBound = IRect(bound.left(), y1, bound.width(), taskSize);
  237. jobs[s] = [task, subBound]() {
  238. task(subBound);
  239. };
  240. }
  241. threadedWorkFromArray(jobs, jobCount);
  242. }
  243. }
  244. void threadedSplit_disabled(const IRect& bound, std::function<void(const IRect& bound)> task) {
  245. task(bound);
  246. }
  247. }