Przeglądaj źródła

Giving manual control over thread count in the threading module.

David Piuva 11 miesięcy temu
rodzic
commit
ae1a8bf4ca

+ 1 - 1
Source/DFPSR/api/bufferAPI.cpp

@@ -1,6 +1,6 @@
 // zlib open source license
 //
-// Copyright (c) 2019 to 2023 David Forsgren Piuva
+// Copyright (c) 2019 to 2024 David Forsgren Piuva
 // 
 // This software is provided 'as-is', without any express or implied
 // warranty. In no event will the authors be held liable for any damages

+ 1 - 1
Source/DFPSR/api/bufferAPI.h

@@ -1,6 +1,6 @@
 // zlib open source license
 //
-// Copyright (c) 2018 to 2023 David Forsgren Piuva
+// Copyright (c) 2018 to 2024 David Forsgren Piuva
 // 
 // This software is provided 'as-is', without any express or implied
 // warranty. In no event will the authors be held liable for any damages

+ 1 - 1
Source/DFPSR/base/SafePointer.cpp

@@ -1,6 +1,6 @@
 // zlib open source license
 //
-// Copyright (c) 2017 to 2023 David Forsgren Piuva
+// Copyright (c) 2017 to 2024 David Forsgren Piuva
 // 
 // This software is provided 'as-is', without any express or implied
 // warranty. In no event will the authors be held liable for any damages

+ 1 - 1
Source/DFPSR/base/SafePointer.h

@@ -1,6 +1,6 @@
 // zlib open source license
 //
-// Copyright (c) 2017 to 2023 David Forsgren Piuva
+// Copyright (c) 2017 to 2024 David Forsgren Piuva
 // 
 // This software is provided 'as-is', without any express or implied
 // warranty. In no event will the authors be held liable for any damages

+ 32 - 21
Source/DFPSR/base/threading.cpp

@@ -1,6 +1,6 @@
 // zlib open source license
 //
-// Copyright (c) 2017 to 2019 David Forsgren Piuva
+// Copyright (c) 2017 to 2024 David Forsgren Piuva
 // 
 // This software is provided 'as-is', without any express or implied
 // warranty. In no event will the authors be held liable for any damages
@@ -23,6 +23,7 @@
 
 #include "threading.h"
 #include "virtualStack.h"
+#include "../math/scalar.h"
 
 // Requires -pthread for linking
 #include <future>
@@ -36,16 +37,11 @@ namespace dsr {
 //   If your application still crashes when using a single thread, it's probably not a concurrency problem
 //#define DISABLE_MULTI_THREADING
 
-// Prevent doing other multi-threaded work at the same time
-//   As a side effect, this makes it safe to use global variables to prevent unsafe use of stack memory
-static std::mutex workLock, getTaskLock;
-static std::atomic<int> nextJobIndex{0};
+int getThreadCount() {
+	return (int)std::thread::hardware_concurrency();
+}
 
-// TODO: This method really needs a thread pool for starting jobs faster,
-//       but third-party libraries often use low-level platform specific solutions.
-// TODO: Let each worker have one future doing scheduling on it's own to prevent stalling on a scheduling main thread.
-//       When a worker is done with a task, it will use a mutex protected volatile variable to pick the next task from the queue.
-void threadedWorkFromArray(std::function<void()>* jobs, int jobCount) {
+void threadedWorkFromArray(std::function<void()>* jobs, int jobCount, int maxThreadCount) {
 	#ifdef DISABLE_MULTI_THREADING
 		// Reference implementation
 		for (int i = 0; i < jobCount; i++) {
@@ -57,15 +53,30 @@ void threadedWorkFromArray(std::function<void()>* jobs, int jobCount) {
 		} else if (jobCount == 1) {
 			jobs[0]();
 		} else {
-			workLock.lock();
-				nextJobIndex = 0;
-				// Multi-threaded work loop
-				int workerCount = std::min((int)std::thread::hardware_concurrency() - 1, jobCount); // All used threads
-				int helperCount = workerCount - 1; // Excluding the main thread
+			static std::recursive_mutex getTaskLock;
+			if (maxThreadCount <= 0) {
+				// No limit.
+				maxThreadCount = jobCount;
+			}
+			// When having more than one thread, one should be reserved for fast responses.
+			//   Otherwise one thread will keep the others waiting while struggling to manage interrupts with expensive context switches.
+			int availableThreads = max(getThreadCount() - 1, 1);
+			int workerCount = min(availableThreads, maxThreadCount, jobCount); // All used threads
+			int helperCount = workerCount - 1; // Excluding the main thread
+			// Multi-threaded work loop
+			if (workerCount == 1) {
+				// Run on the main thread if there is only one.
+				for (int i = 0; i < jobCount; i++) {
+					jobs[i]();
+				}
+			} else {
+				// A shared counter protected by getTaskLock.
+				int nextJobIndex = 0;
+
 				VirtualStackAllocation<std::function<void()>> workers(workerCount);
 				VirtualStackAllocation<std::future<void>> helpers(helperCount);
 				for (int w = 0; w < workerCount; w++) {
-					workers[w] = [jobs, jobCount]() {
+					workers[w] = [&nextJobIndex, jobs, jobCount]() {
 						while (true) {
 							getTaskLock.lock();
 							int taskIndex = nextJobIndex;
@@ -91,17 +102,17 @@ void threadedWorkFromArray(std::function<void()>* jobs, int jobCount) {
 						helpers[h].wait();
 					}
 				}
-			workLock.unlock();
+			}
 		}
 	#endif
 }
 
-void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount) {
-	threadedWorkFromArray(jobs.getUnsafe(), jobCount);
+void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount, int maxThreadCount) {
+	threadedWorkFromArray(jobs.getUnsafe(), jobCount, maxThreadCount);
 }
 
-void threadedWorkFromList(List<std::function<void()>> jobs) {
-	threadedWorkFromArray(&jobs[0], jobs.length());
+void threadedWorkFromList(List<std::function<void()>> jobs, int maxThreadCount) {
+	threadedWorkFromArray(&jobs[0], jobs.length(), maxThreadCount);
 	jobs.clear();
 }
 

+ 10 - 4
Source/DFPSR/base/threading.h

@@ -1,6 +1,6 @@
 // zlib open source license
 //
-// Copyright (c) 2017 to 2019 David Forsgren Piuva
+// Copyright (c) 2017 to 2024 David Forsgren Piuva
 // 
 // This software is provided 'as-is', without any express or implied
 // warranty. In no event will the authors be held liable for any damages
@@ -30,13 +30,19 @@
 
 namespace dsr {
 
+// Get the number of threads available.
+int getThreadCount();
+
 // Executes every function in the array of jobs from jobs[0] to jobs[jobCount - 1].
-void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount);
-void threadedWorkFromArray(std::function<void()>* jobs, int jobCount);
+//   The maxThreadCount argument is the maximum number of threads to use when enough threads are available.
+//     Letting maxThreadCount be 0 removes the limit and uses as many threads as possible, limited only by getThreadCount() - 1 and jobCount.
+//     Letting maxThreadCount be 1 forces single-threaded execution on the calling thread.
+void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount, int maxThreadCount = 0);
+void threadedWorkFromArray(std::function<void()>* jobs, int jobCount, int maxThreadCount = 0);
 
 // Executes every function in the list of jobs.
 //   Also clears the list when done.
-void threadedWorkFromList(List<std::function<void()>> jobs);
+void threadedWorkFromList(List<std::function<void()>> jobs, int maxThreadCount = 0);
 
 // Calling the given function with sub-sets of the interval using multiple threads in parallel.
 //   Useful when you have lots of tiny jobs that can be grouped together into larger jobs.