peerconnection.cpp 42 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  8. */
  9. #include "peerconnection.hpp"
  10. #include "certificate.hpp"
  11. #include "dtlstransport.hpp"
  12. #include "icetransport.hpp"
  13. #include "internals.hpp"
  14. #include "logcounter.hpp"
  15. #include "peerconnection.hpp"
  16. #include "processor.hpp"
  17. #include "rtp.hpp"
  18. #include "sctptransport.hpp"
  19. #include "utils.hpp"
  20. #if RTC_ENABLE_MEDIA
  21. #include "dtlssrtptransport.hpp"
  22. #endif
  23. #include <algorithm>
  24. #include <array>
  25. #include <iomanip>
  26. #include <set>
  27. #include <sstream>
  28. #include <thread>
  29. using namespace std::placeholders;
  30. namespace rtc::impl {
  31. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  32. "Number of truncated RTP packets over past second");
  33. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  34. "Number of SRTP decryption errors over past second");
  35. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  36. "Number of SRTP encryption errors over past second");
  37. static LogCounter
  38. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  39. "Number of unknown RTCP packet types over past second");
  40. PeerConnection::PeerConnection(Configuration config_)
  41. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)) {
  42. PLOG_VERBOSE << "Creating PeerConnection";
  43. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  44. throw std::invalid_argument("Invalid port range");
  45. if (config.mtu) {
  46. if (*config.mtu < 576) // Min MTU for IPv4
  47. throw std::invalid_argument("Invalid MTU value");
  48. if (*config.mtu > 1500) { // Standard Ethernet
  49. PLOG_WARNING << "MTU set to " << *config.mtu;
  50. } else {
  51. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  52. }
  53. }
  54. }
  55. PeerConnection::~PeerConnection() {
  56. PLOG_VERBOSE << "Destroying PeerConnection";
  57. mProcessor.join();
  58. }
  59. void PeerConnection::close() {
  60. negotiationNeeded = false;
  61. if (!closing.exchange(true)) {
  62. PLOG_VERBOSE << "Closing PeerConnection";
  63. if (auto transport = std::atomic_load(&mSctpTransport))
  64. transport->stop();
  65. else
  66. remoteClose();
  67. }
  68. }
  69. void PeerConnection::remoteClose() {
  70. close();
  71. if (state.load() != State::Closed) {
  72. // Close data channels and tracks asynchronously
  73. mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
  74. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  75. closeTransports();
  76. }
  77. }
  78. optional<Description> PeerConnection::localDescription() const {
  79. std::lock_guard lock(mLocalDescriptionMutex);
  80. return mLocalDescription;
  81. }
  82. optional<Description> PeerConnection::remoteDescription() const {
  83. std::lock_guard lock(mRemoteDescriptionMutex);
  84. return mRemoteDescription;
  85. }
  86. size_t PeerConnection::remoteMaxMessageSize() const {
  87. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  88. size_t remoteMax = DEFAULT_REMOTE_MAX_MESSAGE_SIZE;
  89. std::lock_guard lock(mRemoteDescriptionMutex);
  90. if (mRemoteDescription)
  91. if (auto *application = mRemoteDescription->application())
  92. if (auto max = application->maxMessageSize()) {
  93. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  94. // size value of zero, it indicates that the SCTP endpoint will handle messages
  95. // of any size, subject to memory capacity, etc.
  96. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  97. }
  98. return std::min(remoteMax, localMax);
  99. }
  100. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  101. template <typename T>
  102. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  103. std::atomic_store(member, transport);
  104. try {
  105. transport->start();
  106. } catch (...) {
  107. std::atomic_store(member, decltype(transport)(nullptr));
  108. throw;
  109. }
  110. if (pc->closing.load() || pc->state.load() == PeerConnection::State::Closed) {
  111. std::atomic_store(member, decltype(transport)(nullptr));
  112. transport->stop();
  113. return nullptr;
  114. }
  115. return transport;
  116. }
  117. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  118. try {
  119. if (auto transport = std::atomic_load(&mIceTransport))
  120. return transport;
  121. PLOG_VERBOSE << "Starting ICE transport";
  122. auto transport = std::make_shared<IceTransport>(
  123. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  124. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  125. auto shared_this = weak_this.lock();
  126. if (!shared_this)
  127. return;
  128. switch (transportState) {
  129. case IceTransport::State::Connecting:
  130. changeIceState(IceState::Checking);
  131. changeState(State::Connecting);
  132. break;
  133. case IceTransport::State::Connected:
  134. changeIceState(IceState::Connected);
  135. initDtlsTransport();
  136. break;
  137. case IceTransport::State::Completed:
  138. changeIceState(IceState::Completed);
  139. break;
  140. case IceTransport::State::Failed:
  141. changeIceState(IceState::Failed);
  142. changeState(State::Failed);
  143. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  144. break;
  145. case IceTransport::State::Disconnected:
  146. changeIceState(IceState::Disconnected);
  147. changeState(State::Disconnected);
  148. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  149. break;
  150. default:
  151. // Ignore
  152. break;
  153. }
  154. },
  155. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  156. auto shared_this = weak_this.lock();
  157. if (!shared_this)
  158. return;
  159. switch (gatheringState) {
  160. case IceTransport::GatheringState::InProgress:
  161. changeGatheringState(GatheringState::InProgress);
  162. break;
  163. case IceTransport::GatheringState::Complete:
  164. endLocalCandidates();
  165. changeGatheringState(GatheringState::Complete);
  166. break;
  167. default:
  168. // Ignore
  169. break;
  170. }
  171. });
  172. return emplaceTransport(this, &mIceTransport, std::move(transport));
  173. } catch (const std::exception &e) {
  174. PLOG_ERROR << e.what();
  175. changeState(State::Failed);
  176. throw std::runtime_error("ICE transport initialization failed");
  177. }
  178. }
  179. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  180. try {
  181. if (auto transport = std::atomic_load(&mDtlsTransport))
  182. return transport;
  183. PLOG_VERBOSE << "Starting DTLS transport";
  184. auto fingerprintAlgorithm = CertificateFingerprint::Algorithm::Sha256;
  185. if (auto remote = remoteDescription(); remote && remote->fingerprint()) {
  186. fingerprintAlgorithm = remote->fingerprint()->algorithm;
  187. }
  188. auto lower = std::atomic_load(&mIceTransport);
  189. if (!lower)
  190. throw std::logic_error("No underlying ICE transport for DTLS transport");
  191. auto certificate = mCertificate.get();
  192. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  193. auto dtlsStateChangeCallback =
  194. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  195. auto shared_this = weak_this.lock();
  196. if (!shared_this)
  197. return;
  198. switch (transportState) {
  199. case DtlsTransport::State::Connected:
  200. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  201. initSctpTransport();
  202. else
  203. changeState(State::Connected);
  204. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  205. break;
  206. case DtlsTransport::State::Failed:
  207. changeState(State::Failed);
  208. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  209. break;
  210. case DtlsTransport::State::Disconnected:
  211. changeState(State::Disconnected);
  212. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  213. break;
  214. default:
  215. // Ignore
  216. break;
  217. }
  218. };
  219. shared_ptr<DtlsTransport> transport;
  220. auto local = localDescription();
  221. if (config.forceMediaTransport || (local && local->hasAudioOrVideo())) {
  222. #if RTC_ENABLE_MEDIA
  223. PLOG_INFO << "This connection requires media support";
  224. // DTLS-SRTP
  225. transport = std::make_shared<DtlsSrtpTransport>(
  226. lower, certificate, config.mtu, fingerprintAlgorithm, verifierCallback,
  227. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  228. #else
  229. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  230. #endif
  231. }
  232. if (!transport) {
  233. // DTLS only
  234. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  235. fingerprintAlgorithm, verifierCallback,
  236. dtlsStateChangeCallback);
  237. }
  238. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  239. } catch (const std::exception &e) {
  240. PLOG_ERROR << e.what();
  241. changeState(State::Failed);
  242. throw std::runtime_error("DTLS transport initialization failed");
  243. }
  244. }
  245. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  246. try {
  247. if (auto transport = std::atomic_load(&mSctpTransport))
  248. return transport;
  249. PLOG_VERBOSE << "Starting SCTP transport";
  250. auto lower = std::atomic_load(&mDtlsTransport);
  251. if (!lower)
  252. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  253. auto local = localDescription();
  254. if (!local || !local->application())
  255. throw std::logic_error("Starting SCTP transport without local application description");
  256. auto remote = remoteDescription();
  257. if (!remote || !remote->application())
  258. throw std::logic_error(
  259. "Starting SCTP transport without remote application description");
  260. SctpTransport::Ports ports = {};
  261. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  262. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  263. auto transport = std::make_shared<SctpTransport>(
  264. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  265. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  266. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  267. auto shared_this = weak_this.lock();
  268. if (!shared_this)
  269. return;
  270. switch (transportState) {
  271. case SctpTransport::State::Connected:
  272. changeState(State::Connected);
  273. assignDataChannels();
  274. mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
  275. break;
  276. case SctpTransport::State::Failed:
  277. changeState(State::Failed);
  278. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  279. break;
  280. case SctpTransport::State::Disconnected:
  281. changeState(State::Disconnected);
  282. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  283. break;
  284. default:
  285. // Ignore
  286. break;
  287. }
  288. });
  289. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  290. } catch (const std::exception &e) {
  291. PLOG_ERROR << e.what();
  292. changeState(State::Failed);
  293. throw std::runtime_error("SCTP transport initialization failed");
  294. }
  295. }
  296. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  297. return std::atomic_load(&mIceTransport);
  298. }
  299. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  300. return std::atomic_load(&mDtlsTransport);
  301. }
  302. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  303. return std::atomic_load(&mSctpTransport);
  304. }
  305. void PeerConnection::closeTransports() {
  306. PLOG_VERBOSE << "Closing transports";
  307. // Change ICE state to sink state Closed
  308. changeIceState(IceState::Closed);
  309. // Change state to sink state Closed
  310. if (!changeState(State::Closed))
  311. return; // already closed
  312. // Reset interceptor and callbacks now that state is changed
  313. setMediaHandler(nullptr);
  314. resetCallbacks();
  315. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  316. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  317. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  318. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  319. if (sctp) {
  320. sctp->onRecv(nullptr);
  321. sctp->onBufferedAmount(nullptr);
  322. }
  323. using array = std::array<shared_ptr<Transport>, 3>;
  324. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  325. for (const auto &t : transports)
  326. if (t)
  327. t->onStateChange(nullptr);
  328. TearDownProcessor::Instance().enqueue(
  329. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  330. for (const auto &t : transports) {
  331. if (t) {
  332. t->stop();
  333. break;
  334. }
  335. }
  336. for (auto &t : transports)
  337. t.reset();
  338. });
  339. }
  340. void PeerConnection::endLocalCandidates() {
  341. std::lock_guard lock(mLocalDescriptionMutex);
  342. if (mLocalDescription)
  343. mLocalDescription->endCandidates();
  344. }
  345. void PeerConnection::rollbackLocalDescription() {
  346. PLOG_DEBUG << "Rolling back pending local description";
  347. std::unique_lock lock(mLocalDescriptionMutex);
  348. if (mCurrentLocalDescription) {
  349. std::vector<Candidate> existingCandidates;
  350. if (mLocalDescription)
  351. existingCandidates = mLocalDescription->extractCandidates();
  352. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  353. mLocalDescription->addCandidates(std::move(existingCandidates));
  354. mCurrentLocalDescription.reset();
  355. }
  356. }
  357. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  358. std::lock_guard lock(mRemoteDescriptionMutex);
  359. if (!mRemoteDescription || !mRemoteDescription->fingerprint())
  360. return false;
  361. auto expectedFingerprint = mRemoteDescription->fingerprint()->value;
  362. if (expectedFingerprint == fingerprint) {
  363. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  364. return true;
  365. }
  366. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \"" << expectedFingerprint << "\"";
  367. return false;
  368. }
  369. void PeerConnection::forwardMessage(message_ptr message) {
  370. if (!message) {
  371. remoteCloseDataChannels();
  372. return;
  373. }
  374. auto iceTransport = std::atomic_load(&mIceTransport);
  375. auto sctpTransport = std::atomic_load(&mSctpTransport);
  376. if (!iceTransport || !sctpTransport)
  377. return;
  378. const uint16_t stream = uint16_t(message->stream);
  379. auto [channel, found] = findDataChannel(stream);
  380. if (DataChannel::IsOpenMessage(message)) {
  381. if (found) {
  382. // The stream is already used, the receiver must close the DataChannel
  383. PLOG_WARNING << "Got open message on already used stream " << stream;
  384. if (channel && !channel->isClosed())
  385. channel->close();
  386. else
  387. sctpTransport->closeStream(message->stream);
  388. return;
  389. }
  390. const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  391. if (stream % 2 != remoteParity) {
  392. // The odd/even rule is violated, the receiver must close the DataChannel
  393. PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
  394. sctpTransport->closeStream(message->stream);
  395. return;
  396. }
  397. channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport);
  398. channel->assignStream(stream);
  399. channel->openCallback =
  400. weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
  401. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  402. mDataChannels.emplace(stream, channel);
  403. } else if (!found) {
  404. if (message->type == Message::Reset)
  405. return; // ignore
  406. // Invalid, close the DataChannel
  407. PLOG_WARNING << "Got unexpected message on stream " << stream;
  408. sctpTransport->closeStream(message->stream);
  409. return;
  410. }
  411. if (message->type == Message::Reset) {
  412. // Incoming stream is reset, unregister it
  413. removeDataChannel(stream);
  414. }
  415. if (channel) {
  416. // Forward the message
  417. channel->incoming(message);
  418. } else {
  419. // DataChannel was destroyed, ignore
  420. PLOG_DEBUG << "Ignored message on stream " << stream << ", DataChannel is destroyed";
  421. }
  422. }
  423. void PeerConnection::forwardMedia([[maybe_unused]] message_ptr message) {
  424. #if RTC_ENABLE_MEDIA
  425. if (!message)
  426. return;
  427. // TODO: outgoing
  428. if (auto handler = getMediaHandler()) {
  429. message_vector messages{std::move(message)};
  430. handler->incoming(messages, [this](message_ptr message) {
  431. auto transport = std::atomic_load(&mDtlsTransport);
  432. if (auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport))
  433. srtpTransport->send(std::move(message));
  434. });
  435. for (auto &m : messages)
  436. dispatchMedia(std::move(m));
  437. } else {
  438. dispatchMedia(std::move(message));
  439. }
  440. #endif
  441. }
  442. void PeerConnection::dispatchMedia([[maybe_unused]] message_ptr message) {
  443. #if RTC_ENABLE_MEDIA
  444. std::shared_lock lock(mTracksMutex); // read-only
  445. if (mTrackLines.size()==1) {
  446. if (auto track = mTrackLines.front().lock())
  447. track->incoming(message);
  448. return;
  449. }
  450. // Browsers like to compound their packets with a random SSRC.
  451. // we have to do this monstrosity to distribute the report blocks
  452. if (message->type == Message::Control) {
  453. std::set<uint32_t> ssrcs;
  454. size_t offset = 0;
  455. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  456. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  457. if (header->lengthInBytes() > message->size() - offset) {
  458. COUNTER_MEDIA_TRUNCATED++;
  459. break;
  460. }
  461. offset += header->lengthInBytes();
  462. if (header->payloadType() == 205 || header->payloadType() == 206) {
  463. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  464. ssrcs.insert(rtcpfb->packetSenderSSRC());
  465. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  466. } else if (header->payloadType() == 200) {
  467. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  468. ssrcs.insert(rtcpsr->senderSSRC());
  469. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  470. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  471. } else if (header->payloadType() == 201) {
  472. auto rtcprr = reinterpret_cast<RtcpRr *>(header);
  473. ssrcs.insert(rtcprr->senderSSRC());
  474. for (int i = 0; i < rtcprr->header.reportCount(); ++i)
  475. ssrcs.insert(rtcprr->getReportBlock(i)->getSSRC());
  476. } else if (header->payloadType() == 202) {
  477. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  478. if (!sdes->isValid()) {
  479. PLOG_WARNING << "RTCP SDES packet is invalid";
  480. continue;
  481. }
  482. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  483. auto chunk = sdes->getChunk(i);
  484. ssrcs.insert(chunk->ssrc());
  485. }
  486. } else {
  487. // PT=203 == Goodbye
  488. // PT=204 == Application Specific
  489. // PT=207 == Extended Report
  490. if (header->payloadType() != 203 && header->payloadType() != 204 &&
  491. header->payloadType() != 207) {
  492. COUNTER_UNKNOWN_PACKET_TYPE++;
  493. }
  494. }
  495. }
  496. if (!ssrcs.empty()) {
  497. for (uint32_t ssrc : ssrcs) {
  498. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  499. if (auto track = it->second.lock())
  500. track->incoming(message);
  501. }
  502. }
  503. return;
  504. }
  505. }
  506. uint32_t ssrc = uint32_t(message->stream);
  507. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  508. if (auto track = it->second.lock())
  509. track->incoming(message);
  510. } else {
  511. /*
  512. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  513. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  514. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  515. * this ideal? No! Do I know how to fix it? No!
  516. */
  517. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  518. return;
  519. }
  520. #endif
  521. }
  522. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  523. [[maybe_unused]] auto [channel, found] = findDataChannel(stream);
  524. if (channel)
  525. channel->triggerBufferedAmount(amount);
  526. }
  527. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  528. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  529. // If the DataChannel is user-negotiated, do not negotiate it in-band
  530. auto channel =
  531. init.negotiated
  532. ? std::make_shared<DataChannel>(weak_from_this(), std::move(label),
  533. std::move(init.protocol), std::move(init.reliability))
  534. : std::make_shared<OutgoingDataChannel>(weak_from_this(), std::move(label),
  535. std::move(init.protocol),
  536. std::move(init.reliability));
  537. // If the user supplied a stream id, use it, otherwise assign it later
  538. if (init.id) {
  539. uint16_t stream = *init.id;
  540. if (stream > maxDataChannelStream())
  541. throw std::invalid_argument("DataChannel stream id is too high");
  542. channel->assignStream(stream);
  543. mDataChannels.emplace(std::make_pair(stream, channel));
  544. } else {
  545. mUnassignedDataChannels.push_back(channel);
  546. }
  547. lock.unlock(); // we are going to call assignDataChannels()
  548. // If SCTP is connected, assign and open now
  549. auto sctpTransport = std::atomic_load(&mSctpTransport);
  550. if (sctpTransport && sctpTransport->state() == SctpTransport::State::Connected) {
  551. assignDataChannels();
  552. channel->open(sctpTransport);
  553. }
  554. return channel;
  555. }
  556. std::pair<shared_ptr<DataChannel>, bool> PeerConnection::findDataChannel(uint16_t stream) {
  557. std::shared_lock lock(mDataChannelsMutex); // read-only
  558. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  559. return std::make_pair(it->second.lock(), true);
  560. else
  561. return std::make_pair(nullptr, false);
  562. }
  563. bool PeerConnection::removeDataChannel(uint16_t stream) {
  564. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  565. return mDataChannels.erase(stream) != 0;
  566. }
  567. uint16_t PeerConnection::maxDataChannelStream() const {
  568. auto sctpTransport = std::atomic_load(&mSctpTransport);
  569. return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
  570. }
  571. void PeerConnection::assignDataChannels() {
  572. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  573. auto iceTransport = std::atomic_load(&mIceTransport);
  574. if (!iceTransport)
  575. throw std::logic_error("Attempted to assign DataChannels without ICE transport");
  576. const uint16_t maxStream = maxDataChannelStream();
  577. for (auto it = mUnassignedDataChannels.begin(); it != mUnassignedDataChannels.end(); ++it) {
  578. auto channel = it->lock();
  579. if (!channel)
  580. continue;
  581. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier
  582. // for which the corresponding incoming and outgoing streams are unused. If the side is
  583. // acting as the DTLS client, it MUST choose an even stream identifier; if the side is
  584. // acting as the DTLS server, it MUST choose an odd one. See
  585. // https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  586. uint16_t stream = (iceTransport->role() == Description::Role::Active) ? 0 : 1;
  587. while (true) {
  588. if (stream > maxStream)
  589. throw std::runtime_error("Too many DataChannels");
  590. if (mDataChannels.find(stream) == mDataChannels.end())
  591. break;
  592. stream += 2;
  593. }
  594. PLOG_DEBUG << "Assigning stream " << stream << " to DataChannel";
  595. channel->assignStream(stream);
  596. mDataChannels.emplace(std::make_pair(stream, channel));
  597. }
  598. mUnassignedDataChannels.clear();
  599. }
  600. void PeerConnection::iterateDataChannels(
  601. std::function<void(shared_ptr<DataChannel> channel)> func) {
  602. std::vector<shared_ptr<DataChannel>> locked;
  603. {
  604. std::shared_lock lock(mDataChannelsMutex); // read-only
  605. locked.reserve(mDataChannels.size());
  606. auto it = mDataChannels.begin();
  607. while (it != mDataChannels.end()) {
  608. auto channel = it->second.lock();
  609. if (channel && !channel->isClosed())
  610. locked.push_back(std::move(channel));
  611. ++it;
  612. }
  613. }
  614. for (auto &channel : locked) {
  615. try {
  616. func(std::move(channel));
  617. } catch (const std::exception &e) {
  618. PLOG_WARNING << e.what();
  619. }
  620. }
  621. }
  622. void PeerConnection::openDataChannels() {
  623. if (auto transport = std::atomic_load(&mSctpTransport))
  624. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  625. if (!channel->isOpen())
  626. channel->open(transport);
  627. });
  628. }
  629. void PeerConnection::closeDataChannels() {
  630. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  631. }
  632. void PeerConnection::remoteCloseDataChannels() {
  633. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  634. }
  635. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  636. #if !RTC_ENABLE_MEDIA
  637. // No media support, mark as removed
  638. PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
  639. description.markRemoved();
  640. #endif
  641. shared_ptr<Track> track;
  642. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  643. if (track = it->second.lock(); track)
  644. track->setDescription(std::move(description));
  645. if (!track) {
  646. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  647. mTracks.emplace(std::make_pair(track->mid(), track));
  648. mTrackLines.emplace_back(track);
  649. }
  650. auto handler = getMediaHandler();
  651. if (handler)
  652. handler->media(track->description());
  653. if (track->description().isRemoved())
  654. track->close();
  655. return track;
  656. }
  657. void PeerConnection::iterateTracks(std::function<void(shared_ptr<Track> track)> func) {
  658. std::shared_lock lock(mTracksMutex); // read-only
  659. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  660. auto track = it->lock();
  661. if (track && !track->isClosed()) {
  662. try {
  663. func(std::move(track));
  664. } catch (const std::exception &e) {
  665. PLOG_WARNING << e.what();
  666. }
  667. }
  668. }
  669. }
  670. void PeerConnection::openTracks() {
  671. #if RTC_ENABLE_MEDIA
  672. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  673. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  674. iterateTracks([&](const shared_ptr<Track> &track) {
  675. if (!track->isOpen()) {
  676. if (srtpTransport) {
  677. track->open(srtpTransport);
  678. } else {
  679. // A track was added during a latter renegotiation, whereas SRTP transport was
  680. // not initialized. This is an optimization to use the library with data
  681. // channels only. Set forceMediaTransport to true to initialize the transport
  682. // before dynamically adding tracks.
  683. auto errorMsg = "The connection has no media transport";
  684. PLOG_ERROR << errorMsg;
  685. track->triggerError(errorMsg);
  686. }
  687. }
  688. });
  689. }
  690. #endif
  691. }
  692. void PeerConnection::closeTracks() {
  693. std::shared_lock lock(mTracksMutex); // read-only
  694. iterateTracks([&](shared_ptr<Track> track) { track->close(); });
  695. }
  696. void PeerConnection::validateRemoteDescription(const Description &description) {
  697. if (!description.iceUfrag())
  698. throw std::invalid_argument("Remote description has no ICE user fragment");
  699. if (!description.icePwd())
  700. throw std::invalid_argument("Remote description has no ICE password");
  701. if (!description.fingerprint())
  702. throw std::invalid_argument("Remote description has no valid fingerprint");
  703. if (description.mediaCount() == 0)
  704. throw std::invalid_argument("Remote description has no media line");
  705. int activeMediaCount = 0;
  706. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  707. std::visit(rtc::overloaded{[&](const Description::Application *application) {
  708. if (!application->isRemoved())
  709. ++activeMediaCount;
  710. },
  711. [&](const Description::Media *media) {
  712. if (!media->isRemoved() ||
  713. media->direction() != Description::Direction::Inactive)
  714. ++activeMediaCount;
  715. }},
  716. description.media(i));
  717. if (activeMediaCount == 0)
  718. throw std::invalid_argument("Remote description has no active media");
  719. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  720. if (*description.iceUfrag() == *local->iceUfrag() &&
  721. *description.icePwd() == *local->icePwd())
  722. throw std::logic_error("Got the local description as remote description");
  723. PLOG_VERBOSE << "Remote description looks valid";
  724. }
  725. void PeerConnection::processLocalDescription(Description description) {
  726. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  727. const size_t localMaxMessageSize =
  728. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  729. // Clean up the application entry the ICE transport might have added already (libnice)
  730. description.clearMedia();
  731. if (auto remote = remoteDescription()) {
  732. // Reciprocate remote description
  733. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  734. std::visit( // reciprocate each media
  735. rtc::overloaded{
  736. [&](Description::Application *remoteApp) {
  737. std::shared_lock lock(mDataChannelsMutex);
  738. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  739. // Prefer local description
  740. Description::Application app(remoteApp->mid());
  741. app.setSctpPort(localSctpPort);
  742. app.setMaxMessageSize(localMaxMessageSize);
  743. PLOG_DEBUG << "Adding application to local description, mid=\""
  744. << app.mid() << "\"";
  745. description.addMedia(std::move(app));
  746. return;
  747. }
  748. auto reciprocated = remoteApp->reciprocate();
  749. reciprocated.hintSctpPort(localSctpPort);
  750. reciprocated.setMaxMessageSize(localMaxMessageSize);
  751. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  752. << reciprocated.mid() << "\"";
  753. description.addMedia(std::move(reciprocated));
  754. },
  755. [&](Description::Media *remoteMedia) {
  756. std::shared_lock lock(mTracksMutex);
  757. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  758. // Prefer local description
  759. if (auto track = it->second.lock()) {
  760. auto media = track->description();
  761. PLOG_DEBUG << "Adding media to local description, mid=\""
  762. << media.mid() << "\", removed=" << std::boolalpha
  763. << media.isRemoved();
  764. description.addMedia(std::move(media));
  765. } else {
  766. auto reciprocated = remoteMedia->reciprocate();
  767. reciprocated.markRemoved();
  768. PLOG_DEBUG << "Adding media to local description, mid=\""
  769. << reciprocated.mid()
  770. << "\", removed=true (track is destroyed)";
  771. description.addMedia(std::move(reciprocated));
  772. }
  773. return;
  774. }
  775. auto reciprocated = remoteMedia->reciprocate();
  776. #if !RTC_ENABLE_MEDIA
  777. if (!reciprocated.isRemoved()) {
  778. // No media support, mark as removed
  779. PLOG_WARNING << "Rejecting track (not compiled with media support)";
  780. reciprocated.markRemoved();
  781. }
  782. #endif
  783. PLOG_DEBUG << "Reciprocating media in local description, mid=\""
  784. << reciprocated.mid() << "\", removed=" << std::boolalpha
  785. << reciprocated.isRemoved();
  786. // Create incoming track
  787. auto track =
  788. std::make_shared<Track>(weak_from_this(), std::move(reciprocated));
  789. mTracks.emplace(std::make_pair(track->mid(), track));
  790. mTrackLines.emplace_back(track);
  791. triggerTrack(track); // The user may modify the track description
  792. auto handler = getMediaHandler();
  793. if (handler)
  794. handler->media(track->description());
  795. if (track->description().isRemoved())
  796. track->close();
  797. description.addMedia(track->description());
  798. },
  799. },
  800. remote->media(i));
  801. // We need to update the SSRC cache for newly-created incoming tracks
  802. updateTrackSsrcCache(*remote);
  803. }
  804. if (description.type() == Description::Type::Offer) {
  805. // This is an offer, add locally created data channels and tracks
  806. // Add media for local tracks
  807. std::shared_lock lock(mTracksMutex);
  808. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  809. if (auto track = it->lock()) {
  810. if (description.hasMid(track->mid()))
  811. continue;
  812. auto media = track->description();
  813. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  814. << "\", removed=" << std::boolalpha << media.isRemoved();
  815. description.addMedia(std::move(media));
  816. }
  817. }
  818. // Add application for data channels
  819. if (!description.hasApplication()) {
  820. std::shared_lock lock(mDataChannelsMutex);
  821. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  822. // Prevents mid collision with remote or local tracks
  823. unsigned int m = 0;
  824. while (description.hasMid(std::to_string(m)))
  825. ++m;
  826. Description::Application app(std::to_string(m));
  827. app.setSctpPort(localSctpPort);
  828. app.setMaxMessageSize(localMaxMessageSize);
  829. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  830. << "\"";
  831. description.addMedia(std::move(app));
  832. }
  833. }
  834. // There might be no media at this point if the user created a Track, deleted it,
  835. // then called setLocalDescription().
  836. if (description.mediaCount() == 0)
  837. throw std::runtime_error("No DataChannel or Track to negotiate");
  838. }
  839. // Set local fingerprint (wait for certificate if necessary)
  840. description.setFingerprint(mCertificate.get()->fingerprint());
  841. PLOG_VERBOSE << "Issuing local description: " << description;
  842. if (description.mediaCount() == 0)
  843. throw std::logic_error("Local description has no media line");
  844. updateTrackSsrcCache(description);
  845. {
  846. // Set as local description
  847. std::lock_guard lock(mLocalDescriptionMutex);
  848. std::vector<Candidate> existingCandidates;
  849. if (mLocalDescription) {
  850. existingCandidates = mLocalDescription->extractCandidates();
  851. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  852. }
  853. mLocalDescription.emplace(description);
  854. mLocalDescription->addCandidates(std::move(existingCandidates));
  855. }
  856. mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
  857. &localDescriptionCallback, std::move(description));
  858. // Reciprocated tracks might need to be open
  859. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  860. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  861. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  862. }
  863. void PeerConnection::processLocalCandidate(Candidate candidate) {
  864. std::lock_guard lock(mLocalDescriptionMutex);
  865. if (!mLocalDescription)
  866. throw std::logic_error("Got a local candidate without local description");
  867. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  868. candidate.type() != Candidate::Type::Relayed) {
  869. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  870. return;
  871. }
  872. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  873. candidate.resolve(Candidate::ResolveMode::Simple);
  874. mLocalDescription->addCandidate(candidate);
  875. mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
  876. &localCandidateCallback, std::move(candidate));
  877. }
  878. void PeerConnection::processRemoteDescription(Description description) {
  879. // Update the SSRC cache for existing tracks
  880. updateTrackSsrcCache(description);
  881. {
  882. // Set as remote description
  883. std::lock_guard lock(mRemoteDescriptionMutex);
  884. std::vector<Candidate> existingCandidates;
  885. if (mRemoteDescription)
  886. existingCandidates = mRemoteDescription->extractCandidates();
  887. mRemoteDescription.emplace(description);
  888. mRemoteDescription->addCandidates(std::move(existingCandidates));
  889. }
  890. if (description.hasApplication()) {
  891. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  892. auto sctpTransport = std::atomic_load(&mSctpTransport);
  893. if (!sctpTransport && dtlsTransport &&
  894. dtlsTransport->state() == Transport::State::Connected)
  895. initSctpTransport();
  896. } else {
  897. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
  898. }
  899. }
  900. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  901. auto iceTransport = std::atomic_load(&mIceTransport);
  902. {
  903. // Set as remote candidate
  904. std::lock_guard lock(mRemoteDescriptionMutex);
  905. if (!mRemoteDescription)
  906. throw std::logic_error("Got a remote candidate without remote description");
  907. if (!iceTransport)
  908. throw std::logic_error("Got a remote candidate without ICE transport");
  909. candidate.hintMid(mRemoteDescription->bundleMid());
  910. if (mRemoteDescription->hasCandidate(candidate))
  911. return; // already in description, ignore
  912. candidate.resolve(Candidate::ResolveMode::Simple);
  913. mRemoteDescription->addCandidate(candidate);
  914. }
  915. if (candidate.isResolved()) {
  916. iceTransport->addRemoteCandidate(std::move(candidate));
  917. } else {
  918. // We might need a lookup, do it asynchronously
  919. // We don't use the thread pool because we have no control on the timeout
  920. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  921. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  922. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  923. utils::this_thread::set_name("RTC resolver");
  924. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  925. if (auto iceTransport = weakIceTransport.lock())
  926. iceTransport->addRemoteCandidate(std::move(candidate));
  927. });
  928. t.detach();
  929. }
  930. }
  931. }
  932. string PeerConnection::localBundleMid() const {
  933. std::lock_guard lock(mLocalDescriptionMutex);
  934. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  935. }
  936. void PeerConnection::setMediaHandler(shared_ptr<MediaHandler> handler) {
  937. std::unique_lock lock(mMediaHandlerMutex);
  938. mMediaHandler = handler;
  939. }
  940. shared_ptr<MediaHandler> PeerConnection::getMediaHandler() {
  941. std::shared_lock lock(mMediaHandlerMutex);
  942. return mMediaHandler;
  943. }
  944. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  945. auto dataChannel = weakDataChannel.lock();
  946. if (dataChannel) {
  947. dataChannel->resetOpenCallback(); // might be set internally
  948. mPendingDataChannels.push(std::move(dataChannel));
  949. }
  950. triggerPendingDataChannels();
  951. }
  952. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  953. auto track = weakTrack.lock();
  954. if (track) {
  955. track->resetOpenCallback(); // might be set internally
  956. mPendingTracks.push(std::move(track));
  957. }
  958. triggerPendingTracks();
  959. }
  960. void PeerConnection::triggerPendingDataChannels() {
  961. while (dataChannelCallback) {
  962. auto next = mPendingDataChannels.pop();
  963. if (!next)
  964. break;
  965. auto impl = std::move(*next);
  966. try {
  967. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  968. } catch (const std::exception &e) {
  969. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  970. }
  971. impl->triggerOpen();
  972. }
  973. }
  974. void PeerConnection::triggerPendingTracks() {
  975. while (trackCallback) {
  976. auto next = mPendingTracks.pop();
  977. if (!next)
  978. break;
  979. auto impl = std::move(*next);
  980. try {
  981. trackCallback(std::make_shared<rtc::Track>(impl));
  982. } catch (const std::exception &e) {
  983. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  984. }
  985. // Do not trigger open immediately for tracks as it'll be done later
  986. }
  987. }
  988. void PeerConnection::flushPendingDataChannels() {
  989. mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
  990. }
  991. void PeerConnection::flushPendingTracks() {
  992. mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
  993. }
  994. bool PeerConnection::changeState(State newState) {
  995. State current;
  996. do {
  997. current = state.load();
  998. if (current == State::Closed)
  999. return false;
  1000. if (current == newState)
  1001. return false;
  1002. } while (!state.compare_exchange_weak(current, newState));
  1003. std::ostringstream s;
  1004. s << newState;
  1005. PLOG_INFO << "Changed state to " << s.str();
  1006. if (newState == State::Closed) {
  1007. auto callback = std::move(stateChangeCallback); // steal the callback
  1008. callback(State::Closed); // call it synchronously
  1009. } else {
  1010. mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
  1011. &stateChangeCallback, newState);
  1012. }
  1013. return true;
  1014. }
  1015. bool PeerConnection::changeIceState(IceState newState) {
  1016. if (iceState.exchange(newState) == newState)
  1017. return false;
  1018. std::ostringstream s;
  1019. s << newState;
  1020. PLOG_INFO << "Changed ICE state to " << s.str();
  1021. if (newState == IceState::Closed) {
  1022. auto callback = std::move(iceStateChangeCallback); // steal the callback
  1023. callback(IceState::Closed); // call it synchronously
  1024. } else {
  1025. mProcessor.enqueue(&PeerConnection::trigger<IceState>, shared_from_this(),
  1026. &iceStateChangeCallback, newState);
  1027. }
  1028. return true;
  1029. }
  1030. bool PeerConnection::changeGatheringState(GatheringState newState) {
  1031. if (gatheringState.exchange(newState) == newState)
  1032. return false;
  1033. std::ostringstream s;
  1034. s << newState;
  1035. PLOG_INFO << "Changed gathering state to " << s.str();
  1036. mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
  1037. &gatheringStateChangeCallback, newState);
  1038. return true;
  1039. }
  1040. bool PeerConnection::changeSignalingState(SignalingState newState) {
  1041. if (signalingState.exchange(newState) == newState)
  1042. return false;
  1043. std::ostringstream s;
  1044. s << newState;
  1045. PLOG_INFO << "Changed signaling state to " << s.str();
  1046. mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
  1047. &signalingStateChangeCallback, newState);
  1048. return true;
  1049. }
  1050. void PeerConnection::resetCallbacks() {
  1051. // Unregister all callbacks
  1052. dataChannelCallback = nullptr;
  1053. localDescriptionCallback = nullptr;
  1054. localCandidateCallback = nullptr;
  1055. stateChangeCallback = nullptr;
  1056. iceStateChangeCallback = nullptr;
  1057. gatheringStateChangeCallback = nullptr;
  1058. signalingStateChangeCallback = nullptr;
  1059. trackCallback = nullptr;
  1060. }
  1061. void PeerConnection::updateTrackSsrcCache(const Description &description) {
  1062. std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc
  1063. // Setup SSRC -> Track mapping
  1064. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  1065. std::visit( // ssrc -> track mapping
  1066. rtc::overloaded{
  1067. [&](Description::Application const *) { return; },
  1068. [&](Description::Media const *media) {
  1069. const auto ssrcs = media->getSSRCs();
  1070. // Note: We don't want to lock (or do any other lookups), if we
  1071. // already know there's no SSRCs to loop over.
  1072. if (ssrcs.size() <= 0) {
  1073. return;
  1074. }
  1075. std::shared_ptr<Track> track{nullptr};
  1076. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1077. if (auto track_for_mid = it->second.lock())
  1078. track = track_for_mid;
  1079. if (!track) {
  1080. // Unable to find track for MID
  1081. return;
  1082. }
  1083. for (auto ssrc : ssrcs) {
  1084. mTracksBySsrc.insert_or_assign(ssrc, track);
  1085. }
  1086. },
  1087. },
  1088. description.media(i));
  1089. }
  1090. } // namespace rtc::impl