pacinghandler.cpp 1.9 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667
  1. /**
  2. * Copyright (c) 2024 Sean DuBois <[email protected]>
  3. *
  4. * This Source Code Form is subject to the terms of the Mozilla Public
  5. * License, v. 2.0. If a copy of the MPL was not distributed with this
  6. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  7. */
  8. #if RTC_ENABLE_MEDIA
  9. #include <memory>
  10. #include "pacinghandler.hpp"
  11. #include "impl/internals.hpp"
  12. #include "impl/threadpool.hpp"
  13. namespace rtc {
  14. PacingHandler::PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval)
  15. : mBytesPerSecond(bitsPerSecond / 8), mBudget(0.), mSendInterval(sendInterval) {};
  16. void PacingHandler::schedule(const message_callback &send) {
  17. if (!mHaveScheduled.exchange(true))
  18. impl::ThreadPool::Instance().schedule(mSendInterval,
  19. weak_bind(&PacingHandler::run, this, send));
  20. }
  21. void PacingHandler::run(const message_callback &send) {
  22. const std::lock_guard<std::mutex> lock(mMutex);
  23. mHaveScheduled.store(false);
  24. // Update the budget and cap it
  25. auto now = std::chrono::high_resolution_clock::now();
  26. auto newBudget = std::chrono::duration<double>(now - mLastRun).count() * mBytesPerSecond;
  27. auto maxBudget = std::chrono::duration<double>(mSendInterval).count() * mBytesPerSecond;
  28. mBudget = std::min(mBudget + newBudget, maxBudget);
  29. mLastRun = std::chrono::high_resolution_clock::now();
  30. // Send packets while there is budget, allow a single partial packet over budget
  31. while (!mRtpBuffer.empty() && mBudget > 0) {
  32. auto size = int(mRtpBuffer.front()->size());
  33. send(std::move(mRtpBuffer.front()));
  34. mRtpBuffer.pop();
  35. mBudget -= size;
  36. }
  37. if (!mRtpBuffer.empty()) {
  38. schedule(send);
  39. }
  40. }
  41. void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {
  42. std::lock_guard<std::mutex> lock(mMutex);
  43. for (auto &m : messages) {
  44. mRtpBuffer.push(std::move(m));
  45. }
  46. messages.clear();
  47. schedule(send);
  48. }
  49. } // namespace rtc
  50. #endif /* RTC_ENABLE_MEDIA */