stream.cpp 2.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107
  1. /*
  2. * libdatachannel streamer example
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This program is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU General Public License
  7. * as published by the Free Software Foundation; either version 2
  8. * of the License, or (at your option) any later version.
  9. *
  10. * This program is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  13. * GNU General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU General Public License
  16. * along with this program; If not, see <http://www.gnu.org/licenses/>.
  17. */
  18. #include "stream.hpp"
  19. #include "helpers.hpp"
  20. void StreamSource::stop() {
  21. sampleTime_us = 0;
  22. sample = {};
  23. }
  24. StreamSource::~StreamSource() {
  25. stop();
  26. }
  27. Stream::Stream(std::shared_ptr<StreamSource> video, std::shared_ptr<StreamSource> audio): std::enable_shared_from_this<Stream>(), video(video), audio(audio) { }
  28. Stream::~Stream() {
  29. stop();
  30. }
  31. std::pair<std::shared_ptr<StreamSource>, Stream::StreamSourceType> Stream::unsafePrepareForSample() {
  32. std::shared_ptr<StreamSource> ss;
  33. StreamSourceType sst;
  34. uint64_t nextTime;
  35. if (audio->getSampleTime_us() < video->getSampleTime_us()) {
  36. ss = audio;
  37. sst = StreamSourceType::Audio;
  38. nextTime = audio->getSampleTime_us();
  39. } else {
  40. ss = video;
  41. sst = StreamSourceType::Video;
  42. nextTime = video->getSampleTime_us();
  43. }
  44. auto currentTime = currentTimeInMicroSeconds();
  45. auto elapsed = currentTime - startTime;
  46. if (nextTime > elapsed) {
  47. auto waitTime = nextTime - elapsed;
  48. mutex.unlock();
  49. usleep(waitTime);
  50. mutex.lock();
  51. }
  52. return {ss, sst};
  53. }
  54. void Stream::sendSample() {
  55. std::lock_guard lock(mutex);
  56. if (!isRunning) {
  57. return;
  58. }
  59. auto ssSST = unsafePrepareForSample();
  60. auto ss = ssSST.first;
  61. auto sst = ssSST.second;
  62. auto sample = ss->getSample();
  63. sampleHandler(sst, ss->getSampleTime_us(), sample);
  64. ss->loadNextSample();
  65. dispatchQueue.dispatch([this]() {
  66. this->sendSample();
  67. });
  68. }
  69. void Stream::onSample(std::function<void (StreamSourceType, uint64_t, rtc::binary)> handler) {
  70. sampleHandler = handler;
  71. }
  72. void Stream::start() {
  73. std::lock_guard lock(mutex);
  74. if (isRunning) {
  75. return;
  76. }
  77. _isRunning = true;
  78. startTime = currentTimeInMicroSeconds();
  79. audio->start();
  80. video->start();
  81. dispatchQueue.dispatch([this]() {
  82. this->sendSample();
  83. });
  84. }
  85. void Stream::stop() {
  86. std::lock_guard lock(mutex);
  87. if (!isRunning) {
  88. return;
  89. }
  90. _isRunning = false;
  91. dispatchQueue.removePending();
  92. audio->stop();
  93. video->stop();
  94. };