/** * libdatachannel streamer example * Copyright (c) 2020 Filip Klembara (in2core) * * This Source Code Form is subject to the terms of the Mozilla Public * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at https://mozilla.org/MPL/2.0/. */ #include "dispatchqueue.hpp" DispatchQueue::DispatchQueue(std::string name, size_t threadCount) : name{std::move(name)}, threads(threadCount) { for(size_t i = 0; i < threads.size(); i++) { threads[i] = std::thread(&DispatchQueue::dispatchThreadHandler, this); } } DispatchQueue::~DispatchQueue() { // Signal to dispatch threads that it's time to wrap up std::unique_lock lock(lockMutex); quit = true; lock.unlock(); condition.notify_all(); // Wait for threads to finish before we exit for(size_t i = 0; i < threads.size(); i++) { if(threads[i].joinable()) { threads[i].join(); } } } void DispatchQueue::removePending() { std::unique_lock lock(lockMutex); queue = {}; } void DispatchQueue::dispatch(const fp_t& op) { std::unique_lock lock(lockMutex); queue.push(op); // Manual unlocking is done before notifying, to avoid waking up // the waiting thread only to block again (see notify_one for details) lock.unlock(); condition.notify_one(); } void DispatchQueue::dispatch(fp_t&& op) { std::unique_lock lock(lockMutex); queue.push(std::move(op)); // Manual unlocking is done before notifying, to avoid waking up // the waiting thread only to block again (see notify_one for details) lock.unlock(); condition.notify_one(); } void DispatchQueue::dispatchThreadHandler(void) { std::unique_lock lock(lockMutex); do { //Wait until we have data or a quit signal condition.wait(lock, [this]{ return (queue.size() || quit); }); //after wait, we own the lock if(!quit && queue.size()) { auto op = std::move(queue.front()); queue.pop(); //unlock now that we're done messing with the queue lock.unlock(); op(); lock.lock(); } } while (!quit); }