track.cpp 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265
  1. /**
  2. * Copyright (c) 2020 Paul-Louis Ageneau
  3. *
  4. * This Source Code Form is subject to the terms of the Mozilla Public
  5. * License, v. 2.0. If a copy of the MPL was not distributed with this
  6. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  7. */
  8. #include "track.hpp"
  9. #include "internals.hpp"
  10. #include "logcounter.hpp"
  11. #include "peerconnection.hpp"
  12. #include "rtp.hpp"
  13. namespace rtc::impl {
  14. static LogCounter COUNTER_MEDIA_BAD_DIRECTION(plog::warning,
  15. "Number of media packets sent in invalid directions");
  16. static LogCounter COUNTER_QUEUE_FULL(plog::warning,
  17. "Number of media packets dropped due to a full queue");
  18. Track::Track(weak_ptr<PeerConnection> pc, Description::Media desc)
  19. : mPeerConnection(pc), mMediaDescription(std::move(desc)),
  20. mRecvQueue(RECV_QUEUE_LIMIT, [](const message_ptr &m) { return m->size(); }) {
  21. // Discard messages by default if track is send only
  22. if (mMediaDescription.direction() == Description::Direction::SendOnly)
  23. messageCallback = [](message_variant) {};
  24. }
  25. Track::~Track() {
  26. PLOG_VERBOSE << "Destroying Track";
  27. try {
  28. close();
  29. } catch (const std::exception &e) {
  30. PLOG_ERROR << e.what();
  31. }
  32. }
  33. string Track::mid() const {
  34. std::shared_lock lock(mMutex);
  35. return mMediaDescription.mid();
  36. }
  37. Description::Direction Track::direction() const {
  38. std::shared_lock lock(mMutex);
  39. return mMediaDescription.direction();
  40. }
  41. Description::Media Track::description() const {
  42. std::shared_lock lock(mMutex);
  43. return mMediaDescription;
  44. }
  45. void Track::setDescription(Description::Media desc) {
  46. {
  47. std::unique_lock lock(mMutex);
  48. if (desc.mid() != mMediaDescription.mid())
  49. throw std::logic_error("Media description mid does not match track mid");
  50. mMediaDescription = std::move(desc);
  51. }
  52. if (auto handler = getMediaHandler())
  53. handler->media(description());
  54. }
  55. void Track::close() {
  56. PLOG_VERBOSE << "Closing Track";
  57. if (!mIsClosed.exchange(true))
  58. triggerClosed();
  59. setMediaHandler(nullptr);
  60. resetCallbacks();
  61. }
  62. message_variant Track::trackMessageToVariant(message_ptr message) {
  63. if (message->type == Message::Control)
  64. return to_variant(*message); // The same message may be frowarded into multiple Tracks
  65. else
  66. return to_variant(std::move(*message));
  67. }
  68. optional<message_variant> Track::receive() {
  69. if (auto next = mRecvQueue.pop()) {
  70. return trackMessageToVariant(*next);
  71. }
  72. return nullopt;
  73. }
  74. optional<message_variant> Track::peek() {
  75. if (auto next = mRecvQueue.peek()) {
  76. return trackMessageToVariant(*next);
  77. }
  78. return nullopt;
  79. }
  80. size_t Track::availableAmount() const { return mRecvQueue.amount(); }
  81. bool Track::isOpen(void) const {
  82. #if RTC_ENABLE_MEDIA
  83. std::shared_lock lock(mMutex);
  84. return !mIsClosed && mDtlsSrtpTransport.lock();
  85. #else
  86. return false;
  87. #endif
  88. }
  89. bool Track::isClosed(void) const { return mIsClosed; }
  90. size_t Track::maxMessageSize() const {
  91. optional<size_t> mtu;
  92. if (auto pc = mPeerConnection.lock())
  93. mtu = pc->config.mtu;
  94. return mtu.value_or(DEFAULT_MTU) - 12 - 8 - 40; // SRTP/UDP/IPv6
  95. }
  96. #if RTC_ENABLE_MEDIA
  97. void Track::open(shared_ptr<DtlsSrtpTransport> transport) {
  98. {
  99. std::lock_guard lock(mMutex);
  100. mDtlsSrtpTransport = transport;
  101. }
  102. if (!mIsClosed)
  103. triggerOpen();
  104. }
  105. #endif
  106. void Track::incoming(message_ptr message) {
  107. if (!message)
  108. return;
  109. auto dir = direction();
  110. if ((dir == Description::Direction::SendOnly || dir == Description::Direction::Inactive) &&
  111. message->type != Message::Control) {
  112. COUNTER_MEDIA_BAD_DIRECTION++;
  113. return;
  114. }
  115. message_vector messages{std::move(message)};
  116. if (auto handler = getMediaHandler()) {
  117. try {
  118. handler->incomingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
  119. if (auto locked = weak_this.lock()) {
  120. transportSend(m);
  121. }
  122. });
  123. } catch (const std::exception &e) {
  124. PLOG_WARNING << "Exception in incoming media handler: " << e.what();
  125. return;
  126. }
  127. }
  128. for (auto &m : messages) {
  129. // Tail drop if queue is full
  130. if (mRecvQueue.full()) {
  131. COUNTER_QUEUE_FULL++;
  132. return;
  133. }
  134. mRecvQueue.push(m);
  135. triggerAvailable(mRecvQueue.size());
  136. }
  137. }
  138. bool Track::outgoing(message_ptr message) {
  139. if (mIsClosed)
  140. throw std::runtime_error("Track is closed");
  141. auto handler = getMediaHandler();
  142. // If there is no handler, the track expects RTP or RTCP packets
  143. if (!handler && IsRtcp(*message))
  144. message->type = Message::Control; // to allow sending RTCP packets irrelevant of direction
  145. auto dir = direction();
  146. if ((dir == Description::Direction::RecvOnly || dir == Description::Direction::Inactive) &&
  147. message->type != Message::Control) {
  148. COUNTER_MEDIA_BAD_DIRECTION++;
  149. return false;
  150. }
  151. if (handler) {
  152. message_vector messages{std::move(message)};
  153. handler->outgoingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
  154. if (auto locked = weak_this.lock()) {
  155. transportSend(m);
  156. }
  157. });
  158. bool ret = false;
  159. for (auto &m : messages)
  160. ret = transportSend(std::move(m));
  161. return ret;
  162. } else {
  163. return transportSend(std::move(message));
  164. }
  165. }
  166. bool Track::transportSend([[maybe_unused]] message_ptr message) {
  167. #if RTC_ENABLE_MEDIA
  168. shared_ptr<DtlsSrtpTransport> transport;
  169. {
  170. std::shared_lock lock(mMutex);
  171. transport = mDtlsSrtpTransport.lock();
  172. if (!transport)
  173. throw std::runtime_error("Track is not open");
  174. // Set recommended medium-priority DSCP value
  175. // See https://www.rfc-editor.org/rfc/rfc8837.html#section-5
  176. if (mMediaDescription.type() == "audio")
  177. message->dscp = 46; // EF: Expedited Forwarding
  178. else
  179. message->dscp = 36; // AF42: Assured Forwarding class 4, medium drop probability
  180. }
  181. return transport->sendMedia(message);
  182. #else
  183. throw std::runtime_error("Track is disabled (not compiled with media support)");
  184. #endif
  185. }
  186. void Track::setMediaHandler(shared_ptr<MediaHandler> handler) {
  187. {
  188. std::unique_lock lock(mMutex);
  189. mMediaHandler = handler;
  190. }
  191. if (handler)
  192. handler->media(description());
  193. }
  194. shared_ptr<MediaHandler> Track::getMediaHandler() {
  195. std::shared_lock lock(mMutex);
  196. return mMediaHandler;
  197. }
  198. void Track::flushPendingMessages() {
  199. if (!mOpenTriggered)
  200. return;
  201. while (messageCallback || frameCallback) {
  202. auto next = mRecvQueue.pop();
  203. if (!next)
  204. break;
  205. auto message = next.value();
  206. try {
  207. if (message->frameInfo && frameCallback) {
  208. frameCallback(std::move(*message), std::move(*message->frameInfo));
  209. } else if (!message->frameInfo && messageCallback) {
  210. messageCallback(trackMessageToVariant(message));
  211. }
  212. } catch (const std::exception &e) {
  213. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  214. }
  215. }
  216. }
  217. } // namespace rtc::impl