Browse Source

Fixed memory leak by assuming that std::function will always heap allocate captured data.

David Piuva 8 months ago
parent
commit
e32bf92d1a

+ 68 - 5
Source/DFPSR/base/threading.cpp

@@ -49,6 +49,70 @@ int getThreadCount() {
 	#endif
 }
 
+void threadedWorkByIndex(std::function<void(void *context, int jobIndex)> job, void *context, int jobCount, int maxThreadCount) {
+	#ifdef DISABLE_MULTI_THREADING
+		// Reference implementation
+		for (int i = 0; i < jobCount; i++) {
+			job(context, i);
+		}
+	#else
+		if (jobCount <= 0) {
+			return;
+		} else if (jobCount == 1) {
+			job(context, 0);
+		} else {
+			if (maxThreadCount <= 0) {
+				// No limit.
+				maxThreadCount = jobCount;
+			}
+			// When having more than one thread, one should be reserved for fast responses.
+			//   Otherwise one thread will keep the others waiting while struggling to manage interrupts with expensive context switches.
+			int availableThreads = max(getThreadCount() - 1, 1);
+			int workerCount = min(availableThreads, maxThreadCount, jobCount); // All used threads
+			int helperCount = workerCount - 1; // Excluding the main thread
+			// Multi-threaded work loop
+			if (workerCount == 1) {
+				// Run on the main thread if there is only one.
+				for (int i = 0; i < jobCount; i++) {
+					job(context, i);
+				}
+			} else {
+				// A shared counter protected by getTaskLock.
+				int nextJobIndex = 0;
+				DestructibleVirtualStackAllocation<std::function<void()>> workers(workerCount);
+				DestructibleVirtualStackAllocation<std::future<void>> helpers(helperCount);
+				for (int w = 0; w < workerCount; w++) {
+					workers[w] = [&nextJobIndex, context, job, jobCount]() {
+						while (true) {
+							getTaskLock.lock();
+							int taskIndex = nextJobIndex;
+							nextJobIndex++;
+							getTaskLock.unlock();
+							if (taskIndex < jobCount) {
+								job(context, taskIndex);
+							} else {
+								break;
+							}
+						}
+					};
+				}
+				// Start working in the helper threads
+				for (int h = 0; h < helperCount; h++) {
+					helpers[h] = std::async(std::launch::async, workers[h]);
+				}
+				// Perform the same work on the main thread
+				workers[workerCount - 1]();
+				// Wait for all helpers to complete their work once all tasks have been handed out
+				for (int h = 0; h < helperCount; h++) {
+					if (helpers[h].valid()) {
+						helpers[h].wait();
+					}
+				}
+			}
+		}
+	#endif
+}
+
 void threadedWorkFromArray(std::function<void()>* jobs, int jobCount, int maxThreadCount) {
 	#ifdef DISABLE_MULTI_THREADING
 		// Reference implementation
@@ -79,9 +143,8 @@ void threadedWorkFromArray(std::function<void()>* jobs, int jobCount, int maxThr
 			} else {
 				// A shared counter protected by getTaskLock.
 				int nextJobIndex = 0;
-
-				VirtualStackAllocation<std::function<void()>> workers(workerCount);
-				VirtualStackAllocation<std::future<void>> helpers(helperCount);
+				DestructibleVirtualStackAllocation<std::function<void()>> workers(workerCount);
+				DestructibleVirtualStackAllocation<std::future<void>> helpers(helperCount);
 				for (int w = 0; w < workerCount; w++) {
 					workers[w] = [&nextJobIndex, jobs, jobCount]() {
 						while (true) {
@@ -138,7 +201,7 @@ void threadedSplit(int startIndex, int stopIndex, std::function<void(int startIn
 		task(startIndex, stopIndex);
 	} else {
 		// Use multiple threads
-		VirtualStackAllocation<std::function<void()>> jobs(jobCount);
+		DestructibleVirtualStackAllocation<std::function<void()>> jobs(jobCount);
 		int givenRow = startIndex;
 		for (int s = 0; s < jobCount; s++) {
 			int remainingJobs = jobCount - s;
@@ -173,7 +236,7 @@ void threadedSplit(const IRect& bound, std::function<void(const IRect& bound)> t
 		task(bound);
 	} else {
 		// Use multiple threads
-		VirtualStackAllocation<std::function<void()>> jobs(jobCount);
+		DestructibleVirtualStackAllocation<std::function<void()>> jobs(jobCount);
 		int givenRow = bound.top();
 		for (int s = 0; s < jobCount; s++) {
 			int remainingJobs = jobCount - s;

+ 5 - 0
Source/DFPSR/base/threading.h

@@ -33,10 +33,15 @@ namespace dsr {
 // Get the number of threads available.
 int getThreadCount();
 
+// Calls the same job function with indices 0 to jobIndex - 1.
+//   This removes the need for capturing the same data over and over again when each task is identical with a different index.
+void threadedWorkByIndex(std::function<void(void *context, int jobIndex)> job, void *context, int jobCount, int maxThreadCount = 0);
+
 // Executes every function in the array of jobs from jobs[0] to jobs[jobCount - 1].
 //   The maxThreadCount argument is the maximum number of threads to use when enough threads are available.
 //     Letting maxThreadCount be 0 removes the limit and uses as many threads as possible, limited only by getThreadCount() - 1 and jobCount.
 //     Letting maxThreadCount be 1 forces single-threaded execution on the calling thread.
+//   Useful when each job to execute is different.
 void threadedWorkFromArray(SafePointer<std::function<void()>> jobs, int jobCount, int maxThreadCount = 0);
 void threadedWorkFromArray(std::function<void()>* jobs, int jobCount, int maxThreadCount = 0);
 

+ 15 - 0
Source/DFPSR/base/virtualStack.h

@@ -68,6 +68,21 @@ namespace dsr {
 			virtualStack_pop();
 		}
 	};
+
+	template <typename T>
+	class DestructibleVirtualStackAllocation : public SafePointer<T> {
+	public:
+		uint64_t elementCount;
+		DestructibleVirtualStackAllocation(uint64_t elementCount, const char *name = "Nameless virtual stack allocation", uintptr_t alignmentAndMask = ~uintptr_t(0u))
+		: SafePointer<T>(virtualStack_push<T>(elementCount, name, alignmentAndMask)), elementCount(elementCount) {}
+		~DestructibleVirtualStackAllocation() {
+			// Call destructors.
+			for (uint64_t e = 0; e < elementCount; e++) {
+				(*this)[e].~T();
+			}
+			virtualStack_pop();
+		}
+	};
 }
 
 #endif

+ 12 - 11
Source/DFPSR/implementation/render/renderCore.cpp

@@ -436,7 +436,8 @@ void CommandQueue::execute(const IRect &clipBound, int jobCount) const {
 			}
 		}
 	} else {
-		VirtualStackAllocation<std::function<void()>> jobs(jobCount, "Triangle draw jobs in CommandQueue::execute");
+		// Split the target region for multiple threads, with one slice per job.
+		VirtualStackAllocation<IRect> regions(jobCount, "Multi-threaded target pixel regions in CommandQueue::execute");
 		int y1 = clipBound.top();
 		for (int j = 0; j < jobCount; j++) {
 			int y2 = clipBound.top() + ((clipBound.bottom() * (j + 1)) / jobCount);
@@ -445,18 +446,18 @@ void CommandQueue::execute(const IRect &clipBound, int jobCount) const {
 				y2 = (y2 / 2) * 2;
 			}
 			int height = y2 - y1;
-			IRect subBound = IRect(clipBound.left(), y1, clipBound.width(), height);
-			jobs[j] = [this, subBound]() {
-				//this->execute(subBound, 1);
-				for (int i = 0; i < this->buffer.length(); i++) {
-					if (!this->buffer[i].occluded) {
-						executeTriangleDrawing(this->buffer[i], subBound);
-					}
-				}
-			};
+			regions[j] = IRect(clipBound.left(), y1, clipBound.width(), height);
 			y1 = y2;
 		}
-		threadedWorkFromArray(jobs, jobCount);
+		std::function<void(void *context, int jobIndex)> job = [&regions](void *context, int jobIndex) {
+			CommandQueue *commandQueue = (CommandQueue*)context;
+			for (int i = 0; i < commandQueue->buffer.length(); i++) {
+				if (!commandQueue->buffer[i].occluded) {
+					executeTriangleDrawing(commandQueue->buffer[i], regions[jobIndex]);
+				}
+			}
+		};
+		threadedWorkByIndex(job, (void*)this, jobCount);
 	}
 }