threading.cpp 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167
  1. // zlib open source license
  2. //
  3. // Copyright (c) 2017 to 2019 David Forsgren Piuva
  4. //
  5. // This software is provided 'as-is', without any express or implied
  6. // warranty. In no event will the authors be held liable for any damages
  7. // arising from the use of this software.
  8. //
  9. // Permission is granted to anyone to use this software for any purpose,
  10. // including commercial applications, and to alter it and redistribute it
  11. // freely, subject to the following restrictions:
  12. //
  13. // 1. The origin of this software must not be misrepresented; you must not
  14. // claim that you wrote the original software. If you use this software
  15. // in a product, an acknowledgment in the product documentation would be
  16. // appreciated but is not required.
  17. //
  18. // 2. Altered source versions must be plainly marked as such, and must not be
  19. // misrepresented as being the original software.
  20. //
  21. // 3. This notice may not be removed or altered from any source
  22. // distribution.
  23. #include "threading.h"
  24. // Requires -pthread for linking
  25. #include <future>
  26. #include <thread>
  27. #include <mutex>
  28. #include <atomic>
  29. namespace dsr {
  30. // Enable this macro to disable multi-threading
  31. // If your application still crashes when using a single thread, it's probably not a concurrency problem
  32. //#define DISABLE_MULTI_THREADING
  33. // Prevent doing other multi-threaded work at the same time
  34. // As a side effect, this makes it safe to use global variables to prevent unsafe use of stack memory
  35. static std::mutex workLock, getTaskLock;
  36. static std::atomic<int> nextJobIndex{0};
  37. // TODO: This method really needs a thread pool for starting jobs faster,
  38. // but third-party libraries often use low-level platform specific solutions.
  39. // TODO: Let each worker have one future doing scheduling on it's own to prevent stalling on a scheduling main thread.
  40. // When a worker is done with a task, it will use a mutex protected volatile variable to pick the next task from the queue.
  41. void threadedWorkFromArray(std::function<void()>* jobs, int jobCount) {
  42. #ifdef DISABLE_MULTI_THREADING
  43. // Reference implementation
  44. for (int i = 0; i < jobCount; i++) {
  45. jobs[i]();
  46. }
  47. #else
  48. if (jobCount <= 0) {
  49. return;
  50. } else if (jobCount == 1) {
  51. jobs[0]();
  52. } else {
  53. workLock.lock();
  54. nextJobIndex = 0;
  55. // Multi-threaded work loop
  56. int workerCount = std::min((int)std::thread::hardware_concurrency() - 1, jobCount); // All used threads
  57. int helperCount = workerCount - 1; // Excluding the main thread
  58. std::function<void()> workers[workerCount];
  59. std::future<void> helpers[helperCount];
  60. for (int w = 0; w < workerCount; w++) {
  61. workers[w] = [jobs, jobCount]() {
  62. while (true) {
  63. getTaskLock.lock();
  64. int taskIndex = nextJobIndex;
  65. nextJobIndex++;
  66. getTaskLock.unlock();
  67. if (taskIndex < jobCount) {
  68. jobs[taskIndex]();
  69. } else {
  70. break;
  71. }
  72. }
  73. };
  74. }
  75. // Start working in the helper threads
  76. for (int h = 0; h < helperCount; h++) {
  77. helpers[h] = std::async(std::launch::async, workers[h]);
  78. }
  79. // Perform the same work on the main thread
  80. workers[workerCount - 1]();
  81. // Wait for all helpers to complete their work once all tasks have been handed out
  82. for (int h = 0; h < helperCount; h++) {
  83. if (helpers[h].valid()) {
  84. helpers[h].wait();
  85. }
  86. }
  87. workLock.unlock();
  88. }
  89. #endif
  90. }
  91. void threadedWorkFromList(List<std::function<void()>> jobs) {
  92. threadedWorkFromArray(&jobs[0], jobs.length());
  93. jobs.clear();
  94. }
  95. void threadedSplit(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task, int minimumJobSize, int jobsPerThread) {
  96. int totalCount = stopIndex - startIndex;
  97. int maxJobs = totalCount / minimumJobSize;
  98. int jobCount = std::thread::hardware_concurrency() * jobsPerThread;
  99. if (jobCount > maxJobs) { jobCount = maxJobs; }
  100. if (jobCount < 1) { jobCount = 1; }
  101. if (jobCount == 1) {
  102. // Too little work for multi-threading
  103. task(startIndex, stopIndex);
  104. } else {
  105. // Use multiple threads
  106. std::function<void()> jobs[jobCount];
  107. int givenRow = startIndex;
  108. for (int s = 0; s < jobCount; s++) {
  109. int remainingJobs = jobCount - s;
  110. int remainingRows = stopIndex - givenRow;
  111. int y1 = givenRow; // Inclusive
  112. int taskSize = remainingRows / remainingJobs;
  113. givenRow = givenRow + taskSize;
  114. int y2 = givenRow; // Exclusive
  115. jobs[s] = [task, y1, y2]() {
  116. task(y1, y2);
  117. };
  118. }
  119. threadedWorkFromArray(jobs, jobCount);
  120. }
  121. }
  122. void threadedSplit_disabled(int startIndex, int stopIndex, std::function<void(int startIndex, int stopIndex)> task) {
  123. task(startIndex, stopIndex);
  124. }
  125. void threadedSplit(const IRect& bound, std::function<void(const IRect& bound)> task, int minimumRowsPerJob, int jobsPerThread) {
  126. int maxJobs = bound.height() / minimumRowsPerJob;
  127. int jobCount = std::thread::hardware_concurrency() * jobsPerThread;
  128. if (jobCount > maxJobs) { jobCount = maxJobs; }
  129. if (jobCount < 1) { jobCount = 1; }
  130. if (jobCount == 1) {
  131. // Too little work for multi-threading
  132. task(bound);
  133. } else {
  134. // Use multiple threads
  135. std::function<void()> jobs[jobCount];
  136. int givenRow = bound.top();
  137. for (int s = 0; s < jobCount; s++) {
  138. int remainingJobs = jobCount - s;
  139. int remainingRows = bound.bottom() - givenRow;
  140. int y1 = givenRow;
  141. int taskSize = remainingRows / remainingJobs;
  142. givenRow = givenRow + taskSize;
  143. IRect subBound = IRect(bound.left(), y1, bound.width(), taskSize);
  144. jobs[s] = [task, subBound]() {
  145. task(subBound);
  146. };
  147. }
  148. threadedWorkFromArray(jobs, jobCount);
  149. }
  150. }
  151. void threadedSplit_disabled(const IRect& bound, std::function<void(const IRect& bound)> task) {
  152. task(bound);
  153. }
  154. }