peerconnection.cpp 40 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  8. */
  9. #include "peerconnection.hpp"
  10. #include "certificate.hpp"
  11. #include "common.hpp"
  12. #include "dtlstransport.hpp"
  13. #include "icetransport.hpp"
  14. #include "internals.hpp"
  15. #include "logcounter.hpp"
  16. #include "peerconnection.hpp"
  17. #include "processor.hpp"
  18. #include "rtp.hpp"
  19. #include "sctptransport.hpp"
  20. #if RTC_ENABLE_MEDIA
  21. #include "dtlssrtptransport.hpp"
  22. #endif
  23. #include <algorithm>
  24. #include <array>
  25. #include <iomanip>
  26. #include <set>
  27. #include <sstream>
  28. #include <thread>
  29. using namespace std::placeholders;
  30. namespace rtc::impl {
  31. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  32. "Number of truncated RTP packets over past second");
  33. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  34. "Number of SRTP decryption errors over past second");
  35. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  36. "Number of SRTP encryption errors over past second");
  37. static LogCounter
  38. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  39. "Number of unknown RTCP packet types over past second");
  40. PeerConnection::PeerConnection(Configuration config_)
  41. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)) {
  42. PLOG_VERBOSE << "Creating PeerConnection";
  43. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  44. throw std::invalid_argument("Invalid port range");
  45. if (config.mtu) {
  46. if (*config.mtu < 576) // Min MTU for IPv4
  47. throw std::invalid_argument("Invalid MTU value");
  48. if (*config.mtu > 1500) { // Standard Ethernet
  49. PLOG_WARNING << "MTU set to " << *config.mtu;
  50. } else {
  51. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  52. }
  53. }
  54. }
  55. PeerConnection::~PeerConnection() {
  56. PLOG_VERBOSE << "Destroying PeerConnection";
  57. mProcessor.join();
  58. }
  59. void PeerConnection::close() {
  60. negotiationNeeded = false;
  61. if (!closing.exchange(true)) {
  62. PLOG_VERBOSE << "Closing PeerConnection";
  63. if (auto transport = std::atomic_load(&mSctpTransport))
  64. transport->stop();
  65. else
  66. remoteClose();
  67. }
  68. }
  69. void PeerConnection::remoteClose() {
  70. close();
  71. if (state.load() != State::Closed) {
  72. // Close data channels and tracks asynchronously
  73. mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
  74. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  75. closeTransports();
  76. }
  77. }
  78. optional<Description> PeerConnection::localDescription() const {
  79. std::lock_guard lock(mLocalDescriptionMutex);
  80. return mLocalDescription;
  81. }
  82. optional<Description> PeerConnection::remoteDescription() const {
  83. std::lock_guard lock(mRemoteDescriptionMutex);
  84. return mRemoteDescription;
  85. }
  86. size_t PeerConnection::remoteMaxMessageSize() const {
  87. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  88. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  89. std::lock_guard lock(mRemoteDescriptionMutex);
  90. if (mRemoteDescription)
  91. if (auto *application = mRemoteDescription->application())
  92. if (auto max = application->maxMessageSize()) {
  93. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  94. // size value of zero, it indicates that the SCTP endpoint will handle messages
  95. // of any size, subject to memory capacity, etc.
  96. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  97. }
  98. return std::min(remoteMax, localMax);
  99. }
  100. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  101. template <typename T>
  102. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  103. std::atomic_store(member, transport);
  104. try {
  105. transport->start();
  106. } catch (...) {
  107. std::atomic_store(member, decltype(transport)(nullptr));
  108. throw;
  109. }
  110. if (pc->closing.load() || pc->state.load() == PeerConnection::State::Closed) {
  111. std::atomic_store(member, decltype(transport)(nullptr));
  112. transport->stop();
  113. return nullptr;
  114. }
  115. return transport;
  116. }
  117. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  118. try {
  119. if (auto transport = std::atomic_load(&mIceTransport))
  120. return transport;
  121. PLOG_VERBOSE << "Starting ICE transport";
  122. auto transport = std::make_shared<IceTransport>(
  123. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  124. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  125. auto shared_this = weak_this.lock();
  126. if (!shared_this)
  127. return;
  128. switch (transportState) {
  129. case IceTransport::State::Connecting:
  130. changeState(State::Connecting);
  131. break;
  132. case IceTransport::State::Connected:
  133. initDtlsTransport();
  134. break;
  135. case IceTransport::State::Failed:
  136. changeState(State::Failed);
  137. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  138. break;
  139. case IceTransport::State::Disconnected:
  140. changeState(State::Disconnected);
  141. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  142. break;
  143. default:
  144. // Ignore
  145. break;
  146. }
  147. },
  148. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  149. auto shared_this = weak_this.lock();
  150. if (!shared_this)
  151. return;
  152. switch (gatheringState) {
  153. case IceTransport::GatheringState::InProgress:
  154. changeGatheringState(GatheringState::InProgress);
  155. break;
  156. case IceTransport::GatheringState::Complete:
  157. endLocalCandidates();
  158. changeGatheringState(GatheringState::Complete);
  159. break;
  160. default:
  161. // Ignore
  162. break;
  163. }
  164. });
  165. return emplaceTransport(this, &mIceTransport, std::move(transport));
  166. } catch (const std::exception &e) {
  167. PLOG_ERROR << e.what();
  168. changeState(State::Failed);
  169. throw std::runtime_error("ICE transport initialization failed");
  170. }
  171. }
  172. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  173. try {
  174. if (auto transport = std::atomic_load(&mDtlsTransport))
  175. return transport;
  176. PLOG_VERBOSE << "Starting DTLS transport";
  177. auto lower = std::atomic_load(&mIceTransport);
  178. if (!lower)
  179. throw std::logic_error("No underlying ICE transport for DTLS transport");
  180. auto certificate = mCertificate.get();
  181. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  182. auto dtlsStateChangeCallback =
  183. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  184. auto shared_this = weak_this.lock();
  185. if (!shared_this)
  186. return;
  187. switch (transportState) {
  188. case DtlsTransport::State::Connected:
  189. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  190. initSctpTransport();
  191. else
  192. changeState(State::Connected);
  193. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  194. break;
  195. case DtlsTransport::State::Failed:
  196. changeState(State::Failed);
  197. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  198. break;
  199. case DtlsTransport::State::Disconnected:
  200. changeState(State::Disconnected);
  201. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  202. break;
  203. default:
  204. // Ignore
  205. break;
  206. }
  207. };
  208. shared_ptr<DtlsTransport> transport;
  209. auto local = localDescription();
  210. if (config.forceMediaTransport || (local && local->hasAudioOrVideo())) {
  211. #if RTC_ENABLE_MEDIA
  212. PLOG_INFO << "This connection requires media support";
  213. // DTLS-SRTP
  214. transport = std::make_shared<DtlsSrtpTransport>(
  215. lower, certificate, config.mtu, verifierCallback,
  216. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  217. #else
  218. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  219. #endif
  220. }
  221. if (!transport) {
  222. // DTLS only
  223. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  224. verifierCallback, dtlsStateChangeCallback);
  225. }
  226. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  227. } catch (const std::exception &e) {
  228. PLOG_ERROR << e.what();
  229. changeState(State::Failed);
  230. throw std::runtime_error("DTLS transport initialization failed");
  231. }
  232. }
  233. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  234. try {
  235. if (auto transport = std::atomic_load(&mSctpTransport))
  236. return transport;
  237. PLOG_VERBOSE << "Starting SCTP transport";
  238. auto lower = std::atomic_load(&mDtlsTransport);
  239. if (!lower)
  240. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  241. auto local = localDescription();
  242. if (!local || !local->application())
  243. throw std::logic_error("Starting SCTP transport without local application description");
  244. auto remote = remoteDescription();
  245. if (!remote || !remote->application())
  246. throw std::logic_error(
  247. "Starting SCTP transport without remote application description");
  248. SctpTransport::Ports ports = {};
  249. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  250. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  251. auto transport = std::make_shared<SctpTransport>(
  252. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  253. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  254. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  255. auto shared_this = weak_this.lock();
  256. if (!shared_this)
  257. return;
  258. switch (transportState) {
  259. case SctpTransport::State::Connected:
  260. changeState(State::Connected);
  261. assignDataChannels();
  262. mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
  263. break;
  264. case SctpTransport::State::Failed:
  265. changeState(State::Failed);
  266. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  267. break;
  268. case SctpTransport::State::Disconnected:
  269. changeState(State::Disconnected);
  270. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  271. break;
  272. default:
  273. // Ignore
  274. break;
  275. }
  276. });
  277. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  278. } catch (const std::exception &e) {
  279. PLOG_ERROR << e.what();
  280. changeState(State::Failed);
  281. throw std::runtime_error("SCTP transport initialization failed");
  282. }
  283. }
  284. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  285. return std::atomic_load(&mIceTransport);
  286. }
  287. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  288. return std::atomic_load(&mDtlsTransport);
  289. }
  290. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  291. return std::atomic_load(&mSctpTransport);
  292. }
  293. void PeerConnection::closeTransports() {
  294. PLOG_VERBOSE << "Closing transports";
  295. // Change state to sink state Closed
  296. if (!changeState(State::Closed))
  297. return; // already closed
  298. // Reset callbacks now that state is changed
  299. resetCallbacks();
  300. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  301. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  302. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  303. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  304. if (sctp) {
  305. sctp->onRecv(nullptr);
  306. sctp->onBufferedAmount(nullptr);
  307. }
  308. using array = std::array<shared_ptr<Transport>, 3>;
  309. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  310. for (const auto &t : transports)
  311. if (t)
  312. t->onStateChange(nullptr);
  313. TearDownProcessor::Instance().enqueue(
  314. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  315. for (const auto &t : transports) {
  316. if (t) {
  317. t->stop();
  318. break;
  319. }
  320. }
  321. for (auto &t : transports)
  322. t.reset();
  323. });
  324. }
  325. void PeerConnection::endLocalCandidates() {
  326. std::lock_guard lock(mLocalDescriptionMutex);
  327. if (mLocalDescription)
  328. mLocalDescription->endCandidates();
  329. }
  330. void PeerConnection::rollbackLocalDescription() {
  331. PLOG_DEBUG << "Rolling back pending local description";
  332. std::unique_lock lock(mLocalDescriptionMutex);
  333. if (mCurrentLocalDescription) {
  334. std::vector<Candidate> existingCandidates;
  335. if (mLocalDescription)
  336. existingCandidates = mLocalDescription->extractCandidates();
  337. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  338. mLocalDescription->addCandidates(std::move(existingCandidates));
  339. mCurrentLocalDescription.reset();
  340. }
  341. }
  342. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  343. std::lock_guard lock(mRemoteDescriptionMutex);
  344. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  345. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  346. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  347. return true;
  348. }
  349. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  350. << expectedFingerprint.value_or("[none]") << "\"";
  351. return false;
  352. }
  353. void PeerConnection::forwardMessage(message_ptr message) {
  354. if (!message) {
  355. remoteCloseDataChannels();
  356. return;
  357. }
  358. auto iceTransport = std::atomic_load(&mIceTransport);
  359. auto sctpTransport = std::atomic_load(&mSctpTransport);
  360. if (!iceTransport || !sctpTransport)
  361. return;
  362. const uint16_t stream = uint16_t(message->stream);
  363. auto [channel, found] = findDataChannel(stream);
  364. if (DataChannel::IsOpenMessage(message)) {
  365. if (found) {
  366. // The stream is already used, the receiver must close the DataChannel
  367. PLOG_WARNING << "Got open message on already used stream " << stream;
  368. if(channel && !channel->isClosed())
  369. channel->close();
  370. else
  371. sctpTransport->closeStream(message->stream);
  372. return;
  373. }
  374. const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  375. if (stream % 2 != remoteParity) {
  376. // The odd/even rule is violated, the receiver must close the DataChannel
  377. PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
  378. sctpTransport->closeStream(message->stream);
  379. return;
  380. }
  381. channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport);
  382. channel->assignStream(stream);
  383. channel->openCallback =
  384. weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
  385. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  386. mDataChannels.emplace(stream, channel);
  387. }
  388. else if (!found) {
  389. if (message->type == Message::Reset)
  390. return; // ignore
  391. // Invalid, close the DataChannel
  392. PLOG_WARNING << "Got unexpected message on stream " << stream;
  393. sctpTransport->closeStream(message->stream);
  394. return;
  395. }
  396. if (message->type == Message::Reset) {
  397. // Incoming stream is reset, unregister it
  398. removeDataChannel(stream);
  399. }
  400. if (channel) {
  401. // Forward the message
  402. channel->incoming(message);
  403. } else {
  404. // DataChannel was destroyed, ignore
  405. PLOG_DEBUG << "Ignored message on stream " << stream << ", DataChannel is destroyed";
  406. }
  407. }
  408. void PeerConnection::forwardMedia(message_ptr message) {
  409. if (!message)
  410. return;
  411. // Browsers like to compound their packets with a random SSRC.
  412. // we have to do this monstrosity to distribute the report blocks
  413. if (message->type == Message::Control) {
  414. std::set<uint32_t> ssrcs;
  415. size_t offset = 0;
  416. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  417. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  418. if (header->lengthInBytes() > message->size() - offset) {
  419. COUNTER_MEDIA_TRUNCATED++;
  420. break;
  421. }
  422. offset += header->lengthInBytes();
  423. if (header->payloadType() == 205 || header->payloadType() == 206) {
  424. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  425. ssrcs.insert(rtcpfb->packetSenderSSRC());
  426. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  427. } else if (header->payloadType() == 200) {
  428. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  429. ssrcs.insert(rtcpsr->senderSSRC());
  430. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  431. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  432. } else if (header->payloadType() == 201) {
  433. auto rtcprr = reinterpret_cast<RtcpRr *>(header);
  434. ssrcs.insert(rtcprr->senderSSRC());
  435. for (int i = 0; i < rtcprr->header.reportCount(); ++i)
  436. ssrcs.insert(rtcprr->getReportBlock(i)->getSSRC());
  437. } else if (header->payloadType() == 202) {
  438. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  439. if (!sdes->isValid()) {
  440. PLOG_WARNING << "RTCP SDES packet is invalid";
  441. continue;
  442. }
  443. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  444. auto chunk = sdes->getChunk(i);
  445. ssrcs.insert(chunk->ssrc());
  446. }
  447. } else {
  448. // PT=203 == Goodbye
  449. // PT=204 == Application Specific
  450. // PT=207 == Extended Report
  451. if (header->payloadType() != 203 && header->payloadType() != 204 &&
  452. header->payloadType() != 207) {
  453. COUNTER_UNKNOWN_PACKET_TYPE++;
  454. }
  455. }
  456. }
  457. if (!ssrcs.empty()) {
  458. std::shared_lock lock(mTracksMutex); // read-only
  459. for (uint32_t ssrc : ssrcs) {
  460. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  461. if (auto track = it->second.lock())
  462. track->incoming(message);
  463. }
  464. }
  465. return;
  466. }
  467. }
  468. uint32_t ssrc = uint32_t(message->stream);
  469. std::shared_lock lock(mTracksMutex); // read-only
  470. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  471. if (auto track = it->second.lock())
  472. track->incoming(message);
  473. } else {
  474. /*
  475. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  476. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  477. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  478. * this ideal? No! Do I know how to fix it? No!
  479. */
  480. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  481. return;
  482. }
  483. }
  484. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  485. [[maybe_unused]] auto [channel, found] = findDataChannel(stream);
  486. if (channel)
  487. channel->triggerBufferedAmount(amount);
  488. }
  489. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  490. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  491. // If the DataChannel is user-negotiated, do not negotiate it in-band
  492. auto channel =
  493. init.negotiated
  494. ? std::make_shared<DataChannel>(weak_from_this(), std::move(label),
  495. std::move(init.protocol), std::move(init.reliability))
  496. : std::make_shared<OutgoingDataChannel>(weak_from_this(), std::move(label),
  497. std::move(init.protocol),
  498. std::move(init.reliability));
  499. // If the user supplied a stream id, use it, otherwise assign it later
  500. if (init.id) {
  501. uint16_t stream = *init.id;
  502. if (stream > maxDataChannelStream())
  503. throw std::invalid_argument("DataChannel stream id is too high");
  504. channel->assignStream(stream);
  505. mDataChannels.emplace(std::make_pair(stream, channel));
  506. } else {
  507. mUnassignedDataChannels.push_back(channel);
  508. }
  509. lock.unlock(); // we are going to call assignDataChannels()
  510. // If SCTP is connected, assign and open now
  511. auto sctpTransport = std::atomic_load(&mSctpTransport);
  512. if (sctpTransport && sctpTransport->state() == SctpTransport::State::Connected) {
  513. assignDataChannels();
  514. channel->open(sctpTransport);
  515. }
  516. return channel;
  517. }
  518. std::pair<shared_ptr<DataChannel>, bool> PeerConnection::findDataChannel(uint16_t stream) {
  519. std::shared_lock lock(mDataChannelsMutex); // read-only
  520. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  521. return std::make_pair(it->second.lock(), true);
  522. else
  523. return std::make_pair(nullptr, false);
  524. }
  525. bool PeerConnection::removeDataChannel(uint16_t stream) {
  526. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  527. return mDataChannels.erase(stream) != 0;
  528. }
  529. uint16_t PeerConnection::maxDataChannelStream() const {
  530. auto sctpTransport = std::atomic_load(&mSctpTransport);
  531. return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
  532. }
  533. void PeerConnection::assignDataChannels() {
  534. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  535. auto iceTransport = std::atomic_load(&mIceTransport);
  536. if (!iceTransport)
  537. throw std::logic_error("Attempted to assign DataChannels without ICE transport");
  538. const uint16_t maxStream = maxDataChannelStream();
  539. for (auto it = mUnassignedDataChannels.begin(); it != mUnassignedDataChannels.end(); ++it) {
  540. auto channel = it->lock();
  541. if (!channel)
  542. continue;
  543. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier
  544. // for which the corresponding incoming and outgoing streams are unused. If the side is
  545. // acting as the DTLS client, it MUST choose an even stream identifier; if the side is
  546. // acting as the DTLS server, it MUST choose an odd one. See
  547. // https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  548. uint16_t stream = (iceTransport->role() == Description::Role::Active) ? 0 : 1;
  549. while (true) {
  550. if (stream > maxStream)
  551. throw std::runtime_error("Too many DataChannels");
  552. if (mDataChannels.find(stream) == mDataChannels.end())
  553. break;
  554. stream += 2;
  555. }
  556. PLOG_DEBUG << "Assigning stream " << stream << " to DataChannel";
  557. channel->assignStream(stream);
  558. mDataChannels.emplace(std::make_pair(stream, channel));
  559. }
  560. mUnassignedDataChannels.clear();
  561. }
  562. void PeerConnection::iterateDataChannels(
  563. std::function<void(shared_ptr<DataChannel> channel)> func) {
  564. std::vector<shared_ptr<DataChannel>> locked;
  565. {
  566. std::shared_lock lock(mDataChannelsMutex); // read-only
  567. locked.reserve(mDataChannels.size());
  568. auto it = mDataChannels.begin();
  569. while (it != mDataChannels.end()) {
  570. auto channel = it->second.lock();
  571. if (channel && !channel->isClosed())
  572. locked.push_back(std::move(channel));
  573. ++it;
  574. }
  575. }
  576. for (auto &channel : locked) {
  577. try {
  578. func(std::move(channel));
  579. } catch (const std::exception &e) {
  580. PLOG_WARNING << e.what();
  581. }
  582. }
  583. }
  584. void PeerConnection::openDataChannels() {
  585. if (auto transport = std::atomic_load(&mSctpTransport))
  586. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  587. if (!channel->isOpen())
  588. channel->open(transport);
  589. });
  590. }
  591. void PeerConnection::closeDataChannels() {
  592. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  593. }
  594. void PeerConnection::remoteCloseDataChannels() {
  595. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  596. }
  597. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  598. #if !RTC_ENABLE_MEDIA
  599. // No media support, mark as removed
  600. PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
  601. description.markRemoved();
  602. #endif
  603. shared_ptr<Track> track;
  604. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  605. if (track = it->second.lock(); track)
  606. track->setDescription(std::move(description));
  607. if (!track) {
  608. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  609. mTracks.emplace(std::make_pair(track->mid(), track));
  610. mTrackLines.emplace_back(track);
  611. }
  612. if (track->description().isRemoved())
  613. track->close();
  614. return track;
  615. }
  616. void PeerConnection::iterateTracks(std::function<void(shared_ptr<Track> track)> func) {
  617. std::shared_lock lock(mTracksMutex); // read-only
  618. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  619. auto track = it->lock();
  620. if (track && !track->isClosed()) {
  621. try {
  622. func(std::move(track));
  623. } catch (const std::exception &e) {
  624. PLOG_WARNING << e.what();
  625. }
  626. }
  627. }
  628. }
  629. void PeerConnection::openTracks() {
  630. #if RTC_ENABLE_MEDIA
  631. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  632. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  633. iterateTracks([&](const shared_ptr<Track> &track) {
  634. if (!track->isOpen()) {
  635. if (srtpTransport) {
  636. track->open(srtpTransport);
  637. } else {
  638. // A track was added during a latter renegotiation, whereas SRTP transport was
  639. // not initialized. This is an optimization to use the library with data
  640. // channels only. Set forceMediaTransport to true to initialize the transport
  641. // before dynamically adding tracks.
  642. auto errorMsg = "The connection has no media transport";
  643. PLOG_ERROR << errorMsg;
  644. track->triggerError(errorMsg);
  645. }
  646. }
  647. });
  648. }
  649. #endif
  650. }
  651. void PeerConnection::closeTracks() {
  652. std::shared_lock lock(mTracksMutex); // read-only
  653. iterateTracks([&](shared_ptr<Track> track) { track->close(); });
  654. }
  655. void PeerConnection::validateRemoteDescription(const Description &description) {
  656. if (!description.iceUfrag())
  657. throw std::invalid_argument("Remote description has no ICE user fragment");
  658. if (!description.icePwd())
  659. throw std::invalid_argument("Remote description has no ICE password");
  660. if (!description.fingerprint())
  661. throw std::invalid_argument("Remote description has no valid fingerprint");
  662. if (description.mediaCount() == 0)
  663. throw std::invalid_argument("Remote description has no media line");
  664. int activeMediaCount = 0;
  665. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  666. std::visit(rtc::overloaded{[&](const Description::Application *application) {
  667. if (!application->isRemoved())
  668. ++activeMediaCount;
  669. },
  670. [&](const Description::Media *media) {
  671. if (!media->isRemoved() ||
  672. media->direction() != Description::Direction::Inactive)
  673. ++activeMediaCount;
  674. }},
  675. description.media(i));
  676. if (activeMediaCount == 0)
  677. throw std::invalid_argument("Remote description has no active media");
  678. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  679. if (*description.iceUfrag() == *local->iceUfrag() &&
  680. *description.icePwd() == *local->icePwd())
  681. throw std::logic_error("Got the local description as remote description");
  682. PLOG_VERBOSE << "Remote description looks valid";
  683. }
  684. void PeerConnection::processLocalDescription(Description description) {
  685. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  686. const size_t localMaxMessageSize =
  687. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  688. // Clean up the application entry the ICE transport might have added already (libnice)
  689. description.clearMedia();
  690. if (auto remote = remoteDescription()) {
  691. // Reciprocate remote description
  692. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  693. std::visit( // reciprocate each media
  694. rtc::overloaded{
  695. [&](Description::Application *remoteApp) {
  696. std::shared_lock lock(mDataChannelsMutex);
  697. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  698. // Prefer local description
  699. Description::Application app(remoteApp->mid());
  700. app.setSctpPort(localSctpPort);
  701. app.setMaxMessageSize(localMaxMessageSize);
  702. PLOG_DEBUG << "Adding application to local description, mid=\""
  703. << app.mid() << "\"";
  704. description.addMedia(std::move(app));
  705. return;
  706. }
  707. auto reciprocated = remoteApp->reciprocate();
  708. reciprocated.hintSctpPort(localSctpPort);
  709. reciprocated.setMaxMessageSize(localMaxMessageSize);
  710. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  711. << reciprocated.mid() << "\"";
  712. description.addMedia(std::move(reciprocated));
  713. },
  714. [&](Description::Media *remoteMedia) {
  715. std::shared_lock lock(mTracksMutex);
  716. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  717. // Prefer local description
  718. if (auto track = it->second.lock()) {
  719. auto media = track->description();
  720. PLOG_DEBUG << "Adding media to local description, mid=\""
  721. << media.mid() << "\", removed=" << std::boolalpha
  722. << media.isRemoved();
  723. description.addMedia(std::move(media));
  724. } else {
  725. auto reciprocated = remoteMedia->reciprocate();
  726. reciprocated.markRemoved();
  727. PLOG_DEBUG << "Adding media to local description, mid=\""
  728. << reciprocated.mid()
  729. << "\", removed=true (track is destroyed)";
  730. description.addMedia(std::move(reciprocated));
  731. }
  732. return;
  733. }
  734. auto reciprocated = remoteMedia->reciprocate();
  735. #if !RTC_ENABLE_MEDIA
  736. if (!reciprocated.isRemoved()) {
  737. // No media support, mark as removed
  738. PLOG_WARNING << "Rejecting track (not compiled with media support)";
  739. reciprocated.markRemoved();
  740. }
  741. #endif
  742. PLOG_DEBUG << "Reciprocating media in local description, mid=\""
  743. << reciprocated.mid() << "\", removed=" << std::boolalpha
  744. << reciprocated.isRemoved();
  745. // Create incoming track
  746. auto track =
  747. std::make_shared<Track>(weak_from_this(), std::move(reciprocated));
  748. mTracks.emplace(std::make_pair(track->mid(), track));
  749. mTrackLines.emplace_back(track);
  750. triggerTrack(track); // The user may modify the track description
  751. if (track->description().isRemoved())
  752. track->close();
  753. description.addMedia(track->description());
  754. },
  755. },
  756. remote->media(i));
  757. // We need to update the SSRC cache for newly-created incoming tracks
  758. updateTrackSsrcCache(*remote);
  759. }
  760. if (description.type() == Description::Type::Offer) {
  761. // This is an offer, add locally created data channels and tracks
  762. // Add media for local tracks
  763. std::shared_lock lock(mTracksMutex);
  764. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  765. if (auto track = it->lock()) {
  766. if (description.hasMid(track->mid()))
  767. continue;
  768. auto media = track->description();
  769. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  770. << "\", removed=" << std::boolalpha << media.isRemoved();
  771. description.addMedia(std::move(media));
  772. }
  773. }
  774. // Add application for data channels
  775. if (!description.hasApplication()) {
  776. std::shared_lock lock(mDataChannelsMutex);
  777. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  778. // Prevents mid collision with remote or local tracks
  779. unsigned int m = 0;
  780. while (description.hasMid(std::to_string(m)))
  781. ++m;
  782. Description::Application app(std::to_string(m));
  783. app.setSctpPort(localSctpPort);
  784. app.setMaxMessageSize(localMaxMessageSize);
  785. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  786. << "\"";
  787. description.addMedia(std::move(app));
  788. }
  789. }
  790. // There might be no media at this point if the user created a Track, deleted it,
  791. // then called setLocalDescription().
  792. if (description.mediaCount() == 0)
  793. throw std::runtime_error("No DataChannel or Track to negotiate");
  794. }
  795. // Set local fingerprint (wait for certificate if necessary)
  796. description.setFingerprint(mCertificate.get()->fingerprint());
  797. PLOG_VERBOSE << "Issuing local description: " << description;
  798. if (description.mediaCount() == 0)
  799. throw std::logic_error("Local description has no media line");
  800. updateTrackSsrcCache(description);
  801. {
  802. // Set as local description
  803. std::lock_guard lock(mLocalDescriptionMutex);
  804. std::vector<Candidate> existingCandidates;
  805. if (mLocalDescription) {
  806. existingCandidates = mLocalDescription->extractCandidates();
  807. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  808. }
  809. mLocalDescription.emplace(description);
  810. mLocalDescription->addCandidates(std::move(existingCandidates));
  811. }
  812. mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
  813. &localDescriptionCallback, std::move(description));
  814. // Reciprocated tracks might need to be open
  815. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  816. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  817. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  818. }
  819. void PeerConnection::processLocalCandidate(Candidate candidate) {
  820. std::lock_guard lock(mLocalDescriptionMutex);
  821. if (!mLocalDescription)
  822. throw std::logic_error("Got a local candidate without local description");
  823. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  824. candidate.type() != Candidate::Type::Relayed) {
  825. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  826. return;
  827. }
  828. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  829. candidate.resolve(Candidate::ResolveMode::Simple);
  830. mLocalDescription->addCandidate(candidate);
  831. mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
  832. &localCandidateCallback, std::move(candidate));
  833. }
  834. void PeerConnection::processRemoteDescription(Description description) {
  835. // Update the SSRC cache for existing tracks
  836. updateTrackSsrcCache(description);
  837. {
  838. // Set as remote description
  839. std::lock_guard lock(mRemoteDescriptionMutex);
  840. std::vector<Candidate> existingCandidates;
  841. if (mRemoteDescription)
  842. existingCandidates = mRemoteDescription->extractCandidates();
  843. mRemoteDescription.emplace(description);
  844. mRemoteDescription->addCandidates(std::move(existingCandidates));
  845. }
  846. if (description.hasApplication()) {
  847. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  848. auto sctpTransport = std::atomic_load(&mSctpTransport);
  849. if (!sctpTransport && dtlsTransport &&
  850. dtlsTransport->state() == Transport::State::Connected)
  851. initSctpTransport();
  852. } else {
  853. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
  854. }
  855. }
  856. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  857. auto iceTransport = std::atomic_load(&mIceTransport);
  858. {
  859. // Set as remote candidate
  860. std::lock_guard lock(mRemoteDescriptionMutex);
  861. if (!mRemoteDescription)
  862. throw std::logic_error("Got a remote candidate without remote description");
  863. if (!iceTransport)
  864. throw std::logic_error("Got a remote candidate without ICE transport");
  865. candidate.hintMid(mRemoteDescription->bundleMid());
  866. if (mRemoteDescription->hasCandidate(candidate))
  867. return; // already in description, ignore
  868. candidate.resolve(Candidate::ResolveMode::Simple);
  869. mRemoteDescription->addCandidate(candidate);
  870. }
  871. if (candidate.isResolved()) {
  872. iceTransport->addRemoteCandidate(std::move(candidate));
  873. } else {
  874. // We might need a lookup, do it asynchronously
  875. // We don't use the thread pool because we have no control on the timeout
  876. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  877. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  878. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  879. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  880. if (auto iceTransport = weakIceTransport.lock())
  881. iceTransport->addRemoteCandidate(std::move(candidate));
  882. });
  883. t.detach();
  884. }
  885. }
  886. }
  887. string PeerConnection::localBundleMid() const {
  888. std::lock_guard lock(mLocalDescriptionMutex);
  889. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  890. }
  891. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  892. auto dataChannel = weakDataChannel.lock();
  893. if (dataChannel) {
  894. dataChannel->resetOpenCallback(); // might be set internally
  895. mPendingDataChannels.push(std::move(dataChannel));
  896. }
  897. triggerPendingDataChannels();
  898. }
  899. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  900. auto track = weakTrack.lock();
  901. if (track) {
  902. track->resetOpenCallback(); // might be set internally
  903. mPendingTracks.push(std::move(track));
  904. }
  905. triggerPendingTracks();
  906. }
  907. void PeerConnection::triggerPendingDataChannels() {
  908. while (dataChannelCallback) {
  909. auto next = mPendingDataChannels.tryPop();
  910. if (!next)
  911. break;
  912. auto impl = std::move(*next);
  913. try {
  914. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  915. } catch (const std::exception &e) {
  916. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  917. }
  918. impl->triggerOpen();
  919. }
  920. }
  921. void PeerConnection::triggerPendingTracks() {
  922. while (trackCallback) {
  923. auto next = mPendingTracks.tryPop();
  924. if (!next)
  925. break;
  926. auto impl = std::move(*next);
  927. try {
  928. trackCallback(std::make_shared<rtc::Track>(impl));
  929. } catch (const std::exception &e) {
  930. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  931. }
  932. // Do not trigger open immediately for tracks as it'll be done later
  933. }
  934. }
  935. void PeerConnection::flushPendingDataChannels() {
  936. mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
  937. }
  938. void PeerConnection::flushPendingTracks() {
  939. mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
  940. }
  941. bool PeerConnection::changeState(State newState) {
  942. State current;
  943. do {
  944. current = state.load();
  945. if (current == State::Closed)
  946. return false;
  947. if (current == newState)
  948. return false;
  949. } while (!state.compare_exchange_weak(current, newState));
  950. std::ostringstream s;
  951. s << newState;
  952. PLOG_INFO << "Changed state to " << s.str();
  953. if (newState == State::Closed) {
  954. auto callback = std::move(stateChangeCallback); // steal the callback
  955. callback(State::Closed); // call it synchronously
  956. } else {
  957. mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
  958. &stateChangeCallback, newState);
  959. }
  960. return true;
  961. }
  962. bool PeerConnection::changeGatheringState(GatheringState newState) {
  963. if (gatheringState.exchange(newState) == newState)
  964. return false;
  965. std::ostringstream s;
  966. s << newState;
  967. PLOG_INFO << "Changed gathering state to " << s.str();
  968. mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
  969. &gatheringStateChangeCallback, newState);
  970. return true;
  971. }
  972. bool PeerConnection::changeSignalingState(SignalingState newState) {
  973. if (signalingState.exchange(newState) == newState)
  974. return false;
  975. std::ostringstream s;
  976. s << newState;
  977. PLOG_INFO << "Changed signaling state to " << s.str();
  978. mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
  979. &signalingStateChangeCallback, newState);
  980. return true;
  981. }
  982. void PeerConnection::resetCallbacks() {
  983. // Unregister all callbacks
  984. dataChannelCallback = nullptr;
  985. localDescriptionCallback = nullptr;
  986. localCandidateCallback = nullptr;
  987. stateChangeCallback = nullptr;
  988. gatheringStateChangeCallback = nullptr;
  989. signalingStateChangeCallback = nullptr;
  990. trackCallback = nullptr;
  991. }
  992. void PeerConnection::updateTrackSsrcCache(const Description &description) {
  993. std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc
  994. // Setup SSRC -> Track mapping
  995. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  996. std::visit( // ssrc -> track mapping
  997. rtc::overloaded{
  998. [&](Description::Application const *) { return; },
  999. [&](Description::Media const *media) {
  1000. const auto ssrcs = media->getSSRCs();
  1001. // Note: We don't want to lock (or do any other lookups), if we
  1002. // already know there's no SSRCs to loop over.
  1003. if (ssrcs.size() <= 0) {
  1004. return;
  1005. }
  1006. std::shared_ptr<Track> track{nullptr};
  1007. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1008. if (auto track_for_mid = it->second.lock())
  1009. track = track_for_mid;
  1010. if (!track) {
  1011. // Unable to find track for MID
  1012. return;
  1013. }
  1014. for (auto ssrc : ssrcs) {
  1015. mTracksBySsrc.insert_or_assign(ssrc, track);
  1016. }
  1017. },
  1018. },
  1019. description.media(i));
  1020. }
  1021. } // namespace rtc::impl