瀏覽代碼

Added start/shutdown policy to ThreadPool

Marko Pintera 11 年之前
父節點
當前提交
b95193a9b4

+ 2 - 0
CamelotCore/CamelotCore.vcxproj

@@ -272,6 +272,7 @@
     </Link>
     </Link>
   </ItemDefinitionGroup>
   </ItemDefinitionGroup>
   <ItemGroup>
   <ItemGroup>
+    <ClInclude Include="Include\BsThreadPolicy.h" />
     <ClInclude Include="Include\CmBindableGpuParamBlock.h" />
     <ClInclude Include="Include\CmBindableGpuParamBlock.h" />
     <ClInclude Include="Include\CmBindableGpuParams.h" />
     <ClInclude Include="Include\CmBindableGpuParams.h" />
     <ClInclude Include="Include\CmCoreThread.h" />
     <ClInclude Include="Include\CmCoreThread.h" />
@@ -423,6 +424,7 @@
   </ItemGroup>
   </ItemGroup>
   <ItemGroup>
   <ItemGroup>
     <ClCompile Include="Include\CmMaterialManager.cpp" />
     <ClCompile Include="Include\CmMaterialManager.cpp" />
+    <ClCompile Include="Source\BsThreadPolicy.cpp" />
     <ClCompile Include="Source\CmBindableGpuParamBlock.cpp" />
     <ClCompile Include="Source\CmBindableGpuParamBlock.cpp" />
     <ClCompile Include="Source\CmBindableGpuParams.cpp" />
     <ClCompile Include="Source\CmBindableGpuParams.cpp" />
     <ClCompile Include="Source\CmCoreThread.cpp" />
     <ClCompile Include="Source\CmCoreThread.cpp" />

+ 6 - 0
CamelotCore/CamelotCore.vcxproj.filters

@@ -543,6 +543,9 @@
     <ClInclude Include="Include\CmFolderMonitor.h">
     <ClInclude Include="Include\CmFolderMonitor.h">
       <Filter>Header Files\Platform</Filter>
       <Filter>Header Files\Platform</Filter>
     </ClInclude>
     </ClInclude>
+    <ClInclude Include="Include\BsThreadPolicy.h">
+      <Filter>Header Files</Filter>
+    </ClInclude>
   </ItemGroup>
   </ItemGroup>
   <ItemGroup>
   <ItemGroup>
     <ClCompile Include="Source\CmApplication.cpp">
     <ClCompile Include="Source\CmApplication.cpp">
@@ -851,5 +854,8 @@
     <ClCompile Include="Source\CmWin32FolderMonitor.cpp">
     <ClCompile Include="Source\CmWin32FolderMonitor.cpp">
       <Filter>Source Files\Platform</Filter>
       <Filter>Source Files\Platform</Filter>
     </ClCompile>
     </ClCompile>
+    <ClCompile Include="Source\BsThreadPolicy.cpp">
+      <Filter>Source Files</Filter>
+    </ClCompile>
   </ItemGroup>
   </ItemGroup>
 </Project>
 </Project>

+ 14 - 0
CamelotCore/Include/BsThreadPolicy.h

@@ -0,0 +1,14 @@
+#pragma once
+
+#include "CmPrerequisites.h"
+#include "CmProfiler.h"
+
+namespace BansheeEngine
+{
+	class CM_EXPORT ThreadBansheePolicy
+	{
+	public:
+		static void onThreadStarted(const String& name);
+		static void onThreadEnded(const String& name);
+	};
+}

+ 16 - 0
CamelotCore/Source/BsThreadPolicy.cpp

@@ -0,0 +1,16 @@
+#include "BsThreadPolicy.h"
+
+namespace BansheeEngine
+{
+	void ThreadBansheePolicy::onThreadStarted(const String& name) 
+	{
+		MemStack::beginThread();
+		Profiler::instance().beginThread(name.c_str());
+	}
+
+	void ThreadBansheePolicy::onThreadEnded(const String& name) 
+	{
+		Profiler::instance().endThread();
+		MemStack::endThread();
+	}
+}

+ 6 - 0
CamelotCore/Source/CmApplication.cpp

@@ -32,6 +32,8 @@
 #include "CmStringTable.h"
 #include "CmStringTable.h"
 #include "CmProfiler.h"
 #include "CmProfiler.h"
 #include "CmQueryManager.h"
 #include "CmQueryManager.h"
+#include "BsThreadPool.h"
+#include "BsThreadPolicy.h"
 
 
 #include "CmMaterial.h"
 #include "CmMaterial.h"
 #include "CmShader.h"
 #include "CmShader.h"
@@ -50,10 +52,13 @@ namespace BansheeEngine
 
 
 	void Application::startUp(START_UP_DESC& desc)
 	void Application::startUp(START_UP_DESC& desc)
 	{
 	{
+		UINT32 numWorkerThreads = CM_THREAD_HARDWARE_CONCURRENCY - 1; // Number of cores while excluding current thread.
+
 		Platform::startUp();
 		Platform::startUp();
 		MemStack::beginThread();
 		MemStack::beginThread();
 
 
 		Profiler::startUp(cm_new<Profiler>());
 		Profiler::startUp(cm_new<Profiler>());
+		ThreadPool<ThreadBansheePolicy>::startUp(cm_new<ThreadPool<ThreadBansheePolicy>>(numWorkerThreads));
 		StringTable::startUp(cm_new<StringTable>());
 		StringTable::startUp(cm_new<StringTable>());
 		DeferredCallManager::startUp(cm_new<DeferredCallManager>());
 		DeferredCallManager::startUp(cm_new<DeferredCallManager>());
 		Time::startUp(cm_new<Time>());
 		Time::startUp(cm_new<Time>());
@@ -188,6 +193,7 @@ namespace BansheeEngine
 		DeferredCallManager::shutDown();
 		DeferredCallManager::shutDown();
 		StringTable::shutDown();
 		StringTable::shutDown();
 
 
+		ThreadPool<ThreadBansheePolicy>::shutDown();
 		Profiler::shutDown();
 		Profiler::shutDown();
 		MemStack::endThread();
 		MemStack::endThread();
 		Platform::shutDown();
 		Platform::shutDown();

+ 82 - 8
CamelotUtility/Include/BsThreadPool.h

@@ -5,15 +5,65 @@
 
 
 namespace BansheeEngine
 namespace BansheeEngine
 {
 {
-	class PooledThread;
+	class CM_UTILITY_EXPORT PooledThread
+	{
+	public:
+		PooledThread(const String& name);
+		virtual ~PooledThread();
+
+		void start(std::function<void()> workerMethod);
+		void run();
+		void destroy();
+
+		bool isIdle();
+		time_t idleTime();
+
+		void setName(const String& name);
+
+		virtual void onThreadStarted(const String& name) = 0;
+		virtual void onThreadEnded(const String& name) = 0;
+
+		std::function<void()> mWorkerMethod;
+
+		String mName;
+		bool mIdle;
+		bool mThreadStarted;
+		bool mThreadReady;
+
+		time_t mIdleTime;
+
+		CM_THREAD_TYPE* mThread;
+		CM_MUTEX(mMutex);
+		CM_THREAD_SYNCHRONISER(mStartedCond);
+		CM_THREAD_SYNCHRONISER(mReadyCond);
+	};
+
+	template<class ThreadPolicy>
+	class TPooledThread : public PooledThread
+	{
+	public:
+		TPooledThread(const String& name)
+			:PooledThread(name)
+		{ }
+
+		void onThreadStarted(const String& name)
+		{
+			ThreadPolicy::onThreadStarted(name);
+		}
+
+		void onThreadEnded(const String& name)
+		{
+			ThreadPolicy::onThreadEnded(name);
+		}
+	};
 
 
-	class CM_UTILITY_EXPORT ThreadPool : public Module<ThreadPool>
+	class CM_UTILITY_EXPORT ThreadPoolBase : public Module<ThreadPoolBase>
 	{
 	{
 	public:
 	public:
-		ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity = 16, UINT32 idleTimeout = 60);
-		~ThreadPool();
+		ThreadPoolBase(UINT32 threadCapacity, UINT32 maxCapacity = 16, UINT32 idleTimeout = 60);
+		virtual ~ThreadPoolBase();
 
 
-		void run(std::function<void()> workerMethod);
+		void run(const String& name, std::function<void()> workerMethod);
 
 
 		void stopAll();
 		void stopAll();
 
 
@@ -23,12 +73,12 @@ namespace BansheeEngine
 		UINT32 getNumActive() const;
 		UINT32 getNumActive() const;
 		UINT32 getNumAllocated() const;
 		UINT32 getNumAllocated() const;
 
 
-	private:
+	protected:
 		Vector<PooledThread*>::type mThreads;
 		Vector<PooledThread*>::type mThreads;
 		
 		
-		PooledThread* createThread();
+		virtual PooledThread* createThread(const String& name) = 0;
 		void destroyThread(PooledThread* thread);
 		void destroyThread(PooledThread* thread);
-		PooledThread* getThread();
+		PooledThread* getThread(const String& name);
 
 
 		UINT32 mDefaultCapacity;
 		UINT32 mDefaultCapacity;
 		UINT32 mMaxCapacity;
 		UINT32 mMaxCapacity;
@@ -37,4 +87,28 @@ namespace BansheeEngine
 		
 		
 		CM_MUTEX(mMutex);
 		CM_MUTEX(mMutex);
 	};
 	};
+
+	class ThreadNoPolicy
+	{
+	public:
+		static void onThreadStarted(const String& name) { }
+		static void onThreadEnded(const String& name) { }
+	};
+
+	template<class ThreadPolicy = ThreadNoPolicy>
+	class ThreadPool : public ThreadPoolBase
+	{
+	public:
+		ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity = 16, UINT32 idleTimeout = 60)
+			:ThreadPoolBase(threadCapacity, maxCapacity, idleTimeout)
+		{
+
+		}
+
+	protected:
+		PooledThread* createThread(const String& name)
+		{
+			return cm_new<TPooledThread<ThreadPolicy>>(name);
+		}
+	};
 }
 }

+ 84 - 96
CamelotUtility/Source/BsThreadPool.cpp

@@ -2,136 +2,126 @@
 
 
 namespace BansheeEngine
 namespace BansheeEngine
 {
 {
-	class PooledThread
+	PooledThread::PooledThread(const String& name)
+		:mName(name), mIdle(true), mThreadStarted(false),
+			mThreadReady(false), mIdleTime(0)
 	{
 	{
-	public:
-		PooledThread()
-			:mIdle(true), mThreadStarted(false),
-			 mThreadReady(false), mIdleTime(0)
-		{
-			CM_THREAD_CREATE(t, std::bind(&PooledThread::run, this));
+		CM_THREAD_CREATE(t, std::bind(&PooledThread::run, this));
 
 
-			CM_LOCK_MUTEX_NAMED(mMutex, lock);
+		CM_LOCK_MUTEX_NAMED(mMutex, lock);
 
 
-			while(!mThreadStarted)
-				CM_THREAD_WAIT(mStartedCond, mMutex, lock);
-		}
+		while(!mThreadStarted)
+			CM_THREAD_WAIT(mStartedCond, mMutex, lock);
+	}
 
 
-		~PooledThread()
-		{
+	PooledThread::~PooledThread()
+	{
 
 
-		}
+	}
 
 
-		void start(std::function<void()> workerMethod)
+	void PooledThread::start(std::function<void()> workerMethod)
+	{
 		{
 		{
-			{
-				CM_LOCK_MUTEX(mMutex);
-				mWorkerMethod = workerMethod;
-				mThreadReady = true;
-			}
-
-			CM_THREAD_NOTIFY_ONE(mReadyCond);
+			CM_LOCK_MUTEX(mMutex);
+			mWorkerMethod = workerMethod;
+			mThreadReady = true;
 		}
 		}
 
 
-		void run()
-		{
-			{
-				CM_LOCK_MUTEX(mMutex);
-				mThreadStarted = true;
-			}
-
-			CM_THREAD_NOTIFY_ONE(mStartedCond);
+		CM_THREAD_NOTIFY_ONE(mReadyCond);
+	}
 
 
-			while(true)
-			{
-				std::function<void()> worker = nullptr;
+	void PooledThread::run()
+	{
+		onThreadStarted(mName);
 
 
-				{
-					CM_LOCK_MUTEX_NAMED(mMutex, lock);
+		{
+			CM_LOCK_MUTEX(mMutex);
+			mThreadStarted = true;
+		}
 
 
-					while(!mThreadReady)
-						CM_THREAD_WAIT(mReadyCond, mMutex, lock);
+		CM_THREAD_NOTIFY_ONE(mStartedCond);
 
 
-					if(mWorkerMethod == nullptr)
-						return;
+		while(true)
+		{
+			std::function<void()> worker = nullptr;
 
 
-					worker = mWorkerMethod;
-					mIdle = false;
-				}
+			{
+				CM_LOCK_MUTEX_NAMED(mMutex, lock);
 
 
-				worker();
+				while(!mThreadReady)
+					CM_THREAD_WAIT(mReadyCond, mMutex, lock);
 
 
+				if(mWorkerMethod == nullptr)
 				{
 				{
-					CM_LOCK_MUTEX(mMutex);
-
-					mIdle = true;
-					mIdleTime = std::time(nullptr);
-					mThreadReady = false;
+					onThreadEnded(mName);
+					return;
 				}
 				}
+
+				worker = mWorkerMethod;
+				mIdle = false;
 			}
 			}
-		}
 
 
-		void destroy()
-		{
+			worker();
+
 			{
 			{
 				CM_LOCK_MUTEX(mMutex);
 				CM_LOCK_MUTEX(mMutex);
-				mWorkerMethod = nullptr;
-				mThreadReady = true;
-			}
 
 
-			CM_THREAD_NOTIFY_ONE(mReadyCond);
-			CM_THREAD_JOIN((*mThread));
-			CM_THREAD_DESTROY(mThread);
+				mIdle = true;
+				mIdleTime = std::time(nullptr);
+				mThreadReady = false;
+			}
 		}
 		}
+	}
 
 
-		bool isIdle()
+	void PooledThread::destroy()
+	{
 		{
 		{
 			CM_LOCK_MUTEX(mMutex);
 			CM_LOCK_MUTEX(mMutex);
-
-			return mIdle;
+			mWorkerMethod = nullptr;
+			mThreadReady = true;
 		}
 		}
 
 
-		time_t idleTime()
-		{
-			CM_LOCK_MUTEX(mMutex);
+		CM_THREAD_NOTIFY_ONE(mReadyCond);
+		CM_THREAD_JOIN((*mThread));
+		CM_THREAD_DESTROY(mThread);
+	}
 
 
-			return (time(nullptr) - mIdleTime);
-		}
+	bool PooledThread::isIdle()
+	{
+		CM_LOCK_MUTEX(mMutex);
 
 
-		std::function<void()> mWorkerMethod;
+		return mIdle;
+	}
 
 
-		bool mIdle;
-		bool mThreadStarted;
-		bool mThreadReady;
+	time_t PooledThread::idleTime()
+	{
+		CM_LOCK_MUTEX(mMutex);
 
 
-		time_t mIdleTime;
+		return (time(nullptr) - mIdleTime);
+	}
 
 
-		CM_THREAD_TYPE* mThread;
-		CM_MUTEX(mMutex);
-		CM_THREAD_SYNCHRONISER(mStartedCond);
-		CM_THREAD_SYNCHRONISER(mReadyCond);
-	};
+	void PooledThread::setName(const String& name)
+	{
+		mName = name;
+	}
 
 
-	ThreadPool::ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity, UINT32 idleTimeout)
+	ThreadPoolBase::ThreadPoolBase(UINT32 threadCapacity, UINT32 maxCapacity, UINT32 idleTimeout)
 		:mDefaultCapacity(threadCapacity), mMaxCapacity(maxCapacity), mIdleTimeout(idleTimeout), mAge(0)
 		:mDefaultCapacity(threadCapacity), mMaxCapacity(maxCapacity), mIdleTimeout(idleTimeout), mAge(0)
 	{
 	{
-		for(UINT32 i = 0; i < mDefaultCapacity; i++)
-		{
-			mThreads.push_back(createThread());
-		}
+
 	}
 	}
 
 
-	ThreadPool::~ThreadPool()
+	ThreadPoolBase::~ThreadPoolBase()
 	{
 	{
 		stopAll();
 		stopAll();
 	}
 	}
 
 
-	void ThreadPool::run(std::function<void()> workerMethod)
+	void ThreadPoolBase::run(const String& name, std::function<void()> workerMethod)
 	{
 	{
-		getThread()->start(workerMethod);
+		getThread(name)->start(workerMethod);
 	}
 	}
 
 
-	void ThreadPool::stopAll()
+	void ThreadPoolBase::stopAll()
 	{
 	{
 		CM_LOCK_MUTEX(mMutex);
 		CM_LOCK_MUTEX(mMutex);
 		for(auto& thread : mThreads)
 		for(auto& thread : mThreads)
@@ -142,7 +132,7 @@ namespace BansheeEngine
 		mThreads.clear();
 		mThreads.clear();
 	}
 	}
 
 
-	void ThreadPool::clearUnused()
+	void ThreadPoolBase::clearUnused()
 	{
 	{
 		CM_LOCK_MUTEX(mMutex);
 		CM_LOCK_MUTEX(mMutex);
 		mAge = 0;
 		mAge = 0;
@@ -188,18 +178,13 @@ namespace BansheeEngine
 		mThreads.insert(mThreads.end(), activeThreads.begin(), activeThreads.end());
 		mThreads.insert(mThreads.end(), activeThreads.begin(), activeThreads.end());
 	}
 	}
 
 
-	PooledThread* ThreadPool::createThread()
-	{
-		return cm_new<PooledThread>();
-	}
-
-	void ThreadPool::destroyThread(PooledThread* thread)
+	void ThreadPoolBase::destroyThread(PooledThread* thread)
 	{
 	{
 		thread->destroy();
 		thread->destroy();
 		cm_delete(thread);
 		cm_delete(thread);
 	}
 	}
 
 
-	PooledThread* ThreadPool::getThread()
+	PooledThread* ThreadPoolBase::getThread(const String& name)
 	{
 	{
 		UINT32 age = 0;
 		UINT32 age = 0;
 		{
 		{
@@ -216,7 +201,10 @@ namespace BansheeEngine
 		for(auto& thread : mThreads)
 		for(auto& thread : mThreads)
 		{
 		{
 			if(thread->isIdle())
 			if(thread->isIdle())
+			{
+				thread->setName(name);
 				return thread;
 				return thread;
+			}
 		}
 		}
 
 
 		if(newThread == nullptr)
 		if(newThread == nullptr)
@@ -224,14 +212,14 @@ namespace BansheeEngine
 			if(mThreads.size() >= mMaxCapacity)
 			if(mThreads.size() >= mMaxCapacity)
 				CM_EXCEPT(InvalidStateException, "Unable to create a new thread in the pool because maximum capacity has been reached.");
 				CM_EXCEPT(InvalidStateException, "Unable to create a new thread in the pool because maximum capacity has been reached.");
 
 
-			newThread = createThread();
+			newThread = createThread(name);
 			mThreads.push_back(newThread);
 			mThreads.push_back(newThread);
 		}
 		}
 
 
 		return newThread;
 		return newThread;
 	}
 	}
 
 
-	UINT32 ThreadPool::getNumAvailable() const
+	UINT32 ThreadPoolBase::getNumAvailable() const
 	{
 	{
 		UINT32 numAvailable = 0;
 		UINT32 numAvailable = 0;
 
 
@@ -245,7 +233,7 @@ namespace BansheeEngine
 		return numAvailable;
 		return numAvailable;
 	}
 	}
 
 
-	UINT32 ThreadPool::getNumActive() const
+	UINT32 ThreadPoolBase::getNumActive() const
 	{
 	{
 		UINT32 numActive = 0;
 		UINT32 numActive = 0;
 
 
@@ -259,7 +247,7 @@ namespace BansheeEngine
 		return numActive;
 		return numActive;
 	}
 	}
 
 
-	UINT32 ThreadPool::getNumAllocated() const
+	UINT32 ThreadPoolBase::getNumAllocated() const
 	{
 	{
 		CM_LOCK_MUTEX(mMutex);
 		CM_LOCK_MUTEX(mMutex);