Browse Source

Some ThreadHive refactoring

Panagiotis Christopoulos Charitos 9 years ago
parent
commit
044cbaa792
4 changed files with 69 additions and 41 deletions
  1. 9 24
      include/anki/util/ThreadHive.h
  2. 1 1
      sandbox/Main.cpp
  3. 58 15
      src/util/ThreadHive.cpp
  4. 1 1
      tests/util/ThreadHive.cpp

+ 9 - 24
include/anki/util/ThreadHive.h

@@ -14,7 +14,6 @@ namespace anki
 
 // Forward
 class ThreadHive;
-class ThreadHiveThread;
 
 /// @addtogroup util_thread
 /// @{
@@ -51,8 +50,6 @@ public:
 /// completely independent.
 class ThreadHive : public NonCopyable
 {
-	friend class ThreadHiveThread;
-
 public:
 	/// Create the hive.
 	ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc);
@@ -81,39 +78,25 @@ public:
 	void waitAllTasks();
 
 private:
-	static const U MAX_DEPS = 15;
-
-	/// Lightweight task.
-	class Task
-	{
-	public:
-		Task* m_next; ///< Next in the list.
+	static const U MAX_TASKS_PER_SESSION = 1024 * 2;
 
-		ThreadHiveTaskCallback m_cb; ///< Callback that defines the task.
-		void* m_arg; ///< Args for the callback.
+	class Thread;
 
-		U8 m_depCount;
-		Bool8 m_othersDepend; ///< Other tasks depend on this one.
-		Array<ThreadHiveDependencyHandle, MAX_DEPS> m_deps;
-
-		Bool done() const
-		{
-			return m_cb == nullptr;
-		}
-	};
-
-	static_assert(sizeof(Task) == (sizeof(void*) * 3 + 32), "Wrong size");
+	/// Lightweight task.
+	class Task;
 
 	GenericMemoryPoolAllocator<U8> m_alloc;
-	ThreadHiveThread* m_threads = nullptr;
+	Thread* m_threads = nullptr;
 	U32 m_threadCount = 0;
 
 	DArray<Task> m_storage; ///< Task storage.
+	DArray<ThreadHiveDependencyHandle> m_deps; ///< Dependencies storage.
 	Task* m_head = nullptr; ///< Head of the task list.
 	Task* m_tail = nullptr; ///< Tail of the task list.
 	Bool m_quit = false;
 	U32 m_pendingTasks = 0;
 	U32 m_allocatedTasks = 0;
+	U32 m_allocatedDeps = 0;
 
 	Mutex m_mtx;
 	ConditionVariable m_cvar;
@@ -128,6 +111,8 @@ private:
 
 	/// Complete a task.
 	void completeTask(U taskId);
+
+	/// Inject dummy depedency tasks if there are
 };
 /// @}
 

+ 1 - 1
sandbox/Main.cpp

@@ -76,7 +76,7 @@ Error MyApp::init(int argc, char* argv[])
 //==============================================================================
 Error MyApp::userMainLoop(Bool& quit)
 {
-	F32 dist = 0.05;
+	F32 dist = 0.1;
 	F32 ang = toRad(2.5);
 	F32 scale = 0.01;
 	F32 mouseSensivity = 9.0;

+ 58 - 15
src/util/ThreadHive.cpp

@@ -22,15 +22,18 @@ namespace anki
 #define ANKI_HIVE_DEBUG_PRINT(...) ((void)0)
 #endif
 
-class ThreadHiveThread
+//==============================================================================
+// ThreadHive::Thread                                                          =
+//==============================================================================
+class ThreadHive::Thread
 {
 public:
 	U32 m_id; ///< An ID
-	Thread m_thread; ///< Runs the workingFunc
+	anki::Thread m_thread; ///< Runs the workingFunc
 	ThreadHive* m_hive;
 
 	/// Constructor
-	ThreadHiveThread(U32 id, ThreadHive* hive)
+	Thread(U32 id, ThreadHive* hive)
 		: m_id(id)
 		, m_thread("anki_threadhive")
 		, m_hive(hive)
@@ -41,16 +44,43 @@ public:
 
 private:
 	/// Thread callaback
-	static Error threadCallback(Thread::Info& info)
+	static Error threadCallback(anki::Thread::Info& info)
 	{
-		ThreadHiveThread& self =
-			*reinterpret_cast<ThreadHiveThread*>(info.m_userData);
+		Thread& self = *reinterpret_cast<Thread*>(info.m_userData);
 
 		self.m_hive->threadRun(self.m_id);
 		return ErrorCode::NONE;
 	}
 };
 
+//==============================================================================
+// ThreadHive::Task                                                            =
+//==============================================================================
+class ThreadHive::Task
+{
+public:
+	Task* m_next; ///< Next in the list.
+
+	ThreadHiveTaskCallback m_cb; ///< Callback that defines the task.
+	void* m_arg; ///< Args for the callback.
+
+	U16* m_deps;
+	U16 m_depCount;
+	Bool8 m_othersDepend; ///< Other tasks depend on this one.
+
+	Task()
+	{
+	}
+
+	Task(const Task& b) = delete;
+	Task& operator=(const Task& b) = delete;
+
+	Bool done() const
+	{
+		return m_cb == nullptr;
+	}
+};
+
 //==============================================================================
 // ThreadHive                                                                  =
 //==============================================================================
@@ -60,20 +90,22 @@ ThreadHive::ThreadHive(U threadCount, GenericMemoryPoolAllocator<U8> alloc)
 	: m_alloc(alloc)
 	, m_threadCount(threadCount)
 {
-	m_threads = reinterpret_cast<ThreadHiveThread*>(
-		alloc.allocate(sizeof(ThreadHiveThread) * threadCount));
+	m_threads =
+		reinterpret_cast<Thread*>(alloc.allocate(sizeof(Thread) * threadCount));
 	for(U i = 0; i < threadCount; ++i)
 	{
-		::new(&m_threads[i]) ThreadHiveThread(i, this);
+		::new(&m_threads[i]) Thread(i, this);
 	}
 
-	m_storage.create(m_alloc, 1024 * 2);
+	m_storage.create(m_alloc, MAX_TASKS_PER_SESSION);
+	m_deps.create(m_alloc, MAX_TASKS_PER_SESSION * threadCount);
 }
 
 //==============================================================================
 ThreadHive::~ThreadHive()
 {
 	m_storage.destroy(m_alloc);
+	m_deps.destroy(m_alloc);
 
 	if(m_threads)
 	{
@@ -91,11 +123,11 @@ ThreadHive::~ThreadHive()
 		{
 			Error err = m_threads[threadCount].m_thread.join();
 			(void)err;
-			m_threads[threadCount].~ThreadHiveThread();
+			m_threads[threadCount].~Thread();
 		}
 
-		m_alloc.deallocate(static_cast<void*>(m_threads),
-			m_threadCount * sizeof(ThreadHiveThread));
+		m_alloc.deallocate(
+			static_cast<void*>(m_threads), m_threadCount * sizeof(Thread));
 	}
 }
 
@@ -113,6 +145,7 @@ void ThreadHive::submitTasks(ThreadHiveTask* tasks, U taskCount)
 		for(U i = 0; i < taskCount; ++i)
 		{
 			const auto& inTask = tasks[i];
+
 			Task& outTask = m_storage[m_allocatedTasks];
 
 			outTask.m_cb = inTask.m_callback;
@@ -122,8 +155,17 @@ void ThreadHive::submitTasks(ThreadHiveTask* tasks, U taskCount)
 			outTask.m_othersDepend = false;
 
 			// Set the dependencies
-			ANKI_ASSERT(inTask.m_inDependencies.getSize() <= MAX_DEPS
-				&& "For now only limited deps");
+			if(inTask.m_inDependencies.getSize() > 0)
+			{
+				outTask.m_deps = &m_deps[m_allocatedDeps];
+				m_allocatedDeps += inTask.m_inDependencies.getSize();
+				ANKI_ASSERT(m_allocatedDeps <= m_deps.getSize());
+			}
+			else
+			{
+				outTask.m_deps = nullptr;
+			}
+
 			for(U j = 0; j < inTask.m_inDependencies.getSize(); ++j)
 			{
 				ThreadHiveDependencyHandle dep = inTask.m_inDependencies[j];
@@ -286,6 +328,7 @@ void ThreadHive::waitAllTasks()
 	m_head = nullptr;
 	m_tail = nullptr;
 	m_allocatedTasks = 0;
+	m_allocatedDeps = 0;
 
 	ANKI_HIVE_DEBUG_PRINT("mt: done waiting all\n");
 }

+ 1 - 1
tests/util/ThreadHive.cpp

@@ -138,7 +138,7 @@ ANKI_TEST(Util, ThreadHive)
 		ThreadHiveDependencyHandle dep = 0;
 
 		const U SUBMISSION_COUNT = 100;
-		const U TASK_COUNT = 100;
+		const U TASK_COUNT = 1000;
 		for(U i = 0; i < SUBMISSION_COUNT; ++i)
 		{
 			for(U j = 0; j < TASK_COUNT; ++j)