Browse Source

Various thread shutdown fixes:
TaskScheduler waits until its main worker thread shuts down before destructing, as well as waiting for all child tasks to complete
CoreThread wait until the main worker thread shuts down before destructing

Marko Pintera 11 years ago
parent
commit
aa6bcc7ef9

+ 2 - 0
BansheeCore/Include/BsCoreThread.h

@@ -4,6 +4,7 @@
 #include "BsModule.h"
 #include "BsModule.h"
 #include "BsCommandQueue.h"
 #include "BsCommandQueue.h"
 #include "BsCoreThreadAccessor.h"
 #include "BsCoreThreadAccessor.h"
+#include "BsThreadPool.h"
 
 
 namespace BansheeEngine
 namespace BansheeEngine
 {
 {
@@ -116,6 +117,7 @@ private:
 
 
 	volatile bool mCoreThreadShutdown;
 	volatile bool mCoreThreadShutdown;
 
 
+	HThread mCoreThread;
 	BS_THREAD_ID_TYPE mCoreThreadId;
 	BS_THREAD_ID_TYPE mCoreThreadId;
 	BS_MUTEX(mCommandQueueMutex)
 	BS_MUTEX(mCommandQueueMutex)
 	BS_MUTEX(mAccessorMutex)
 	BS_MUTEX(mAccessorMutex)

+ 3 - 1
BansheeCore/Source/BsCoreThread.cpp

@@ -55,7 +55,7 @@ namespace BansheeEngine
 	{
 	{
 #if !BS_FORCE_SINGLETHREADED_RENDERING
 #if !BS_FORCE_SINGLETHREADED_RENDERING
 #if BS_THREAD_SUPPORT
 #if BS_THREAD_SUPPORT
-		ThreadPool::instance().run("Core", std::bind(&CoreThread::runCoreThread, this));
+		mCoreThread = ThreadPool::instance().run("Core", std::bind(&CoreThread::runCoreThread, this));
 #else
 #else
 		BS_EXCEPT(InternalErrorException, "Attempting to start a core thread but application isn't compiled with thread support.");
 		BS_EXCEPT(InternalErrorException, "Attempting to start a core thread but application isn't compiled with thread support.");
 #endif
 #endif
@@ -113,6 +113,8 @@ namespace BansheeEngine
 		BS_THREAD_NOTIFY_ALL(mCommandReadyCondition);
 		BS_THREAD_NOTIFY_ALL(mCommandReadyCondition);
 
 
 		mCoreThreadId = BS_THREAD_CURRENT_ID;
 		mCoreThreadId = BS_THREAD_CURRENT_ID;
+
+		mCoreThread.blockUntilComplete();
 #endif
 #endif
 	}
 	}
 
 

+ 1 - 0
BansheeUtility/Include/BsFwdDeclUtil.h

@@ -51,6 +51,7 @@ namespace BansheeEngine
 	class StringTable;
 	class StringTable;
 	struct LocalizedStringData;
 	struct LocalizedStringData;
 	class Path;
 	class Path;
+	class HThread;
 	// Reflection
 	// Reflection
 	class IReflectable;
 	class IReflectable;
 	class RTTITypeBase;
 	class RTTITypeBase;

+ 4 - 0
BansheeUtility/Include/BsTaskScheduler.h

@@ -2,6 +2,7 @@
 
 
 #include "BsPrerequisitesUtil.h"
 #include "BsPrerequisitesUtil.h"
 #include "BsModule.h"
 #include "BsModule.h"
+#include "BsThreadPool.h"
 
 
 namespace BansheeEngine
 namespace BansheeEngine
 {
 {
@@ -136,7 +137,9 @@ namespace BansheeEngine
 		 */
 		 */
 		static bool taskCompare(const TaskPtr& lhs, const TaskPtr& rhs);
 		static bool taskCompare(const TaskPtr& lhs, const TaskPtr& rhs);
 
 
+		HThread mTaskSchedulerThread;
 		Set<TaskPtr, std::function<bool(const TaskPtr&, const TaskPtr&)>> mTaskQueue;
 		Set<TaskPtr, std::function<bool(const TaskPtr&, const TaskPtr&)>> mTaskQueue;
+		Vector<TaskPtr> mActiveTasks;
 		UINT32 mNumActiveTasks;
 		UINT32 mNumActiveTasks;
 		UINT32 mMaxActiveTasks;
 		UINT32 mMaxActiveTasks;
 		UINT32 mNextTaskId;
 		UINT32 mNextTaskId;
@@ -144,6 +147,7 @@ namespace BansheeEngine
 
 
 		BS_MUTEX(mReadyMutex);
 		BS_MUTEX(mReadyMutex);
 		BS_MUTEX(mCompleteMutex);
 		BS_MUTEX(mCompleteMutex);
+		BS_MUTEX(mActiveTaskMutex);
 		BS_THREAD_SYNCHRONISER(mTaskReadyCond);
 		BS_THREAD_SYNCHRONISER(mTaskReadyCond);
 		BS_THREAD_SYNCHRONISER(mTaskCompleteCond);
 		BS_THREAD_SYNCHRONISER(mTaskCompleteCond);
 	};
 	};

+ 3 - 3
BansheeUtility/Include/BsThreadDefines.h

@@ -11,17 +11,17 @@
 #include "BsSpinLock.h"
 #include "BsSpinLock.h"
 
 
 #define BS_AUTO_MUTEX mutable std::mutex BS_AUTO_MUTEX_NAME;
 #define BS_AUTO_MUTEX mutable std::mutex BS_AUTO_MUTEX_NAME;
-#define BS_LOCK_AUTO_MUTEX std::unique_lock<std::mutex> cmAutoMutexLock(BS_AUTO_MUTEX_NAME);
+#define BS_LOCK_AUTO_MUTEX std::unique_lock<std::mutex> bsAutoMutexLock(BS_AUTO_MUTEX_NAME);
 #define BS_MUTEX(name) mutable std::mutex name;
 #define BS_MUTEX(name) mutable std::mutex name;
 #define BS_STATIC_MUTEX(name) static std::mutex name;
 #define BS_STATIC_MUTEX(name) static std::mutex name;
 #define BS_STATIC_MUTEX_INSTANCE(name) std::mutex name;
 #define BS_STATIC_MUTEX_INSTANCE(name) std::mutex name;
 #define BS_STATIC_MUTEX_CLASS_INSTANCE(name, classTypeName) std::mutex classTypeName##::name;
 #define BS_STATIC_MUTEX_CLASS_INSTANCE(name, classTypeName) std::mutex classTypeName##::name;
-#define BS_LOCK_MUTEX(name) std::unique_lock<std::mutex> cmnameLock(name);
+#define BS_LOCK_MUTEX(name) std::unique_lock<std::mutex> bsnameLock(name);
 #define BS_LOCK_MUTEX_NAMED(mutexName, lockName) std::unique_lock<std::mutex> lockName(mutexName);
 #define BS_LOCK_MUTEX_NAMED(mutexName, lockName) std::unique_lock<std::mutex> lockName(mutexName);
 #define BS_LOCK_TYPE std::unique_lock<std::mutex>
 #define BS_LOCK_TYPE std::unique_lock<std::mutex>
 // like BS_AUTO_MUTEX but mutex held by pointer
 // like BS_AUTO_MUTEX but mutex held by pointer
 #define BS_AUTO_SHARED_MUTEX mutable std::mutex *BS_AUTO_MUTEX_NAME;
 #define BS_AUTO_SHARED_MUTEX mutable std::mutex *BS_AUTO_MUTEX_NAME;
-#define BS_LOCK_AUTO_SHARED_MUTEX assert(BS_AUTO_MUTEX_NAME); std::lock_guard<std::mutex> cmAutoMutexLock(*BS_AUTO_MUTEX_NAME);
+#define BS_LOCK_AUTO_SHARED_MUTEX assert(BS_AUTO_MUTEX_NAME); std::lock_guard<std::mutex> bsAutoMutexLock(*BS_AUTO_MUTEX_NAME);
 #define BS_COPY_AUTO_SHARED_MUTEX(from) assert(!BS_AUTO_MUTEX_NAME); BS_AUTO_MUTEX_NAME = from;
 #define BS_COPY_AUTO_SHARED_MUTEX(from) assert(!BS_AUTO_MUTEX_NAME); BS_AUTO_MUTEX_NAME = from;
 #define BS_SET_AUTO_SHARED_MUTEX_NULL BS_AUTO_MUTEX_NAME = 0;
 #define BS_SET_AUTO_SHARED_MUTEX_NULL BS_AUTO_MUTEX_NAME = 0;
 #define BS_MUTEX_CONDITIONAL(mutex) if (mutex)
 #define BS_MUTEX_CONDITIONAL(mutex) if (mutex)

+ 35 - 1
BansheeUtility/Include/BsThreadPool.h

@@ -5,6 +5,27 @@
 
 
 namespace BansheeEngine
 namespace BansheeEngine
 {
 {
+	class ThreadPool;
+
+	/**
+	 * @brief	Handle to a thread managed by ThreadPool.
+	 */
+	class BS_UTILITY_EXPORT HThread
+	{
+	public:
+		HThread();
+		HThread(ThreadPool* pool, UINT32 threadId);
+
+		/**
+		 * @brief	Block the calling thread until the thread this handle points to completes.
+		 */
+		void blockUntilComplete();
+
+	private:
+		UINT32 mThreadId;
+		ThreadPool* mPool;
+	};
+
 	/**
 	/**
 	 * @brief	Wrapper around a thread that is used within ThreadPool.
 	 * @brief	Wrapper around a thread that is used within ThreadPool.
 	 */
 	 */
@@ -50,6 +71,11 @@ namespace BansheeEngine
 		 */
 		 */
 		void setName(const String& name);
 		void setName(const String& name);
 
 
+		/**
+		 * @brief	Gets unique ID of the currently executing thread.
+		 */
+		UINT32 getId() const;
+
 		/**
 		/**
 		 * @brief	Called when the thread is first created.
 		 * @brief	Called when the thread is first created.
 		 */
 		 */
@@ -61,6 +87,8 @@ namespace BansheeEngine
 		virtual void onThreadEnded(const String& name) = 0;
 		virtual void onThreadEnded(const String& name) = 0;
 
 
 	protected:
 	protected:
+		friend class HThread;
+
 		/**
 		/**
 		 * @brief	Primary worker method that is ran when the thread is first
 		 * @brief	Primary worker method that is ran when the thread is first
 		 * 			initialized.
 		 * 			initialized.
@@ -71,6 +99,7 @@ namespace BansheeEngine
 		std::function<void()> mWorkerMethod;
 		std::function<void()> mWorkerMethod;
 
 
 		String mName;
 		String mName;
+		UINT32 mId;
 		bool mIdle;
 		bool mIdle;
 		bool mThreadStarted;
 		bool mThreadStarted;
 		bool mThreadReady;
 		bool mThreadReady;
@@ -81,6 +110,7 @@ namespace BansheeEngine
 		BS_MUTEX(mMutex);
 		BS_MUTEX(mMutex);
 		BS_THREAD_SYNCHRONISER(mStartedCond);
 		BS_THREAD_SYNCHRONISER(mStartedCond);
 		BS_THREAD_SYNCHRONISER(mReadyCond);
 		BS_THREAD_SYNCHRONISER(mReadyCond);
+		BS_THREAD_SYNCHRONISER(mWorkerEndedCond);
 	};
 	};
 
 
 	/**
 	/**
@@ -140,8 +170,10 @@ namespace BansheeEngine
 		 *
 		 *
 		 * @param	name			A name you may use for more easily identifying the thread.
 		 * @param	name			A name you may use for more easily identifying the thread.
 		 * @param	workerMethod	The worker method to be called by the thread.
 		 * @param	workerMethod	The worker method to be called by the thread.
+		 *
+		 * @returns	A thread handle you may use for monitoring the thread execution.
 		 */
 		 */
-		void run(const String& name, std::function<void()> workerMethod);
+		HThread run(const String& name, std::function<void()> workerMethod);
 
 
 		/**
 		/**
 		 * @brief	Stops all threads and destroys them. Caller must ensure each threads workerMethod
 		 * @brief	Stops all threads and destroys them. Caller must ensure each threads workerMethod
@@ -171,6 +203,8 @@ namespace BansheeEngine
 		UINT32 getNumAllocated() const;
 		UINT32 getNumAllocated() const;
 
 
 	protected:
 	protected:
+		friend class HThread;
+
 		Vector<PooledThread*> mThreads;
 		Vector<PooledThread*> mThreads;
 		
 		
 		/**
 		/**

+ 35 - 5
BansheeUtility/Source/BsTaskScheduler.cpp

@@ -42,15 +42,33 @@ namespace BansheeEngine
 	{
 	{
 		mMaxActiveTasks = BS_THREAD_HARDWARE_CONCURRENCY;
 		mMaxActiveTasks = BS_THREAD_HARDWARE_CONCURRENCY;
 
 
-		ThreadPool::instance().run("TaskScheduler", std::bind(&TaskScheduler::runMain, this));
+		mTaskSchedulerThread = ThreadPool::instance().run("TaskScheduler", std::bind(&TaskScheduler::runMain, this));
 	}
 	}
 
 
 	TaskScheduler::~TaskScheduler()
 	TaskScheduler::~TaskScheduler()
 	{
 	{
-		BS_LOCK_MUTEX(mReadyMutex);
+		// Wait until all tasks complete
+		BS_LOCK_MUTEX_NAMED(mActiveTaskMutex, activeTaskLock);
+
+		while (mActiveTasks.size() > 0)
+		{
+			TaskPtr task = mActiveTasks[0];
+			activeTaskLock.unlock();
+
+			task->wait();
+			activeTaskLock.lock();
+		}
+
+		// Start shutdown of the main queue worker and wait until it exits
+		{
+			BS_LOCK_MUTEX(mReadyMutex);
+
+			mShutdown = true;
+		}
 
 
-		mShutdown = true;
 		BS_THREAD_NOTIFY_ONE(mTaskReadyCond);
 		BS_THREAD_NOTIFY_ONE(mTaskReadyCond);
+
+		mTaskSchedulerThread.blockUntilComplete();
 	}
 	}
 
 
 	void TaskScheduler::addTask(const TaskPtr& task)
 	void TaskScheduler::addTask(const TaskPtr& task)
@@ -107,8 +125,12 @@ namespace BansheeEngine
 				if(curTask->mTaskDependency != nullptr && !curTask->mTaskDependency->isComplete())
 				if(curTask->mTaskDependency != nullptr && !curTask->mTaskDependency->isComplete())
 					continue;
 					continue;
 
 
-				curTask->mState.store(1);
-				mNumActiveTasks++;
+				BS_LOCK_MUTEX(mActiveTaskMutex);
+				{
+					curTask->mState.store(1);
+					mActiveTasks.push_back(curTask);
+					mNumActiveTasks++;
+				}
 
 
 				ThreadPool::instance().run(curTask->mName, std::bind(&TaskScheduler::runTask, this, curTask));
 				ThreadPool::instance().run(curTask->mName, std::bind(&TaskScheduler::runTask, this, curTask));
 			}
 			}
@@ -119,6 +141,14 @@ namespace BansheeEngine
 	{
 	{
 		task->mTaskWorker();
 		task->mTaskWorker();
 
 
+		{
+			BS_LOCK_MUTEX(mActiveTaskMutex);
+
+			auto findIter = std::find(mActiveTasks.begin(), mActiveTasks.end(), task);
+			if (findIter != mActiveTasks.end())
+				mActiveTasks.erase(findIter);
+		}
+
 		{
 		{
 			BS_LOCK_MUTEX(mCompleteMutex);
 			BS_LOCK_MUTEX(mCompleteMutex);
 			task->mState.store(2);
 			task->mState.store(2);

+ 50 - 3
BansheeUtility/Source/BsThreadPool.cpp

@@ -2,9 +2,43 @@
 
 
 namespace BansheeEngine
 namespace BansheeEngine
 {
 {
+	HThread::HThread()
+		:mPool(nullptr), mThreadId(0)
+	{ }
+
+	HThread::HThread(ThreadPool* pool, UINT32 threadId)
+		:mPool(pool), mThreadId(threadId)
+	{ }
+
+	void HThread::blockUntilComplete()
+	{
+		PooledThread* parentThread = nullptr;
+
+		{
+			BS_LOCK_MUTEX(mPool->mMutex);
+
+			for (auto& thread : mPool->mThreads)
+			{
+				if (thread->getId() == mThreadId)
+					parentThread = thread;
+			}
+		}
+
+		if (parentThread != nullptr)
+		{
+			BS_LOCK_MUTEX_NAMED(parentThread->mMutex, lock);
+
+			if (parentThread->mId == mThreadId) // Check again in case it changed
+			{
+				while (!parentThread->mIdle)
+					BS_THREAD_WAIT(parentThread->mWorkerEndedCond, parentThread->mMutex, lock);
+			}
+		}
+	}
+
 	PooledThread::PooledThread(const String& name)
 	PooledThread::PooledThread(const String& name)
 		:mName(name), mIdle(true), mThreadStarted(false),
 		:mName(name), mIdle(true), mThreadStarted(false),
-			mThreadReady(false), mIdleTime(0)
+		mThreadReady(false), mIdleTime(0), mId(0)
 	{ }
 	{ }
 
 
 	PooledThread::~PooledThread()
 	PooledThread::~PooledThread()
@@ -30,6 +64,7 @@ namespace BansheeEngine
 			mIdle = false;
 			mIdle = false;
 			mIdleTime = std::time(nullptr);
 			mIdleTime = std::time(nullptr);
 			mThreadReady = true;
 			mThreadReady = true;
+			mId++;
 		}
 		}
 
 
 		BS_THREAD_NOTIFY_ONE(mReadyCond);
 		BS_THREAD_NOTIFY_ONE(mReadyCond);
@@ -73,6 +108,8 @@ namespace BansheeEngine
 				mIdle = true;
 				mIdle = true;
 				mIdleTime = std::time(nullptr);
 				mIdleTime = std::time(nullptr);
 				mThreadReady = false;
 				mThreadReady = false;
+
+				BS_THREAD_NOTIFY_ONE(mWorkerEndedCond);
 			}
 			}
 		}
 		}
 	}
 	}
@@ -109,6 +146,13 @@ namespace BansheeEngine
 		mName = name;
 		mName = name;
 	}
 	}
 
 
+	UINT32 PooledThread::getId() const
+	{ 
+		BS_LOCK_MUTEX(mMutex);
+
+		return mId; 
+	}
+
 	ThreadPool::ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity, UINT32 idleTimeout)
 	ThreadPool::ThreadPool(UINT32 threadCapacity, UINT32 maxCapacity, UINT32 idleTimeout)
 		:mDefaultCapacity(threadCapacity), mMaxCapacity(maxCapacity), mIdleTimeout(idleTimeout), mAge(0)
 		:mDefaultCapacity(threadCapacity), mMaxCapacity(maxCapacity), mIdleTimeout(idleTimeout), mAge(0)
 	{
 	{
@@ -120,9 +164,12 @@ namespace BansheeEngine
 		stopAll();
 		stopAll();
 	}
 	}
 
 
-	void ThreadPool::run(const String& name, std::function<void()> workerMethod)
+	HThread ThreadPool::run(const String& name, std::function<void()> workerMethod)
 	{
 	{
-		getThread(name)->start(workerMethod);
+		PooledThread* thread = getThread(name);
+		thread->start(workerMethod);
+
+		return HThread(this, thread->getId());
 	}
 	}
 
 
 	void ThreadPool::stopAll()
 	void ThreadPool::stopAll()

+ 2 - 0
Polish.txt

@@ -21,6 +21,8 @@ Polish TODO:
  - Updates links in git readme
  - Updates links in git readme
  - Update links in license text
  - Update links in license text
 
 
+TaskScheduler crashes on shutdown. My guess is because mutex and condition var go outs of scope but they're still being called on the task thread. Similar issue might exist in CoreThread.
+
 Automatically release resources on shutdown instead of forcing the user to manually call unload() and set handles to nullptr.
 Automatically release resources on shutdown instead of forcing the user to manually call unload() and set handles to nullptr.
  - e.g. just invalidate all handles automatically
  - e.g. just invalidate all handles automatically