Browse Source

Revert "Prevent scheduling tasks while joining thread pool"

This reverts commit ab392fe0da0b3451fc16a607e0ca1040afc62e7c.
Paul-Louis Ageneau 4 years ago
parent
commit
82568e3aa0
4 changed files with 12 additions and 45 deletions
  1. 6 27
      src/processor.cpp
  2. 5 5
      src/processor.hpp
  3. 0 4
      src/threadpool.cpp
  4. 1 9
      src/threadpool.hpp

+ 6 - 27
src/processor.cpp

@@ -25,39 +25,18 @@ Processor::Processor(size_t limit) : mTasks(limit) {}
 Processor::~Processor() { join(); }
 
 void Processor::join() {
-	// We need to detect situations where the thread pool does not execute a pending task at exit
-	std::optional<unsigned int> counter;
-	while (true) {
-		std::shared_future<void> pending;
-		{
-			std::unique_lock lock(mMutex);
-			if (!mPending                               // no pending task
-			    || (counter && *counter == mCounter)) { // or no scheduled task after the last one
-
-				// Processing is stopped, clear everything and return
-				mPending.reset();
-				while (!mTasks.empty())
-					mTasks.pop();
-
-				return;
-			}
-
-			pending = *mPending;
-			counter = mCounter;
-		}
-
-		// Wait for the pending task
-		pending.wait();
-	}
+	std::unique_lock lock(mMutex);
+	mCondition.wait(lock, [this]() { return !mPending && mTasks.empty(); });
 }
 
 void Processor::schedule() {
 	std::unique_lock lock(mMutex);
 	if (auto next = mTasks.tryPop()) {
-		mPending = ThreadPool::Instance().enqueue(std::move(*next)).share();
-		++mCounter;
+		ThreadPool::Instance().enqueue(std::move(*next));
 	} else {
-		mPending.reset(); // No more tasks
+		// No more tasks
+		mPending = false;
+		mCondition.notify_all();
 	}
 }
 

+ 5 - 5
src/processor.hpp

@@ -54,10 +54,10 @@ protected:
 	const init_token mInitToken = Init::Token();
 
 	Queue<std::function<void()>> mTasks;
-	std::optional<std::shared_future<void>> mPending; // future of the pending task
-	unsigned int mCounter = 0; // Number of scheduled tasks
+	bool mPending = false; // true iff a task is pending in the thread pool
 
 	mutable std::mutex mMutex;
+	std::condition_variable mCondition;
 };
 
 template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args) {
@@ -65,12 +65,12 @@ template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args)
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
 	auto task = [this, bound = std::move(bound)]() mutable {
 		scope_guard guard(std::bind(&Processor::schedule, this)); // chain the next task
-		bound();
+		return bound();
 	};
 
 	if (!mPending) {
-		mPending = ThreadPool::Instance().enqueue(std::move(task)).share();
-		++mCounter;
+		ThreadPool::Instance().enqueue(std::move(task));
+		mPending = true;
 	} else {
 		mTasks.push(std::move(task));
 	}

+ 0 - 4
src/threadpool.cpp

@@ -96,10 +96,6 @@ std::function<void()> ThreadPool::dequeue() {
 			mCondition.wait(lock);
 		}
 	}
-
-	while (!mTasks.empty())
-		mTasks.pop();
-
 	return nullptr;
 }
 

+ 1 - 9
src/threadpool.hpp

@@ -100,16 +100,8 @@ auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args)
 template <class F, class... Args>
 auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args)
     -> invoke_future_t<F, Args...> {
-	using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
 	std::unique_lock lock(mMutex);
-	if (mJoining) {
-		std::promise<R> promise;
-		std::future<R> result = promise.get_future();
-		promise.set_exception(std::make_exception_ptr(
-		    std::runtime_error("Scheduled a task while joining the thread pool")));
-		return result;
-	}
-
+	using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
 	auto task = std::make_shared<std::packaged_task<R()>>([bound = std::move(bound)]() mutable {
 		try {