threadpool.cpp 2.0 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091
  1. /**
  2. * Copyright (c) 2020 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "threadpool.hpp"
  19. namespace rtc {
  20. ThreadPool &ThreadPool::Instance() {
  21. // Init handles joining on cleanup
  22. static ThreadPool *instance = new ThreadPool;
  23. return *instance;
  24. }
  25. ThreadPool::~ThreadPool() { join(); }
  26. int ThreadPool::count() const {
  27. std::unique_lock lock(mWorkersMutex);
  28. return int(mWorkers.size());
  29. }
  30. void ThreadPool::spawn(int count) {
  31. std::unique_lock lock(mWorkersMutex);
  32. mJoining = false;
  33. while (count-- > 0)
  34. mWorkers.emplace_back(std::bind(&ThreadPool::run, this));
  35. }
  36. void ThreadPool::join() {
  37. std::unique_lock lock(mWorkersMutex);
  38. mJoining = true;
  39. mCondition.notify_all();
  40. for (auto &w : mWorkers)
  41. w.join();
  42. mWorkers.clear();
  43. }
  44. void ThreadPool::run() {
  45. while (runOne()) {
  46. }
  47. }
  48. bool ThreadPool::runOne() {
  49. if (auto task = dequeue()) {
  50. task();
  51. return true;
  52. }
  53. return false;
  54. }
  55. std::function<void()> ThreadPool::dequeue() {
  56. std::unique_lock lock(mMutex);
  57. while (true) {
  58. if (!mTasks.empty()) {
  59. if (mTasks.top().time <= clock::now()) {
  60. auto func = std::move(mTasks.top().func);
  61. mTasks.pop();
  62. return func;
  63. }
  64. if (mJoining)
  65. break;
  66. mCondition.wait_until(lock, mTasks.top().time);
  67. } else {
  68. if (mJoining)
  69. break;
  70. mCondition.wait(lock);
  71. }
  72. }
  73. return nullptr;
  74. }
  75. } // namespace rtc