threading.cpp 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172
  1. // zlib open source license
  2. //
  3. // Copyright (c) 2017 to 2019 David Forsgren Piuva
  4. //
  5. // This software is provided 'as-is', without any express or implied
  6. // warranty. In no event will the authors be held liable for any damages
  7. // arising from the use of this software.
  8. //
  9. // Permission is granted to anyone to use this software for any purpose,
  10. // including commercial applications, and to alter it and redistribute it
  11. // freely, subject to the following restrictions:
  12. //
  13. // 1. The origin of this software must not be misrepresented; you must not
  14. // claim that you wrote the original software. If you use this software
  15. // in a product, an acknowledgment in the product documentation would be
  16. // appreciated but is not required.
  17. //
  18. // 2. Altered source versions must be plainly marked as such, and must not be
  19. // misrepresented as being the original software.
  20. //
  21. // 3. This notice may not be removed or altered from any source
  22. // distribution.
  23. #include "threading.h"
  24. #include "virtualStack.h"
  25. // Requires -pthread for linking
  26. #include <future>
  27. #include <thread>
  28. #include <mutex>
  29. #include <atomic>
  30. namespace dsr {
  31. // Enable this macro to disable multi-threading
  32. // If your application still crashes when using a single thread, it's probably not a concurrency problem
  33. //#define DISABLE_MULTI_THREADING
  34. // Prevent doing other multi-threaded work at the same time
  35. // As a side effect, this makes it safe to use global variables to prevent unsafe use of stack memory
  36. static std::mutex workLock, getTaskLock;
  37. static std::atomic<int> nextJobIndex{0};
  38. // TODO: This method really needs a thread pool for starting jobs faster,
  39. // but third-party libraries often use low-level platform specific solutions.
  40. // TODO: Let each worker have one future doing scheduling on it's own to prevent stalling on a scheduling main thread.
  41. // When a worker is done with a task, it will use a mutex protected volatile variable to pick the next task from the queue.
  42. void threadedWorkFromArray(std::function<void()>* jobs, int jobCount) {
  43. #ifdef DISABLE_MULTI_THREADING
  44. // Reference implementation
  45. for (int i = 0; i < jobCount; i++) {
  46. jobs[i]();
  47. }
  48. #else
  49. if (jobCount <= 0) {
  50. return;
  51. } else if (jobCount == 1) {
  52. jobs[0]();
  53. } else {
  54. workLock.lock();
  55. nextJobIndex = 0;
  56. // Multi-threaded work loop
  57. int workerCount = std::min((int)std::thread::hardware_concurrency() - 1, jobCount); // All used threads
  58. int helperCount = workerCount - 1; // Excluding the main thread
  59. VirtualStackAllocation<std::function<void()>> workers(workerCount);
  60. VirtualStackAllocation<std::future<void>> helpers(helperCount);
  61. for (int w = 0; w < workerCount; w++) {
  62. workers[w] = [jobs, jobCount]() {
  63. while (true) {
  64. getTaskLock.lock();
  65. int taskIndex = nextJobIndex;
  66. nextJobIndex++;
  67. getTaskLock.unlock();
  68. if (taskIndex < jobCount) {
  69. jobs[taskIndex]();
  70. } else {
  71. break;
  72. }
  73. }
  74. };
  75. }
  76. // Start working in the helper threads
  77. for (int h = 0; h < helperCount; h++) {
  78. helpers[h] = std::async(std::launch::async, workers[h]);
  79. }
  80. // Perform the same work on the main thread
  81. workers[workerCount - 1]();
  82. // Wait for all helpers to complete their work once all tasks have been handed out
  83. for (int h = 0; h < helperCount; h++) {
  84. if (helpers[h].valid()) {
  85. helpers[h].wait();
  86. }
  87. }
  88. workLock.unlock();
  89. }
  90. #endif
  91. }
  92. void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount) {
  93. threadedWorkFromArray(jobs.getUnsafe(), jobCount);
  94. }
  95. void threadedWorkFromList(List<std::function<void()>> jobs) {
  96. threadedWorkFromArray(&jobs[0], jobs.length());
  97. jobs.clear();
  98. }
  99. void threadedSplit(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task, int minimumJobSize, int jobsPerThread) {
  100. int totalCount = stopIndex - startIndex;
  101. int maxJobs = totalCount / minimumJobSize;
  102. int jobCount = std::thread::hardware_concurrency() * jobsPerThread;
  103. if (jobCount > maxJobs) { jobCount = maxJobs; }
  104. if (jobCount < 1) { jobCount = 1; }
  105. if (jobCount == 1) {
  106. // Too little work for multi-threading
  107. task(startIndex, stopIndex);
  108. } else {
  109. // Use multiple threads
  110. VirtualStackAllocation<std::function<void()>> jobs(jobCount);
  111. int givenRow = startIndex;
  112. for (int s = 0; s < jobCount; s++) {
  113. int remainingJobs = jobCount - s;
  114. int remainingRows = stopIndex - givenRow;
  115. int y1 = givenRow; // Inclusive
  116. int taskSize = remainingRows / remainingJobs;
  117. givenRow = givenRow + taskSize;
  118. int y2 = givenRow; // Exclusive
  119. jobs[s] = [task, y1, y2]() {
  120. task(y1, y2);
  121. };
  122. }
  123. threadedWorkFromArray(jobs, jobCount);
  124. }
  125. }
  126. void threadedSplit_disabled(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task) {
  127. task(startIndex, stopIndex);
  128. }
  129. void threadedSplit(const IRect& bound, std::function<void(const IRect& bound)> task, int minimumRowsPerJob, int jobsPerThread) {
  130. int maxJobs = bound.height() / minimumRowsPerJob;
  131. int jobCount = std::thread::hardware_concurrency() * jobsPerThread;
  132. if (jobCount > maxJobs) { jobCount = maxJobs; }
  133. if (jobCount < 1) { jobCount = 1; }
  134. if (jobCount == 1) {
  135. // Too little work for multi-threading
  136. task(bound);
  137. } else {
  138. // Use multiple threads
  139. VirtualStackAllocation<std::function<void()>> jobs(jobCount);
  140. int givenRow = bound.top();
  141. for (int s = 0; s < jobCount; s++) {
  142. int remainingJobs = jobCount - s;
  143. int remainingRows = bound.bottom() - givenRow;
  144. int y1 = givenRow;
  145. int taskSize = remainingRows / remainingJobs;
  146. givenRow = givenRow + taskSize;
  147. IRect subBound = IRect(bound.left(), y1, bound.width(), taskSize);
  148. jobs[s] = [task, subBound]() {
  149. task(subBound);
  150. };
  151. }
  152. threadedWorkFromArray(jobs, jobCount);
  153. }
  154. }
  155. void threadedSplit_disabled(const IRect& bound, std::function<void(const IRect& bound)> task) {
  156. task(bound);
  157. }
  158. }