peerconnection.cpp 41 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  8. */
  9. #include "peerconnection.hpp"
  10. #include "certificate.hpp"
  11. #include "dtlstransport.hpp"
  12. #include "icetransport.hpp"
  13. #include "internals.hpp"
  14. #include "logcounter.hpp"
  15. #include "peerconnection.hpp"
  16. #include "processor.hpp"
  17. #include "rtp.hpp"
  18. #include "sctptransport.hpp"
  19. #include "utils.hpp"
  20. #if RTC_ENABLE_MEDIA
  21. #include "dtlssrtptransport.hpp"
  22. #endif
  23. #include <algorithm>
  24. #include <array>
  25. #include <iomanip>
  26. #include <set>
  27. #include <sstream>
  28. #include <thread>
  29. using namespace std::placeholders;
  30. namespace rtc::impl {
  31. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  32. "Number of truncated RTP packets over past second");
  33. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  34. "Number of SRTP decryption errors over past second");
  35. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  36. "Number of SRTP encryption errors over past second");
  37. static LogCounter
  38. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  39. "Number of unknown RTCP packet types over past second");
  40. PeerConnection::PeerConnection(Configuration config_)
  41. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)) {
  42. PLOG_VERBOSE << "Creating PeerConnection";
  43. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  44. throw std::invalid_argument("Invalid port range");
  45. if (config.mtu) {
  46. if (*config.mtu < 576) // Min MTU for IPv4
  47. throw std::invalid_argument("Invalid MTU value");
  48. if (*config.mtu > 1500) { // Standard Ethernet
  49. PLOG_WARNING << "MTU set to " << *config.mtu;
  50. } else {
  51. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  52. }
  53. }
  54. }
  55. PeerConnection::~PeerConnection() {
  56. PLOG_VERBOSE << "Destroying PeerConnection";
  57. mProcessor.join();
  58. }
  59. void PeerConnection::close() {
  60. negotiationNeeded = false;
  61. if (!closing.exchange(true)) {
  62. PLOG_VERBOSE << "Closing PeerConnection";
  63. if (auto transport = std::atomic_load(&mSctpTransport))
  64. transport->stop();
  65. else
  66. remoteClose();
  67. }
  68. }
  69. void PeerConnection::remoteClose() {
  70. close();
  71. if (state.load() != State::Closed) {
  72. // Close data channels and tracks asynchronously
  73. mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
  74. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  75. closeTransports();
  76. }
  77. }
  78. optional<Description> PeerConnection::localDescription() const {
  79. std::lock_guard lock(mLocalDescriptionMutex);
  80. return mLocalDescription;
  81. }
  82. optional<Description> PeerConnection::remoteDescription() const {
  83. std::lock_guard lock(mRemoteDescriptionMutex);
  84. return mRemoteDescription;
  85. }
  86. size_t PeerConnection::remoteMaxMessageSize() const {
  87. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  88. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  89. std::lock_guard lock(mRemoteDescriptionMutex);
  90. if (mRemoteDescription)
  91. if (auto *application = mRemoteDescription->application())
  92. if (auto max = application->maxMessageSize()) {
  93. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  94. // size value of zero, it indicates that the SCTP endpoint will handle messages
  95. // of any size, subject to memory capacity, etc.
  96. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  97. }
  98. return std::min(remoteMax, localMax);
  99. }
  100. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  101. template <typename T>
  102. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  103. std::atomic_store(member, transport);
  104. try {
  105. transport->start();
  106. } catch (...) {
  107. std::atomic_store(member, decltype(transport)(nullptr));
  108. throw;
  109. }
  110. if (pc->closing.load() || pc->state.load() == PeerConnection::State::Closed) {
  111. std::atomic_store(member, decltype(transport)(nullptr));
  112. transport->stop();
  113. return nullptr;
  114. }
  115. return transport;
  116. }
  117. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  118. try {
  119. if (auto transport = std::atomic_load(&mIceTransport))
  120. return transport;
  121. PLOG_VERBOSE << "Starting ICE transport";
  122. auto transport = std::make_shared<IceTransport>(
  123. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  124. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  125. auto shared_this = weak_this.lock();
  126. if (!shared_this)
  127. return;
  128. switch (transportState) {
  129. case IceTransport::State::Connecting:
  130. changeIceState(IceState::Checking);
  131. changeState(State::Connecting);
  132. break;
  133. case IceTransport::State::Connected:
  134. changeIceState(IceState::Connected);
  135. initDtlsTransport();
  136. break;
  137. case IceTransport::State::Completed:
  138. changeIceState(IceState::Completed);
  139. break;
  140. case IceTransport::State::Failed:
  141. changeIceState(IceState::Failed);
  142. changeState(State::Failed);
  143. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  144. break;
  145. case IceTransport::State::Disconnected:
  146. changeIceState(IceState::Disconnected);
  147. changeState(State::Disconnected);
  148. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  149. break;
  150. default:
  151. // Ignore
  152. break;
  153. }
  154. },
  155. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  156. auto shared_this = weak_this.lock();
  157. if (!shared_this)
  158. return;
  159. switch (gatheringState) {
  160. case IceTransport::GatheringState::InProgress:
  161. changeGatheringState(GatheringState::InProgress);
  162. break;
  163. case IceTransport::GatheringState::Complete:
  164. endLocalCandidates();
  165. changeGatheringState(GatheringState::Complete);
  166. break;
  167. default:
  168. // Ignore
  169. break;
  170. }
  171. });
  172. return emplaceTransport(this, &mIceTransport, std::move(transport));
  173. } catch (const std::exception &e) {
  174. PLOG_ERROR << e.what();
  175. changeState(State::Failed);
  176. throw std::runtime_error("ICE transport initialization failed");
  177. }
  178. }
  179. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  180. try {
  181. if (auto transport = std::atomic_load(&mDtlsTransport))
  182. return transport;
  183. PLOG_VERBOSE << "Starting DTLS transport";
  184. auto lower = std::atomic_load(&mIceTransport);
  185. if (!lower)
  186. throw std::logic_error("No underlying ICE transport for DTLS transport");
  187. auto certificate = mCertificate.get();
  188. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  189. auto dtlsStateChangeCallback =
  190. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  191. auto shared_this = weak_this.lock();
  192. if (!shared_this)
  193. return;
  194. switch (transportState) {
  195. case DtlsTransport::State::Connected:
  196. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  197. initSctpTransport();
  198. else
  199. changeState(State::Connected);
  200. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  201. break;
  202. case DtlsTransport::State::Failed:
  203. changeState(State::Failed);
  204. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  205. break;
  206. case DtlsTransport::State::Disconnected:
  207. changeState(State::Disconnected);
  208. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  209. break;
  210. default:
  211. // Ignore
  212. break;
  213. }
  214. };
  215. shared_ptr<DtlsTransport> transport;
  216. auto local = localDescription();
  217. if (config.forceMediaTransport || (local && local->hasAudioOrVideo())) {
  218. #if RTC_ENABLE_MEDIA
  219. PLOG_INFO << "This connection requires media support";
  220. // DTLS-SRTP
  221. transport = std::make_shared<DtlsSrtpTransport>(
  222. lower, certificate, config.mtu, verifierCallback,
  223. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  224. #else
  225. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  226. #endif
  227. }
  228. if (!transport) {
  229. // DTLS only
  230. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  231. verifierCallback, dtlsStateChangeCallback);
  232. }
  233. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  234. } catch (const std::exception &e) {
  235. PLOG_ERROR << e.what();
  236. changeState(State::Failed);
  237. throw std::runtime_error("DTLS transport initialization failed");
  238. }
  239. }
  240. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  241. try {
  242. if (auto transport = std::atomic_load(&mSctpTransport))
  243. return transport;
  244. PLOG_VERBOSE << "Starting SCTP transport";
  245. auto lower = std::atomic_load(&mDtlsTransport);
  246. if (!lower)
  247. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  248. auto local = localDescription();
  249. if (!local || !local->application())
  250. throw std::logic_error("Starting SCTP transport without local application description");
  251. auto remote = remoteDescription();
  252. if (!remote || !remote->application())
  253. throw std::logic_error(
  254. "Starting SCTP transport without remote application description");
  255. SctpTransport::Ports ports = {};
  256. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  257. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  258. auto transport = std::make_shared<SctpTransport>(
  259. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  260. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  261. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  262. auto shared_this = weak_this.lock();
  263. if (!shared_this)
  264. return;
  265. switch (transportState) {
  266. case SctpTransport::State::Connected:
  267. changeState(State::Connected);
  268. assignDataChannels();
  269. mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
  270. break;
  271. case SctpTransport::State::Failed:
  272. changeState(State::Failed);
  273. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  274. break;
  275. case SctpTransport::State::Disconnected:
  276. changeState(State::Disconnected);
  277. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  278. break;
  279. default:
  280. // Ignore
  281. break;
  282. }
  283. });
  284. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  285. } catch (const std::exception &e) {
  286. PLOG_ERROR << e.what();
  287. changeState(State::Failed);
  288. throw std::runtime_error("SCTP transport initialization failed");
  289. }
  290. }
  291. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  292. return std::atomic_load(&mIceTransport);
  293. }
  294. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  295. return std::atomic_load(&mDtlsTransport);
  296. }
  297. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  298. return std::atomic_load(&mSctpTransport);
  299. }
  300. void PeerConnection::closeTransports() {
  301. PLOG_VERBOSE << "Closing transports";
  302. // Change ICE state to sink state Closed
  303. changeIceState(IceState::Closed);
  304. // Change state to sink state Closed
  305. if (!changeState(State::Closed))
  306. return; // already closed
  307. // Reset interceptor and callbacks now that state is changed
  308. setMediaHandler(nullptr);
  309. resetCallbacks();
  310. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  311. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  312. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  313. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  314. if (sctp) {
  315. sctp->onRecv(nullptr);
  316. sctp->onBufferedAmount(nullptr);
  317. }
  318. using array = std::array<shared_ptr<Transport>, 3>;
  319. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  320. for (const auto &t : transports)
  321. if (t)
  322. t->onStateChange(nullptr);
  323. TearDownProcessor::Instance().enqueue(
  324. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  325. for (const auto &t : transports) {
  326. if (t) {
  327. t->stop();
  328. break;
  329. }
  330. }
  331. for (auto &t : transports)
  332. t.reset();
  333. });
  334. }
  335. void PeerConnection::endLocalCandidates() {
  336. std::lock_guard lock(mLocalDescriptionMutex);
  337. if (mLocalDescription)
  338. mLocalDescription->endCandidates();
  339. }
  340. void PeerConnection::rollbackLocalDescription() {
  341. PLOG_DEBUG << "Rolling back pending local description";
  342. std::unique_lock lock(mLocalDescriptionMutex);
  343. if (mCurrentLocalDescription) {
  344. std::vector<Candidate> existingCandidates;
  345. if (mLocalDescription)
  346. existingCandidates = mLocalDescription->extractCandidates();
  347. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  348. mLocalDescription->addCandidates(std::move(existingCandidates));
  349. mCurrentLocalDescription.reset();
  350. }
  351. }
  352. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  353. std::lock_guard lock(mRemoteDescriptionMutex);
  354. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  355. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  356. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  357. return true;
  358. }
  359. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  360. << expectedFingerprint.value_or("[none]") << "\"";
  361. return false;
  362. }
  363. void PeerConnection::forwardMessage(message_ptr message) {
  364. if (!message) {
  365. remoteCloseDataChannels();
  366. return;
  367. }
  368. auto iceTransport = std::atomic_load(&mIceTransport);
  369. auto sctpTransport = std::atomic_load(&mSctpTransport);
  370. if (!iceTransport || !sctpTransport)
  371. return;
  372. const uint16_t stream = uint16_t(message->stream);
  373. auto [channel, found] = findDataChannel(stream);
  374. if (DataChannel::IsOpenMessage(message)) {
  375. if (found) {
  376. // The stream is already used, the receiver must close the DataChannel
  377. PLOG_WARNING << "Got open message on already used stream " << stream;
  378. if(channel && channel->isOpen())
  379. channel->close();
  380. else
  381. sctpTransport->closeStream(message->stream);
  382. return;
  383. }
  384. const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  385. if (stream % 2 != remoteParity) {
  386. // The odd/even rule is violated, the receiver must close the DataChannel
  387. PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
  388. sctpTransport->closeStream(message->stream);
  389. return;
  390. }
  391. channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport);
  392. channel->assignStream(stream);
  393. channel->openCallback =
  394. weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
  395. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  396. mDataChannels.emplace(stream, channel);
  397. }
  398. else if (!found) {
  399. if (message->type == Message::Reset)
  400. return; // ignore
  401. // Invalid, close the DataChannel
  402. PLOG_WARNING << "Got unexpected message on stream " << stream;
  403. sctpTransport->closeStream(message->stream);
  404. return;
  405. }
  406. if (message->type == Message::Reset) {
  407. // Incoming stream is reset, unregister it
  408. removeDataChannel(stream);
  409. }
  410. if (channel) {
  411. // Forward the message
  412. channel->incoming(message);
  413. } else {
  414. // DataChannel was destroyed, ignore
  415. PLOG_DEBUG << "Ignored message on stream " << stream << ", DataChannel is destroyed";
  416. }
  417. }
  418. void PeerConnection::forwardMedia(message_ptr message) {
  419. if (!message)
  420. return;
  421. auto handler = getMediaHandler();
  422. if (handler) {
  423. message = handler->incoming(message);
  424. if (!message)
  425. return;
  426. }
  427. // Browsers like to compound their packets with a random SSRC.
  428. // we have to do this monstrosity to distribute the report blocks
  429. if (message->type == Message::Control) {
  430. std::set<uint32_t> ssrcs;
  431. size_t offset = 0;
  432. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  433. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  434. if (header->lengthInBytes() > message->size() - offset) {
  435. COUNTER_MEDIA_TRUNCATED++;
  436. break;
  437. }
  438. offset += header->lengthInBytes();
  439. if (header->payloadType() == 205 || header->payloadType() == 206) {
  440. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  441. ssrcs.insert(rtcpfb->packetSenderSSRC());
  442. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  443. } else if (header->payloadType() == 200) {
  444. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  445. ssrcs.insert(rtcpsr->senderSSRC());
  446. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  447. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  448. } else if (header->payloadType() == 201) {
  449. auto rtcprr = reinterpret_cast<RtcpRr *>(header);
  450. ssrcs.insert(rtcprr->senderSSRC());
  451. for (int i = 0; i < rtcprr->header.reportCount(); ++i)
  452. ssrcs.insert(rtcprr->getReportBlock(i)->getSSRC());
  453. } else if (header->payloadType() == 202) {
  454. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  455. if (!sdes->isValid()) {
  456. PLOG_WARNING << "RTCP SDES packet is invalid";
  457. continue;
  458. }
  459. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  460. auto chunk = sdes->getChunk(i);
  461. ssrcs.insert(chunk->ssrc());
  462. }
  463. } else {
  464. // PT=203 == Goodbye
  465. // PT=204 == Application Specific
  466. // PT=207 == Extended Report
  467. if (header->payloadType() != 203 && header->payloadType() != 204 &&
  468. header->payloadType() != 207) {
  469. COUNTER_UNKNOWN_PACKET_TYPE++;
  470. }
  471. }
  472. }
  473. if (!ssrcs.empty()) {
  474. std::shared_lock lock(mTracksMutex); // read-only
  475. for (uint32_t ssrc : ssrcs) {
  476. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  477. if (auto track = it->second.lock())
  478. track->incoming(message);
  479. }
  480. }
  481. return;
  482. }
  483. }
  484. uint32_t ssrc = uint32_t(message->stream);
  485. std::shared_lock lock(mTracksMutex); // read-only
  486. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  487. if (auto track = it->second.lock())
  488. track->incoming(message);
  489. } else {
  490. /*
  491. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  492. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  493. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  494. * this ideal? No! Do I know how to fix it? No!
  495. */
  496. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  497. return;
  498. }
  499. }
  500. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  501. [[maybe_unused]] auto [channel, found] = findDataChannel(stream);
  502. if (channel)
  503. channel->triggerBufferedAmount(amount);
  504. }
  505. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  506. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  507. // If the DataChannel is user-negotiated, do not negotiate it in-band
  508. auto channel =
  509. init.negotiated
  510. ? std::make_shared<DataChannel>(weak_from_this(), std::move(label),
  511. std::move(init.protocol), std::move(init.reliability))
  512. : std::make_shared<OutgoingDataChannel>(weak_from_this(), std::move(label),
  513. std::move(init.protocol),
  514. std::move(init.reliability));
  515. // If the user supplied a stream id, use it, otherwise assign it later
  516. if (init.id) {
  517. uint16_t stream = *init.id;
  518. if (stream > maxDataChannelStream())
  519. throw std::invalid_argument("DataChannel stream id is too high");
  520. channel->assignStream(stream);
  521. mDataChannels.emplace(std::make_pair(stream, channel));
  522. } else {
  523. mUnassignedDataChannels.push_back(channel);
  524. }
  525. lock.unlock(); // we are going to call assignDataChannels()
  526. // If SCTP is connected, assign and open now
  527. auto sctpTransport = std::atomic_load(&mSctpTransport);
  528. if (sctpTransport && sctpTransport->state() == SctpTransport::State::Connected) {
  529. assignDataChannels();
  530. channel->open(sctpTransport);
  531. }
  532. return channel;
  533. }
  534. std::pair<shared_ptr<DataChannel>, bool> PeerConnection::findDataChannel(uint16_t stream) {
  535. std::shared_lock lock(mDataChannelsMutex); // read-only
  536. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  537. return std::make_pair(it->second.lock(), true);
  538. else
  539. return std::make_pair(nullptr, false);
  540. }
  541. bool PeerConnection::removeDataChannel(uint16_t stream) {
  542. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  543. return mDataChannels.erase(stream) != 0;
  544. }
  545. uint16_t PeerConnection::maxDataChannelStream() const {
  546. auto sctpTransport = std::atomic_load(&mSctpTransport);
  547. return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
  548. }
  549. void PeerConnection::assignDataChannels() {
  550. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  551. auto iceTransport = std::atomic_load(&mIceTransport);
  552. if (!iceTransport)
  553. throw std::logic_error("Attempted to assign DataChannels without ICE transport");
  554. const uint16_t maxStream = maxDataChannelStream();
  555. for (auto it = mUnassignedDataChannels.begin(); it != mUnassignedDataChannels.end(); ++it) {
  556. auto channel = it->lock();
  557. if (!channel)
  558. continue;
  559. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier
  560. // for which the corresponding incoming and outgoing streams are unused. If the side is
  561. // acting as the DTLS client, it MUST choose an even stream identifier; if the side is
  562. // acting as the DTLS server, it MUST choose an odd one. See
  563. // https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  564. uint16_t stream = (iceTransport->role() == Description::Role::Active) ? 0 : 1;
  565. while (true) {
  566. if (stream > maxStream)
  567. throw std::runtime_error("Too many DataChannels");
  568. if (mDataChannels.find(stream) == mDataChannels.end())
  569. break;
  570. stream += 2;
  571. }
  572. PLOG_DEBUG << "Assigning stream " << stream << " to DataChannel";
  573. channel->assignStream(stream);
  574. mDataChannels.emplace(std::make_pair(stream, channel));
  575. }
  576. mUnassignedDataChannels.clear();
  577. }
  578. void PeerConnection::iterateDataChannels(
  579. std::function<void(shared_ptr<DataChannel> channel)> func) {
  580. std::vector<shared_ptr<DataChannel>> locked;
  581. {
  582. std::shared_lock lock(mDataChannelsMutex); // read-only
  583. locked.reserve(mDataChannels.size());
  584. auto it = mDataChannels.begin();
  585. while (it != mDataChannels.end()) {
  586. auto channel = it->second.lock();
  587. if (channel && !channel->isClosed())
  588. locked.push_back(std::move(channel));
  589. ++it;
  590. }
  591. }
  592. for (auto &channel : locked) {
  593. try {
  594. func(std::move(channel));
  595. } catch (const std::exception &e) {
  596. PLOG_WARNING << e.what();
  597. }
  598. }
  599. }
  600. void PeerConnection::openDataChannels() {
  601. if (auto transport = std::atomic_load(&mSctpTransport))
  602. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  603. if (!channel->isOpen())
  604. channel->open(transport);
  605. });
  606. }
  607. void PeerConnection::closeDataChannels() {
  608. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  609. }
  610. void PeerConnection::remoteCloseDataChannels() {
  611. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  612. }
  613. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  614. #if !RTC_ENABLE_MEDIA
  615. // No media support, mark as removed
  616. PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
  617. description.markRemoved();
  618. #endif
  619. shared_ptr<Track> track;
  620. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  621. if (track = it->second.lock(); track)
  622. track->setDescription(std::move(description));
  623. if (!track) {
  624. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  625. mTracks.emplace(std::make_pair(track->mid(), track));
  626. mTrackLines.emplace_back(track);
  627. }
  628. if (track->description().isRemoved())
  629. track->close();
  630. return track;
  631. }
  632. void PeerConnection::iterateTracks(std::function<void(shared_ptr<Track> track)> func) {
  633. std::shared_lock lock(mTracksMutex); // read-only
  634. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  635. auto track = it->lock();
  636. if (track && !track->isClosed()) {
  637. try {
  638. func(std::move(track));
  639. } catch (const std::exception &e) {
  640. PLOG_WARNING << e.what();
  641. }
  642. }
  643. }
  644. }
  645. void PeerConnection::openTracks() {
  646. #if RTC_ENABLE_MEDIA
  647. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  648. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  649. iterateTracks([&](const shared_ptr<Track> &track) {
  650. if (!track->isOpen()) {
  651. if (srtpTransport) {
  652. track->open(srtpTransport);
  653. } else {
  654. // A track was added during a latter renegotiation, whereas SRTP transport was
  655. // not initialized. This is an optimization to use the library with data
  656. // channels only. Set forceMediaTransport to true to initialize the transport
  657. // before dynamically adding tracks.
  658. auto errorMsg = "The connection has no media transport";
  659. PLOG_ERROR << errorMsg;
  660. track->triggerError(errorMsg);
  661. }
  662. }
  663. });
  664. }
  665. #endif
  666. }
  667. void PeerConnection::closeTracks() {
  668. std::shared_lock lock(mTracksMutex); // read-only
  669. iterateTracks([&](shared_ptr<Track> track) { track->close(); });
  670. }
  671. void PeerConnection::validateRemoteDescription(const Description &description) {
  672. if (!description.iceUfrag())
  673. throw std::invalid_argument("Remote description has no ICE user fragment");
  674. if (!description.icePwd())
  675. throw std::invalid_argument("Remote description has no ICE password");
  676. if (!description.fingerprint())
  677. throw std::invalid_argument("Remote description has no valid fingerprint");
  678. if (description.mediaCount() == 0)
  679. throw std::invalid_argument("Remote description has no media line");
  680. int activeMediaCount = 0;
  681. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  682. std::visit(rtc::overloaded{[&](const Description::Application *application) {
  683. if (!application->isRemoved())
  684. ++activeMediaCount;
  685. },
  686. [&](const Description::Media *media) {
  687. if (!media->isRemoved() ||
  688. media->direction() != Description::Direction::Inactive)
  689. ++activeMediaCount;
  690. }},
  691. description.media(i));
  692. if (activeMediaCount == 0)
  693. throw std::invalid_argument("Remote description has no active media");
  694. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  695. if (*description.iceUfrag() == *local->iceUfrag() &&
  696. *description.icePwd() == *local->icePwd())
  697. throw std::logic_error("Got the local description as remote description");
  698. PLOG_VERBOSE << "Remote description looks valid";
  699. }
  700. void PeerConnection::processLocalDescription(Description description) {
  701. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  702. const size_t localMaxMessageSize =
  703. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  704. // Clean up the application entry the ICE transport might have added already (libnice)
  705. description.clearMedia();
  706. if (auto remote = remoteDescription()) {
  707. // Reciprocate remote description
  708. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  709. std::visit( // reciprocate each media
  710. rtc::overloaded{
  711. [&](Description::Application *remoteApp) {
  712. std::shared_lock lock(mDataChannelsMutex);
  713. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  714. // Prefer local description
  715. Description::Application app(remoteApp->mid());
  716. app.setSctpPort(localSctpPort);
  717. app.setMaxMessageSize(localMaxMessageSize);
  718. PLOG_DEBUG << "Adding application to local description, mid=\""
  719. << app.mid() << "\"";
  720. description.addMedia(std::move(app));
  721. return;
  722. }
  723. auto reciprocated = remoteApp->reciprocate();
  724. reciprocated.hintSctpPort(localSctpPort);
  725. reciprocated.setMaxMessageSize(localMaxMessageSize);
  726. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  727. << reciprocated.mid() << "\"";
  728. description.addMedia(std::move(reciprocated));
  729. },
  730. [&](Description::Media *remoteMedia) {
  731. std::shared_lock lock(mTracksMutex);
  732. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  733. // Prefer local description
  734. if (auto track = it->second.lock()) {
  735. auto media = track->description();
  736. PLOG_DEBUG << "Adding media to local description, mid=\""
  737. << media.mid() << "\", removed=" << std::boolalpha
  738. << media.isRemoved();
  739. description.addMedia(std::move(media));
  740. } else {
  741. auto reciprocated = remoteMedia->reciprocate();
  742. reciprocated.markRemoved();
  743. PLOG_DEBUG << "Adding media to local description, mid=\""
  744. << reciprocated.mid()
  745. << "\", removed=true (track is destroyed)";
  746. description.addMedia(std::move(reciprocated));
  747. }
  748. return;
  749. }
  750. auto reciprocated = remoteMedia->reciprocate();
  751. #if !RTC_ENABLE_MEDIA
  752. if (!reciprocated.isRemoved()) {
  753. // No media support, mark as removed
  754. PLOG_WARNING << "Rejecting track (not compiled with media support)";
  755. reciprocated.markRemoved();
  756. }
  757. #endif
  758. PLOG_DEBUG << "Reciprocating media in local description, mid=\""
  759. << reciprocated.mid() << "\", removed=" << std::boolalpha
  760. << reciprocated.isRemoved();
  761. // Create incoming track
  762. auto track =
  763. std::make_shared<Track>(weak_from_this(), std::move(reciprocated));
  764. mTracks.emplace(std::make_pair(track->mid(), track));
  765. mTrackLines.emplace_back(track);
  766. triggerTrack(track); // The user may modify the track description
  767. if (track->description().isRemoved())
  768. track->close();
  769. description.addMedia(track->description());
  770. },
  771. },
  772. remote->media(i));
  773. // We need to update the SSRC cache for newly-created incoming tracks
  774. updateTrackSsrcCache(*remote);
  775. }
  776. if (description.type() == Description::Type::Offer) {
  777. // This is an offer, add locally created data channels and tracks
  778. // Add media for local tracks
  779. std::shared_lock lock(mTracksMutex);
  780. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  781. if (auto track = it->lock()) {
  782. if (description.hasMid(track->mid()))
  783. continue;
  784. auto media = track->description();
  785. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  786. << "\", removed=" << std::boolalpha << media.isRemoved();
  787. description.addMedia(std::move(media));
  788. }
  789. }
  790. // Add application for data channels
  791. if (!description.hasApplication()) {
  792. std::shared_lock lock(mDataChannelsMutex);
  793. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  794. // Prevents mid collision with remote or local tracks
  795. unsigned int m = 0;
  796. while (description.hasMid(std::to_string(m)))
  797. ++m;
  798. Description::Application app(std::to_string(m));
  799. app.setSctpPort(localSctpPort);
  800. app.setMaxMessageSize(localMaxMessageSize);
  801. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  802. << "\"";
  803. description.addMedia(std::move(app));
  804. }
  805. }
  806. // There might be no media at this point if the user created a Track, deleted it,
  807. // then called setLocalDescription().
  808. if (description.mediaCount() == 0)
  809. throw std::runtime_error("No DataChannel or Track to negotiate");
  810. }
  811. // Set local fingerprint (wait for certificate if necessary)
  812. description.setFingerprint(mCertificate.get()->fingerprint());
  813. PLOG_VERBOSE << "Issuing local description: " << description;
  814. if (description.mediaCount() == 0)
  815. throw std::logic_error("Local description has no media line");
  816. updateTrackSsrcCache(description);
  817. {
  818. // Set as local description
  819. std::lock_guard lock(mLocalDescriptionMutex);
  820. std::vector<Candidate> existingCandidates;
  821. if (mLocalDescription) {
  822. existingCandidates = mLocalDescription->extractCandidates();
  823. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  824. }
  825. mLocalDescription.emplace(description);
  826. mLocalDescription->addCandidates(std::move(existingCandidates));
  827. }
  828. mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
  829. &localDescriptionCallback, std::move(description));
  830. // Reciprocated tracks might need to be open
  831. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  832. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  833. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  834. }
  835. void PeerConnection::processLocalCandidate(Candidate candidate) {
  836. std::lock_guard lock(mLocalDescriptionMutex);
  837. if (!mLocalDescription)
  838. throw std::logic_error("Got a local candidate without local description");
  839. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  840. candidate.type() != Candidate::Type::Relayed) {
  841. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  842. return;
  843. }
  844. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  845. candidate.resolve(Candidate::ResolveMode::Simple);
  846. mLocalDescription->addCandidate(candidate);
  847. mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
  848. &localCandidateCallback, std::move(candidate));
  849. }
  850. void PeerConnection::processRemoteDescription(Description description) {
  851. // Update the SSRC cache for existing tracks
  852. updateTrackSsrcCache(description);
  853. {
  854. // Set as remote description
  855. std::lock_guard lock(mRemoteDescriptionMutex);
  856. std::vector<Candidate> existingCandidates;
  857. if (mRemoteDescription)
  858. existingCandidates = mRemoteDescription->extractCandidates();
  859. mRemoteDescription.emplace(description);
  860. mRemoteDescription->addCandidates(std::move(existingCandidates));
  861. }
  862. if (description.hasApplication()) {
  863. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  864. auto sctpTransport = std::atomic_load(&mSctpTransport);
  865. if (!sctpTransport && dtlsTransport &&
  866. dtlsTransport->state() == Transport::State::Connected)
  867. initSctpTransport();
  868. } else {
  869. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
  870. }
  871. }
  872. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  873. auto iceTransport = std::atomic_load(&mIceTransport);
  874. {
  875. // Set as remote candidate
  876. std::lock_guard lock(mRemoteDescriptionMutex);
  877. if (!mRemoteDescription)
  878. throw std::logic_error("Got a remote candidate without remote description");
  879. if (!iceTransport)
  880. throw std::logic_error("Got a remote candidate without ICE transport");
  881. candidate.hintMid(mRemoteDescription->bundleMid());
  882. if (mRemoteDescription->hasCandidate(candidate))
  883. return; // already in description, ignore
  884. candidate.resolve(Candidate::ResolveMode::Simple);
  885. mRemoteDescription->addCandidate(candidate);
  886. }
  887. if (candidate.isResolved()) {
  888. iceTransport->addRemoteCandidate(std::move(candidate));
  889. } else {
  890. // We might need a lookup, do it asynchronously
  891. // We don't use the thread pool because we have no control on the timeout
  892. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  893. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  894. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  895. utils::this_thread::set_name("RTC resolver");
  896. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  897. if (auto iceTransport = weakIceTransport.lock())
  898. iceTransport->addRemoteCandidate(std::move(candidate));
  899. });
  900. t.detach();
  901. }
  902. }
  903. }
  904. string PeerConnection::localBundleMid() const {
  905. std::lock_guard lock(mLocalDescriptionMutex);
  906. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  907. }
  908. void PeerConnection::setMediaHandler(shared_ptr<MediaHandler> handler) {
  909. std::unique_lock lock(mMediaHandlerMutex);
  910. if (mMediaHandler)
  911. mMediaHandler->onOutgoing(nullptr);
  912. mMediaHandler = handler;
  913. }
  914. shared_ptr<MediaHandler> PeerConnection::getMediaHandler() {
  915. std::shared_lock lock(mMediaHandlerMutex);
  916. return mMediaHandler;
  917. }
  918. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  919. auto dataChannel = weakDataChannel.lock();
  920. if (dataChannel) {
  921. dataChannel->resetOpenCallback(); // might be set internally
  922. mPendingDataChannels.push(std::move(dataChannel));
  923. }
  924. triggerPendingDataChannels();
  925. }
  926. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  927. auto track = weakTrack.lock();
  928. if (track) {
  929. track->resetOpenCallback(); // might be set internally
  930. mPendingTracks.push(std::move(track));
  931. }
  932. triggerPendingTracks();
  933. }
  934. void PeerConnection::triggerPendingDataChannels() {
  935. while (dataChannelCallback) {
  936. auto next = mPendingDataChannels.pop();
  937. if (!next)
  938. break;
  939. auto impl = std::move(*next);
  940. try {
  941. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  942. } catch (const std::exception &e) {
  943. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  944. }
  945. impl->triggerOpen();
  946. }
  947. }
  948. void PeerConnection::triggerPendingTracks() {
  949. while (trackCallback) {
  950. auto next = mPendingTracks.pop();
  951. if (!next)
  952. break;
  953. auto impl = std::move(*next);
  954. try {
  955. trackCallback(std::make_shared<rtc::Track>(impl));
  956. } catch (const std::exception &e) {
  957. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  958. }
  959. // Do not trigger open immediately for tracks as it'll be done later
  960. }
  961. }
  962. void PeerConnection::flushPendingDataChannels() {
  963. mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
  964. }
  965. void PeerConnection::flushPendingTracks() {
  966. mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
  967. }
  968. bool PeerConnection::changeState(State newState) {
  969. State current;
  970. do {
  971. current = state.load();
  972. if (current == State::Closed)
  973. return false;
  974. if (current == newState)
  975. return false;
  976. } while (!state.compare_exchange_weak(current, newState));
  977. std::ostringstream s;
  978. s << newState;
  979. PLOG_INFO << "Changed state to " << s.str();
  980. if (newState == State::Closed) {
  981. auto callback = std::move(stateChangeCallback); // steal the callback
  982. callback(State::Closed); // call it synchronously
  983. } else {
  984. mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
  985. &stateChangeCallback, newState);
  986. }
  987. return true;
  988. }
  989. bool PeerConnection::changeIceState(IceState newState) {
  990. if (iceState.exchange(newState) == newState)
  991. return false;
  992. std::ostringstream s;
  993. s << newState;
  994. PLOG_INFO << "Changed ICE state to " << s.str();
  995. if (newState == IceState::Closed) {
  996. auto callback = std::move(iceStateChangeCallback); // steal the callback
  997. callback(IceState::Closed); // call it synchronously
  998. } else {
  999. mProcessor.enqueue(&PeerConnection::trigger<IceState>, shared_from_this(),
  1000. &iceStateChangeCallback, newState);
  1001. }
  1002. return true;
  1003. }
  1004. bool PeerConnection::changeGatheringState(GatheringState newState) {
  1005. if (gatheringState.exchange(newState) == newState)
  1006. return false;
  1007. std::ostringstream s;
  1008. s << newState;
  1009. PLOG_INFO << "Changed gathering state to " << s.str();
  1010. mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
  1011. &gatheringStateChangeCallback, newState);
  1012. return true;
  1013. }
  1014. bool PeerConnection::changeSignalingState(SignalingState newState) {
  1015. if (signalingState.exchange(newState) == newState)
  1016. return false;
  1017. std::ostringstream s;
  1018. s << newState;
  1019. PLOG_INFO << "Changed signaling state to " << s.str();
  1020. mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
  1021. &signalingStateChangeCallback, newState);
  1022. return true;
  1023. }
  1024. void PeerConnection::resetCallbacks() {
  1025. // Unregister all callbacks
  1026. dataChannelCallback = nullptr;
  1027. localDescriptionCallback = nullptr;
  1028. localCandidateCallback = nullptr;
  1029. stateChangeCallback = nullptr;
  1030. iceStateChangeCallback = nullptr;
  1031. gatheringStateChangeCallback = nullptr;
  1032. signalingStateChangeCallback = nullptr;
  1033. trackCallback = nullptr;
  1034. }
  1035. void PeerConnection::updateTrackSsrcCache(const Description &description) {
  1036. std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc
  1037. // Setup SSRC -> Track mapping
  1038. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  1039. std::visit( // ssrc -> track mapping
  1040. rtc::overloaded{
  1041. [&](Description::Application const *) { return; },
  1042. [&](Description::Media const *media) {
  1043. const auto ssrcs = media->getSSRCs();
  1044. // Note: We don't want to lock (or do any other lookups), if we
  1045. // already know there's no SSRCs to loop over.
  1046. if (ssrcs.size() <= 0) {
  1047. return;
  1048. }
  1049. std::shared_ptr<Track> track{nullptr};
  1050. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1051. if (auto track_for_mid = it->second.lock())
  1052. track = track_for_mid;
  1053. if (!track) {
  1054. // Unable to find track for MID
  1055. return;
  1056. }
  1057. for (auto ssrc : ssrcs) {
  1058. mTracksBySsrc.insert_or_assign(ssrc, track);
  1059. }
  1060. },
  1061. },
  1062. description.media(i));
  1063. }
  1064. } // namespace rtc::impl