Bläddra i källkod

Refactoring the Job manager

Panagiotis Christopoulos Charitos 14 år sedan
förälder
incheckning
38d4a5cdf6

+ 25 - 23
shaders/GaussianBlurGeneric.glsl

@@ -10,12 +10,14 @@
 
 layout(location = 0) in vec2 position;
 
-uniform float imgDimension = 0.0; ///< the img width for hspass or the img height for vpass
+uniform float imgDimension = 0.0; ///< The img width for hspass or the img 
+                                  ///< height for vpass
 
 out vec2 vTexCoords;
 out vec2 vOffsets; ///< For side pixels. Its actually a 2D array
 
-const vec2 BLURRING_OFFSET = vec2(1.3846153846, 3.2307692308); ///< The offset of side pixels. Its actually a 2D array
+/// The offset of side pixels. Its actually a 2D array
+const vec2 BLURRING_OFFSET = vec2(1.3846153846, 3.2307692308); 
 
 
 void main()
@@ -33,11 +35,11 @@ void main()
 
 // Preprocessor switches sanity checks
 #if !defined(VPASS) && !defined(HPASS)
-	#error "See file"
+#	error "See file"
 #endif
 
 #if !(defined(COL_RGBA) || defined(COL_RGB) || defined(COL_R))
-	#error "See file"
+#	error "See file"
 #endif
 
 
@@ -50,20 +52,20 @@ in vec2 vOffsets;
 
 // Determine color type
 #if defined(COL_RGBA)
-	#define COL_TYPE vec4
+#	define COL_TYPE vec4
 #elif defined(COL_RGB)
-	#define COL_TYPE vec3
+#	define COL_TYPE vec3
 #elif defined(COL_R)
-	#define COL_TYPE float
+#	define COL_TYPE float
 #endif
 
 // Determine tex fetch
 #if defined(COL_RGBA)
-	#define TEX_FETCH rgba
+#	define TEX_FETCH rgba
 #elif defined(COL_RGB)
-	#define TEX_FETCH rgb
+#	define TEX_FETCH rgb
 #elif defined(COL_R)
-	#define TEX_FETCH r
+#	define TEX_FETCH r
 #endif
 
 
@@ -82,19 +84,19 @@ void main()
 	// side pixels
 	for(int i = 0; i < 2; i++)
 	{
-		#if defined(HPASS)
-			vec2 texCoords = vec2(vTexCoords.x + blurringDist + vOffsets[i], vTexCoords.y);
-			col += texture2D(img, texCoords).TEX_FETCH * WEIGHTS[i];
-
-			texCoords.x = vTexCoords.x - blurringDist - vOffsets[i];
-			col += texture2D(img, texCoords).TEX_FETCH * WEIGHTS[i];
-		#elif defined(VPASS)
-			vec2 texCoords = vec2(vTexCoords.x, vTexCoords.y + blurringDist + vOffsets[i]);
-			col += texture2D(img, texCoords).TEX_FETCH * WEIGHTS[i];
-
-			texCoords.y = vTexCoords.y - blurringDist - vOffsets[i];
-			col += texture2D(img, texCoords).TEX_FETCH * WEIGHTS[i];
-		#endif
+#if defined(HPASS)
+		vec2 tc = vec2(vTexCoords.x + blurringDist + vOffsets[i], vTexCoords.y);
+		col += texture2D(img, tc).TEX_FETCH * WEIGHTS[i];
+
+		tc.x = vTexCoords.x - blurringDist - vOffsets[i];
+		col += texture2D(img, tc).TEX_FETCH * WEIGHTS[i];
+#elif defined(VPASS)
+		vec2 tc = vec2(vTexCoords.x, vTexCoords.y + blurringDist + vOffsets[i]);
+		col += texture2D(img, tc).TEX_FETCH * WEIGHTS[i];
+
+		tc.y = vTexCoords.y - blurringDist - vOffsets[i];
+		col += texture2D(img, tc).TEX_FETCH * WEIGHTS[i];
+#endif
 	}
 
 	fFragColor = col;

+ 4 - 12
src/Core/App.cpp

@@ -13,7 +13,7 @@
 #include "Input/Input.h"
 #include "Logger.h"
 #include "Core/Globals.h"
-#include "JobManager.h"
+#include "ParallelJobs/Manager.h"
 #include "Renderer/Drawers/PhyDbgDrawer.h"
 #include "Scene/Scene.h"
 
@@ -24,16 +24,8 @@
 void App::handleMessageHanlderMsgs(const char* file, int line,
 	const char* func, const char* msg)
 {
-	if(boost::find_first(msg, "Warning") || boost::find_first(msg, "Error"))
-	{
-		std::cerr << "(" << file << ":" << line << " "<< func << ") " << msg <<
-			std::flush;
-	}
-	else
-	{
-		std::cout << "(" << file << ":" << line << " "<< func << ") " << msg <<
-			std::flush;
-	}
+	std::cerr << "(" << file << ":" << line << " "<< func <<
+		") " << msg << std::flush;
 }
 
 
@@ -116,7 +108,7 @@ void App::init(int argc, char* argv[])
 
 	initWindow();
 	initRenderer();
-	JobManagerSingleton::getInstance().init(4);
+	ParallelJobs::ManagerSingleton::getInstance().init(4);
 	SceneSingleton::getInstance().getPhysMasterContainer().setDebugDrawer(
 		new R::PhyDbgDrawer(R::MainRendererSingleton::getInstance().getDbg()));
 

+ 2 - 2
src/Core/CMakeLists.txt

@@ -1,4 +1,4 @@
-FILE(GLOB CORE_SOURCES *.cpp)
-FILE(GLOB CORE_HEADERS *.h)
+FILE(GLOB_RECURSE CORE_SOURCES *.cpp)
+FILE(GLOB_RECURSE CORE_HEADERS *.h)
 
 ADD_LIBRARY(Core ${CORE_SOURCES} ${CORE_HEADERS})

+ 3 - 1
src/Core/Globals.h

@@ -22,7 +22,9 @@ namespace Event {
 typedef Singleton<class Manager> ManagerSingleton;
 }
 
-typedef Singleton<class JobManager> JobManagerSingleton;
+namespace ParallelJobs {
+typedef Singleton<class Manager> ManagerSingleton;
+}
 
 
 #endif

+ 0 - 109
src/Core/JobManager.h

@@ -1,109 +0,0 @@
-#ifndef JOB_MANAGER_H
-#define JOB_MANAGER_H
-
-#include <boost/thread.hpp>
-#include <boost/ptr_container/ptr_vector.hpp>
-#include <boost/scoped_ptr.hpp>
-#include "Util/Accessors.h"
-
-
-class JobManager;
-
-
-/// The thread that executes a job
-class WorkerThread
-{
-	public:
-		typedef void (*JobCallback)(void*, const WorkerThread&);
-
-		/// Constructor
-		WorkerThread(int id, const JobManager& jobManager,
-			boost::barrier* barrier);
-
-		/// @name Accessors
-		/// @{
-		GETTER_R(uint, id, getId)
-		const JobManager& getJobManager() const {return jobManager;}
-		/// @}
-
-		/// Assign new job to the thread
-		void assignNewJob(JobCallback job, void* jobParams);
-
-	private:
-		uint id; ///< An ID
-		boost::thread thread; ///< Runs the workingFunc
-		boost::mutex mutex; ///< Protect the WorkerThread::job
-		boost::condition_variable condVar; ///< To wake up the thread
-		boost::barrier* barrier;
-		JobCallback job; ///< Its NULL if there are no pending jobs
-		void* jobParams;
-		/// Know your father and pass him to the jobs
-		const JobManager& jobManager;
-
-		/// Start thread
-		void start();
-
-		/// Thread loop
-		void workingFunc();
-};
-
-
-inline WorkerThread::WorkerThread(int id_, const JobManager& jobManager_,
-	boost::barrier* barrier_)
-:	id(id_),
-	barrier(barrier_),
-	job(NULL),
-	jobManager(jobManager_)
-{
-	start();
-}
-
-
-inline void WorkerThread::start()
-{
-	thread = boost::thread(&WorkerThread::workingFunc, this);
-}
-
-
-/// The job manager
-class JobManager
-{
-	public:
-		/// Default constructor
-		JobManager() {}
-
-		/// Constructor #2
-		JobManager(uint threadsNum) {init(threadsNum);}
-
-		/// Init the manager
-		void init(uint threadsNum);
-
-		/// Assign a job to a working thread
-		void assignNewJob(uint threadId, WorkerThread::JobCallback job,
-			void* jobParams);
-
-		/// Wait for all jobs to finish
-		void waitForAllJobsToFinish();
-
-		uint getThreadsNum() const {return workers.size();}
-
-	private:
-		boost::ptr_vector<WorkerThread> workers; ///< Worker threads
-		boost::scoped_ptr<boost::barrier> barrier; ///< Synchronization barrier
-};
-
-
-inline void JobManager::assignNewJob(uint threadId,
-	WorkerThread::JobCallback job, void* jobParams)
-{
-	workers[threadId].assignNewJob(job, jobParams);
-}
-
-
-inline void JobManager::waitForAllJobsToFinish()
-{
-	barrier->wait();
-}
-
-
-#endif

+ 28 - 24
src/Core/JobManager.cpp → src/Core/ParallelJobs/Job.cpp

@@ -1,14 +1,31 @@
-#include "JobManager.h"
+#include "Job.h"
+#include "Manager.h"
+
+
+namespace ParallelJobs {
+
+
+//==============================================================================
+// Constructor                                                                 =
+//==============================================================================
+Job::Job(int id_, const Manager& manager_, boost::barrier& barrier_)
+:	id(id_),
+ 	barrier(barrier_),
+	callback(NULL),
+	manager(manager_)
+{
+	start();
+}
 
 
 //==============================================================================
 // assignNewJob                                                                =
 //==============================================================================
-void WorkerThread::assignNewJob(JobCallback job_, void* jobParams_)
+void Job::assignNewJob(JobCallback callback_, JobParameters& jobParams_)
 {
 	boost::mutex::scoped_lock lock(mutex);
-	job = job_;
-	jobParams = jobParams_;
+	callback = callback_;
+	params = &jobParams_;
 
 	lock.unlock();
 	condVar.notify_one();
@@ -18,45 +35,32 @@ void WorkerThread::assignNewJob(JobCallback job_, void* jobParams_)
 //==============================================================================
 // workingFunc                                                                 =
 //==============================================================================
-void WorkerThread::workingFunc()
+void Job::workingFunc()
 {
 	while(1)
 	{
 		// Wait for something
 		{
 			boost::mutex::scoped_lock lock(mutex);
-			while(job == NULL)
+			while(callback == NULL)
 			{
 				condVar.wait(lock);
 			}
 		}
 
 		// Exec
-		job(jobParams, *this);
+		params->job = this;
+		callback(*params);
 
 		// Nullify
 		{
 			boost::mutex::scoped_lock lock(mutex);
-			job = NULL;
+			callback = NULL;
 		}
 
-		barrier->wait();
-	}
-}
-
-
-//==============================================================================
-// init                                                                        =
-//==============================================================================
-void JobManager::init(uint threadsNum)
-{
-	barrier.reset(new boost::barrier(threadsNum + 1));
-
-	for(uint i = 0; i < threadsNum; i++)
-	{
-		workers.push_back(new WorkerThread(i, *this, barrier.get()));
+		barrier.wait();
 	}
 }
 
 
-
+} // end namespace

+ 70 - 0
src/Core/ParallelJobs/Job.h

@@ -0,0 +1,70 @@
+#ifndef PARALLEL_JOBS_JOB_H
+#define PARALLEL_JOBS_JOB_H
+
+#include <boost/thread.hpp>
+#include "Util/Accessors.h"
+
+
+namespace ParallelJobs {
+
+
+class Manager;
+class Job;
+
+
+/// The base class of the parameters the we pass in the job
+struct JobParameters
+{
+	const Job* job;
+};
+
+
+/// The callback that we feed to the job
+typedef void (*JobCallback)(JobParameters&);
+
+
+/// The thread that executes a JobCallback
+class Job
+{
+	public:
+		/// Constructor
+		Job(int id, const Manager& manager, boost::barrier& barrier);
+
+		/// @name Accessors
+		/// @{
+		GETTER_R(uint, id, getId)
+		GETTER_R(Manager, manager, getManager)
+		/// @}
+
+		/// Assign new job to the thread
+		void assignNewJob(JobCallback callback, JobParameters& jobParams);
+
+	private:
+		uint id; ///< An ID
+		boost::thread thread; ///< Runs the workingFunc
+		boost::mutex mutex; ///< Protect the Job::job
+		boost::condition_variable condVar; ///< To wake up the thread
+		boost::barrier& barrier; ///< For synchronization
+		JobCallback callback; ///< Its NULL if there are no pending job
+		JobParameters* params;
+		/// Know your father and pass him to the jobs
+		const Manager& manager; ///< Know your father
+
+		/// Start thread
+		void start();
+
+		/// Thread loop
+		void workingFunc();
+};
+
+
+inline void Job::start()
+{
+	thread = boost::thread(&Job::workingFunc, this);
+}
+
+
+} // end namespace
+
+
+#endif

+ 21 - 0
src/Core/ParallelJobs/Manager.cpp

@@ -0,0 +1,21 @@
+#include "Manager.h"
+
+
+namespace ParallelJobs {
+
+
+//==============================================================================
+// init                                                                        =
+//==============================================================================
+void Manager::init(uint threadsNum)
+{
+	barrier.reset(new boost::barrier(threadsNum + 1));
+
+	for(uint i = 0; i < threadsNum; i++)
+	{
+		jobs.push_back(new Job(i, *this, *barrier.get()));
+	}
+}
+
+
+} // end namespace

+ 56 - 0
src/Core/ParallelJobs/Manager.h

@@ -0,0 +1,56 @@
+#ifndef PARALLEL_JOBS_MANAGER_H
+#define PARALLEL_JOBS_MANAGER_H
+
+#include "Job.h"
+#include <boost/thread.hpp>
+#include <boost/ptr_container/ptr_vector.hpp>
+
+
+namespace ParallelJobs {
+
+
+/// The job manager
+class Manager
+{
+	public:
+		/// Default constructor
+		Manager() {}
+
+		/// Constructor #2
+		Manager(uint threadsNum) {init(threadsNum);}
+
+		/// Init the manager
+		void init(uint threadsNum);
+
+		/// Assign a job to a working thread
+		void assignNewJob(uint threadId, JobCallback job,
+			JobParameters& jobParams);
+
+		/// Wait for all jobs to finish
+		void waitForAllJobsToFinish();
+
+		uint getThreadsNum() const {return jobs.size();}
+
+	private:
+		boost::ptr_vector<Job> jobs; ///< Worker threads
+		boost::scoped_ptr<boost::barrier> barrier; ///< Synchronization barrier
+};
+
+
+inline void Manager::assignNewJob(uint jobId,
+	JobCallback callback, JobParameters& jobParams)
+{
+	jobs[jobId].assignNewJob(callback, jobParams);
+}
+
+
+inline void Manager::waitForAllJobsToFinish()
+{
+	barrier->wait();
+}
+
+
+} // end namespace
+
+
+#endif

+ 16 - 10
src/Scene/VisibilityTester.cpp

@@ -8,7 +8,7 @@
 #include "Collision/Sphere.h"
 #include "PointLight.h"
 #include "SpotLight.h"
-#include "Core/JobManager.h"
+#include "Core/ParallelJobs/Manager.h"
 #include "Core/Logger.h"
 
 
@@ -129,12 +129,16 @@ void VisibilityTester::getRenderableNodes(bool skipShadowless_,
 	storage.getVisibleMsRenderableNodes().clear();
 	storage.getVisibleBsRenderableNodes().clear();
 
-	for(uint i = 0; i < JobManagerSingleton::getInstance().getThreadsNum(); i++)
+	// Run in parallel
+	JobParameters jobParameters;
+	jobParameters.visTester = this;
+	for(uint i = 0;
+		i < ParallelJobs::ManagerSingleton::getInstance().getThreadsNum(); i++)
 	{
-		JobManagerSingleton::getInstance().assignNewJob(i,
-			getRenderableNodesJobCallback, this);
+		ParallelJobs::ManagerSingleton::getInstance().assignNewJob(i,
+			getRenderableNodesJobCallback, jobParameters);
 	}
-	JobManagerSingleton::getInstance().waitForAllJobsToFinish();
+	ParallelJobs::ManagerSingleton::getInstance().waitForAllJobsToFinish();
 
 	//
 	// Sort the renderables from closest to the camera to the farthest
@@ -151,12 +155,14 @@ void VisibilityTester::getRenderableNodes(bool skipShadowless_,
 //==============================================================================
 // getRenderableNodesJobCallback                                               =
 //==============================================================================
-void VisibilityTester::getRenderableNodesJobCallback(void* args,
-	const WorkerThread& workerThread)
+void VisibilityTester::getRenderableNodesJobCallback(
+	ParallelJobs::JobParameters& data)
 {
-	uint id = workerThread.getId();
-	uint threadsNum = workerThread.getJobManager().getThreadsNum();
-	VisibilityTester* visTester = reinterpret_cast<VisibilityTester*>(args);
+	JobParameters& jobParameters = static_cast<JobParameters&>(data);
+
+	uint id = jobParameters.job->getId();
+	uint threadsNum = jobParameters.job->getManager().getThreadsNum();
+	VisibilityTester* visTester = jobParameters.visTester;
 	Scene& scene = visTester->scene;
 
 	uint count, from, to;

+ 10 - 4
src/Scene/VisibilityTester.h

@@ -3,6 +3,7 @@
 
 #include "Util/Accessors.h"
 #include "Math/Math.h"
+#include "Core/ParallelJobs/Job.h"
 #include <deque>
 #include <boost/thread.hpp>
 
@@ -14,7 +15,6 @@ class SpotLight;
 class PointLight;
 class SceneNode;
 class VisibilityInfo;
-class WorkerThread;
 
 
 /// Performs visibility determination tests and fills a few containers with the
@@ -42,6 +42,12 @@ class VisibilityTester
 			bool operator()(const SceneNode* a, const SceneNode* b) const;
 		};
 
+		/// The JobParameters that we feed in the ParallelJobs::Manager
+		struct JobParameters: ParallelJobs::JobParameters
+		{
+			VisibilityTester* visTester;
+		};
+
 		Scene& scene; ///< Know your father
 
 		/// @name Needed by getRenderableNodesJobCallback
@@ -68,10 +74,10 @@ class VisibilityTester
 		void getRenderableNodes(bool skipShadowless, const Camera& cam,
 			VisibilityInfo& storage);
 
-		/// This static method will be fed into the JobManager
+		/// This static method will be fed into the ParallelJobs::Manager
 		/// @param data This is actually a pointer to VisibilityTester
-		static void getRenderableNodesJobCallback(void* data,
-			const WorkerThread& workerThread);
+		static void getRenderableNodesJobCallback(
+			ParallelJobs::JobParameters& data);
 };