Browse Source

Added ThreadPool

Paul-Louis Ageneau 5 years ago
parent
commit
e41019a1f0
3 changed files with 168 additions and 0 deletions
  1. 1 0
      CMakeLists.txt
  2. 87 0
      src/threadpool.cpp
  3. 80 0
      src/threadpool.hpp

+ 1 - 0
CMakeLists.txt

@@ -43,6 +43,7 @@ set(LIBDATACHANNEL_SOURCES
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/rtc.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/sctptransport.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
 	${CMAKE_CURRENT_SOURCE_DIR}/src/tls.cpp
+	${CMAKE_CURRENT_SOURCE_DIR}/src/threadpool.cpp
 )
 )
 
 
 set(LIBDATACHANNEL_WEBSOCKET_SOURCES
 set(LIBDATACHANNEL_WEBSOCKET_SOURCES

+ 87 - 0
src/threadpool.cpp

@@ -0,0 +1,87 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#include "threadpool.hpp"
+
+namespace rtc {
+
+ThreadPool::ThreadPool(int count) { spawn(count); }
+
+ThreadPool::~ThreadPool() { join(); }
+
+int ThreadPool::count() const {
+	std::unique_lock lock(mMutex);
+	return mWorkers.size();
+}
+
+void ThreadPool::spawn(int count) {
+	std::unique_lock lock(mMutex);
+	while (count-- > 0)
+		mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
+}
+
+void ThreadPool::join() {
+	try {
+		std::unique_lock lock(mMutex);
+		mJoining = true;
+		mCondition.notify_all();
+
+		auto workers = std::move(mWorkers);
+		mWorkers.clear();
+
+		lock.unlock();
+		for (auto &w : workers)
+			w.join();
+
+	} catch (...) {
+		mJoining = false;
+		throw;
+	}
+
+	mJoining = false;
+}
+
+void ThreadPool::run() {
+	while (runOne()) {
+	}
+}
+
+bool ThreadPool::runOne() {
+	if (auto task = dequeue()) {
+		try {
+			task();
+		} catch (const std::exception &e) {
+			PLOG_WARNING << "Unhandled exception in task: " << e.what();
+		}
+		return true;
+	}
+	return false;
+}
+
+std::function<void()> ThreadPool::dequeue() {
+	std::unique_lock lock(mMutex);
+	mCondition.wait(lock, [this]() { return !mTasks.empty() || mJoining; });
+	if (mTasks.empty())
+		return nullptr;
+	auto task = std::move(mTasks.front());
+	mTasks.pop();
+	return task;
+}
+
+} // namespace rtc
+

+ 80 - 0
src/threadpool.hpp

@@ -0,0 +1,80 @@
+/**
+ * Copyright (c) 2020 Paul-Louis Ageneau
+ *
+ * This library is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU Lesser General Public
+ * License as published by the Free Software Foundation; either
+ * version 2.1 of the License, or (at your option) any later version.
+ *
+ * This library is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU
+ * Lesser General Public License for more details.
+ *
+ * You should have received a copy of the GNU Lesser General Public
+ * License along with this library; if not, write to the Free Software
+ * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
+ */
+
+#ifndef RTC_THREADPOOL_H
+#define RTC_THREADPOOL_H
+
+#include "include.hpp"
+
+#include <condition_variable>
+#include <functional>
+#include <future>
+#include <mutex>
+#include <queue>
+#include <stdexcept>
+#include <thread>
+#include <vector>
+
+namespace rtc {
+
+template <class F, class... Args>
+using invoke_future_t = std::future<std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>>;
+
+class ThreadPool final {
+public:
+	explicit ThreadPool(int count);
+	ThreadPool(const ThreadPool &) = delete;
+	ThreadPool &operator=(const ThreadPool &) = delete;
+	ThreadPool(ThreadPool &&) = delete;
+	ThreadPool &operator=(ThreadPool &&) = delete;
+	~ThreadPool();
+
+	int count() const;
+	void spawn(int count = 1);
+	void join();
+	void run();
+	bool runOne();
+
+	template <class F, class... Args>
+	auto enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...>;
+
+protected:
+	std::function<void()> dequeue(); // returns null function if joining
+
+	std::vector<std::thread> mWorkers;
+	std::queue<std::function<void()>> mTasks;
+
+	std::mutex mMutex;
+	std::condition_variable mCondition;
+	bool mJoining = false;
+};
+
+template <class F, class... Args>
+auto ThreadPool::enqueue(F &&f, Args &&... args) -> invoke_future_t<F, Args...> {
+	std::unique_lock lock(mMutex);
+	using R = std::invoke_result_t<std::decay_t<F>, std::decay_t<Args>...>;
+	auto task = std::packaged_task<R()>(std::bind(std::forward<F>(f), std::forward<Args>(args)...));
+	std::future<R> result = task.get_future();
+	mTasks.emplace([task = std::move(task)]() { task(); });
+	mCondition.notify_one();
+	return result;
+}
+
+} // namespace rtc
+
+#endif