Browse Source

Refactor and optimize the ThreadHive

Panagiotis Christopoulos Charitos 8 years ago
parent
commit
c5e5e5a037
5 changed files with 190 additions and 93 deletions
  1. 5 3
      CMakeLists.txt
  2. 99 78
      src/anki/util/ThreadHive.cpp
  3. 4 7
      src/anki/util/ThreadHive.h
  4. 4 4
      tests/framework/Framework.h
  5. 78 1
      tests/util/ThreadHive.cpp

+ 5 - 3
CMakeLists.txt

@@ -70,6 +70,7 @@ endif()
 set(ANKI_CPU_ADDR_SPACE "0" CACHE STRING "The CPU architecture (0 or 32 or 64). If zero go native")
 
 option(ANKI_ENABLE_SIMD "Enable or not SIMD optimizations" ON)
+option(ANKI_ADDRESS_SANITIZER "Enable address sanitizer (-fsanitize=address)" OFF)
 
 # Take a wild guess on the windowing system
 if(LINUX)
@@ -87,9 +88,6 @@ else()
 	message(FATAL_ERROR "Couldn't determine the window backend. You need to specify it manually")
 endif()
 
-# Valgrind
-option(ANKI_VALGRIND_HAPPY "Make valgrind happy" OFF)
-
 set(ANKI_GR_BACKEND "GL" CACHE STRING "The graphics API (GL or VULKAN)")
 
 if(${ANKI_GR_BACKEND} STREQUAL "GL")
@@ -171,6 +169,10 @@ if(ANKI_STRIP)
 	set(COMPILER_FLAGS "${COMPILER_FLAGS} -s ")
 endif()
 
+if(ANKI_ADDRESS_SANITIZER)
+	set(COMPILER_FLAGS "${COMPILER_FLAGS} -fsanitize=address ")
+endif()
+
 if(${CMAKE_BUILD_TYPE} STREQUAL "Release")
 	set(COMPILER_FLAGS "${COMPILER_FLAGS} -O3 -DNDEBUG ")
 

+ 99 - 78
src/anki/util/ThreadHive.cpp

@@ -54,7 +54,7 @@ public:
 	ThreadHiveTaskCallback m_cb; ///< Callback that defines the task.
 	void* m_arg; ///< Args for the callback.
 
-	U16* m_deps;
+	Task** m_deps;
 	U16 m_depCount;
 	Bool8 m_othersDepend; ///< Other tasks depend on this one.
 
@@ -72,24 +72,21 @@ public:
 };
 
 ThreadHive::ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc)
-	: m_alloc(alloc)
+	: m_slowAlloc(alloc)
+	, m_alloc(alloc.getMemoryPool().getAllocationCallback(),
+		  alloc.getMemoryPool().getAllocationCallbackUserData(),
+		  1024 * 4)
 	, m_threadCount(threadCount)
 {
-	m_threads = reinterpret_cast<Thread*>(alloc.allocate(sizeof(Thread) * threadCount));
+	m_threads = reinterpret_cast<Thread*>(m_slowAlloc.allocate(sizeof(Thread) * threadCount));
 	for(U i = 0; i < threadCount; ++i)
 	{
 		::new(&m_threads[i]) Thread(i, this);
 	}
-
-	m_storage.create(m_alloc, MAX_TASKS_PER_SESSION);
-	m_deps.create(m_alloc, MAX_TASKS_PER_SESSION * threadCount);
 }
 
 ThreadHive::~ThreadHive()
 {
-	m_storage.destroy(m_alloc);
-	m_deps.destroy(m_alloc);
-
 	if(m_threads)
 	{
 		{
@@ -109,15 +106,59 @@ ThreadHive::~ThreadHive()
 			m_threads[threadCount].~Thread();
 		}
 
-		m_alloc.deallocate(static_cast<void*>(m_threads), m_threadCount * sizeof(Thread));
+		m_slowAlloc.deallocate(static_cast<void*>(m_threads), m_threadCount * sizeof(Thread));
 	}
 }
 
-void ThreadHive::submitTasks(ThreadHiveTask* tasks, U taskCount)
+void ThreadHive::submitTasks(ThreadHiveTask* tasks, const U taskCount)
 {
 	ANKI_ASSERT(tasks && taskCount > 0);
 
-	U allocatedTasks;
+	// Allocate tasks
+	Task* const htasks = m_alloc.newArray<Task>(taskCount);
+
+	// Allocate the dependency handles
+	U depCount = 0;
+	for(U i = 0; i < taskCount; ++i)
+	{
+		depCount += tasks[i].m_inDependencies.getSize();
+	}
+
+	Task** depHandles;
+	if(depCount)
+	{
+		depHandles = m_alloc.newArray<Task*>(depCount);
+	}
+	else
+	{
+		depHandles = nullptr;
+	}
+
+	depCount = 0;
+
+	// Initialize tasks
+	for(U i = 0; i < taskCount; ++i)
+	{
+		const ThreadHiveTask& inTask = tasks[i];
+		Task& outTask = htasks[i];
+
+		outTask.m_cb = inTask.m_callback;
+		outTask.m_arg = inTask.m_argument;
+		outTask.m_depCount = 0;
+		outTask.m_next = nullptr;
+		outTask.m_othersDepend = false;
+
+		// Set the dependencies
+		if(inTask.m_inDependencies.getSize() > 0)
+		{
+			outTask.m_deps = &depHandles[depCount];
+			depCount += inTask.m_inDependencies.getSize();
+		}
+		else
+		{
+			outTask.m_deps = nullptr;
+		}
+	}
 
 	// Push work
 	{
@@ -125,58 +166,39 @@ void ThreadHive::submitTasks(ThreadHiveTask* tasks, U taskCount)
 
 		for(U i = 0; i < taskCount; ++i)
 		{
-			const auto& inTask = tasks[i];
-
-			Task& outTask = m_storage[m_allocatedTasks];
-
-			outTask.m_cb = inTask.m_callback;
-			outTask.m_arg = inTask.m_argument;
-			outTask.m_depCount = 0;
-			outTask.m_next = nullptr;
-			outTask.m_othersDepend = false;
-
-			// Set the dependencies
-			if(inTask.m_inDependencies.getSize() > 0)
-			{
-				outTask.m_deps = &m_deps[m_allocatedDeps];
-				m_allocatedDeps += inTask.m_inDependencies.getSize();
-				ANKI_ASSERT(m_allocatedDeps <= m_deps.getSize());
-			}
-			else
-			{
-				outTask.m_deps = nullptr;
-			}
+			const ThreadHiveTask& inTask = tasks[i];
+			Task& outTask = htasks[i];
 
 			for(U j = 0; j < inTask.m_inDependencies.getSize(); ++j)
 			{
 				ThreadHiveDependencyHandle dep = inTask.m_inDependencies[j];
-				ANKI_ASSERT(dep < m_allocatedTasks);
+				ANKI_ASSERT(dep);
+				Task* depTask = static_cast<Task*>(dep);
 
-				if(!m_storage[dep].done())
+				if(!depTask->done())
 				{
-					outTask.m_deps[outTask.m_depCount++] = dep;
-					m_storage[dep].m_othersDepend = true;
+					outTask.m_deps[outTask.m_depCount++] = depTask;
+					depTask->m_othersDepend = true;
 				}
 			}
 
 			// Push to the list
-			if(m_head == nullptr)
-			{
-				ANKI_ASSERT(m_tail == nullptr);
-				m_head = &m_storage[m_allocatedTasks];
-				m_tail = m_head;
-			}
-			else
+			ANKI_HIVE_DEBUG_PRINT(
+				"pushing back %p (udata %p)\n", static_cast<void*>(&outTask), static_cast<void*>(outTask.m_arg));
+			if(m_head != nullptr)
 			{
 				ANKI_ASSERT(m_tail && m_head);
 				m_tail->m_next = &outTask;
 				m_tail = &outTask;
 			}
-
-			++m_allocatedTasks;
+			else
+			{
+				ANKI_ASSERT(m_tail == nullptr);
+				m_head = &outTask;
+				m_tail = m_head;
+			}
 		}
 
-		allocatedTasks = m_allocatedTasks;
 		m_pendingTasks += taskCount;
 
 		ANKI_HIVE_DEBUG_PRINT("submit tasks\n");
@@ -187,7 +209,7 @@ void ThreadHive::submitTasks(ThreadHiveTask* tasks, U taskCount)
 	// Set the out dependencies
 	for(U i = 0; i < taskCount; ++i)
 	{
-		tasks[i].m_outDependency = allocatedTasks - taskCount + i;
+		tasks[i].m_outDependency = static_cast<ThreadHiveDependencyHandle>(&htasks[i]);
 	}
 }
 
@@ -199,8 +221,9 @@ void ThreadHive::threadRun(U threadId)
 	{
 		// Run the task
 		ANKI_ASSERT(task && task->m_cb);
+		ANKI_HIVE_DEBUG_PRINT(
+			"tid: %lu will exec %p (udata: %p)\n", threadId, static_cast<void*>(task), static_cast<void*>(task->m_arg));
 		task->m_cb(task->m_arg, threadId, *this);
-		ANKI_HIVE_DEBUG_PRINT("tid: %lu executed\n", threadId);
 	}
 
 	ANKI_HIVE_DEBUG_PRINT("tid: %lu thread quits!\n", threadId);
@@ -243,46 +266,43 @@ ThreadHive::Task* ThreadHive::getNewTask()
 	Task* task = m_head;
 	while(task)
 	{
-		if(!task->done())
+		ANKI_ASSERT(!task->done());
+
+		// Check if there are dependencies
+		Bool allDepsCompleted = true;
+		for(U j = 0; j < task->m_depCount; ++j)
 		{
-			// We may have a candiate
+			Task* depTask = task->m_deps[j];
 
-			// Check if there are dependencies
-			Bool allDepsCompleted = true;
-			for(U j = 0; j < task->m_depCount; ++j)
+			if(!depTask->done())
 			{
-				U dep = task->m_deps[j];
-
-				if(!m_storage[dep].done())
-				{
-					allDepsCompleted = false;
-					break;
-				}
+				allDepsCompleted = false;
+				break;
 			}
+		}
 
-			if(allDepsCompleted)
+		if(allDepsCompleted)
+		{
+			// Found something, pop it
+			if(prevTask)
 			{
-				// Found something, pop it
-				if(prevTask)
-				{
-					prevTask->m_next = task->m_next;
-				}
+				prevTask->m_next = task->m_next;
+			}
 
-				if(m_head == task)
-				{
-					m_head = task->m_next;
-				}
+			if(m_head == task)
+			{
+				m_head = task->m_next;
+			}
 
-				if(m_tail == task)
-				{
-					m_tail = prevTask;
-				}
+			if(m_tail == task)
+			{
+				m_tail = prevTask;
+			}
 
 #if ANKI_EXTRA_CHECKS
-				task->m_next = nullptr;
+			task->m_next = nullptr;
 #endif
-				break;
-			}
+			break;
 		}
 
 		prevTask = task;
@@ -306,6 +326,7 @@ void ThreadHive::waitAllTasks()
 	m_tail = nullptr;
 	m_allocatedTasks = 0;
 	m_allocatedDeps = 0;
+	m_alloc.getMemoryPool().reset();
 
 	ANKI_HIVE_DEBUG_PRINT("mt: done waiting all\n");
 }

+ 4 - 7
src/anki/util/ThreadHive.h

@@ -19,7 +19,7 @@ class ThreadHive;
 /// @{
 
 /// Opaque handle that defines a ThreadHive depedency. @memberof ThreadHive
-using ThreadHiveDependencyHandle = U16;
+using ThreadHiveDependencyHandle = void*;
 
 /// The callback that defines a ThreadHibe task.
 /// @memberof ThreadHive
@@ -60,7 +60,7 @@ public:
 	}
 
 	/// Submit tasks. The ThreadHiveTaskCallback callbacks can also call this.
-	void submitTasks(ThreadHiveTask* tasks, U taskCount);
+	void submitTasks(ThreadHiveTask* tasks, const U taskCount);
 
 	/// Submit a single task without dependencies. The ThreadHiveTaskCallback callbacks can also call this.
 	void submitTask(ThreadHiveTaskCallback callback, void* arg)
@@ -75,19 +75,16 @@ public:
 	void waitAllTasks();
 
 private:
-	static const U MAX_TASKS_PER_SESSION = 1024 * 4;
-
 	class Thread;
 
 	/// Lightweight task.
 	class Task;
 
-	GenericMemoryPoolAllocator<U8> m_alloc;
+	GenericMemoryPoolAllocator<U8> m_slowAlloc;
+	StackAllocator<U8> m_alloc;
 	Thread* m_threads = nullptr;
 	U32 m_threadCount = 0;
 
-	DynamicArray<Task> m_storage; ///< Task storage.
-	DynamicArray<ThreadHiveDependencyHandle> m_deps; ///< Dependencies storage.
 	Task* m_head = nullptr; ///< Head of the task list.
 	Task* m_tail = nullptr; ///< Tail of the task list.
 	Bool m_quit = false;

+ 4 - 4
tests/framework/Framework.h

@@ -21,10 +21,10 @@ class TestSuite;
 class Test;
 class Tester;
 
-#define ANKI_TEST_LOGI(...) ANKI_LOGI("TESTS: " __VA_ARGS__)
-#define ANKI_TEST_LOGE(...) ANKI_LOGE("TESTS: " __VA_ARGS__)
-#define ANKI_TEST_LOGW(...) ANKI_LOGW("TESTS: " __VA_ARGS__)
-#define ANKI_TEST_LOGF(...) ANKI_LOGF("TESTS: " __VA_ARGS__)
+#define ANKI_TEST_LOGI(...) ANKI_LOG("TEST", NORMAL, __VA_ARGS__)
+#define ANKI_TEST_LOGE(...) ANKI_LOG("TEST", ERROR, __VA_ARGS__)
+#define ANKI_TEST_LOGW(...) ANKI_LOG("TEST", WARNING, __VA_ARGS__)
+#define ANKI_TEST_LOGF(...) ANKI_LOG("TEST", FATAL, __VA_ARGS__)
 
 /// The actual test
 using TestCallback = void (*)(Test&);

+ 78 - 1
tests/util/ThreadHive.cpp

@@ -6,6 +6,7 @@
 #include <tests/framework/Framework.h>
 #include <anki/util/ThreadHive.h>
 #include <anki/util/HighRezTimer.h>
+#include <anki/util/System.h>
 
 namespace anki
 {
@@ -143,7 +144,7 @@ ANKI_TEST(Util, ThreadHive)
 				task.m_callback = (cb) ? incNumber : decNumber;
 				task.m_argument = &ctx;
 
-				if((rand() % 3) == 0 && j > 0)
+				if((rand() % 3) == 0 && j > 0 && dep)
 				{
 					task.m_inDependencies = WeakArray<ThreadHiveDependencyHandle>(&dep, 1);
 				}
@@ -164,4 +165,80 @@ ANKI_TEST(Util, ThreadHive)
 	}
 }
 
+class FibTask
+{
+public:
+	Atomic<U64>* m_sum;
+	StackAllocator<U8> m_alloc;
+	U64 m_n;
+
+	FibTask(Atomic<U64>* sum, StackAllocator<U8>& alloc, U64 n)
+		: m_sum(sum)
+		, m_alloc(alloc)
+		, m_n(n)
+	{
+	}
+
+	void doWork(ThreadHive& hive)
+	{
+		if(m_n > 1)
+		{
+			FibTask* a = m_alloc.newInstance<FibTask>(m_sum, m_alloc, m_n - 1);
+			FibTask* b = m_alloc.newInstance<FibTask>(m_sum, m_alloc, m_n - 2);
+
+			Array<ThreadHiveTask, 2> tasks;
+			tasks[0].m_callback = tasks[1].m_callback = FibTask::callback;
+			tasks[0].m_argument = a;
+			tasks[1].m_argument = b;
+
+			hive.submitTasks(&tasks[0], tasks.getSize());
+		}
+		else
+		{
+			m_sum->fetchAdd(m_n);
+		}
+	}
+
+	static void callback(void* arg, U32, ThreadHive& hive)
+	{
+		static_cast<FibTask*>(arg)->doWork(hive);
+	}
+};
+
+static U64 fib(U64 n)
+{
+	if(n > 1)
+	{
+		return fib(n - 1) + fib(n - 2);
+	}
+	else
+	{
+		return n;
+	}
+}
+
+ANKI_TEST(Util, ThreadHiveBench)
+{
+	static const U FIB_N = 32;
+
+	const U32 threadCount = getCpuCoresCount();
+	HeapAllocator<U8> alloc(allocAligned, nullptr);
+	ThreadHive hive(threadCount, alloc);
+
+	StackAllocator<U8> salloc(allocAligned, nullptr, 1024);
+	Atomic<U64> sum = {0};
+	FibTask task(&sum, salloc, FIB_N);
+
+	HighRezTimer timer;
+	timer.start();
+	hive.submitTask(FibTask::callback, &task);
+
+	hive.waitAllTasks();
+	timer.stop();
+
+	ANKI_TEST_LOGI("Total time %fms", timer.getElapsedTime() * 1000.0);
+	const U64 serialFib = fib(FIB_N);
+	ANKI_TEST_EXPECT_EQ(sum.get(), serialFib);
+}
+
 } // end namespace anki