threadpool.cpp 2.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899
  1. /**
  2. * Copyright (c) 2020 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "threadpool.hpp"
  19. namespace rtc::impl {
  20. ThreadPool &ThreadPool::Instance() {
  21. static ThreadPool *instance = new ThreadPool;
  22. return *instance;
  23. }
  24. ThreadPool::ThreadPool() {}
  25. ThreadPool::~ThreadPool() {}
  26. int ThreadPool::count() const {
  27. std::unique_lock lock(mWorkersMutex);
  28. return int(mWorkers.size());
  29. }
  30. void ThreadPool::spawn(int count) {
  31. std::unique_lock lock(mWorkersMutex);
  32. while (count-- > 0)
  33. mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
  34. }
  35. void ThreadPool::join() {
  36. {
  37. std::unique_lock lock(mMutex);
  38. mWaitingCondition.wait(lock, [&]() { return mBusyWorkers == 0; });
  39. mJoining = true;
  40. mTasksCondition.notify_all();
  41. }
  42. std::unique_lock lock(mWorkersMutex);
  43. for (auto &w : mWorkers)
  44. w.join();
  45. mWorkers.clear();
  46. mJoining = false;
  47. }
  48. void ThreadPool::run() {
  49. ++mBusyWorkers;
  50. scope_guard guard([&]() { --mBusyWorkers; });
  51. while (runOne()) {
  52. }
  53. }
  54. bool ThreadPool::runOne() {
  55. if (auto task = dequeue()) {
  56. task();
  57. return true;
  58. }
  59. return false;
  60. }
  61. std::function<void()> ThreadPool::dequeue() {
  62. std::unique_lock lock(mMutex);
  63. while (!mJoining) {
  64. std::optional<clock::time_point> time;
  65. if (!mTasks.empty()) {
  66. time = mTasks.top().time;
  67. if (*time <= clock::now()) {
  68. auto func = std::move(mTasks.top().func);
  69. mTasks.pop();
  70. return func;
  71. }
  72. }
  73. --mBusyWorkers;
  74. scope_guard guard([&]() { ++mBusyWorkers; });
  75. mWaitingCondition.notify_all();
  76. if(time)
  77. mTasksCondition.wait_until(lock, *time);
  78. else
  79. mTasksCondition.wait(lock);
  80. }
  81. return nullptr;
  82. }
  83. } // namespace rtc::impl