Selaa lähdekoodia

Merge pull request #326 from paullouisageneau/fix-deadlock-at-exit

Fix possible deadlock at exit
Paul-Louis Ageneau 4 vuotta sitten
vanhempi
commit
690653f8ef
4 muutettua tiedostoa jossa 45 lisäystä ja 12 poistoa
  1. 27 6
      src/processor.cpp
  2. 5 5
      src/processor.hpp
  3. 4 0
      src/threadpool.cpp
  4. 9 1
      src/threadpool.hpp

+ 27 - 6
src/processor.cpp

@@ -25,18 +25,39 @@ Processor::Processor(size_t limit) : mTasks(limit) {}
 Processor::~Processor() { join(); }
 
 void Processor::join() {
-	std::unique_lock lock(mMutex);
-	mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); });
+	// We need to detect situations where the thread pool does not execute a pending task at exit
+	std::optional<unsigned int> counter;
+	while (true) {
+		std::shared_future<void> pending;
+		{
+			std::unique_lock lock(mMutex);
+			if (!mPending                               // no pending task
+			    || (counter && *counter == mCounter)) { // or no scheduled task after the last one
+
+				// Processing is stopped, clear everything and return
+				mPending.reset();
+				while (!mTasks.empty())
+					mTasks.pop();
+
+				return;
+			}
+
+			pending = *mPending;
+			counter = mCounter;
+		}
+
+		// Wait for the pending task
+		pending.wait();
+	}
 }
 
 void Processor::schedule() {
 	std::unique_lock lock(mMutex);
 	if (auto next = mTasks.tryPop()) {
-		ThreadPool::Instance().enqueue(std::move(*next));
+		mPending = ThreadPool::Instance().enqueue(std::move(*next)).share();
+		++mCounter;
 	} else {
-		// No more tasks
-		mPending = false;
-		mCondition.notify_all();
+		mPending.reset(); // No more tasks
 	}
 }
 

+ 5 - 5
src/processor.hpp

@@ -54,10 +54,10 @@ protected:
 	const init_token mInitToken = Init::Token();
 
 	Queue<std::function<void()>> mTasks;
-	bool mPending = false; // true iff a task is pending in the thread pool
+	std::optional<std::shared_future<void>> mPending; // future of the pending task
+	unsigned int mCounter = 0; // Number of scheduled tasks
 
 	mutable std::mutex mMutex;
-	std::condition_variable mCondition;
 };
 
 template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args) {
@@ -65,12 +65,12 @@ template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args)
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
 	auto task = [this, bound = std::move(bound)]() mutable {
 		scope_guard guard(std::bind(&Processor::schedule, this)); // chain the next task
-		return bound();
+		bound();
 	};
 
 	if (!mPending) {
-		ThreadPool::Instance().enqueue(std::move(task));
-		mPending = true;
+		mPending = ThreadPool::Instance().enqueue(std::move(task)).share();
+		++mCounter;
 	} else {
 		mTasks.push(std::move(task));
 	}

+ 4 - 0
src/threadpool.cpp

@@ -96,6 +96,10 @@ std::function<void()> ThreadPool::dequeue() {
 			mCondition.wait(lock);
 		}
 	}
+
+	while (!mTasks.empty())
+		mTasks.pop();
+
 	return nullptr;
 }
 

+ 9 - 1
src/threadpool.hpp

@@ -100,8 +100,16 @@ auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args)
 template <class F, class... Args>
 auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args)
     -> invoke_future_t<F, Args...> {
-	std::unique_lock lock(mMutex);
 	using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
+	std::unique_lock lock(mMutex);
+	if (mJoining) {
+		std::promise<R> promise;
+		std::future<R> result = promise.get_future();
+		promise.set_exception(std::make_exception_ptr(
+		    std::runtime_error("Scheduled a task while joining the thread pool")));
+		return result;
+	}
+
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
 	auto task = std::make_shared<std::packaged_task<R()>>([bound = std::move(bound)]() mutable {
 		try {