peerconnection.cpp 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  8. */
  9. #include "peerconnection.hpp"
  10. #include "certificate.hpp"
  11. #include "dtlstransport.hpp"
  12. #include "icetransport.hpp"
  13. #include "internals.hpp"
  14. #include "logcounter.hpp"
  15. #include "peerconnection.hpp"
  16. #include "processor.hpp"
  17. #include "rtp.hpp"
  18. #include "sctptransport.hpp"
  19. #include "utils.hpp"
  20. #if RTC_ENABLE_MEDIA
  21. #include "dtlssrtptransport.hpp"
  22. #endif
  23. #include <algorithm>
  24. #include <array>
  25. #include <iomanip>
  26. #include <set>
  27. #include <sstream>
  28. #include <thread>
  29. using namespace std::placeholders;
  30. namespace rtc::impl {
  31. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  32. "Number of truncated RTP packets over past second");
  33. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  34. "Number of SRTP decryption errors over past second");
  35. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  36. "Number of SRTP encryption errors over past second");
  37. static LogCounter
  38. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  39. "Number of unknown RTCP packet types over past second");
  40. const string PemBeginCertificateTag = "-----BEGIN CERTIFICATE-----";
  41. PeerConnection::PeerConnection(Configuration config_) : config(std::move(config_)) {
  42. PLOG_VERBOSE << "Creating PeerConnection";
  43. if (config.certificatePemFile && config.keyPemFile) {
  44. std::promise<certificate_ptr> cert;
  45. cert.set_value(std::make_shared<Certificate>(
  46. config.certificatePemFile->find(PemBeginCertificateTag) != string::npos
  47. ? Certificate::FromString(*config.certificatePemFile, *config.keyPemFile)
  48. : Certificate::FromFile(*config.certificatePemFile, *config.keyPemFile,
  49. config.keyPemPass.value_or(""))));
  50. mCertificate = cert.get_future();
  51. } else if (!config.certificatePemFile && !config.keyPemFile) {
  52. mCertificate = make_certificate(config.certificateType);
  53. } else {
  54. throw std::invalid_argument(
  55. "Either none or both certificate and key PEM files must be specified");
  56. }
  57. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  58. throw std::invalid_argument("Invalid port range");
  59. if (config.mtu) {
  60. if (*config.mtu < 576) // Min MTU for IPv4
  61. throw std::invalid_argument("Invalid MTU value");
  62. if (*config.mtu > 1500) { // Standard Ethernet
  63. PLOG_WARNING << "MTU set to " << *config.mtu;
  64. } else {
  65. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  66. }
  67. }
  68. }
  69. PeerConnection::~PeerConnection() {
  70. PLOG_VERBOSE << "Destroying PeerConnection";
  71. mProcessor.join();
  72. }
  73. void PeerConnection::close() {
  74. if (!closing.exchange(true)) {
  75. PLOG_VERBOSE << "Closing PeerConnection";
  76. if (auto transport = std::atomic_load(&mSctpTransport))
  77. transport->stop();
  78. else
  79. remoteClose();
  80. }
  81. }
  82. void PeerConnection::remoteClose() {
  83. close();
  84. if (state.load() != State::Closed) {
  85. // Close data channels and tracks asynchronously
  86. mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
  87. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  88. closeTransports();
  89. }
  90. }
  91. optional<Description> PeerConnection::localDescription() const {
  92. std::lock_guard lock(mLocalDescriptionMutex);
  93. return mLocalDescription;
  94. }
  95. optional<Description> PeerConnection::remoteDescription() const {
  96. std::lock_guard lock(mRemoteDescriptionMutex);
  97. return mRemoteDescription;
  98. }
  99. size_t PeerConnection::remoteMaxMessageSize() const {
  100. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  101. size_t remoteMax = DEFAULT_REMOTE_MAX_MESSAGE_SIZE;
  102. std::lock_guard lock(mRemoteDescriptionMutex);
  103. if (mRemoteDescription)
  104. if (auto *application = mRemoteDescription->application())
  105. if (auto max = application->maxMessageSize()) {
  106. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  107. // size value of zero, it indicates that the SCTP endpoint will handle messages
  108. // of any size, subject to memory capacity, etc.
  109. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  110. }
  111. return std::min(remoteMax, localMax);
  112. }
  113. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  114. template <typename T>
  115. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  116. std::atomic_store(member, transport);
  117. try {
  118. transport->start();
  119. } catch (...) {
  120. std::atomic_store(member, decltype(transport)(nullptr));
  121. throw;
  122. }
  123. if (pc->closing.load() || pc->state.load() == PeerConnection::State::Closed) {
  124. std::atomic_store(member, decltype(transport)(nullptr));
  125. transport->stop();
  126. return nullptr;
  127. }
  128. return transport;
  129. }
  130. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  131. try {
  132. if (auto transport = std::atomic_load(&mIceTransport))
  133. return transport;
  134. PLOG_VERBOSE << "Starting ICE transport";
  135. auto transport = std::make_shared<IceTransport>(
  136. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  137. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  138. auto shared_this = weak_this.lock();
  139. if (!shared_this)
  140. return;
  141. switch (transportState) {
  142. case IceTransport::State::Connecting:
  143. changeIceState(IceState::Checking);
  144. changeState(State::Connecting);
  145. break;
  146. case IceTransport::State::Connected:
  147. changeIceState(IceState::Connected);
  148. initDtlsTransport();
  149. break;
  150. case IceTransport::State::Completed:
  151. changeIceState(IceState::Completed);
  152. break;
  153. case IceTransport::State::Failed:
  154. changeIceState(IceState::Failed);
  155. changeState(State::Failed);
  156. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  157. break;
  158. case IceTransport::State::Disconnected:
  159. changeIceState(IceState::Disconnected);
  160. changeState(State::Disconnected);
  161. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  162. break;
  163. default:
  164. // Ignore
  165. break;
  166. }
  167. },
  168. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  169. auto shared_this = weak_this.lock();
  170. if (!shared_this)
  171. return;
  172. switch (gatheringState) {
  173. case IceTransport::GatheringState::InProgress:
  174. changeGatheringState(GatheringState::InProgress);
  175. break;
  176. case IceTransport::GatheringState::Complete:
  177. endLocalCandidates();
  178. changeGatheringState(GatheringState::Complete);
  179. break;
  180. default:
  181. // Ignore
  182. break;
  183. }
  184. });
  185. return emplaceTransport(this, &mIceTransport, std::move(transport));
  186. } catch (const std::exception &e) {
  187. PLOG_ERROR << e.what();
  188. changeState(State::Failed);
  189. throw std::runtime_error("ICE transport initialization failed");
  190. }
  191. }
  192. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  193. try {
  194. if (auto transport = std::atomic_load(&mDtlsTransport))
  195. return transport;
  196. PLOG_VERBOSE << "Starting DTLS transport";
  197. CertificateFingerprint::Algorithm fingerprintAlgorithm;
  198. {
  199. std::lock_guard lock(mRemoteDescriptionMutex);
  200. if (mRemoteDescription && mRemoteDescription->fingerprint()) {
  201. mRemoteFingerprintAlgorithm = mRemoteDescription->fingerprint()->algorithm;
  202. }
  203. fingerprintAlgorithm = mRemoteFingerprintAlgorithm;
  204. }
  205. auto lower = std::atomic_load(&mIceTransport);
  206. if (!lower)
  207. throw std::logic_error("No underlying ICE transport for DTLS transport");
  208. auto certificate = mCertificate.get();
  209. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  210. auto dtlsStateChangeCallback =
  211. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  212. auto shared_this = weak_this.lock();
  213. if (!shared_this)
  214. return;
  215. switch (transportState) {
  216. case DtlsTransport::State::Connected:
  217. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  218. initSctpTransport();
  219. else
  220. changeState(State::Connected);
  221. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  222. break;
  223. case DtlsTransport::State::Failed:
  224. changeState(State::Failed);
  225. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  226. break;
  227. case DtlsTransport::State::Disconnected:
  228. changeState(State::Disconnected);
  229. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  230. break;
  231. default:
  232. // Ignore
  233. break;
  234. }
  235. };
  236. shared_ptr<DtlsTransport> transport;
  237. auto local = localDescription();
  238. if (config.forceMediaTransport || (local && local->hasAudioOrVideo())) {
  239. #if RTC_ENABLE_MEDIA
  240. PLOG_INFO << "This connection requires media support";
  241. // DTLS-SRTP
  242. transport = std::make_shared<DtlsSrtpTransport>(
  243. lower, certificate, config.mtu, fingerprintAlgorithm, verifierCallback,
  244. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  245. #else
  246. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  247. #endif
  248. }
  249. if (!transport) {
  250. // DTLS only
  251. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  252. fingerprintAlgorithm, verifierCallback,
  253. dtlsStateChangeCallback);
  254. }
  255. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  256. } catch (const std::exception &e) {
  257. PLOG_ERROR << e.what();
  258. changeState(State::Failed);
  259. throw std::runtime_error("DTLS transport initialization failed");
  260. }
  261. }
  262. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  263. try {
  264. if (auto transport = std::atomic_load(&mSctpTransport))
  265. return transport;
  266. PLOG_VERBOSE << "Starting SCTP transport";
  267. auto lower = std::atomic_load(&mDtlsTransport);
  268. if (!lower)
  269. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  270. auto local = localDescription();
  271. if (!local || !local->application())
  272. throw std::logic_error("Starting SCTP transport without local application description");
  273. auto remote = remoteDescription();
  274. if (!remote || !remote->application())
  275. throw std::logic_error(
  276. "Starting SCTP transport without remote application description");
  277. SctpTransport::Ports ports = {};
  278. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  279. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  280. auto transport = std::make_shared<SctpTransport>(
  281. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  282. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  283. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  284. auto shared_this = weak_this.lock();
  285. if (!shared_this)
  286. return;
  287. switch (transportState) {
  288. case SctpTransport::State::Connected:
  289. changeState(State::Connected);
  290. assignDataChannels();
  291. mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
  292. break;
  293. case SctpTransport::State::Failed:
  294. changeState(State::Failed);
  295. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  296. break;
  297. case SctpTransport::State::Disconnected:
  298. changeState(State::Disconnected);
  299. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  300. break;
  301. default:
  302. // Ignore
  303. break;
  304. }
  305. });
  306. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  307. } catch (const std::exception &e) {
  308. PLOG_ERROR << e.what();
  309. changeState(State::Failed);
  310. throw std::runtime_error("SCTP transport initialization failed");
  311. }
  312. }
  313. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  314. return std::atomic_load(&mIceTransport);
  315. }
  316. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  317. return std::atomic_load(&mDtlsTransport);
  318. }
  319. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  320. return std::atomic_load(&mSctpTransport);
  321. }
  322. void PeerConnection::closeTransports() {
  323. PLOG_VERBOSE << "Closing transports";
  324. // Change ICE state to sink state Closed
  325. changeIceState(IceState::Closed);
  326. // Change state to sink state Closed
  327. if (!changeState(State::Closed))
  328. return; // already closed
  329. // Reset interceptor and callbacks now that state is changed
  330. setMediaHandler(nullptr);
  331. resetCallbacks();
  332. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  333. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  334. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  335. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  336. if (sctp) {
  337. sctp->onRecv(nullptr);
  338. sctp->onBufferedAmount(nullptr);
  339. }
  340. using array = std::array<shared_ptr<Transport>, 3>;
  341. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  342. for (const auto &t : transports)
  343. if (t)
  344. t->onStateChange(nullptr);
  345. TearDownProcessor::Instance().enqueue(
  346. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  347. for (const auto &t : transports) {
  348. if (t) {
  349. t->stop();
  350. break;
  351. }
  352. }
  353. for (auto &t : transports)
  354. t.reset();
  355. });
  356. }
  357. void PeerConnection::endLocalCandidates() {
  358. std::lock_guard lock(mLocalDescriptionMutex);
  359. if (mLocalDescription)
  360. mLocalDescription->endCandidates();
  361. }
  362. void PeerConnection::rollbackLocalDescription() {
  363. PLOG_DEBUG << "Rolling back pending local description";
  364. std::unique_lock lock(mLocalDescriptionMutex);
  365. if (mCurrentLocalDescription) {
  366. std::vector<Candidate> existingCandidates;
  367. if (mLocalDescription)
  368. existingCandidates = mLocalDescription->extractCandidates();
  369. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  370. mLocalDescription->addCandidates(std::move(existingCandidates));
  371. mCurrentLocalDescription.reset();
  372. }
  373. }
  374. bool PeerConnection::checkFingerprint(const std::string &fingerprint) {
  375. std::lock_guard lock(mRemoteDescriptionMutex);
  376. mRemoteFingerprint = fingerprint;
  377. if (!mRemoteDescription || !mRemoteDescription->fingerprint()
  378. || mRemoteFingerprintAlgorithm != mRemoteDescription->fingerprint()->algorithm)
  379. return false;
  380. if (config.disableFingerprintVerification) {
  381. PLOG_VERBOSE << "Skipping fingerprint validation";
  382. return true;
  383. }
  384. auto expectedFingerprint = mRemoteDescription->fingerprint()->value;
  385. if (expectedFingerprint == fingerprint) {
  386. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  387. return true;
  388. }
  389. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  390. << expectedFingerprint << "\"";
  391. return false;
  392. }
  393. void PeerConnection::forwardMessage(message_ptr message) {
  394. if (!message) {
  395. remoteCloseDataChannels();
  396. return;
  397. }
  398. auto iceTransport = std::atomic_load(&mIceTransport);
  399. auto sctpTransport = std::atomic_load(&mSctpTransport);
  400. if (!iceTransport || !sctpTransport)
  401. return;
  402. const uint16_t stream = uint16_t(message->stream);
  403. auto [channel, found] = findDataChannel(stream);
  404. if (DataChannel::IsOpenMessage(message)) {
  405. if (found) {
  406. // The stream is already used, the receiver must close the DataChannel
  407. PLOG_WARNING << "Got open message on already used stream " << stream;
  408. if (channel && !channel->isClosed())
  409. channel->close();
  410. else
  411. sctpTransport->closeStream(message->stream);
  412. return;
  413. }
  414. const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  415. if (stream % 2 != remoteParity) {
  416. // The odd/even rule is violated, the receiver must close the DataChannel
  417. PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
  418. sctpTransport->closeStream(message->stream);
  419. return;
  420. }
  421. channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport);
  422. channel->assignStream(stream);
  423. channel->openCallback =
  424. weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
  425. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  426. mDataChannels.emplace(stream, channel);
  427. } else if (!found) {
  428. if (message->type == Message::Reset)
  429. return; // ignore
  430. // Invalid, close the DataChannel
  431. PLOG_WARNING << "Got unexpected message on stream " << stream;
  432. sctpTransport->closeStream(message->stream);
  433. return;
  434. }
  435. if (message->type == Message::Reset) {
  436. // Incoming stream is reset, unregister it
  437. removeDataChannel(stream);
  438. }
  439. if (channel) {
  440. // Forward the message
  441. channel->incoming(message);
  442. } else {
  443. // DataChannel was destroyed, ignore
  444. PLOG_DEBUG << "Ignored message on stream " << stream << ", DataChannel is destroyed";
  445. }
  446. }
  447. void PeerConnection::forwardMedia([[maybe_unused]] message_ptr message) {
  448. #if RTC_ENABLE_MEDIA
  449. if (!message)
  450. return;
  451. // TODO: outgoing
  452. if (auto handler = getMediaHandler()) {
  453. message_vector messages{std::move(message)};
  454. try {
  455. handler->incomingChain(messages, [this](message_ptr message) {
  456. auto transport = std::atomic_load(&mDtlsTransport);
  457. if (auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport))
  458. srtpTransport->send(std::move(message));
  459. });
  460. } catch(const std::exception &e) {
  461. PLOG_WARNING << "Exception in global incoming media handler: " << e.what();
  462. return;
  463. }
  464. for (auto &m : messages)
  465. dispatchMedia(std::move(m));
  466. } else {
  467. dispatchMedia(std::move(message));
  468. }
  469. #endif
  470. }
  471. void PeerConnection::dispatchMedia([[maybe_unused]] message_ptr message) {
  472. #if RTC_ENABLE_MEDIA
  473. std::shared_lock lock(mTracksMutex); // read-only
  474. if (mTrackLines.size() == 1) {
  475. if (auto track = mTrackLines.front().lock())
  476. track->incoming(message);
  477. return;
  478. }
  479. // Browsers like to compound their packets with a random SSRC.
  480. // we have to do this monstrosity to distribute the report blocks
  481. if (message->type == Message::Control) {
  482. std::set<uint32_t> ssrcs;
  483. size_t offset = 0;
  484. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  485. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  486. if (header->lengthInBytes() > message->size() - offset) {
  487. COUNTER_MEDIA_TRUNCATED++;
  488. break;
  489. }
  490. offset += header->lengthInBytes();
  491. if (header->payloadType() == 205 || header->payloadType() == 206) {
  492. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  493. ssrcs.insert(rtcpfb->packetSenderSSRC());
  494. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  495. } else if (header->payloadType() == 200) {
  496. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  497. ssrcs.insert(rtcpsr->senderSSRC());
  498. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  499. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  500. } else if (header->payloadType() == 201) {
  501. auto rtcprr = reinterpret_cast<RtcpRr *>(header);
  502. ssrcs.insert(rtcprr->senderSSRC());
  503. for (int i = 0; i < rtcprr->header.reportCount(); ++i)
  504. ssrcs.insert(rtcprr->getReportBlock(i)->getSSRC());
  505. } else if (header->payloadType() == 202) {
  506. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  507. if (!sdes->isValid()) {
  508. PLOG_WARNING << "RTCP SDES packet is invalid";
  509. continue;
  510. }
  511. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  512. auto chunk = sdes->getChunk(i);
  513. ssrcs.insert(chunk->ssrc());
  514. }
  515. } else {
  516. // PT=203 == Goodbye
  517. // PT=204 == Application Specific
  518. // PT=207 == Extended Report
  519. if (header->payloadType() != 203 && header->payloadType() != 204 &&
  520. header->payloadType() != 207) {
  521. COUNTER_UNKNOWN_PACKET_TYPE++;
  522. }
  523. }
  524. }
  525. if (!ssrcs.empty()) {
  526. for (uint32_t ssrc : ssrcs) {
  527. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  528. if (auto track = it->second.lock())
  529. track->incoming(message);
  530. }
  531. }
  532. return;
  533. }
  534. }
  535. uint32_t ssrc = uint32_t(message->stream);
  536. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  537. if (auto track = it->second.lock())
  538. track->incoming(message);
  539. } else {
  540. /*
  541. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  542. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  543. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  544. * this ideal? No! Do I know how to fix it? No!
  545. */
  546. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  547. return;
  548. }
  549. #endif
  550. }
  551. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  552. [[maybe_unused]] auto [channel, found] = findDataChannel(stream);
  553. if (channel)
  554. channel->triggerBufferedAmount(amount);
  555. }
  556. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  557. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  558. // If the DataChannel is user-negotiated, do not negotiate it in-band
  559. auto channel =
  560. init.negotiated
  561. ? std::make_shared<DataChannel>(weak_from_this(), std::move(label),
  562. std::move(init.protocol), std::move(init.reliability))
  563. : std::make_shared<OutgoingDataChannel>(weak_from_this(), std::move(label),
  564. std::move(init.protocol),
  565. std::move(init.reliability));
  566. // If the user supplied a stream id, use it, otherwise assign it later
  567. if (init.id) {
  568. uint16_t stream = *init.id;
  569. if (stream > maxDataChannelStream())
  570. throw std::invalid_argument("DataChannel stream id is too high");
  571. channel->assignStream(stream);
  572. mDataChannels.emplace(std::make_pair(stream, channel));
  573. } else {
  574. mUnassignedDataChannels.push_back(channel);
  575. }
  576. lock.unlock(); // we are going to call assignDataChannels()
  577. // If SCTP is connected, assign and open now
  578. auto sctpTransport = std::atomic_load(&mSctpTransport);
  579. if (sctpTransport && sctpTransport->state() == SctpTransport::State::Connected) {
  580. assignDataChannels();
  581. channel->open(sctpTransport);
  582. }
  583. return channel;
  584. }
  585. std::pair<shared_ptr<DataChannel>, bool> PeerConnection::findDataChannel(uint16_t stream) {
  586. std::shared_lock lock(mDataChannelsMutex); // read-only
  587. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  588. return std::make_pair(it->second.lock(), true);
  589. else
  590. return std::make_pair(nullptr, false);
  591. }
  592. bool PeerConnection::removeDataChannel(uint16_t stream) {
  593. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  594. return mDataChannels.erase(stream) != 0;
  595. }
  596. uint16_t PeerConnection::maxDataChannelStream() const {
  597. auto sctpTransport = std::atomic_load(&mSctpTransport);
  598. return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
  599. }
  600. void PeerConnection::assignDataChannels() {
  601. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  602. auto iceTransport = std::atomic_load(&mIceTransport);
  603. if (!iceTransport)
  604. throw std::logic_error("Attempted to assign DataChannels without ICE transport");
  605. const uint16_t maxStream = maxDataChannelStream();
  606. for (auto it = mUnassignedDataChannels.begin(); it != mUnassignedDataChannels.end(); ++it) {
  607. auto channel = it->lock();
  608. if (!channel)
  609. continue;
  610. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier
  611. // for which the corresponding incoming and outgoing streams are unused. If the side is
  612. // acting as the DTLS client, it MUST choose an even stream identifier; if the side is
  613. // acting as the DTLS server, it MUST choose an odd one. See
  614. // https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  615. uint16_t stream = (iceTransport->role() == Description::Role::Active) ? 0 : 1;
  616. while (true) {
  617. if (stream > maxStream)
  618. throw std::runtime_error("Too many DataChannels");
  619. if (mDataChannels.find(stream) == mDataChannels.end())
  620. break;
  621. stream += 2;
  622. }
  623. PLOG_DEBUG << "Assigning stream " << stream << " to DataChannel";
  624. channel->assignStream(stream);
  625. mDataChannels.emplace(std::make_pair(stream, channel));
  626. }
  627. mUnassignedDataChannels.clear();
  628. }
  629. void PeerConnection::iterateDataChannels(
  630. std::function<void(shared_ptr<DataChannel> channel)> func) {
  631. std::vector<shared_ptr<DataChannel>> locked;
  632. {
  633. std::shared_lock lock(mDataChannelsMutex); // read-only
  634. locked.reserve(mDataChannels.size());
  635. for (auto it = mDataChannels.begin(); it != mDataChannels.end(); ++it) {
  636. auto channel = it->second.lock();
  637. if (channel && !channel->isClosed())
  638. locked.push_back(std::move(channel));
  639. }
  640. }
  641. for (auto &channel : locked) {
  642. try {
  643. func(std::move(channel));
  644. } catch (const std::exception &e) {
  645. PLOG_WARNING << e.what();
  646. }
  647. }
  648. }
  649. void PeerConnection::openDataChannels() {
  650. if (auto transport = std::atomic_load(&mSctpTransport))
  651. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  652. if (!channel->isOpen())
  653. channel->open(transport);
  654. });
  655. }
  656. void PeerConnection::closeDataChannels() {
  657. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  658. }
  659. void PeerConnection::remoteCloseDataChannels() {
  660. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  661. }
  662. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  663. std::unique_lock lock(mTracksMutex); // we are going to emplace
  664. #if !RTC_ENABLE_MEDIA
  665. // No media support, mark as removed
  666. PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
  667. description.markRemoved();
  668. #endif
  669. shared_ptr<Track> track;
  670. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  671. if (auto t = it->second.lock(); t && !t->isClosed())
  672. track = std::move(t);
  673. if (track) {
  674. track->setDescription(std::move(description));
  675. } else {
  676. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  677. mTracks.emplace(std::make_pair(track->mid(), track));
  678. mTrackLines.emplace_back(track);
  679. }
  680. auto handler = getMediaHandler();
  681. if (handler)
  682. handler->media(track->description());
  683. if (track->description().isRemoved())
  684. track->close();
  685. return track;
  686. }
  687. void PeerConnection::iterateTracks(std::function<void(shared_ptr<Track> track)> func) {
  688. std::vector<shared_ptr<Track>> locked;
  689. {
  690. std::shared_lock lock(mTracksMutex); // read-only
  691. locked.reserve(mTrackLines.size());
  692. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  693. auto track = it->lock();
  694. if (track && !track->isClosed())
  695. locked.push_back(std::move(track));
  696. }
  697. }
  698. for (auto &track : locked) {
  699. try {
  700. func(std::move(track));
  701. } catch (const std::exception &e) {
  702. PLOG_WARNING << e.what();
  703. }
  704. }
  705. }
  706. void PeerConnection::iterateRemoteTracks(std::function<void(shared_ptr<Track> track)> func) {
  707. auto remote = remoteDescription();
  708. if(!remote)
  709. return;
  710. std::vector<shared_ptr<Track>> locked;
  711. {
  712. std::shared_lock lock(mTracksMutex); // read-only
  713. locked.reserve(remote->mediaCount());
  714. for(int i = 0; i < remote->mediaCount(); ++i) {
  715. if (std::holds_alternative<Description::Media *>(remote->media(i))) {
  716. auto remoteMedia = std::get<Description::Media *>(remote->media(i));
  717. if (!remoteMedia->isRemoved())
  718. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end())
  719. if (auto track = it->second.lock())
  720. locked.push_back(std::move(track));
  721. }
  722. }
  723. }
  724. for (auto &track : locked) {
  725. try {
  726. func(std::move(track));
  727. } catch (const std::exception &e) {
  728. PLOG_WARNING << e.what();
  729. }
  730. }
  731. }
  732. void PeerConnection::openTracks() {
  733. #if RTC_ENABLE_MEDIA
  734. auto transport = std::atomic_load(&mDtlsTransport);
  735. if (!transport)
  736. return;
  737. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  738. iterateRemoteTracks([&](shared_ptr<Track> track) {
  739. if(!track->isOpen()) {
  740. if (srtpTransport) {
  741. track->open(srtpTransport);
  742. } else {
  743. // A track was added during a latter renegotiation, whereas SRTP transport was
  744. // not initialized. This is an optimization to use the library with data
  745. // channels only. Set forceMediaTransport to true to initialize the transport
  746. // before dynamically adding tracks.
  747. auto errorMsg = "The connection has no media transport";
  748. PLOG_ERROR << errorMsg;
  749. track->triggerError(errorMsg);
  750. }
  751. }
  752. });
  753. #endif
  754. }
  755. void PeerConnection::closeTracks() {
  756. std::shared_lock lock(mTracksMutex); // read-only
  757. iterateTracks([&](shared_ptr<Track> track) { track->close(); });
  758. }
  759. void PeerConnection::validateRemoteDescription(const Description &description) {
  760. if (!description.iceUfrag())
  761. throw std::invalid_argument("Remote description has no ICE user fragment");
  762. if (!description.icePwd())
  763. throw std::invalid_argument("Remote description has no ICE password");
  764. if (!description.fingerprint())
  765. throw std::invalid_argument("Remote description has no valid fingerprint");
  766. if (description.mediaCount() == 0)
  767. throw std::invalid_argument("Remote description has no media line");
  768. int activeMediaCount = 0;
  769. for (int i = 0; i < description.mediaCount(); ++i)
  770. std::visit(rtc::overloaded{[&](const Description::Application *application) {
  771. if (!application->isRemoved())
  772. ++activeMediaCount;
  773. },
  774. [&](const Description::Media *media) {
  775. if (!media->isRemoved() ||
  776. media->direction() != Description::Direction::Inactive)
  777. ++activeMediaCount;
  778. }},
  779. description.media(i));
  780. if (activeMediaCount == 0)
  781. throw std::invalid_argument("Remote description has no active media");
  782. PLOG_VERBOSE << "Remote description looks valid";
  783. }
  784. void PeerConnection::populateLocalDescription(Description &description) const {
  785. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  786. const size_t localMaxMessageSize =
  787. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  788. // Clean up the application entry the ICE transport might have added already (libnice)
  789. description.clearMedia();
  790. if (auto remote = remoteDescription()) {
  791. // Reciprocate remote description
  792. for (int i = 0; i < remote->mediaCount(); ++i) {
  793. std::visit( // reciprocate each media
  794. rtc::overloaded{
  795. [&](Description::Application *remoteApp) {
  796. std::shared_lock lock(mDataChannelsMutex);
  797. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  798. // Prefer local description
  799. Description::Application app(remoteApp->mid());
  800. app.setSctpPort(localSctpPort);
  801. app.setMaxMessageSize(localMaxMessageSize);
  802. PLOG_DEBUG << "Adding application to local description, mid=\""
  803. << app.mid() << "\"";
  804. description.addMedia(std::move(app));
  805. } else {
  806. auto reciprocated = remoteApp->reciprocate();
  807. reciprocated.hintSctpPort(localSctpPort);
  808. reciprocated.setMaxMessageSize(localMaxMessageSize);
  809. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  810. << reciprocated.mid() << "\"";
  811. description.addMedia(std::move(reciprocated));
  812. }
  813. },
  814. [&](Description::Media *remoteMedia) {
  815. std::shared_lock lock(mTracksMutex);
  816. auto it = mTracks.find(remoteMedia->mid());
  817. auto track = it != mTracks.end() ? it->second.lock() : nullptr;
  818. if(track) {
  819. // Prefer local description
  820. auto media = track->description();
  821. PLOG_DEBUG << "Adding media to local description, mid=\""
  822. << media.mid() << "\", removed=" << std::boolalpha
  823. << media.isRemoved();
  824. description.addMedia(std::move(media));
  825. } else {
  826. auto reciprocated = remoteMedia->reciprocate();
  827. reciprocated.markRemoved();
  828. PLOG_DEBUG << "Adding media to local description, mid=\""
  829. << reciprocated.mid()
  830. << "\", removed=true (track is destroyed)";
  831. description.addMedia(std::move(reciprocated));
  832. }
  833. },
  834. },
  835. remote->media(i));
  836. }
  837. }
  838. if (description.type() == Description::Type::Offer) {
  839. // This is an offer, add locally created data channels and tracks
  840. // Add media for local tracks
  841. std::shared_lock lock(mTracksMutex);
  842. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  843. if (auto track = it->lock()) {
  844. if (description.hasMid(track->mid()))
  845. continue;
  846. auto media = track->description();
  847. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  848. << "\", removed=" << std::boolalpha << media.isRemoved();
  849. description.addMedia(std::move(media));
  850. }
  851. }
  852. // Add application for data channels
  853. if (!description.hasApplication()) {
  854. std::shared_lock lock(mDataChannelsMutex);
  855. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  856. // Prevents mid collision with remote or local tracks
  857. unsigned int m = 0;
  858. while (description.hasMid(std::to_string(m)))
  859. ++m;
  860. Description::Application app(std::to_string(m));
  861. app.setSctpPort(localSctpPort);
  862. app.setMaxMessageSize(localMaxMessageSize);
  863. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  864. << "\"";
  865. description.addMedia(std::move(app));
  866. }
  867. }
  868. }
  869. // Set local fingerprint (wait for certificate if necessary)
  870. description.setFingerprint(mCertificate.get()->fingerprint());
  871. }
  872. void PeerConnection::processLocalDescription(Description description) {
  873. PLOG_VERBOSE << "Issuing local description: " << description;
  874. if (description.mediaCount() == 0)
  875. throw std::logic_error("Local description has no media line");
  876. {
  877. // Set as local description
  878. std::lock_guard lock(mLocalDescriptionMutex);
  879. std::vector<Candidate> existingCandidates;
  880. if (mLocalDescription) {
  881. existingCandidates = mLocalDescription->extractCandidates();
  882. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  883. }
  884. mLocalDescription.emplace(description);
  885. mLocalDescription->addCandidates(std::move(existingCandidates));
  886. }
  887. mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
  888. &localDescriptionCallback, std::move(description));
  889. }
  890. void PeerConnection::processLocalCandidate(Candidate candidate) {
  891. std::lock_guard lock(mLocalDescriptionMutex);
  892. if (!mLocalDescription)
  893. throw std::logic_error("Got a local candidate without local description");
  894. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  895. candidate.type() != Candidate::Type::Relayed) {
  896. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  897. return;
  898. }
  899. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  900. candidate.resolve(Candidate::ResolveMode::Simple);
  901. mLocalDescription->addCandidate(candidate);
  902. mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
  903. &localCandidateCallback, std::move(candidate));
  904. }
  905. void PeerConnection::processRemoteDescription(Description description) {
  906. // Create tracks from remote description
  907. for (int i = 0; i < description.mediaCount(); ++i) {
  908. if (std::holds_alternative<Description::Media *>(description.media(i))) {
  909. auto remoteMedia = std::get<Description::Media *>(description.media(i));
  910. std::unique_lock lock(mTracksMutex); // we may emplace a track
  911. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end())
  912. continue;
  913. PLOG_DEBUG << "New remote track, mid=\"" << remoteMedia->mid() << "\"";
  914. auto reciprocated = remoteMedia->reciprocate();
  915. #if !RTC_ENABLE_MEDIA
  916. if (!reciprocated.isRemoved()) {
  917. // No media support, mark as removed
  918. PLOG_WARNING << "Rejecting track (not compiled with media support)";
  919. reciprocated.markRemoved();
  920. }
  921. #endif
  922. // Create incoming track
  923. auto track = std::make_shared<Track>(weak_from_this(), std::move(reciprocated));
  924. mTracks.emplace(std::make_pair(track->mid(), track));
  925. mTrackLines.emplace_back(track);
  926. triggerTrack(track); // The user may modify the track description
  927. auto handler = getMediaHandler();
  928. if (handler)
  929. handler->media(track->description());
  930. if (track->description().isRemoved())
  931. track->close();
  932. }
  933. }
  934. // Update the SSRC cache for existing tracks
  935. updateTrackSsrcCache(description);
  936. {
  937. // Set as remote description
  938. std::lock_guard lock(mRemoteDescriptionMutex);
  939. std::vector<Candidate> existingCandidates;
  940. if (mRemoteDescription)
  941. existingCandidates = mRemoteDescription->extractCandidates();
  942. mRemoteDescription.emplace(description);
  943. mRemoteDescription->addCandidates(std::move(existingCandidates));
  944. }
  945. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  946. if (description.hasApplication()) {
  947. auto sctpTransport = std::atomic_load(&mSctpTransport);
  948. if (!sctpTransport && dtlsTransport &&
  949. dtlsTransport->state() == Transport::State::Connected)
  950. initSctpTransport();
  951. } else {
  952. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
  953. }
  954. // Reciprocated tracks might need to be open
  955. if (dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  956. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  957. }
  958. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  959. auto iceTransport = std::atomic_load(&mIceTransport);
  960. {
  961. // Set as remote candidate
  962. std::lock_guard lock(mRemoteDescriptionMutex);
  963. if (!mRemoteDescription)
  964. throw std::logic_error("Got a remote candidate without remote description");
  965. if (!iceTransport)
  966. throw std::logic_error("Got a remote candidate without ICE transport");
  967. candidate.hintMid(mRemoteDescription->bundleMid());
  968. if (mRemoteDescription->hasCandidate(candidate))
  969. return; // already in description, ignore
  970. candidate.resolve(Candidate::ResolveMode::Simple);
  971. mRemoteDescription->addCandidate(candidate);
  972. }
  973. if (candidate.isResolved()) {
  974. iceTransport->addRemoteCandidate(std::move(candidate));
  975. } else {
  976. // We might need a lookup, do it asynchronously
  977. // We don't use the thread pool because we have no control on the timeout
  978. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  979. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  980. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  981. utils::this_thread::set_name("RTC resolver");
  982. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  983. if (auto iceTransport = weakIceTransport.lock())
  984. iceTransport->addRemoteCandidate(std::move(candidate));
  985. });
  986. t.detach();
  987. }
  988. }
  989. }
  990. string PeerConnection::localBundleMid() const {
  991. std::lock_guard lock(mLocalDescriptionMutex);
  992. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  993. }
  994. bool PeerConnection::negotiationNeeded() const {
  995. auto description = localDescription();
  996. {
  997. std::shared_lock lock(mDataChannelsMutex);
  998. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty())
  999. if(!description || !description->hasApplication()) {
  1000. PLOG_DEBUG << "Negotiation needed for data channels";
  1001. return true;
  1002. }
  1003. }
  1004. {
  1005. std::shared_lock lock(mTracksMutex);
  1006. for(const auto &[mid, weakTrack] : mTracks)
  1007. if (auto track = weakTrack.lock())
  1008. if (!description || !description->hasMid(track->mid())) {
  1009. PLOG_DEBUG << "Negotiation needed to add track, mid=" << track->mid();
  1010. return true;
  1011. }
  1012. if(description) {
  1013. for(int i = 0; i < description->mediaCount(); ++i) {
  1014. if (std::holds_alternative<Description::Media *>(description->media(i))) {
  1015. auto media = std::get<Description::Media *>(description->media(i));
  1016. if (!media->isRemoved())
  1017. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1018. if (auto track = it->second.lock(); !track || track->isClosed()) {
  1019. PLOG_DEBUG << "Negotiation needed to remove track, mid=" << media->mid();
  1020. return true;
  1021. }
  1022. }
  1023. }
  1024. }
  1025. }
  1026. return false;
  1027. }
  1028. void PeerConnection::setMediaHandler(shared_ptr<MediaHandler> handler) {
  1029. std::unique_lock lock(mMediaHandlerMutex);
  1030. mMediaHandler = handler;
  1031. }
  1032. shared_ptr<MediaHandler> PeerConnection::getMediaHandler() const {
  1033. std::shared_lock lock(mMediaHandlerMutex);
  1034. return mMediaHandler;
  1035. }
  1036. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  1037. auto dataChannel = weakDataChannel.lock();
  1038. if (dataChannel) {
  1039. dataChannel->resetOpenCallback(); // might be set internally
  1040. mPendingDataChannels.push(std::move(dataChannel));
  1041. }
  1042. triggerPendingDataChannels();
  1043. }
  1044. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  1045. auto track = weakTrack.lock();
  1046. if (track) {
  1047. track->resetOpenCallback(); // might be set internally
  1048. mPendingTracks.push(std::move(track));
  1049. }
  1050. triggerPendingTracks();
  1051. }
  1052. void PeerConnection::triggerPendingDataChannels() {
  1053. while (dataChannelCallback) {
  1054. auto next = mPendingDataChannels.pop();
  1055. if (!next)
  1056. break;
  1057. auto impl = std::move(*next);
  1058. try {
  1059. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  1060. } catch (const std::exception &e) {
  1061. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  1062. }
  1063. impl->triggerOpen();
  1064. }
  1065. }
  1066. void PeerConnection::triggerPendingTracks() {
  1067. while (trackCallback) {
  1068. auto next = mPendingTracks.pop();
  1069. if (!next)
  1070. break;
  1071. auto impl = std::move(*next);
  1072. try {
  1073. trackCallback(std::make_shared<rtc::Track>(impl));
  1074. } catch (const std::exception &e) {
  1075. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  1076. }
  1077. // Do not trigger open immediately for tracks as it'll be done later
  1078. }
  1079. }
  1080. void PeerConnection::flushPendingDataChannels() {
  1081. mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
  1082. }
  1083. void PeerConnection::flushPendingTracks() {
  1084. mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
  1085. }
  1086. bool PeerConnection::changeState(State newState) {
  1087. State current;
  1088. do {
  1089. current = state.load();
  1090. if (current == State::Closed)
  1091. return false;
  1092. if (current == newState)
  1093. return false;
  1094. } while (!state.compare_exchange_weak(current, newState));
  1095. std::ostringstream s;
  1096. s << newState;
  1097. PLOG_INFO << "Changed state to " << s.str();
  1098. if (newState == State::Closed) {
  1099. auto callback = std::move(stateChangeCallback); // steal the callback
  1100. callback(State::Closed); // call it synchronously
  1101. } else {
  1102. mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
  1103. &stateChangeCallback, newState);
  1104. }
  1105. return true;
  1106. }
  1107. bool PeerConnection::changeIceState(IceState newState) {
  1108. if (iceState.exchange(newState) == newState)
  1109. return false;
  1110. std::ostringstream s;
  1111. s << newState;
  1112. PLOG_INFO << "Changed ICE state to " << s.str();
  1113. if (newState == IceState::Closed) {
  1114. auto callback = std::move(iceStateChangeCallback); // steal the callback
  1115. callback(IceState::Closed); // call it synchronously
  1116. } else {
  1117. mProcessor.enqueue(&PeerConnection::trigger<IceState>, shared_from_this(),
  1118. &iceStateChangeCallback, newState);
  1119. }
  1120. return true;
  1121. }
  1122. bool PeerConnection::changeGatheringState(GatheringState newState) {
  1123. if (gatheringState.exchange(newState) == newState)
  1124. return false;
  1125. std::ostringstream s;
  1126. s << newState;
  1127. PLOG_INFO << "Changed gathering state to " << s.str();
  1128. mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
  1129. &gatheringStateChangeCallback, newState);
  1130. return true;
  1131. }
  1132. bool PeerConnection::changeSignalingState(SignalingState newState) {
  1133. if (signalingState.exchange(newState) == newState)
  1134. return false;
  1135. std::ostringstream s;
  1136. s << newState;
  1137. PLOG_INFO << "Changed signaling state to " << s.str();
  1138. mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
  1139. &signalingStateChangeCallback, newState);
  1140. return true;
  1141. }
  1142. void PeerConnection::resetCallbacks() {
  1143. // Unregister all callbacks
  1144. dataChannelCallback = nullptr;
  1145. localDescriptionCallback = nullptr;
  1146. localCandidateCallback = nullptr;
  1147. stateChangeCallback = nullptr;
  1148. iceStateChangeCallback = nullptr;
  1149. gatheringStateChangeCallback = nullptr;
  1150. signalingStateChangeCallback = nullptr;
  1151. trackCallback = nullptr;
  1152. }
  1153. CertificateFingerprint PeerConnection::remoteFingerprint() {
  1154. std::lock_guard lock(mRemoteDescriptionMutex);
  1155. if (mRemoteFingerprint)
  1156. return {CertificateFingerprint{mRemoteFingerprintAlgorithm, *mRemoteFingerprint}};
  1157. else
  1158. return {};
  1159. }
  1160. void PeerConnection::updateTrackSsrcCache(const Description &description) {
  1161. std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc
  1162. // Setup SSRC -> Track mapping
  1163. for (int i = 0; i < description.mediaCount(); ++i)
  1164. std::visit( // ssrc -> track mapping
  1165. rtc::overloaded{
  1166. [&](Description::Application const *) { return; },
  1167. [&](Description::Media const *media) {
  1168. const auto ssrcs = media->getSSRCs();
  1169. // Note: We don't want to lock (or do any other lookups), if we
  1170. // already know there's no SSRCs to loop over.
  1171. if (ssrcs.size() <= 0) {
  1172. return;
  1173. }
  1174. std::shared_ptr<Track> track{nullptr};
  1175. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1176. if (auto track_for_mid = it->second.lock())
  1177. track = track_for_mid;
  1178. if (!track) {
  1179. // Unable to find track for MID
  1180. return;
  1181. }
  1182. for (auto ssrc : ssrcs) {
  1183. mTracksBySsrc.insert_or_assign(ssrc, track);
  1184. }
  1185. },
  1186. },
  1187. description.media(i));
  1188. }
  1189. } // namespace rtc::impl