Procházet zdrojové kódy

Thread refactoring

Panagiotis Christopoulos Charitos před 12 roky
rodič
revize
20ebc211eb

+ 2 - 2
bench/Main.cpp

@@ -22,7 +22,7 @@
 #include "anki/resource/Skin.h"
 #include "anki/resource/ShaderProgramPrePreprocessor.h"
 #include "anki/resource/Material.h"
-#include "anki/core/ThreadPool.h"
+#include "anki/core/Threadpool.h"
 #include "anki/core/NativeWindow.h"
 #include "anki/core/Counters.h"
 #include "anki/Scene.h"
@@ -154,7 +154,7 @@ void initSubsystems()
 	StdinListenerSingleton::get().start();
 
 	// Parallel jobs
-	ThreadPoolSingleton::get().init(getCpuCoresCount());
+	ThreadpoolSingleton::get().init(getCpuCoresCount());
 }
 
 //==============================================================================

+ 1 - 1
include/anki/Util.h

@@ -4,7 +4,7 @@
 #include "anki/util/Allocator.h"
 #include "anki/util/Array.h"
 #include "anki/util/Assert.h"
-#include "anki/util/Barrier.h"
+#include "anki/util/Thread.h"
 #include "anki/util/Bitset.h"
 #include "anki/util/Dictionary.h"
 #include "anki/util/DynamicArray.h"

+ 0 - 135
include/anki/core/ThreadPool.h

@@ -1,135 +0,0 @@
-#ifndef ANKI_CORE_PARALLEL_MANAGER_H
-#define ANKI_CORE_PARALLEL_MANAGER_H
-
-#include "anki/util/Barrier.h"
-#include "anki/util/Singleton.h"
-#include "anki/util/Vector.h"
-#include "anki/util/StdTypes.h"
-#include <thread>
-
-namespace anki {
-
-/// Debug flag to disable threadpool threading capabilities
-#define ANKI_DISABLE_THREADPOOL_THREADING 0
-
-// Forward
-class ThreadPool;
-
-/// A job assignment for a ThreadWorker
-struct ThreadJob
-{
-	virtual ~ThreadJob()
-	{}
-
-	virtual void operator()(U threadId, U threadsCount) = 0;
-
-	/// Chose a starting and end index
-	void choseStartEnd(U threadId, U threadsCount, U64 elementsCount,
-		U64& start, U64& end)
-	{
-		start = threadId * (elementsCount / threadsCount);
-		end = (threadId == threadsCount - 1)
-			? elementsCount
-			: start + elementsCount / threadsCount;
-	}
-};
-
-/// A dummy job
-struct ThreadJobDummy: ThreadJob
-{
-	void operator()(U threadId, U threadsCount)
-	{
-		(void)threadId;
-		(void)threadsCount;
-	}
-};
-
-/// The thread that executes a ThreadJob
-class ThreadWorker
-{
-public:
-	/// Constructor
-	ThreadWorker(U32 id, Barrier* barrier, ThreadPool* threadPool);
-
-	/// @name Accessors
-	/// @{
-	U32 getId() const
-	{
-		return id;
-	}
-	/// @}
-
-	/// Assign new job to the thread
-	void assignNewJob(ThreadJob* job);
-
-private:
-	U32 id = 0; ///< An ID
-	std::thread thread; ///< Runs the workingFunc
-	std::mutex mutex; ///< Protect the ThreadWorker::job
-	std::condition_variable condVar; ///< To wake up the thread
-	Barrier* barrier = nullptr; ///< For synchronization
-	ThreadJob* job = nullptr; ///< Its NULL if there are no pending job
-	ThreadPool* threadPool;
-
-	/// Start thread
-	void start()
-	{
-		thread = std::thread(&ThreadWorker::workingFunc, this);
-	}
-
-	/// Thread loop
-	void workingFunc();
-};
-
-/// Parallel job dispatcher.You feed it with jobs and sends them for
-/// execution in parallel and then waits for all to finish
-class ThreadPool
-{
-public:
-	static constexpr U MAX_THREADS = 32; ///< An absolute limit
-
-	/// Default constructor
-	ThreadPool()
-	{}
-
-	/// Constructor #2
-	ThreadPool(U threadsNum)
-	{
-		init(threadsNum);
-	}
-
-	~ThreadPool();
-
-	/// Init the manager
-	void init(U threadsNum);
-
-	/// Assign a job to a working thread
-	void assignNewJob(U jobId, ThreadJob* job)
-	{
-		jobs[jobId]->assignNewJob(job);
-	}
-
-	/// Wait for all jobs to finish
-	void waitForAllJobsToFinish()
-	{
-#if !ANKI_DISABLE_THREADPOOL_THREADING
-		barrier->wait();
-#endif
-	}
-
-	U32 getThreadsCount() const
-	{
-		return jobs.size();
-	}
-
-private:
-	Vector<ThreadWorker*> jobs; ///< Worker threads
-	std::unique_ptr<Barrier> barrier; ///< Synchronization barrier
-};
-
-/// Singleton
-typedef Singleton<ThreadPool> ThreadPoolSingleton;
-
-} // end namespace anki
-
-#endif

+ 14 - 0
include/anki/core/Threadpool.h

@@ -0,0 +1,14 @@
+#ifndef ANKI_CORE_THREADPOOL_H
+#define ANKI_CORE_THREADPOOL_H
+
+#include "anki/util/Thread.h"
+#include "anki/util/Singleton.h"
+
+namespace anki {
+
+/// Singleton
+typedef Singleton<Threadpool> ThreadpoolSingleton;
+
+} // end namespace anki
+
+#endif

+ 0 - 53
include/anki/core/WorkingThread.h

@@ -1,53 +0,0 @@
-#ifndef ANKI_CORE_WORKING_THREAD_H
-#define ANKI_CORE_WORKING_THREAD_H
-
-#include "anki/util/Barrier.h"
-#include "anki/util/StdTypes.h"
-#include <thread>
-#include <memory>
-
-namespace anki {
-
-/// A job assignment for a WorkingThread
-struct WorkingThreadJob
-{
-	virtual ~WorkingThreadJob()
-	{}
-
-	virtual void operator()(U32 id) = 0;
-};
-
-/// The thread that executes a WorkingThreadJob
-class WorkingThread
-{
-public:
-	/// Constructor
-	WorkingThread(U32 id, const std::shared_ptr<WorkingThreadJob>& job);
-
-	/// @name Accessors
-	/// @{
-	U32 getId() const
-	{
-		return id;
-	}
-	/// @}
-
-private:
-	U32 id = 0; ///< An ID just to identify the thread
-	std::thread thread; ///< Runs the workingFunc
-	std::mutex mutex; ///< Protect the jobDone
-	std::condition_variable condVar; ///< To wake up the thread
-	std::shared_ptr<WorkingThreadJob> job;
-	Bool jobDone = false;
-
-	/// Start thread
-	void start();
-
-	/// Thread loop
-	void workingFunc();
-};
-
-} // end namespace anki
-
-#endif
-

+ 5 - 5
include/anki/scene/Visibility.h

@@ -6,7 +6,7 @@
 #include "anki/scene/SceneNode.h"
 #include "anki/scene/SpatialComponent.h"
 #include "anki/scene/RenderComponent.h"
-#include "anki/core/ThreadPool.h"
+#include "anki/core/Threadpool.h"
 
 namespace anki {
 
@@ -118,13 +118,13 @@ struct MaterialSortFunctor
 };
 
 /// Thread job to short scene nodes by distance
-struct DistanceSortJob: ThreadJob
+struct DistanceSortJob: ThreadpoolTask
 {
 	U nodesCount;
 	VisibilityTestResults::Container::iterator nodes;
 	Vec3 origin;
 
-	void operator()(U /*threadId*/, U /*threadsCount*/)
+	void operator()(ThreadId /*threadId*/, U /*threadsCount*/)
 	{
 		DistanceSortFunctor comp;
 		comp.origin = origin;
@@ -133,12 +133,12 @@ struct DistanceSortJob: ThreadJob
 };
 
 /// Thread job to short renderable scene nodes by material
-struct MaterialSortJob: ThreadJob
+struct MaterialSortJob: ThreadpoolTask
 {
 	U nodesCount;
 	VisibilityTestResults::Container::iterator nodes;
 
-	void operator()(U /*threadId*/, U /*threadsCount*/)
+	void operator()(ThreadId /*threadId*/, U /*threadsCount*/)
 	{
 		std::sort(nodes, nodes + nodesCount, MaterialSortFunctor());
 	}

+ 0 - 52
include/anki/util/Barrier.h

@@ -1,52 +0,0 @@
-#ifndef ANKI_UTIL_BARRIER_H
-#define ANKI_UTIL_BARRIER_H
-
-#include "anki/util/Assert.h"
-#include "anki/util/StdTypes.h"
-#include <thread>
-#include <condition_variable>
-#include <mutex>
-
-namespace anki {
-
-/// A barrier for thread synchronization. It works just like boost::barrier
-class Barrier
-{
-public:
-	Barrier(U32 count_)
-		: threshold(count_), count(count_), generation(0)
-	{
-		ANKI_ASSERT(count_ != 0);
-	}
-
-	Bool wait()
-	{
-		std::unique_lock<std::mutex> lock(mtx);
-		U32 gen = generation;
-
-		if(--count == 0)
-		{
-			generation++;
-			count = threshold;
-			cond.notify_all();
-			return true;
-		}
-
-		while(gen == generation)
-		{
-			cond.wait(lock);
-		}
-		return false;
-	}
-
-private:
-	std::mutex mtx;
-	std::condition_variable cond;
-	U32 threshold;
-	U32 count;
-	U32 generation;
-};
-
-} // end namespace anki
-
-#endif

+ 215 - 0
include/anki/util/Thread.h

@@ -0,0 +1,215 @@
+#ifndef ANKI_UTIL_THREAD_H
+#define ANKI_UTIL_THREAD_H
+
+#include "anki/Config.h"
+#include "anki/util/StdTypes.h"
+#include "anki/util/Array.h"
+#include "anki/util/Vector.h"
+#include <condition_variable>
+#include <mutex>
+#include <thread>
+
+namespace anki {
+
+#define ANKI_DISABLE_THREADPOOL_THREADING 0
+
+class Threadpool;
+
+typedef U32 ThreadId;
+
+/// A barrier for thread synchronization. It works just like boost::barrier
+class Barrier
+{
+public:
+	Barrier(U32 count_)
+		: threshold(count_), count(count_), generation(0)
+	{
+		ANKI_ASSERT(count_ != 0);
+	}
+
+	Bool wait()
+	{
+		std::unique_lock<std::mutex> lock(mtx);
+		U32 gen = generation;
+
+		if(--count == 0)
+		{
+			generation++;
+			count = threshold;
+			cond.notify_all();
+			return true;
+		}
+
+		while(gen == generation)
+		{
+			cond.wait(lock);
+		}
+		return false;
+	}
+
+private:
+	std::mutex mtx;
+	std::condition_variable cond;
+	U32 threshold;
+	U32 count;
+	U32 generation;
+};
+
+/// A task assignment to threads
+struct ThreadTask
+{
+	virtual ~ThreadTask()
+	{}
+
+	virtual void operator()(ThreadId threadId) = 0;
+};
+
+/// This is a thread with 2 sync points
+class DualSyncThread
+{
+public:
+	DualSyncThread(ThreadId threadId_)
+		:	id(threadId_),
+			barriers{{{2}, {2}}},
+			task(nullptr)
+	{}
+
+	/// The thread does not own the task
+	/// @note This operation is not thread safe. Call it between syncs
+	/// @note This class will not own the task_
+	void setTask(ThreadTask* task_)
+	{
+		task = task_;
+	}
+
+	/// Start the thread
+	void start()
+	{
+		thread = std::thread(&DualSyncThread::workingFunc, this);
+	}
+
+	/// Sync with one of the 2 sync points
+	void sync(U syncPoint)
+	{
+		barriers[syncPoint].wait();
+	}
+
+private:
+	ThreadId id;
+	std::thread thread; ///< Runs the workingFunc
+	Array<Barrier, 2> barriers;
+	ThreadTask* task; ///< Its nullptr if there are no pending task
+
+	/// Thread loop
+	void workingFunc();
+};
+
+/// A task assignment for a ThreadpoolThread
+struct ThreadpoolTask
+{
+	virtual ~ThreadpoolTask()
+	{}
+
+	virtual void operator()(ThreadId threadId, U threadsCount) = 0;
+
+	/// Chose a starting and end index
+	void choseStartEnd(ThreadId threadId, PtrSize threadsCount, 
+		PtrSize elementsCount, PtrSize& start, PtrSize& end)
+	{
+		start = threadId * (elementsCount / threadsCount);
+		end = (threadId == threadsCount - 1)
+			? elementsCount
+			: start + elementsCount / threadsCount;
+	}
+};
+
+/// A dummy thread pool task
+struct DummyThreadpoolTask: ThreadpoolTask
+{
+	void operator()(ThreadId threadId, U threadsCount)
+	{
+		(void)threadId;
+		(void)threadsCount;
+	}
+};
+
+/// The thread that executes a ThreadpoolTask
+class ThreadpoolThread
+{
+public:
+	/// Constructor
+	ThreadpoolThread(ThreadId id, Barrier* barrier, Threadpool* threadpool);
+
+	/// Assign new task to the thread
+	/// @note 
+	void assignNewTask(ThreadpoolTask* task);
+
+private:
+	ThreadId id; ///< An ID
+	std::thread thread; ///< Runs the workingFunc
+	std::mutex mutex; ///< Protect the task
+	std::condition_variable condVar; ///< To wake up the thread
+	Barrier* barrier; ///< For synchronization
+	ThreadpoolTask* task; ///< Its NULL if there are no pending task
+	Threadpool* threadpool;
+
+	/// Start thread
+	void start()
+	{
+		thread = std::thread(&ThreadpoolThread::workingFunc, this);
+	}
+
+	/// Thread loop
+	void workingFunc();
+};
+
+/// Parallel task dispatcher.You feed it with tasks and sends them for
+/// execution in parallel and then waits for all to finish
+class Threadpool
+{
+public:
+	static constexpr U MAX_THREADS = 32; ///< An absolute limit
+
+	/// Default constructor
+	Threadpool()
+	{}
+
+	/// Constructor #2
+	Threadpool(U threadsNum)
+	{
+		init(threadsNum);
+	}
+
+	~Threadpool();
+
+	/// Init the manager
+	void init(U threadsNum);
+
+	/// Assign a task to a working thread
+	void assignNewTask(U taskId, ThreadpoolTask* task)
+	{
+		threads[taskId]->assignNewTask(task);
+	}
+
+	/// Wait for all tasks to finish
+	void waitForAllThreadsToFinish()
+	{
+#if !ANKI_DISABLE_THREADPOOL_THREADING
+		barrier->wait();
+#endif
+	}
+
+	U32 getThreadsCount() const
+	{
+		return threads.size();
+	}
+
+private:
+	Vector<ThreadpoolThread*> threads; ///< Worker threads
+	std::unique_ptr<Barrier> barrier; ///< Synchronization barrier
+};
+
+} // end namespace anki
+
+#endif
+

+ 1 - 1
src/core/CMakeLists.txt

@@ -1,4 +1,4 @@
-SET(ANKI_CORE_SOURCES App.cpp Async.cpp Logger.cpp StdinListener.cpp ThreadPool.cpp WorkingThread.cpp Timestamp.cpp Counters.cpp)
+SET(ANKI_CORE_SOURCES App.cpp Async.cpp Logger.cpp StdinListener.cpp Timestamp.cpp Counters.cpp)
 
 IF(${ANKI_WINDOW_BACKEND} STREQUAL "GLXX11")
 	SET(ANKI_CORE_SOURCES ${ANKI_CORE_SOURCES} NativeWindowGlxX11.cpp)

+ 0 - 87
src/core/ThreadPool.cpp

@@ -1,87 +0,0 @@
-#include "anki/core/ThreadPool.h"
-#include "anki/util/Memory.h"
-
-namespace anki {
-
-//==============================================================================
-// ThreadWorker                                                                =
-//==============================================================================
-
-//==============================================================================
-ThreadWorker::ThreadWorker(U32 id_, Barrier* barrier_, ThreadPool* threadPool_)
-	: id(id_), barrier(barrier_), threadPool(threadPool_)
-{
-#if !ANKI_DISABLE_THREADPOOL_THREADING
-	start();
-#endif
-}
-
-//==============================================================================
-void ThreadWorker::assignNewJob(ThreadJob* job_)
-{
-#if !ANKI_DISABLE_THREADPOOL_THREADING
-	mutex.lock();
-	ANKI_ASSERT(job == nullptr && "Probably forgot to wait for all jobs");
-	job = job_;
-	mutex.unlock();
-	condVar.notify_one(); // Wake the thread
-#else
-	(*job_)(id, threadPool->getThreadsCount());
-#endif
-}
-
-//==============================================================================
-void ThreadWorker::workingFunc()
-{
-	while(1)
-	{
-		// Wait for something
-		{
-			std::unique_lock<std::mutex> lock(mutex);
-			while(job == nullptr)
-			{
-				condVar.wait(lock);
-			}
-		}
-
-		// Exec
-		(*job)(id, threadPool->getThreadsCount());
-
-		// Nullify
-		{
-			std::lock_guard<std::mutex> lock(mutex);
-			job = nullptr;
-		}
-
-		barrier->wait();
-	}
-}
-
-//==============================================================================
-// ThreadPool                                                                  =
-//==============================================================================
-
-//==============================================================================
-ThreadPool::~ThreadPool()
-{
-	for(ThreadWorker* thread : jobs)
-	{
-		delete thread;
-	}
-}
-
-//==============================================================================
-void ThreadPool::init(U threadsCount)
-{
-	ANKI_ASSERT(threadsCount <= MAX_THREADS && threadsCount > 0);
-
-	barrier.reset(new Barrier(threadsCount + 1));
-
-	jobs.resize(threadsCount);
-	for(U i = 0; i < threadsCount; i++)
-	{
-		jobs[i] = new ThreadWorker(i, barrier.get(), this);
-	}
-}
-
-} // end namespace anki

+ 0 - 49
src/core/WorkingThread.cpp

@@ -1,49 +0,0 @@
-#include "anki/core/WorkingThread.h"
-#include "anki/core/Logger.h"
-
-namespace anki {
-
-#if 0
-
-//==============================================================================
-WorkingThread::WorkingThread(
-	U32 id, 
-	const std::shared_ptr<WorkingThreadJob>& job)
-{}
-
-//==============================================================================
-void WorkingThread::start()
-{
-	ANKI_LOGI("Starting working thread %d", id);
-	thread = std::thread(&WorkingThread::workingFunc, this);
-}
-
-//==============================================================================
-void WorkingThread::workingFunc()
-{
-	while(1)
-	{
-		// Wait for something
-		{
-			std::unique_lock<std::mutex> lock(mutex);
-			while(jobDone == true)
-			{
-				condVar.wait(lock);
-			}
-		}
-
-		// Exec
-		(*job)(id);
-
-		// Nullify
-		{
-			std::lock_guard<std::mutex> lock(mutex);
-			jobDone = true;
-		}
-
-		barrier->wait();
-	}
-}
-#endif
-
-} // end namespace anki

+ 7 - 7
src/renderer/Is.cpp

@@ -3,7 +3,7 @@
 #include "anki/scene/SceneGraph.h"
 #include "anki/scene/Camera.h"
 #include "anki/scene/Light.h"
-#include "anki/core/ThreadPool.h"
+#include "anki/core/Threadpool.h"
 #include "anki/core/Counters.h"
 #include "anki/core/Logger.h"
 #include <sstream>
@@ -86,7 +86,7 @@ static Bool useCompute()
 
 //==============================================================================
 /// Write the lights to a client buffer
-struct WriteLightsJob: ThreadJob
+struct WriteLightsJob: ThreadpoolTask
 {
 	shader::PointLight* pointLights = nullptr;
 	shader::SpotLight* spotLights = nullptr;
@@ -116,7 +116,7 @@ struct WriteLightsJob: ThreadJob
 	/// Bin lights on CPU path
 	Bool binLights = true;
 
-	void operator()(U threadId, U threadsCount)
+	void operator()(ThreadId threadId, U threadsCount)
 	{
 		U ligthsCount = lightsEnd - lightsBegin;
 
@@ -576,7 +576,7 @@ void Is::initInternal(const RendererInitializer& initializer)
 //==============================================================================
 void Is::lightPass()
 {
-	ThreadPool& threadPool = ThreadPoolSingleton::get();
+	Threadpool& threadPool = ThreadpoolSingleton::get();
 	VisibilityTestResults& vi = 
 		cam->getFrustumComponent()->getVisibilityTestResults();
 
@@ -656,7 +656,7 @@ void Is::lightPass()
 	ANKI_ASSERT(spotTexLightsOffset + spotTexLightsSize <= calcLightsUboSize());
 
 	// Fire the super jobs
-	Array<WriteLightsJob, ThreadPool::MAX_THREADS> jobs;
+	Array<WriteLightsJob, Threadpool::MAX_THREADS> jobs;
 
 	U8* lightsClientBuffer = alloc.allocate(lightsUbo.getSizeInBytes());
 
@@ -719,14 +719,14 @@ void Is::lightPass()
 		job.tiler = &r->getTiler();
 		job.is = this;
 
-		threadPool.assignNewJob(i, &job);
+		threadPool.assignNewTask(i, &job);
 	}
 
 	// In the meantime set the state
 	setState();
 
 	// Sync
-	threadPool.waitForAllJobsToFinish();
+	threadPool.waitForAllThreadsToFinish();
 
 	// Write the light count for each tile
 	for(U y = 0; y < r->getTilesCount().y(); y++)

+ 8 - 8
src/renderer/Tiler.cpp

@@ -1,7 +1,7 @@
 #include "anki/renderer/Tiler.h"
 #include "anki/renderer/Renderer.h"
 #include "anki/resource/ShaderProgramResource.h"
-#include "anki/core/ThreadPool.h"
+#include "anki/core/Threadpool.h"
 #include "anki/scene/Camera.h"
 #include <sstream>
 
@@ -19,7 +19,7 @@ namespace anki {
 	ANKI_ASSERT(p_ < &tiler->allPlanes[tiler->allPlanes.size()]);
 
 /// Job that updates the left, right, top and buttom tile planes
-struct UpdatePlanesPerspectiveCameraJob: ThreadJob
+struct UpdatePlanesPerspectiveCameraJob: ThreadpoolTask
 {
 	Tiler* tiler = nullptr;
 	PerspectiveCamera* cam = nullptr;
@@ -28,13 +28,13 @@ struct UpdatePlanesPerspectiveCameraJob: ThreadJob
 	const PixelArray* pixels = nullptr;
 #endif
 
-	void operator()(U threadId, U threadsCount)
+	void operator()(ThreadId threadId, U threadsCount)
 	{
 #if ANKI_TILER_ENABLE_GPU
 		ANKI_ASSERT(tiler && cam && pixels);
 #endif
 
-		U64 start, end;
+		PtrSize start, end;
 		Transform trf = Transform(cam->getWorldTransform());
 
 		if(frustumChanged)
@@ -302,7 +302,7 @@ void Tiler::updateTiles(Camera& cam)
 	//
 	// Issue parallel jobs
 	//
-	Array<UpdatePlanesPerspectiveCameraJob, ThreadPool::MAX_THREADS> jobs;
+	Array<UpdatePlanesPerspectiveCameraJob, Threadpool::MAX_THREADS> jobs;
 	U32 camTimestamp = cam.FrustumComponent::getTimestamp();
 
 	// Do a job that transforms only the planes when:
@@ -311,7 +311,7 @@ void Tiler::updateTiles(Camera& cam)
 	Bool frustumChanged =
 		camTimestamp >= planes4UpdateTimestamp || prevCam != &cam;
 
-	ThreadPool& threadPool = ThreadPoolSingleton::get();
+	Threadpool& threadPool = ThreadpoolSingleton::get();
 
 	switch(cam.getCameraType())
 	{
@@ -324,7 +324,7 @@ void Tiler::updateTiles(Camera& cam)
 			jobs[i].pixels = &pixels;
 #endif
 			jobs[i].frustumChanged = frustumChanged;
-			threadPool.assignNewJob(i, &jobs[i]);
+			threadPool.assignNewTask(i, &jobs[i]);
 		}
 		break;
 	default:
@@ -339,7 +339,7 @@ void Tiler::updateTiles(Camera& cam)
 	}
 
 	// Sync threads
-	threadPool.waitForAllJobsToFinish();
+	threadPool.waitForAllThreadsToFinish();
 
 	// 
 	// Misc

+ 12 - 12
src/scene/SceneGraph.cpp

@@ -2,7 +2,7 @@
 #include "anki/scene/Camera.h"
 #include "anki/scene/ModelNode.h"
 #include "anki/util/Exception.h"
-#include "anki/core/ThreadPool.h"
+#include "anki/core/Threadpool.h"
 #include "anki/core/Counters.h"
 #include "anki/renderer/Renderer.h"
 #include "anki/misc/Xml.h"
@@ -14,11 +14,11 @@ namespace anki {
 //==============================================================================
 
 //==============================================================================
-struct UpdateMoveComponentsJob: ThreadJob
+struct UpdateMoveComponentsJob: ThreadpoolTask
 {
 	SceneGraph* scene = nullptr;
 
-	void operator()(U threadId, U threadsCount)
+	void operator()(ThreadId threadId, U threadsCount)
 	{
 		U64 start, end;
 		ANKI_ASSERT(scene);
@@ -65,14 +65,14 @@ static void updateSceneNode(SceneNode& sn, F32 prevUpdateTime,
 }
 
 //==============================================================================
-struct UpdateSceneNodesJob: ThreadJob
+struct UpdateSceneNodesJob: ThreadpoolTask
 {
 	SceneGraph* scene = nullptr;
 	F32 prevUpdateTime;
 	F32 crntTime;
 	SectorGroup* sectorGroup;
 
-	void operator()(U threadId, U threadsCount)
+	void operator()(ThreadId threadId, U threadsCount)
 	{
 		ANKI_ASSERT(scene);
 		U64 start, end;
@@ -231,7 +231,7 @@ void SceneGraph::update(F32 prevUpdateTime, F32 crntTime, Renderer& renderer)
 
 	deleteNodesMarkedForDeletion();
 
-	ThreadPool& threadPool = ThreadPoolSingleton::get();
+	Threadpool& threadPool = ThreadpoolSingleton::get();
 	(void)threadPool;
 
 	// XXX Do that in parallel
@@ -251,16 +251,16 @@ void SceneGraph::update(F32 prevUpdateTime, F32 crntTime, Renderer& renderer)
 		}
 	}
 #else
-	UpdateMoveComponentsJob jobs[ThreadPool::MAX_THREADS];
+	UpdateMoveComponentsJob jobs[Threadpool::MAX_THREADS];
 
 	for(U i = 0; i < threadPool.getThreadsCount(); i++)
 	{
 		jobs[i].scene = this;
 
-		threadPool.assignNewJob(i, &jobs[i]);
+		threadPool.assignNewTask(i, &jobs[i]);
 	}
 
-	threadPool.waitForAllJobsToFinish();
+	threadPool.waitForAllThreadsToFinish();
 #endif
 
 	// Then the rest
@@ -270,7 +270,7 @@ void SceneGraph::update(F32 prevUpdateTime, F32 crntTime, Renderer& renderer)
 		updateSceneNode(*n, prevUpdateTime, crntTime, sectorGroup);
 	}
 #else
-	Array<UpdateSceneNodesJob, ThreadPool::MAX_THREADS> jobs2;
+	Array<UpdateSceneNodesJob, Threadpool::MAX_THREADS> jobs2;
 
 	for(U i = 0; i < threadPool.getThreadsCount(); i++)
 	{
@@ -281,10 +281,10 @@ void SceneGraph::update(F32 prevUpdateTime, F32 crntTime, Renderer& renderer)
 		job.crntTime = crntTime;
 		job.sectorGroup = &sectorGroup;
 
-		threadPool.assignNewJob(i, &job);
+		threadPool.assignNewTask(i, &job);
 	}
 
-	threadPool.waitForAllJobsToFinish();
+	threadPool.waitForAllThreadsToFinish();
 #endif
 
 	doVisibilityTests(*mainCam, *this, renderer);

+ 3 - 3
src/scene/Sector.cpp

@@ -8,7 +8,7 @@
 #include "anki/scene/SceneGraph.h"
 #include "anki/core/Logger.h"
 #include "anki/renderer/Renderer.h"
-#include "anki/core/ThreadPool.h"
+#include "anki/core/Threadpool.h"
 
 namespace anki {
 
@@ -378,7 +378,7 @@ void SectorGroup::doVisibilityTestsInternal(SceneNode& sn, VisibilityTest test,
 	// Sort
 	//
 
-	ThreadPool& threadPool = ThreadPoolSingleton::get();
+	Threadpool.h& threadPool = Threadpool.hSingleton::get();
 
 	// Sort the renderables in a another thread
 	DistanceSortJob dsjob;
@@ -388,7 +388,7 @@ void SectorGroup::doVisibilityTestsInternal(SceneNode& sn, VisibilityTest test,
 	threadPool.assignNewJob(0, &dsjob);
 
 	// The rest of the jobs are dummy
-	Array<ThreadJobDummy, ThreadPool::MAX_THREADS>  dummyjobs;
+	Array<ThreadJobDummy, Threadpool.h::MAX_THREADS>  dummyjobs;
 	for(U i = 1; i < threadPool.getThreadsCount(); i++)
 	{
 		threadPool.assignNewJob(i, &dummyjobs[i]);

+ 10 - 10
src/scene/Visibility.cpp

@@ -33,7 +33,7 @@ struct SortSubspatialsFunctor
 };
 
 //==============================================================================
-struct VisibilityTestJob: ThreadJob
+struct VisibilityTestJob: ThreadpoolTask
 {
 	U nodesCount = 0;
 	SceneGraph* scene = nullptr;
@@ -89,7 +89,7 @@ struct VisibilityTestJob: ThreadJob
 	}
 
 	/// Do the tests
-	void operator()(U threadId, U threadsCount)
+	void operator()(ThreadId threadId, U threadsCount)
 	{
 		U64 start, end;
 		choseStartEnd(threadId, threadsCount, nodesCount, start, end);
@@ -222,8 +222,8 @@ void doVisibilityTests(SceneNode& fsn, SceneGraph& scene,
 	//
 	// Do the tests in parallel
 	//
-	ThreadPool& threadPool = ThreadPoolSingleton::get();
-	VisibilityTestJob jobs[ThreadPool::MAX_THREADS];
+	Threadpool& threadPool = ThreadpoolSingleton::get();
+	VisibilityTestJob jobs[Threadpool::MAX_THREADS];
 	for(U i = 0; i < threadPool.getThreadsCount(); i++)
 	{
 		jobs[i].nodesCount = scene.getSceneNodesCount();
@@ -232,10 +232,10 @@ void doVisibilityTests(SceneNode& fsn, SceneGraph& scene,
 		jobs[i].tiler = &r.getTiler();
 		jobs[i].frameAlloc = scene.getFrameAllocator();
 
-		threadPool.assignNewJob(i, &jobs[i]);
+		threadPool.assignNewTask(i, &jobs[i]);
 	}
 
-	threadPool.waitForAllJobsToFinish();
+	threadPool.waitForAllThreadsToFinish();
 
 	//
 	// Combine results
@@ -293,13 +293,13 @@ void doVisibilityTests(SceneNode& fsn, SceneGraph& scene,
 	dsjob.nodes = visible->lights.begin();
 	dsjob.nodesCount = visible->lights.size();
 	dsjob.origin = fr->getFrustumOrigin();
-	threadPool.assignNewJob(0, &dsjob);
+	threadPool.assignNewTask(0, &dsjob);
 
 	// The rest of the jobs are dummy
-	ThreadJobDummy dummyjobs[ThreadPool::MAX_THREADS];
+	DummyThreadpoolTask dummyjobs[Threadpool::MAX_THREADS];
 	for(U i = 1; i < threadPool.getThreadsCount(); i++)
 	{
-		threadPool.assignNewJob(i, &dummyjobs[i]);
+		threadPool.assignNewTask(i, &dummyjobs[i]);
 	}
 
 	// Sort the renderables in the main thread
@@ -307,7 +307,7 @@ void doVisibilityTests(SceneNode& fsn, SceneGraph& scene,
 	dsfunc.origin = fr->getFrustumOrigin();
 	std::sort(visible->renderables.begin(), visible->renderables.end(), dsfunc);
 
-	threadPool.waitForAllJobsToFinish();
+	threadPool.waitForAllThreadsToFinish();
 }
 
 } // end namespace anki

+ 1 - 1
src/util/CMakeLists.txt

@@ -1,4 +1,4 @@
-set(ANKI_UTIL_SOURCES Assert.cpp Exception.cpp Functions.cpp StringList.cpp File.cpp Allocator.cpp Memory.cpp System.cpp HighRezTimer.cpp)
+set(ANKI_UTIL_SOURCES Assert.cpp Exception.cpp Functions.cpp StringList.cpp File.cpp Allocator.cpp Memory.cpp System.cpp HighRezTimer.cpp Thread.cpp)
 
 if(LINUX OR ANDROID OR MACOS)
 	set(ANKI_UTIL_SOURCES ${ANKI_UTIL_SOURCES} HighRezTimerPosix.cpp FilePosix.cpp)

+ 111 - 0
src/util/Thread.cpp

@@ -0,0 +1,111 @@
+#include "anki/util/Thread.h"
+#include "anki/util/Assert.h"
+
+namespace anki {
+
+//==============================================================================
+// DualSyncThread                                                              =
+//==============================================================================
+
+//==============================================================================
+void DualSyncThread::workingFunc()
+{
+	while(task)
+	{
+		// Exec the task
+		(*task)(id);
+
+		// Wait for the other thread to sync
+		sync(0);
+
+		// Do nothing
+
+		// Wait for the other thread again
+		sync(1);
+	}
+}
+
+//==============================================================================
+// ThreadpoolThread                                                            =
+//==============================================================================
+
+//==============================================================================
+ThreadpoolThread::ThreadpoolThread(U32 id_, Barrier* barrier_, 
+	Threadpool* threadpool_)
+	: id(id_), barrier(barrier_), task(nullptr), threadpool(threadpool_)
+{
+	ANKI_ASSERT(barrier && threadpool);
+#if !ANKI_DISABLE_THREADPOOL_THREADING
+	start();
+#endif
+}
+
+//==============================================================================
+void ThreadpoolThread::assignNewTask(ThreadpoolTask* task_)
+{
+#if !ANKI_DISABLE_THREADPOOL_THREADING
+	mutex.lock();
+	ANKI_ASSERT(task == nullptr && "Probably forgot to wait for all tasks");
+	task = task_;
+	mutex.unlock();
+	condVar.notify_one(); // Wake the thread
+#else
+	(*task_)(id, threadpool->getThreadsCount());
+#endif
+}
+
+//==============================================================================
+void ThreadpoolThread::workingFunc()
+{
+	while(1)
+	{
+		// Wait for something
+		{
+			std::unique_lock<std::mutex> lock(mutex);
+			while(task == nullptr)
+			{
+				condVar.wait(lock);
+			}
+		}
+
+		// Exec
+		(*task)(id, threadpool->getThreadsCount());
+
+		// Nullify
+		{
+			std::lock_guard<std::mutex> lock(mutex);
+			task = nullptr;
+		}
+
+		barrier->wait();
+	}
+}
+
+//==============================================================================
+// Threadpool                                                                  =
+//==============================================================================
+
+//==============================================================================
+Threadpool::~Threadpool()
+{
+	for(ThreadpoolThread* thread : threads)
+	{
+		delete thread;
+	}
+}
+
+//==============================================================================
+void Threadpool::init(U threadsCount)
+{
+	ANKI_ASSERT(threadsCount <= MAX_THREADS && threadsCount > 0);
+
+	barrier.reset(new Barrier(threadsCount + 1));
+
+	threads.resize(threadsCount);
+	for(U i = 0; i < threadsCount; i++)
+	{
+		threads[i] = new ThreadpoolThread(i, barrier.get(), this);
+	}
+}
+
+} // end namespace

+ 1 - 2
testapp/Main.cpp

@@ -24,7 +24,6 @@
 #include "anki/event/MainRendererPpsHdrEvent.h"
 #include "anki/resource/ShaderProgramPrePreprocessor.h"
 #include "anki/resource/Material.h"
-#include "anki/core/ThreadPool.h"
 #include "anki/core/Timestamp.h"
 #include "anki/core/NativeWindow.h"
 #include "anki/Scene.h"
@@ -580,7 +579,7 @@ void initSubsystems(int argc, char* argv[])
 	StdinListenerSingleton::get().start();
 
 	// Parallel jobs
-	ThreadPoolSingleton::get().init(getCpuCoresCount());
+	ThreadpoolSingleton::get().init(getCpuCoresCount());
 }
 
 //==============================================================================