Browse Source

Working on the ThreadHive. Some additional tracing

Panagiotis Christopoulos Charitos 9 years ago
parent
commit
7b38f30640

+ 4 - 0
include/anki/core/Trace.h

@@ -38,6 +38,10 @@ enum class TraceEventType
 	RENDER_DRAWER,
 	RENDERER_COMMAND_BUFFER_BUILDING,
 	GL_THREAD,
+	GL_2ND_LEVEL_CMD_BUFFER,
+	GL_BIND_RESOURCES,
+	GL_BIND_PPLINE,
+	GL_CMD_BUFFER_DESTROY,
 	SWAP_BUFFERS,
 	BARRIER_WAIT,
 

+ 68 - 0
include/anki/util/DArray.h

@@ -360,6 +360,74 @@ private:
 		b.m_size = 0;
 	}
 };
+
+/// Same as SArray but copyable.
+template<typename T>
+class WArray : public DArray<T>
+{
+public:
+	using Base = DArray<T>;
+	using Value = T;
+
+	WArray()
+		: Base()
+	{
+	}
+
+	WArray(T* mem, PtrSize size)
+		: Base()
+	{
+		if(size)
+		{
+			ANKI_ASSERT(mem);
+		}
+
+		Base::m_data = mem;
+		Base::m_size = size;
+	}
+
+	/// Copy.
+	WArray(const WArray& b)
+		: Base::m_data(b.m_data)
+		, Base::m_size(b.m_size)
+	{
+	}
+
+	/// Move.
+	WArray(WArray&& b)
+		: Base::m_data(b.m_data)
+		, Base::m_size(b.m_size)
+	{
+		b.m_data = nullptr;
+		b.m_size = 0;
+	}
+
+	~WArray()
+	{
+#if ANKI_ASSERTIONS
+		Base::m_data = nullptr;
+		Base::m_size = 0;
+#endif
+	}
+
+	/// Copy.
+	WArray& operator=(const WArray& b)
+	{
+		Base::m_data = b.m_data;
+		Base::m_size = b.m_size;
+		return *this;
+	}
+
+	/// Move.
+	WArray& operator=(WArray&& b)
+	{
+		Base::m_data = b.m_data;
+		b.m_data = nullptr;
+		Base::m_size = b.m_size;
+		b.m_size = 0;
+		return *this;
+	}
+};
 /// @}
 
 } // end namespace anki

+ 102 - 2
include/anki/util/ThreadHive.h

@@ -5,16 +5,116 @@
 
 #pragma once
 
-#include <anki/util/ThreadPool.h>
+#include <anki/util/Thread.h>
+#include <anki/util/DArray.h>
+#include <anki/util/Allocator.h>
 
 namespace anki
 {
 
+// Forward
+class ThreadHive;
+class ThreadHiveThread;
+
 /// @addtogroup util_thread
 /// @{
 
-class ThreadHive
+using ThreadHiveDependencyHandle = U16;
+
+using ThreadHiveTaskCallback = void (*)(void*, U32 threadId, ThreadHive& hive);
+
+/// Task for the ThreadHive.
+class ThreadHiveTask
 {
+public:
+	/// What this task will do.
+	ThreadHiveTaskCallback m_callback ANKI_DBG_NULLIFY_PTR;
+
+	/// Arguments to pass to the m_callback.
+	void* m_argument ANKI_DBG_NULLIFY_PTR;
+
+	/// The tasks that this task will depend on.
+	WArray<ThreadHiveDependencyHandle> m_inDependencies;
+
+	/// Will be filled after the submission of the task. Can be used to set
+	/// dependencies to future tasks.
+	ThreadHiveDependencyHandle m_outDependency;
+};
+
+/// A scheduler of small tasks. It takes tasks to be executed and schedules them
+/// in one of the threads.
+class ThreadHive : public NonCopyable
+{
+	friend class ThreadHiveThread;
+
+public:
+	/// Create the hive.
+	ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc);
+
+	~ThreadHive();
+
+	/// Submit tasks. The ThreadHiveTaskCallback callbacks can also call this.
+	void submitTasks(ThreadHiveTask* tasks, U taskCount);
+
+	/// Submit a single task without dependencies. The ThreadHiveTaskCallback
+	/// callbacks can also call this.
+	void submitTask(ThreadHiveTaskCallback callback, void* arg)
+	{
+		ThreadHiveTask task;
+		task.m_callback = callback;
+		task.m_argument = arg;
+		submitTasks(&task, 1);
+	}
+
+	/// Wait for all tasks to finish. Will block.
+	void waitAllTasks();
+
+private:
+	static const U MAX_DEPS = 4;
+
+	/// Lightweight task.
+	class Task
+	{
+	public:
+		ThreadHiveTaskCallback m_cb;
+		void* m_arg;
+
+		union
+		{
+			Array<ThreadHiveDependencyHandle, MAX_DEPS> m_deps;
+			U64 m_depsU64;
+		};
+
+		Bool done() const
+		{
+			return m_cb == nullptr;
+		}
+	};
+
+	static_assert(sizeof(Task) == sizeof(void*) * 2 + 8, "Too big");
+
+	GenericMemoryPoolAllocator<U8> m_alloc;
+	ThreadHiveThread* m_threads = nullptr;
+	U32 m_threadCount = 0;
+
+	DArray<Task> m_queue; ///< Task queue.
+	I32 m_head = 0; ///< Head of m_queue.
+	I32 m_tail = -1; ///< Tail of m_queue.
+	U64 m_workingThreadsMask = 0; ///< Mask with the threads that have work.
+	Bool m_quit = false;
+	U64 m_waitingThreadsMask = 0;
+
+	Mutex m_mtx; ///< Protect the queue
+	ConditionVariable m_cvar;
+
+	Bool m_mainThreadStopWaiting = false;
+	Mutex m_mainThreadMtx;
+	ConditionVariable m_mainThreadCvar;
+
+	void run(U threadId);
+
+	/// Get new work from the queue.
+	ThreadHiveTaskCallback getNewWork(void*& arg);
 };
 /// @}
 

+ 1 - 2
shaders/LightResources.glsl

@@ -48,8 +48,7 @@ struct ReflectionProbe
 	vec4 cubemapIndexPad3;
 };
 
-layout(
-	std140, row_major, UBO_BINDING(LIGHT_SET, LIGHT_UBO_BINDING)) uniform u0_
+layout(std140, row_major, UBO_BINDING(LIGHT_SET, LIGHT_UBO_BINDING)) uniform u0_
 {
 	LightingUniforms u_lightingUniforms;
 };

+ 4 - 0
src/core/Trace.cpp

@@ -31,6 +31,10 @@ static Array<const char*, U(TraceEventType::COUNT)> eventNames = {
 		"RENDER_DRAWER",
 		"RENDERER_COMMAND_BUFFER_BUILDING",
 		"GL_THREAD",
+		"GL_2ND_LEVEL_CMD_BUFFER",
+		"GL_BIND_RESOURCES",
+		"GL_BIND_PPLINE",
+		"GL_CMD_BUFFER_DESTROY",
 		"SWAP_BUFFERS",
 		"BARRIER_WAIT"}};
 

+ 9 - 1
src/gr/gl/CommandBuffer.cpp

@@ -163,6 +163,8 @@ public:
 
 	Error operator()(GlState& state)
 	{
+		ANKI_TRACE_START_EVENT(GL_BIND_PPLINE);
+
 		PipelineImpl& impl = m_ppline->getImplementation();
 
 		auto name = impl.getGlName();
@@ -172,6 +174,7 @@ public:
 			state.m_crntPpline = name;
 		}
 
+		ANKI_TRACE_STOP_EVENT(GL_BIND_PPLINE);
 		return ErrorCode::NONE;
 	}
 };
@@ -239,7 +242,9 @@ public:
 
 	Error operator()(GlState& state)
 	{
+		ANKI_TRACE_START_EVENT(GL_BIND_RESOURCES);
 		m_rc->getImplementation().bind(m_slot, m_dynInfo, state);
+		ANKI_TRACE_STOP_EVENT(GL_BIND_RESOURCES);
 		return ErrorCode::NONE;
 	}
 };
@@ -576,7 +581,10 @@ public:
 
 	Error operator()(GlState&)
 	{
-		return m_cmdb->getImplementation().executeAllCommands();
+		ANKI_TRACE_START_EVENT(GL_2ND_LEVEL_CMD_BUFFER);
+		Error err = m_cmdb->getImplementation().executeAllCommands();
+		ANKI_TRACE_STOP_EVENT(GL_2ND_LEVEL_CMD_BUFFER);
+		return err;
 	}
 };
 

+ 5 - 0
src/gr/gl/CommandBufferImpl.cpp

@@ -9,6 +9,7 @@
 #include <anki/gr/gl/RenderingThread.h>
 #include <anki/gr/gl/Error.h>
 #include <anki/util/Logger.h>
+#include <anki/core/Trace.h>
 #include <cstring>
 
 namespace anki
@@ -30,6 +31,8 @@ void CommandBufferImpl::init(const InitHints& hints)
 //==============================================================================
 void CommandBufferImpl::destroy()
 {
+	ANKI_TRACE_START_EVENT(GL_CMD_BUFFER_DESTROY);
+
 #if ANKI_DEBUG
 	if(!m_executed && m_firstCommand)
 	{
@@ -50,6 +53,8 @@ void CommandBufferImpl::destroy()
 		&& "Someone is holding a reference to the command buffer's allocator");
 
 	m_alloc = CommandBufferAllocator<U8>();
+
+	ANKI_TRACE_STOP_EVENT(GL_CMD_BUFFER_DESTROY);
 }
 
 //==============================================================================

+ 1 - 1
src/util/CMakeLists.txt

@@ -1,4 +1,4 @@
-set(ANKI_UTIL_SOURCES Assert.cpp Functions.cpp File.cpp Filesystem.cpp Memory.cpp System.cpp HighRezTimer.cpp ThreadPool.cpp Hash.cpp Logger.cpp String.cpp)
+set(ANKI_UTIL_SOURCES Assert.cpp Functions.cpp File.cpp Filesystem.cpp Memory.cpp System.cpp HighRezTimer.cpp ThreadPool.cpp ThreadHive.cpp Hash.cpp Logger.cpp String.cpp)
 
 if(LINUX OR ANDROID OR MACOS)
 	set(ANKI_UTIL_SOURCES ${ANKI_UTIL_SOURCES} HighRezTimerPosix.cpp FilesystemPosix.cpp ThreadPosix.cpp)

+ 282 - 0
src/util/ThreadHive.cpp

@@ -0,0 +1,282 @@
+// Copyright (C) 2009-2016, Panagiotis Christopoulos Charitos and contributors.
+// All rights reserved.
+// Code licensed under the BSD License.
+// http://www.anki3d.org/LICENSE
+
+#include <anki/util/ThreadHive.h>
+#include <cstring>
+#include <cstdio>
+
+namespace anki
+{
+
+//==============================================================================
+// Misc                                                                        =
+//==============================================================================
+
+#define ANKI_ENABLE_HIVE_DEBUG_PRINT 1
+
+#if ANKI_ENABLE_HIVE_DEBUG_PRINT
+#define ANKI_HIVE_DEBUG_PRINT(...) printf(__VA_ARGS__)
+#else
+#define ANKI_HIVE_DEBUG_PRINT(...) ((void)0)
+#endif
+
+class ThreadHiveThread
+{
+public:
+	U32 m_id; ///< An ID
+	Thread m_thread; ///< Runs the workingFunc
+	ThreadHive* m_hive;
+
+	/// Constructor
+	ThreadHiveThread(U32 id, ThreadHive* hive)
+		: m_id(id)
+		, m_thread("anki_threadhive")
+		, m_hive(hive)
+	{
+		ANKI_ASSERT(hive);
+		m_thread.start(this, threadCallback);
+	}
+
+private:
+	/// Thread callaback
+	static Error threadCallback(Thread::Info& info)
+	{
+		ThreadHiveThread& self =
+			*reinterpret_cast<ThreadHiveThread*>(info.m_userData);
+
+		self.m_hive->run(self.m_id);
+		return ErrorCode::NONE;
+	}
+};
+
+//==============================================================================
+// ThreadHive                                                                  =
+//==============================================================================
+
+//==============================================================================
+ThreadHive::ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc)
+	: m_alloc(alloc)
+	, m_threadCount(threadCount)
+{
+	m_threads = reinterpret_cast<ThreadHiveThread*>(
+		alloc.allocate(sizeof(ThreadHiveThread) * threadCount));
+	for(U i = 0; i < threadCount; ++i)
+	{
+		::new(&m_threads[i]) ThreadHiveThread(i, this);
+	}
+
+	m_queue.create(m_alloc, 1024);
+}
+
+//==============================================================================
+ThreadHive::~ThreadHive()
+{
+	m_queue.destroy(m_alloc);
+
+	if(m_threads)
+	{
+		{
+			LockGuard<Mutex> lock(m_mtx);
+			m_quit = true;
+
+			// Wake the threads
+			m_cvar.notifyAll();
+		}
+
+		// Join and destroy
+		U threadCount = m_threadCount;
+		while(threadCount-- != 0)
+		{
+			Error err = m_threads[threadCount].m_thread.join();
+			(void)err;
+			m_threads[threadCount].~ThreadHiveThread();
+		}
+
+		m_alloc.deallocate(static_cast<void*>(m_threads),
+			m_threadCount * sizeof(ThreadHiveThread));
+	}
+}
+
+//==============================================================================
+void ThreadHive::submitTasks(ThreadHiveTask* tasks, U taskCount)
+{
+	ANKI_ASSERT(tasks && taskCount > 0);
+
+	// Create the tasks to temp memory to decrease thread contention
+	Array<Task, 64> tempTasks;
+	for(U i = 0; i < taskCount; ++i)
+	{
+		tempTasks[i].m_cb = tasks[i].m_callback;
+		tempTasks[i].m_arg = tasks[i].m_argument;
+		tempTasks[i].m_depsU64 = 0;
+
+		ANKI_ASSERT(tasks[i].m_inDependencies.getSize() <= MAX_DEPS
+			&& "For now only limited deps");
+		for(U j = 0; j < tasks[i].m_inDependencies.getSize(); ++j)
+		{
+			tempTasks[i].m_deps[j] = tasks[i].m_inDependencies[j];
+		}
+	}
+
+	// Push work
+	I firstTaskIdx;
+
+	{
+		LockGuard<Mutex> lock(m_mtx);
+
+		// "Allocate" storage for tasks
+		firstTaskIdx = m_tail + 1;
+		m_tail += taskCount;
+
+		// Store tasks
+		memcpy(&m_queue[firstTaskIdx], &tempTasks[0], sizeof(Task) * taskCount);
+
+		// Notify all threads
+		m_cvar.notifyAll();
+	}
+
+	// Set the out dependencies
+	for(U i = 0; i < taskCount; ++i)
+	{
+		tasks[i].m_outDependency = firstTaskIdx + i;
+	}
+}
+
+//==============================================================================
+void ThreadHive::run(U threadId)
+{
+	U64 threadMask = 1 << threadId;
+
+	while(1)
+	{
+		// Wait for something
+		ThreadHiveTaskCallback cb;
+		void* arg;
+		Bool quit;
+
+		{
+			LockGuard<Mutex> lock(m_mtx);
+
+			ANKI_HIVE_DEBUG_PRINT("tid: %lu locking\n", threadId);
+
+			while(!m_quit && (cb = getNewWork(arg)) == nullptr)
+			{
+				ANKI_HIVE_DEBUG_PRINT("tid: %lu waiting, cb %p\n", 
+					threadId, 
+					reinterpret_cast<void*>(cb));
+
+				m_waitingThreadsMask |= threadMask;
+
+				if(__builtin_popcount(m_waitingThreadsMask) == m_threadCount)
+				{
+					ANKI_HIVE_DEBUG_PRINT("tid: %lu all threads done. 0x%lu\n", 
+						threadId, 
+						m_waitingThreadsMask);
+					LockGuard<Mutex> lock2(m_mainThreadMtx);
+
+					m_mainThreadStopWaiting = true;
+
+					// Everyone is waiting. Wake the main thread
+					m_mainThreadCvar.notifyOne();
+				}
+
+				// Wait if there is no work.
+				m_cvar.wait(m_mtx);
+			}
+
+			m_waitingThreadsMask &= ~threadMask;
+			quit = m_quit;
+		}
+
+		if(quit)
+		{
+			break;
+		}
+
+		// Run the task
+		cb(arg, threadId, *this);
+		ANKI_HIVE_DEBUG_PRINT("dit: %lu executed\n", threadId);
+	}
+
+	ANKI_HIVE_DEBUG_PRINT("dit: %lu thread quits!\n", threadId);
+}
+
+//==============================================================================
+ThreadHiveTaskCallback ThreadHive::getNewWork(void*& arg)
+{
+	ThreadHiveTaskCallback cb = nullptr;
+
+	for(I i = m_head; cb == nullptr && i <= m_tail; ++i)
+	{
+		Task& task = m_queue[i];
+		if(!task.done())
+		{
+			// We may have a candiate
+
+			// Check if there are dependencies
+			Bool allDepsCompleted = true;
+			if(task.m_depsU64 != 0)
+			{
+				for(U j = 0; j < MAX_DEPS; ++j)
+				{
+					I32 dep = task.m_deps[j];
+
+					if(dep < m_head || dep > m_tail || !m_queue[dep].done())
+					{
+						allDepsCompleted = false;
+						break;
+					}
+				}
+			}
+
+			if(allDepsCompleted)
+			{
+				// Found something
+				cb = task.m_cb;
+				arg = task.m_arg;
+
+				// "Complete" the task
+				task.m_cb = nullptr;
+
+				if(ANKI_UNLIKELY(m_head == m_tail))
+				{
+					// Reset it
+					m_head = 0;
+					m_tail = -1;
+				}
+				else if(i == m_head)
+				{
+					// Pop front
+					++m_head;
+				}
+				else if(i == m_tail)
+				{
+					// Pop back
+					--m_tail;
+				}
+			}
+		}
+	}
+
+	return cb;
+}
+
+//==============================================================================
+void ThreadHive::waitAllTasks()
+{
+	ANKI_HIVE_DEBUG_PRINT("mt: waiting all\n");
+	LockGuard<Mutex> lock(m_mainThreadMtx);
+
+	while(!m_mainThreadStopWaiting)
+	{
+		m_mainThreadCvar.wait(m_mainThreadMtx);
+	}
+
+	m_mainThreadStopWaiting = false;
+
+	ANKI_HIVE_DEBUG_PRINT("mt: done waiting all\n");
+}
+
+} // end namespace anki

+ 2 - 2
tests/util/Memory.cpp

@@ -6,7 +6,7 @@
 #include "tests/framework/Framework.h"
 #include "tests/util/Foo.h"
 #include "anki/util/Memory.h"
-#include "anki/util/Thread.h"
+#include "anki/util/ThreadPool.h"
 #include <type_traits>
 #include <cstring>
 
@@ -97,7 +97,7 @@ ANKI_TEST(Util, StackMemoryPool)
 		const U ALLOC_SIZE = 25;
 		ThreadPool threadPool(THREAD_COUNT);
 
-		class AllocateTask : public ThreadPool::Task
+		class AllocateTask : public ThreadPoolTask
 		{
 		public:
 			StackMemoryPool* m_pool = nullptr;

+ 2 - 1
tests/util/Thread.cpp

@@ -7,6 +7,7 @@
 #include "anki/util/Thread.h"
 #include "anki/util/StdTypes.h"
 #include "anki/util/HighRezTimer.h"
+#include "anki/util/ThreadPool.h"
 #include <cstring>
 
 namespace anki
@@ -103,7 +104,7 @@ ANKI_TEST(Util, Mutex)
 //==============================================================================
 
 /// Struct for our tests
-struct TestJobTP : ThreadPool::Task
+struct TestJobTP : ThreadPoolTask
 {
 	U32 in = 0;
 	U32 iterations = 0;

+ 116 - 0
tests/util/ThreadHive.cpp

@@ -0,0 +1,116 @@
+// Copyright (C) 2009-2016, Panagiotis Christopoulos Charitos and contributors.
+// All rights reserved.
+// Code licensed under the BSD License.
+// http://www.anki3d.org/LICENSE
+
+#include <tests/framework/Framework.h>
+#include <anki/util/ThreadHive.h>
+#include <chrono>
+#include <thread>
+
+namespace anki
+{
+
+class ThreadHiveTestContext
+{
+public:
+	ThreadHiveTestContext()
+	{
+	}
+
+	~ThreadHiveTestContext()
+	{
+	}
+
+	union
+	{
+		Atomic<U32> m_countAtomic;
+		U32 m_count;
+	};
+};
+
+//==============================================================================
+static void decNumber(void* arg, U32, ThreadHive& hive)
+{
+	ThreadHiveTestContext* ctx = static_cast<ThreadHiveTestContext*>(arg);
+	ctx->m_countAtomic.fetchSub(2);
+}
+
+//==============================================================================
+static void incNumber(void* arg, U32, ThreadHive& hive)
+{
+	ThreadHiveTestContext* ctx = static_cast<ThreadHiveTestContext*>(arg);
+	ctx->m_countAtomic.fetchAdd(4);
+
+	hive.submitTask(decNumber, arg);
+}
+
+//==============================================================================
+static void taskToWaitOn(void* arg, U32, ThreadHive& hive)
+{
+	ThreadHiveTestContext* ctx = static_cast<ThreadHiveTestContext*>(arg);
+	std::this_thread::sleep_for(std::chrono::seconds(1));
+	ctx->m_count = 10;
+	std::this_thread::sleep_for(std::chrono::seconds(1));
+}
+
+//==============================================================================
+static void taskToWait(void* arg, U32, ThreadHive& hive)
+{
+	ThreadHiveTestContext* ctx = static_cast<ThreadHiveTestContext*>(arg);
+	ANKI_TEST_EXPECT_EQ(ctx->m_count, 10);
+}
+
+//==============================================================================
+ANKI_TEST(Util, ThreadHive)
+{
+	const U32 threadCount = 4;
+	HeapAllocator<U8> alloc(allocAligned, nullptr);
+	ThreadHive hive(threadCount, alloc);
+
+	// Simple test
+	{
+		ThreadHiveTestContext ctx;
+		ctx.m_countAtomic.set(0);
+		const U INITIAL_TASK_COUNT = 10;
+
+		for(U i = 0; i < INITIAL_TASK_COUNT; ++i)
+		{
+			hive.submitTask(incNumber, &ctx);
+		}
+
+		hive.waitAllTasks();
+
+		ANKI_TEST_EXPECT_EQ(ctx.m_countAtomic.get(), INITIAL_TASK_COUNT * 2);
+	}
+
+	// Depedency tests
+	if(0)
+	{
+		ThreadHiveTestContext ctx;
+		ctx.m_count = 0;
+
+		ThreadHiveTask task;
+		task.m_callback = taskToWaitOn;
+		task.m_argument = &ctx;
+
+		hive.submitTasks(&task, 1);
+
+		const U DEP_TASKS = 10;
+		ThreadHiveTask dtasks[DEP_TASKS];
+
+		for(U i = 0; i < DEP_TASKS; ++i)
+		{
+			dtasks[i].m_callback = taskToWait;
+			dtasks[i].m_argument = &ctx;
+			dtasks[i].m_inDependencies =
+				WArray<ThreadHiveDependencyHandle>(&task.m_outDependency, 1);
+		}
+
+		hive.submitTasks(&dtasks[0], DEP_TASKS);
+
+		hive.waitAllTasks();
+	}
+}
+
+} // end namespace anki