dispatchqueue.cpp 2.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. /**
  2. * libdatachannel streamer example
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  8. */
  9. #include "dispatchqueue.hpp"
  10. DispatchQueue::DispatchQueue(std::string name, size_t threadCount) :
  11. name{std::move(name)}, threads(threadCount) {
  12. for(size_t i = 0; i < threads.size(); i++)
  13. {
  14. threads[i] = std::thread(&DispatchQueue::dispatchThreadHandler, this);
  15. }
  16. }
  17. DispatchQueue::~DispatchQueue() {
  18. // Signal to dispatch threads that it's time to wrap up
  19. std::unique_lock<std::mutex> lock(lockMutex);
  20. quit = true;
  21. lock.unlock();
  22. condition.notify_all();
  23. // Wait for threads to finish before we exit
  24. for(size_t i = 0; i < threads.size(); i++)
  25. {
  26. if(threads[i].joinable())
  27. {
  28. threads[i].join();
  29. }
  30. }
  31. }
  32. void DispatchQueue::removePending() {
  33. std::unique_lock<std::mutex> lock(lockMutex);
  34. queue = {};
  35. }
  36. void DispatchQueue::dispatch(const fp_t& op) {
  37. std::unique_lock<std::mutex> lock(lockMutex);
  38. queue.push(op);
  39. // Manual unlocking is done before notifying, to avoid waking up
  40. // the waiting thread only to block again (see notify_one for details)
  41. lock.unlock();
  42. condition.notify_one();
  43. }
  44. void DispatchQueue::dispatch(fp_t&& op) {
  45. std::unique_lock<std::mutex> lock(lockMutex);
  46. queue.push(std::move(op));
  47. // Manual unlocking is done before notifying, to avoid waking up
  48. // the waiting thread only to block again (see notify_one for details)
  49. lock.unlock();
  50. condition.notify_one();
  51. }
  52. void DispatchQueue::dispatchThreadHandler(void) {
  53. std::unique_lock<std::mutex> lock(lockMutex);
  54. do {
  55. //Wait until we have data or a quit signal
  56. condition.wait(lock, [this]{
  57. return (queue.size() || quit);
  58. });
  59. //after wait, we own the lock
  60. if(!quit && queue.size())
  61. {
  62. auto op = std::move(queue.front());
  63. queue.pop();
  64. //unlock now that we're done messing with the queue
  65. lock.unlock();
  66. op();
  67. lock.lock();
  68. }
  69. } while (!quit);
  70. }