Quellcode durchsuchen

Merge pull request #300 from paullouisageneau/threadpool-schedule

Add schedule method to ThreadPool
Paul-Louis Ageneau vor 5 Jahren
Ursprung
Commit
3acddc6897
2 geänderte Dateien mit 53 neuen und 10 gelöschten Zeilen
  1. 20 6
      src/threadpool.cpp
  2. 33 4
      src/threadpool.hpp

+ 20 - 6
src/threadpool.cpp

@@ -66,12 +66,26 @@ bool ThreadPool::runOne() {
 
 
 std::function<void()> ThreadPool::dequeue() {
 std::function<void()> ThreadPool::dequeue() {
 	std::unique_lock lock(mMutex);
 	std::unique_lock lock(mMutex);
-	mCondition.wait(lock, [this]() { return !mTasks.empty() || mJoining; });
-	if (mTasks.empty())
-		return nullptr;
-	auto task = std::move(mTasks.front());
-	mTasks.pop();
-	return task;
+	while (true) {
+		if (!mTasks.empty()) {
+			if (mTasks.top().time <= clock::now()) {
+				auto func = std::move(mTasks.top().func);
+				mTasks.pop();
+				return func;
+			}
+
+			if (mJoining)
+				break;
+
+			mCondition.wait_until(lock, mTasks.top().time);
+		} else {
+			if (mJoining)
+				break;
+
+			mCondition.wait(lock);
+		}
+	}
+	return nullptr;
 }
 }
 
 
 } // namespace rtc
 } // namespace rtc

+ 33 - 4
src/threadpool.hpp

@@ -22,7 +22,9 @@
 #include "include.hpp"
 #include "include.hpp"
 #include "init.hpp"
 #include "init.hpp"
 
 
+#include <chrono>
 #include <condition_variable>
 #include <condition_variable>
+#include <deque>
 #include <functional>
 #include <functional>
 #include <future>
 #include <future>
 #include <memory>
 #include <memory>
@@ -39,6 +41,8 @@ using invoke_future_t = std::future<std::invoke_result_t<std::decay_t<F>, std::d
 
 
 class ThreadPool final {
 class ThreadPool final {
 public:
 public:
+	using clock = std::chrono::steady_clock;
+
 	static ThreadPool &Instance();
 	static ThreadPool &Instance();
 
 
 	ThreadPool(const ThreadPool &) = delete;
 	ThreadPool(const ThreadPool &) = delete;
@@ -53,7 +57,13 @@ public:
 	bool runOne();
 	bool runOne();
 
 
 	template <class F, class... Args>
 	template <class F, class... Args>
-	auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
+	auto enqueue(F &&f, Args &&...args) -> invoke_future_t<F, Args...>;
+
+	template <class F, class... Args>
+	auto schedule(clock::duration delay, F &&f, Args &&...args) -> invoke_future_t<F, Args...>;
+
+	template <class F, class... Args>
+	auto schedule(clock::time_point time, F &&f, Args &&...args) -> invoke_future_t<F, Args...>;
 
 
 protected:
 protected:
 	ThreadPool() = default;
 	ThreadPool() = default;
@@ -62,15 +72,34 @@ protected:
 	std::function<void()> dequeue(); // returns null function if joining
 	std::function<void()> dequeue(); // returns null function if joining
 
 
 	std::vector<std::thread> mWorkers;
 	std::vector<std::thread> mWorkers;
-	std::queue<std::function<void()>> mTasks;
 	std::atomic<bool> mJoining = false;
 	std::atomic<bool> mJoining = false;
 
 
+	struct Task {
+		clock::time_point time;
+		std::function<void()> func;
+		bool operator>(const Task &other) const { return time > other.time; }
+		bool operator<(const Task &other) const { return time < other.time; }
+	};
+	std::priority_queue<Task, std::deque<Task>, std::greater<Task>> mTasks;
+
 	mutable std::mutex mMutex, mWorkersMutex;
 	mutable std::mutex mMutex, mWorkersMutex;
 	std::condition_variable mCondition;
 	std::condition_variable mCondition;
 };
 };
 
 
 template <class F, class... Args>
 template <class F, class... Args>
-auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
+auto ThreadPool::enqueue(F &&f, Args &&...args) -> invoke_future_t<F, Args...> {
+	return schedule(clock::now(), std::forward<F>(f), std::forward<Args>(args)...);
+}
+
+template <class F, class... Args>
+auto ThreadPool::schedule(clock::duration delay, F &&f, Args &&...args)
+    -> invoke_future_t<F, Args...> {
+	return schedule(clock::now() + delay, std::forward<F>(f), std::forward<Args>(args)...);
+}
+
+template <class F, class... Args>
+auto ThreadPool::schedule(clock::time_point time, F &&f, Args &&...args)
+    -> invoke_future_t<F, Args...> {
 	std::unique_lock lock(mMutex);
 	std::unique_lock lock(mMutex);
 	using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
 	using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);
@@ -84,7 +113,7 @@ auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>
 	});
 	});
 	std::future<R> result = task->get_future();
 	std::future<R> result = task->get_future();
 
 
-	mTasks.emplace([task = std::move(task), token = Init::Token()]() { return (*task)(); });
+	mTasks.push({time, [task = std::move(task), token = Init::Token()]() { return (*task)(); }});
 	mCondition.notify_one();
 	mCondition.notify_one();
 	return result;
 	return result;
 }
 }