Przeglądaj źródła

Added schedule() method to ThreadPool

Paul-Louis Ageneau 4 lat temu
rodzic
commit
4629e57931
2 zmienionych plików z 41 dodań i 8 usunięć
  1. 15 3
      src/threadpool.cpp
  2. 26 5
      src/threadpool.hpp

+ 15 - 3
src/threadpool.cpp

@@ -66,11 +66,23 @@ bool ThreadPool::runOne() {
 
 std::function<void()> ThreadPool::dequeue() {
 	std::unique_lock lock(mMutex);
-	mCondition.wait(lock, [this]() { return !mTasks.empty() || mJoining; });
+	while(!mJoining) {
+		if(!mTasks.empty()) {
+			clock::time_point time = mTasks.begin()->first;
+			if(time <= clock::now())
+				break;
+
+			mCondition.wait_until(lock, time);
+
+		} else {
+			mCondition.wait(lock);
+		}
+	}
 	if (mTasks.empty())
 		return nullptr;
-	auto task = std::move(mTasks.front());
-	mTasks.pop();
+
+	auto task = std::move(mTasks.begin()->second);
+	mTasks.erase(mTasks.begin());
 	return task;
 }
 

+ 26 - 5
src/threadpool.hpp

@@ -22,12 +22,13 @@
 #include "include.hpp"
 #include "init.hpp"
 
+#include <chrono>
 #include <condition_variable>
 #include <functional>
 #include <future>
+#include <map>
 #include <memory>
 #include <mutex>
-#include <queue>
 #include <stdexcept>
 #include <thread>
 #include <vector>
@@ -39,6 +40,8 @@ using invoke_future_t = std::future<std::invoke_result_t<std::decay_t<F>, std::d
 
 class ThreadPool final {
 public:
+	using clock = std::chrono::steady_clock;
+
 	static ThreadPool &Instance();
 
 	ThreadPool(const ThreadPool &) = delete;
@@ -53,7 +56,13 @@ public:
 	bool runOne();
 
 	template <class F, class... Args>
-	auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
+	auto enqueue(F &&f, Args &&...args) -> invoke_future_t<F, Args...>;
+
+	template <class F, class... Args>
+	auto schedule(clock::duration delay, F &&f, Args &&...args) -> invoke_future_t<F, Args...>;
+
+	template <class F, class... Args>
+	auto schedule(clock::time_point time, F &&f, Args &&...args) -> invoke_future_t<F, Args...>;
 
 protected:
 	ThreadPool() = default;
@@ -62,7 +71,7 @@ protected:
 	std::function<void()> dequeue(); // returns null function if joining
 
 	std::vector<std::thread> mWorkers;
-	std::queue<std::function<void()>> mTasks;
+	std::multimap<clock::time_point, std::function<void()>> mTasks;
 	std::atomic<bool> mJoining = false;
 
 	mutable std::mutex mMutex, mWorkersMutex;
@@ -70,7 +79,19 @@ protected:
 };
 
 template <class F, class... Args>
-auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
+auto ThreadPool::enqueue(F &&f, Args &&...args) -> invoke_future_t<F, Args...> {
+	return schedule(clock::now(), std::forward<F>(f), std::forward<Args>(args)...);
+}
+
+template <class F, class... Args>
+auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args)
+    -> invoke_future_t<F, Args...> {
+	return schedule(clock::now() + delay, std::forward<F>(f), std::forward<Args>(args)...);
+}
+
+template <class F, class... Args>
+auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args)
+    -> invoke_future_t<F, Args...> {
 	std::unique_lock lock(mMutex);
 	using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
@@ -84,7 +105,7 @@ auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>
 	});
 	std::future<R> result = task->get_future();
 
-	mTasks.emplace([task = std::move(task), token = Init::Token()]() { return (*task)(); });
+	mTasks.emplace(time, [task = std::move(task), token = Init::Token()]() { return (*task)(); });
 	mCondition.notify_one();
 	return result;
 }