peerconnection.cpp 45 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  8. */
  9. #include "peerconnection.hpp"
  10. #include "certificate.hpp"
  11. #include "dtlstransport.hpp"
  12. #include "icetransport.hpp"
  13. #include "internals.hpp"
  14. #include "logcounter.hpp"
  15. #include "peerconnection.hpp"
  16. #include "processor.hpp"
  17. #include "rtp.hpp"
  18. #include "sctptransport.hpp"
  19. #include "utils.hpp"
  20. #if RTC_ENABLE_MEDIA
  21. #include "dtlssrtptransport.hpp"
  22. #endif
  23. #include <algorithm>
  24. #include <array>
  25. #include <iomanip>
  26. #include <set>
  27. #include <sstream>
  28. #include <thread>
  29. using namespace std::placeholders;
  30. namespace rtc::impl {
  31. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  32. "Number of truncated RTP packets over past second");
  33. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  34. "Number of SRTP decryption errors over past second");
  35. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  36. "Number of SRTP encryption errors over past second");
  37. static LogCounter
  38. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  39. "Number of unknown RTCP packet types over past second");
  40. const string PemBeginCertificateTag = "-----BEGIN CERTIFICATE-----";
  41. PeerConnection::PeerConnection(Configuration config_) : config(std::move(config_)) {
  42. PLOG_VERBOSE << "Creating PeerConnection";
  43. if (config.certificatePemFile && config.keyPemFile) {
  44. std::promise<certificate_ptr> cert;
  45. cert.set_value(std::make_shared<Certificate>(
  46. config.certificatePemFile->find(PemBeginCertificateTag) != string::npos
  47. ? Certificate::FromString(*config.certificatePemFile, *config.keyPemFile)
  48. : Certificate::FromFile(*config.certificatePemFile, *config.keyPemFile,
  49. config.keyPemPass.value_or(""))));
  50. mCertificate = cert.get_future();
  51. } else if (!config.certificatePemFile && !config.keyPemFile) {
  52. mCertificate = make_certificate(config.certificateType);
  53. } else {
  54. throw std::invalid_argument(
  55. "Either none or both certificate and key PEM files must be specified");
  56. }
  57. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  58. throw std::invalid_argument("Invalid port range");
  59. if (config.mtu) {
  60. if (*config.mtu < 576) // Min MTU for IPv4
  61. throw std::invalid_argument("Invalid MTU value");
  62. if (*config.mtu > 1500) { // Standard Ethernet
  63. PLOG_WARNING << "MTU set to " << *config.mtu;
  64. } else {
  65. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  66. }
  67. }
  68. }
  69. PeerConnection::~PeerConnection() {
  70. PLOG_VERBOSE << "Destroying PeerConnection";
  71. mProcessor.join();
  72. }
  73. void PeerConnection::close() {
  74. if (!closing.exchange(true)) {
  75. PLOG_VERBOSE << "Closing PeerConnection";
  76. if (auto transport = std::atomic_load(&mSctpTransport))
  77. transport->stop();
  78. else
  79. remoteClose();
  80. }
  81. }
  82. void PeerConnection::remoteClose() {
  83. close();
  84. if (state.load() != State::Closed) {
  85. // Close data channels and tracks asynchronously
  86. mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
  87. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  88. closeTransports();
  89. }
  90. }
  91. optional<Description> PeerConnection::localDescription() const {
  92. std::lock_guard lock(mLocalDescriptionMutex);
  93. return mLocalDescription;
  94. }
  95. optional<Description> PeerConnection::remoteDescription() const {
  96. std::lock_guard lock(mRemoteDescriptionMutex);
  97. return mRemoteDescription;
  98. }
  99. size_t PeerConnection::remoteMaxMessageSize() const {
  100. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  101. size_t remoteMax = DEFAULT_REMOTE_MAX_MESSAGE_SIZE;
  102. std::lock_guard lock(mRemoteDescriptionMutex);
  103. if (mRemoteDescription)
  104. if (auto *application = mRemoteDescription->application())
  105. if (auto max = application->maxMessageSize()) {
  106. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  107. // size value of zero, it indicates that the SCTP endpoint will handle messages
  108. // of any size, subject to memory capacity, etc.
  109. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  110. }
  111. return std::min(remoteMax, localMax);
  112. }
  113. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  114. template <typename T>
  115. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  116. std::atomic_store(member, transport);
  117. try {
  118. transport->start();
  119. } catch (...) {
  120. std::atomic_store(member, decltype(transport)(nullptr));
  121. throw;
  122. }
  123. if (pc->closing.load() || pc->state.load() == PeerConnection::State::Closed) {
  124. std::atomic_store(member, decltype(transport)(nullptr));
  125. transport->stop();
  126. return nullptr;
  127. }
  128. return transport;
  129. }
  130. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  131. try {
  132. if (auto transport = std::atomic_load(&mIceTransport))
  133. return transport;
  134. PLOG_VERBOSE << "Starting ICE transport";
  135. auto transport = std::make_shared<IceTransport>(
  136. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  137. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  138. auto shared_this = weak_this.lock();
  139. if (!shared_this)
  140. return;
  141. switch (transportState) {
  142. case IceTransport::State::Connecting:
  143. changeIceState(IceState::Checking);
  144. changeState(State::Connecting);
  145. break;
  146. case IceTransport::State::Connected:
  147. changeIceState(IceState::Connected);
  148. initDtlsTransport();
  149. break;
  150. case IceTransport::State::Completed:
  151. changeIceState(IceState::Completed);
  152. break;
  153. case IceTransport::State::Failed:
  154. changeIceState(IceState::Failed);
  155. changeState(State::Failed);
  156. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  157. break;
  158. case IceTransport::State::Disconnected:
  159. changeIceState(IceState::Disconnected);
  160. changeState(State::Disconnected);
  161. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  162. break;
  163. default:
  164. // Ignore
  165. break;
  166. }
  167. },
  168. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  169. auto shared_this = weak_this.lock();
  170. if (!shared_this)
  171. return;
  172. switch (gatheringState) {
  173. case IceTransport::GatheringState::InProgress:
  174. changeGatheringState(GatheringState::InProgress);
  175. break;
  176. case IceTransport::GatheringState::Complete:
  177. endLocalCandidates();
  178. changeGatheringState(GatheringState::Complete);
  179. break;
  180. default:
  181. // Ignore
  182. break;
  183. }
  184. });
  185. return emplaceTransport(this, &mIceTransport, std::move(transport));
  186. } catch (const std::exception &e) {
  187. PLOG_ERROR << e.what();
  188. changeState(State::Failed);
  189. throw std::runtime_error("ICE transport initialization failed");
  190. }
  191. }
  192. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  193. try {
  194. if (auto transport = std::atomic_load(&mDtlsTransport))
  195. return transport;
  196. PLOG_VERBOSE << "Starting DTLS transport";
  197. CertificateFingerprint::Algorithm fingerprintAlgorithm;
  198. {
  199. std::lock_guard lock(mRemoteDescriptionMutex);
  200. if (mRemoteDescription && mRemoteDescription->fingerprint()) {
  201. mRemoteFingerprintAlgorithm = mRemoteDescription->fingerprint()->algorithm;
  202. }
  203. fingerprintAlgorithm = mRemoteFingerprintAlgorithm;
  204. }
  205. auto lower = std::atomic_load(&mIceTransport);
  206. if (!lower)
  207. throw std::logic_error("No underlying ICE transport for DTLS transport");
  208. auto certificate = mCertificate.get();
  209. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  210. auto dtlsStateChangeCallback =
  211. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  212. auto shared_this = weak_this.lock();
  213. if (!shared_this)
  214. return;
  215. switch (transportState) {
  216. case DtlsTransport::State::Connected:
  217. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  218. initSctpTransport();
  219. else
  220. changeState(State::Connected);
  221. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  222. break;
  223. case DtlsTransport::State::Failed:
  224. changeState(State::Failed);
  225. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  226. break;
  227. case DtlsTransport::State::Disconnected:
  228. changeState(State::Disconnected);
  229. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  230. break;
  231. default:
  232. // Ignore
  233. break;
  234. }
  235. };
  236. shared_ptr<DtlsTransport> transport;
  237. auto local = localDescription();
  238. if (config.forceMediaTransport || (local && local->hasAudioOrVideo())) {
  239. #if RTC_ENABLE_MEDIA
  240. PLOG_INFO << "This connection requires media support";
  241. // DTLS-SRTP
  242. transport = std::make_shared<DtlsSrtpTransport>(
  243. lower, certificate, config.mtu, fingerprintAlgorithm, verifierCallback,
  244. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  245. #else
  246. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  247. #endif
  248. }
  249. if (!transport) {
  250. // DTLS only
  251. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  252. fingerprintAlgorithm, verifierCallback,
  253. dtlsStateChangeCallback);
  254. }
  255. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  256. } catch (const std::exception &e) {
  257. PLOG_ERROR << e.what();
  258. changeState(State::Failed);
  259. throw std::runtime_error("DTLS transport initialization failed");
  260. }
  261. }
  262. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  263. try {
  264. if (auto transport = std::atomic_load(&mSctpTransport))
  265. return transport;
  266. PLOG_VERBOSE << "Starting SCTP transport";
  267. auto lower = std::atomic_load(&mDtlsTransport);
  268. if (!lower)
  269. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  270. auto local = localDescription();
  271. if (!local || !local->application())
  272. throw std::logic_error("Starting SCTP transport without local application description");
  273. auto remote = remoteDescription();
  274. if (!remote || !remote->application())
  275. throw std::logic_error(
  276. "Starting SCTP transport without remote application description");
  277. SctpTransport::Ports ports = {};
  278. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  279. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  280. auto transport = std::make_shared<SctpTransport>(
  281. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  282. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  283. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  284. auto shared_this = weak_this.lock();
  285. if (!shared_this)
  286. return;
  287. switch (transportState) {
  288. case SctpTransport::State::Connected:
  289. changeState(State::Connected);
  290. assignDataChannels();
  291. mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
  292. break;
  293. case SctpTransport::State::Failed:
  294. changeState(State::Failed);
  295. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  296. break;
  297. case SctpTransport::State::Disconnected:
  298. changeState(State::Disconnected);
  299. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  300. break;
  301. default:
  302. // Ignore
  303. break;
  304. }
  305. });
  306. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  307. } catch (const std::exception &e) {
  308. PLOG_ERROR << e.what();
  309. changeState(State::Failed);
  310. throw std::runtime_error("SCTP transport initialization failed");
  311. }
  312. }
  313. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  314. return std::atomic_load(&mIceTransport);
  315. }
  316. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  317. return std::atomic_load(&mDtlsTransport);
  318. }
  319. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  320. return std::atomic_load(&mSctpTransport);
  321. }
  322. void PeerConnection::closeTransports() {
  323. PLOG_VERBOSE << "Closing transports";
  324. // Change ICE state to sink state Closed
  325. changeIceState(IceState::Closed);
  326. // Change state to sink state Closed
  327. if (!changeState(State::Closed))
  328. return; // already closed
  329. // Reset interceptor and callbacks now that state is changed
  330. setMediaHandler(nullptr);
  331. resetCallbacks();
  332. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  333. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  334. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  335. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  336. if (sctp) {
  337. sctp->onRecv(nullptr);
  338. sctp->onBufferedAmount(nullptr);
  339. }
  340. using array = std::array<shared_ptr<Transport>, 3>;
  341. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  342. for (const auto &t : transports)
  343. if (t)
  344. t->onStateChange(nullptr);
  345. TearDownProcessor::Instance().enqueue(
  346. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  347. for (const auto &t : transports) {
  348. if (t) {
  349. t->stop();
  350. break;
  351. }
  352. }
  353. for (auto &t : transports)
  354. t.reset();
  355. });
  356. }
  357. void PeerConnection::endLocalCandidates() {
  358. std::lock_guard lock(mLocalDescriptionMutex);
  359. if (mLocalDescription)
  360. mLocalDescription->endCandidates();
  361. }
  362. void PeerConnection::rollbackLocalDescription() {
  363. PLOG_DEBUG << "Rolling back pending local description";
  364. std::unique_lock lock(mLocalDescriptionMutex);
  365. if (mCurrentLocalDescription) {
  366. std::vector<Candidate> existingCandidates;
  367. if (mLocalDescription)
  368. existingCandidates = mLocalDescription->extractCandidates();
  369. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  370. mLocalDescription->addCandidates(std::move(existingCandidates));
  371. mCurrentLocalDescription.reset();
  372. }
  373. }
  374. bool PeerConnection::checkFingerprint(const std::string &fingerprint) {
  375. std::lock_guard lock(mRemoteDescriptionMutex);
  376. mRemoteFingerprint = fingerprint;
  377. if (!mRemoteDescription || !mRemoteDescription->fingerprint()
  378. || mRemoteFingerprintAlgorithm != mRemoteDescription->fingerprint()->algorithm)
  379. return false;
  380. if (config.disableFingerprintVerification) {
  381. PLOG_VERBOSE << "Skipping fingerprint validation";
  382. return true;
  383. }
  384. auto expectedFingerprint = mRemoteDescription->fingerprint()->value;
  385. if (expectedFingerprint == fingerprint) {
  386. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  387. return true;
  388. }
  389. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  390. << expectedFingerprint << "\"";
  391. return false;
  392. }
  393. void PeerConnection::forwardMessage(message_ptr message) {
  394. if (!message) {
  395. remoteCloseDataChannels();
  396. return;
  397. }
  398. auto iceTransport = std::atomic_load(&mIceTransport);
  399. auto sctpTransport = std::atomic_load(&mSctpTransport);
  400. if (!iceTransport || !sctpTransport)
  401. return;
  402. const uint16_t stream = uint16_t(message->stream);
  403. auto [channel, found] = findDataChannel(stream);
  404. if (DataChannel::IsOpenMessage(message)) {
  405. if (found) {
  406. // The stream is already used, the receiver must close the DataChannel
  407. PLOG_WARNING << "Got open message on already used stream " << stream;
  408. if (channel && !channel->isClosed())
  409. channel->close();
  410. else
  411. sctpTransport->closeStream(message->stream);
  412. return;
  413. }
  414. const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  415. if (stream % 2 != remoteParity) {
  416. // The odd/even rule is violated, the receiver must close the DataChannel
  417. PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
  418. sctpTransport->closeStream(message->stream);
  419. return;
  420. }
  421. channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport);
  422. channel->assignStream(stream);
  423. channel->openCallback =
  424. weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
  425. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  426. mDataChannels.emplace(stream, channel);
  427. } else if (!found) {
  428. if (message->type == Message::Reset)
  429. return; // ignore
  430. // Invalid, close the DataChannel
  431. PLOG_WARNING << "Got unexpected message on stream " << stream;
  432. sctpTransport->closeStream(message->stream);
  433. return;
  434. }
  435. if (message->type == Message::Reset) {
  436. // Incoming stream is reset, unregister it
  437. removeDataChannel(stream);
  438. }
  439. if (channel) {
  440. // Forward the message
  441. channel->incoming(message);
  442. } else {
  443. // DataChannel was destroyed, ignore
  444. PLOG_DEBUG << "Ignored message on stream " << stream << ", DataChannel is destroyed";
  445. }
  446. }
  447. void PeerConnection::forwardMedia([[maybe_unused]] message_ptr message) {
  448. #if RTC_ENABLE_MEDIA
  449. if (!message)
  450. return;
  451. // TODO: outgoing
  452. if (auto handler = getMediaHandler()) {
  453. message_vector messages{std::move(message)};
  454. try {
  455. handler->incomingChain(messages, [this](message_ptr message) {
  456. auto transport = std::atomic_load(&mDtlsTransport);
  457. if (auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport))
  458. srtpTransport->send(std::move(message));
  459. });
  460. } catch(const std::exception &e) {
  461. PLOG_WARNING << "Exception in global incoming media handler: " << e.what();
  462. return;
  463. }
  464. for (auto &m : messages)
  465. dispatchMedia(std::move(m));
  466. } else {
  467. dispatchMedia(std::move(message));
  468. }
  469. #endif
  470. }
  471. void PeerConnection::dispatchMedia([[maybe_unused]] message_ptr message) {
  472. #if RTC_ENABLE_MEDIA
  473. std::shared_lock lock(mTracksMutex); // read-only
  474. if (mTrackLines.size() == 1) {
  475. if (auto track = mTrackLines.front().lock())
  476. track->incoming(message);
  477. return;
  478. }
  479. // Browsers like to compound their packets with a random SSRC.
  480. // we have to do this monstrosity to distribute the report blocks
  481. if (message->type == Message::Control) {
  482. std::set<uint32_t> ssrcs;
  483. size_t offset = 0;
  484. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  485. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  486. if (header->lengthInBytes() > message->size() - offset) {
  487. COUNTER_MEDIA_TRUNCATED++;
  488. break;
  489. }
  490. offset += header->lengthInBytes();
  491. if (header->payloadType() == 205 || header->payloadType() == 206) {
  492. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  493. ssrcs.insert(rtcpfb->packetSenderSSRC());
  494. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  495. } else if (header->payloadType() == 200) {
  496. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  497. ssrcs.insert(rtcpsr->senderSSRC());
  498. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  499. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  500. } else if (header->payloadType() == 201) {
  501. auto rtcprr = reinterpret_cast<RtcpRr *>(header);
  502. ssrcs.insert(rtcprr->senderSSRC());
  503. for (int i = 0; i < rtcprr->header.reportCount(); ++i)
  504. ssrcs.insert(rtcprr->getReportBlock(i)->getSSRC());
  505. } else if (header->payloadType() == 202) {
  506. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  507. if (!sdes->isValid()) {
  508. PLOG_WARNING << "RTCP SDES packet is invalid";
  509. continue;
  510. }
  511. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  512. auto chunk = sdes->getChunk(i);
  513. ssrcs.insert(chunk->ssrc());
  514. }
  515. } else {
  516. // PT=203 == Goodbye
  517. // PT=204 == Application Specific
  518. // PT=207 == Extended Report
  519. if (header->payloadType() != 203 && header->payloadType() != 204 &&
  520. header->payloadType() != 207) {
  521. COUNTER_UNKNOWN_PACKET_TYPE++;
  522. }
  523. }
  524. }
  525. if (!ssrcs.empty()) {
  526. for (uint32_t ssrc : ssrcs) {
  527. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  528. if (auto track = it->second.lock())
  529. track->incoming(message);
  530. }
  531. }
  532. return;
  533. }
  534. }
  535. uint32_t ssrc = uint32_t(message->stream);
  536. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  537. if (auto track = it->second.lock())
  538. track->incoming(message);
  539. } else {
  540. /*
  541. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  542. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  543. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  544. * this ideal? No! Do I know how to fix it? No!
  545. */
  546. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  547. return;
  548. }
  549. #endif
  550. }
  551. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  552. [[maybe_unused]] auto [channel, found] = findDataChannel(stream);
  553. if (channel)
  554. channel->triggerBufferedAmount(amount);
  555. }
  556. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  557. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  558. // If the DataChannel is user-negotiated, do not negotiate it in-band
  559. auto channel =
  560. init.negotiated
  561. ? std::make_shared<DataChannel>(weak_from_this(), std::move(label),
  562. std::move(init.protocol), std::move(init.reliability))
  563. : std::make_shared<OutgoingDataChannel>(weak_from_this(), std::move(label),
  564. std::move(init.protocol),
  565. std::move(init.reliability));
  566. // If the user supplied a stream id, use it, otherwise assign it later
  567. if (init.id) {
  568. uint16_t stream = *init.id;
  569. if (stream > maxDataChannelStream())
  570. throw std::invalid_argument("DataChannel stream id is too high");
  571. channel->assignStream(stream);
  572. mDataChannels.emplace(std::make_pair(stream, channel));
  573. } else {
  574. mUnassignedDataChannels.push_back(channel);
  575. }
  576. lock.unlock(); // we are going to call assignDataChannels()
  577. // If SCTP is connected, assign and open now
  578. auto sctpTransport = std::atomic_load(&mSctpTransport);
  579. if (sctpTransport && sctpTransport->state() == SctpTransport::State::Connected) {
  580. assignDataChannels();
  581. channel->open(sctpTransport);
  582. }
  583. return channel;
  584. }
  585. std::pair<shared_ptr<DataChannel>, bool> PeerConnection::findDataChannel(uint16_t stream) {
  586. std::shared_lock lock(mDataChannelsMutex); // read-only
  587. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  588. return std::make_pair(it->second.lock(), true);
  589. else
  590. return std::make_pair(nullptr, false);
  591. }
  592. bool PeerConnection::removeDataChannel(uint16_t stream) {
  593. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  594. return mDataChannels.erase(stream) != 0;
  595. }
  596. uint16_t PeerConnection::maxDataChannelStream() const {
  597. auto sctpTransport = std::atomic_load(&mSctpTransport);
  598. return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
  599. }
  600. void PeerConnection::assignDataChannels() {
  601. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  602. auto iceTransport = std::atomic_load(&mIceTransport);
  603. if (!iceTransport)
  604. throw std::logic_error("Attempted to assign DataChannels without ICE transport");
  605. const uint16_t maxStream = maxDataChannelStream();
  606. for (auto it = mUnassignedDataChannels.begin(); it != mUnassignedDataChannels.end(); ++it) {
  607. auto channel = it->lock();
  608. if (!channel)
  609. continue;
  610. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier
  611. // for which the corresponding incoming and outgoing streams are unused. If the side is
  612. // acting as the DTLS client, it MUST choose an even stream identifier; if the side is
  613. // acting as the DTLS server, it MUST choose an odd one. See
  614. // https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  615. uint16_t stream = (iceTransport->role() == Description::Role::Active) ? 0 : 1;
  616. while (true) {
  617. if (stream > maxStream)
  618. throw std::runtime_error("Too many DataChannels");
  619. if (mDataChannels.find(stream) == mDataChannels.end())
  620. break;
  621. stream += 2;
  622. }
  623. PLOG_DEBUG << "Assigning stream " << stream << " to DataChannel";
  624. channel->assignStream(stream);
  625. mDataChannels.emplace(std::make_pair(stream, channel));
  626. }
  627. mUnassignedDataChannels.clear();
  628. }
  629. void PeerConnection::iterateDataChannels(
  630. std::function<void(shared_ptr<DataChannel> channel)> func) {
  631. std::vector<shared_ptr<DataChannel>> locked;
  632. {
  633. std::shared_lock lock(mDataChannelsMutex); // read-only
  634. locked.reserve(mDataChannels.size());
  635. for (auto it = mDataChannels.begin(); it != mDataChannels.end(); ++it) {
  636. auto channel = it->second.lock();
  637. if (channel && !channel->isClosed())
  638. locked.push_back(std::move(channel));
  639. }
  640. }
  641. for (auto &channel : locked) {
  642. try {
  643. func(std::move(channel));
  644. } catch (const std::exception &e) {
  645. PLOG_WARNING << e.what();
  646. }
  647. }
  648. }
  649. void PeerConnection::openDataChannels() {
  650. if (auto transport = std::atomic_load(&mSctpTransport))
  651. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  652. if (!channel->isOpen())
  653. channel->open(transport);
  654. });
  655. }
  656. void PeerConnection::closeDataChannels() {
  657. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  658. }
  659. void PeerConnection::remoteCloseDataChannels() {
  660. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  661. }
  662. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  663. std::unique_lock lock(mTracksMutex); // we are going to emplace
  664. #if !RTC_ENABLE_MEDIA
  665. // No media support, mark as removed
  666. PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
  667. description.markRemoved();
  668. #endif
  669. shared_ptr<Track> track;
  670. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  671. if (auto t = it->second.lock(); t && !t->isClosed())
  672. track = std::move(t);
  673. if (track) {
  674. track->setDescription(std::move(description));
  675. } else {
  676. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  677. mTracks.emplace(std::make_pair(track->mid(), track));
  678. mTrackLines.emplace_back(track);
  679. }
  680. auto handler = getMediaHandler();
  681. if (handler)
  682. handler->media(track->description());
  683. if (track->description().isRemoved())
  684. track->close();
  685. return track;
  686. }
  687. void PeerConnection::iterateTracks(std::function<void(shared_ptr<Track> track)> func) {
  688. std::vector<shared_ptr<Track>> locked;
  689. {
  690. std::shared_lock lock(mTracksMutex); // read-only
  691. locked.reserve(mTrackLines.size());
  692. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  693. auto track = it->lock();
  694. if (track && !track->isClosed())
  695. locked.push_back(std::move(track));
  696. }
  697. }
  698. for (auto &track : locked) {
  699. try {
  700. func(std::move(track));
  701. } catch (const std::exception &e) {
  702. PLOG_WARNING << e.what();
  703. }
  704. }
  705. }
  706. void PeerConnection::iterateRemoteTracks(std::function<void(shared_ptr<Track> track)> func) {
  707. auto remote = remoteDescription();
  708. if(!remote)
  709. return;
  710. std::vector<shared_ptr<Track>> locked;
  711. {
  712. std::shared_lock lock(mTracksMutex); // read-only
  713. locked.reserve(remote->mediaCount());
  714. for(int i = 0; i < remote->mediaCount(); ++i) {
  715. auto media = remote->media(i);
  716. if (std::holds_alternative<Description::Media *>(media)) {
  717. auto remoteMedia = std::get<Description::Media *>(media);
  718. if (!remoteMedia->isRemoved())
  719. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end())
  720. if (auto track = it->second.lock())
  721. locked.push_back(std::move(track));
  722. }
  723. }
  724. }
  725. for (auto &track : locked) {
  726. try {
  727. func(std::move(track));
  728. } catch (const std::exception &e) {
  729. PLOG_WARNING << e.what();
  730. }
  731. }
  732. }
  733. void PeerConnection::openTracks() {
  734. #if RTC_ENABLE_MEDIA
  735. auto transport = std::atomic_load(&mDtlsTransport);
  736. if (!transport)
  737. return;
  738. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  739. iterateRemoteTracks([&](shared_ptr<Track> track) {
  740. if(!track->isOpen()) {
  741. if (srtpTransport) {
  742. track->open(srtpTransport);
  743. } else {
  744. // A track was added during a latter renegotiation, whereas SRTP transport was
  745. // not initialized. This is an optimization to use the library with data
  746. // channels only. Set forceMediaTransport to true to initialize the transport
  747. // before dynamically adding tracks.
  748. auto errorMsg = "The connection has no media transport";
  749. PLOG_ERROR << errorMsg;
  750. track->triggerError(errorMsg);
  751. }
  752. }
  753. });
  754. #endif
  755. }
  756. void PeerConnection::closeTracks() {
  757. std::shared_lock lock(mTracksMutex); // read-only
  758. iterateTracks([&](shared_ptr<Track> track) { track->close(); });
  759. }
  760. void PeerConnection::validateRemoteDescription(const Description &description) {
  761. if (!description.iceUfrag())
  762. throw std::invalid_argument("Remote description has no ICE user fragment");
  763. if (!description.icePwd())
  764. throw std::invalid_argument("Remote description has no ICE password");
  765. if (!description.fingerprint())
  766. throw std::invalid_argument("Remote description has no valid fingerprint");
  767. if (description.mediaCount() == 0)
  768. throw std::invalid_argument("Remote description has no media line");
  769. int activeMediaCount = 0;
  770. for (int i = 0; i < description.mediaCount(); ++i)
  771. std::visit(rtc::overloaded{[&](const Description::Application *application) {
  772. if (!application->isRemoved())
  773. ++activeMediaCount;
  774. },
  775. [&](const Description::Media *media) {
  776. if (!media->isRemoved() ||
  777. media->direction() != Description::Direction::Inactive)
  778. ++activeMediaCount;
  779. }},
  780. description.media(i));
  781. if (activeMediaCount == 0)
  782. throw std::invalid_argument("Remote description has no active media");
  783. PLOG_VERBOSE << "Remote description looks valid";
  784. }
  785. void PeerConnection::populateLocalDescription(Description &description) const {
  786. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  787. const size_t localMaxMessageSize =
  788. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  789. // Clean up the application entry the ICE transport might have added already (libnice)
  790. description.clearMedia();
  791. if (auto remote = remoteDescription()) {
  792. // Reciprocate remote description
  793. for (int i = 0; i < remote->mediaCount(); ++i) {
  794. std::visit( // reciprocate each media
  795. rtc::overloaded{
  796. [&](Description::Application *remoteApp) {
  797. std::shared_lock lock(mDataChannelsMutex);
  798. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  799. // Prefer local description
  800. Description::Application app(remoteApp->mid());
  801. app.setSctpPort(localSctpPort);
  802. app.setMaxMessageSize(localMaxMessageSize);
  803. PLOG_DEBUG << "Adding application to local description, mid=\""
  804. << app.mid() << "\"";
  805. description.addMedia(std::move(app));
  806. } else {
  807. auto reciprocated = remoteApp->reciprocate();
  808. reciprocated.hintSctpPort(localSctpPort);
  809. reciprocated.setMaxMessageSize(localMaxMessageSize);
  810. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  811. << reciprocated.mid() << "\"";
  812. description.addMedia(std::move(reciprocated));
  813. }
  814. },
  815. [&](Description::Media *remoteMedia) {
  816. std::shared_lock lock(mTracksMutex);
  817. auto it = mTracks.find(remoteMedia->mid());
  818. auto track = it != mTracks.end() ? it->second.lock() : nullptr;
  819. if(track) {
  820. // Prefer local description
  821. auto media = track->description();
  822. PLOG_DEBUG << "Adding media to local description, mid=\""
  823. << media.mid() << "\", removed=" << std::boolalpha
  824. << media.isRemoved();
  825. description.addMedia(std::move(media));
  826. } else {
  827. auto reciprocated = remoteMedia->reciprocate();
  828. reciprocated.markRemoved();
  829. PLOG_DEBUG << "Adding media to local description, mid=\""
  830. << reciprocated.mid()
  831. << "\", removed=true (track is destroyed)";
  832. description.addMedia(std::move(reciprocated));
  833. }
  834. },
  835. },
  836. remote->media(i));
  837. }
  838. }
  839. if (description.type() == Description::Type::Offer) {
  840. // This is an offer, add locally created data channels and tracks
  841. // Add media for local tracks
  842. std::shared_lock lock(mTracksMutex);
  843. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  844. if (auto track = it->lock()) {
  845. if (description.hasMid(track->mid()))
  846. continue;
  847. auto media = track->description();
  848. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  849. << "\", removed=" << std::boolalpha << media.isRemoved();
  850. description.addMedia(std::move(media));
  851. }
  852. }
  853. // Add application for data channels
  854. if (!description.hasApplication()) {
  855. std::shared_lock lock(mDataChannelsMutex);
  856. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  857. // Prevents mid collision with remote or local tracks
  858. unsigned int m = 0;
  859. while (description.hasMid(std::to_string(m)))
  860. ++m;
  861. Description::Application app(std::to_string(m));
  862. app.setSctpPort(localSctpPort);
  863. app.setMaxMessageSize(localMaxMessageSize);
  864. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  865. << "\"";
  866. description.addMedia(std::move(app));
  867. }
  868. }
  869. }
  870. // Set local fingerprint (wait for certificate if necessary)
  871. description.setFingerprint(mCertificate.get()->fingerprint());
  872. }
  873. void PeerConnection::processLocalDescription(Description description) {
  874. PLOG_VERBOSE << "Issuing local description: " << description;
  875. if (description.mediaCount() == 0)
  876. throw std::logic_error("Local description has no media line");
  877. // Update the SSRC cache
  878. updateTrackSsrcCache(description);
  879. {
  880. // Set as local description
  881. std::lock_guard lock(mLocalDescriptionMutex);
  882. std::vector<Candidate> existingCandidates;
  883. if (mLocalDescription) {
  884. existingCandidates = mLocalDescription->extractCandidates();
  885. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  886. }
  887. mLocalDescription.emplace(description);
  888. mLocalDescription->addCandidates(std::move(existingCandidates));
  889. }
  890. mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
  891. &localDescriptionCallback, std::move(description));
  892. }
  893. void PeerConnection::processLocalCandidate(Candidate candidate) {
  894. std::lock_guard lock(mLocalDescriptionMutex);
  895. if (!mLocalDescription)
  896. throw std::logic_error("Got a local candidate without local description");
  897. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  898. candidate.type() != Candidate::Type::Relayed) {
  899. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  900. return;
  901. }
  902. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  903. candidate.resolve(Candidate::ResolveMode::Simple);
  904. mLocalDescription->addCandidate(candidate);
  905. mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
  906. &localCandidateCallback, std::move(candidate));
  907. }
  908. void PeerConnection::processRemoteDescription(Description description) {
  909. // Create tracks from remote description
  910. for (int i = 0; i < description.mediaCount(); ++i) {
  911. auto media = description.media(i);
  912. if (std::holds_alternative<Description::Media *>(media)) {
  913. auto remoteMedia = std::get<Description::Media *>(media);
  914. std::unique_lock lock(mTracksMutex); // we may emplace a track
  915. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end())
  916. continue;
  917. PLOG_DEBUG << "New remote track, mid=\"" << remoteMedia->mid() << "\"";
  918. auto reciprocated = remoteMedia->reciprocate();
  919. #if !RTC_ENABLE_MEDIA
  920. if (!reciprocated.isRemoved()) {
  921. // No media support, mark as removed
  922. PLOG_WARNING << "Rejecting track (not compiled with media support)";
  923. reciprocated.markRemoved();
  924. }
  925. #endif
  926. // Create incoming track
  927. auto track = std::make_shared<Track>(weak_from_this(), std::move(reciprocated));
  928. mTracks.emplace(std::make_pair(track->mid(), track));
  929. mTrackLines.emplace_back(track);
  930. triggerTrack(track); // The user may modify the track description
  931. auto handler = getMediaHandler();
  932. if (handler)
  933. handler->media(track->description());
  934. if (track->description().isRemoved())
  935. track->close();
  936. }
  937. }
  938. // Update the SSRC cache for existing tracks
  939. updateTrackSsrcCache(description);
  940. {
  941. // Set as remote description
  942. std::lock_guard lock(mRemoteDescriptionMutex);
  943. std::vector<Candidate> existingCandidates;
  944. if (mRemoteDescription)
  945. existingCandidates = mRemoteDescription->extractCandidates();
  946. mRemoteDescription.emplace(description);
  947. mRemoteDescription->addCandidates(std::move(existingCandidates));
  948. }
  949. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  950. if (description.hasApplication()) {
  951. auto sctpTransport = std::atomic_load(&mSctpTransport);
  952. if (!sctpTransport && dtlsTransport &&
  953. dtlsTransport->state() == Transport::State::Connected)
  954. initSctpTransport();
  955. } else {
  956. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
  957. }
  958. // Reciprocated tracks might need to be open
  959. if (dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  960. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  961. }
  962. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  963. auto iceTransport = std::atomic_load(&mIceTransport);
  964. {
  965. // Set as remote candidate
  966. std::lock_guard lock(mRemoteDescriptionMutex);
  967. if (!mRemoteDescription)
  968. throw std::logic_error("Got a remote candidate without remote description");
  969. if (!iceTransport)
  970. throw std::logic_error("Got a remote candidate without ICE transport");
  971. candidate.hintMid(mRemoteDescription->bundleMid());
  972. if (mRemoteDescription->hasCandidate(candidate))
  973. return; // already in description, ignore
  974. candidate.resolve(Candidate::ResolveMode::Simple);
  975. mRemoteDescription->addCandidate(candidate);
  976. }
  977. if (candidate.isResolved()) {
  978. iceTransport->addRemoteCandidate(std::move(candidate));
  979. } else {
  980. // We might need a lookup, do it asynchronously
  981. // We don't use the thread pool because we have no control on the timeout
  982. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  983. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  984. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  985. utils::this_thread::set_name("RTC resolver");
  986. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  987. if (auto iceTransport = weakIceTransport.lock())
  988. iceTransport->addRemoteCandidate(std::move(candidate));
  989. });
  990. t.detach();
  991. }
  992. }
  993. }
  994. string PeerConnection::localBundleMid() const {
  995. std::lock_guard lock(mLocalDescriptionMutex);
  996. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  997. }
  998. bool PeerConnection::negotiationNeeded() const {
  999. auto description = localDescription();
  1000. {
  1001. std::shared_lock lock(mDataChannelsMutex);
  1002. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty())
  1003. if(!description || !description->hasApplication()) {
  1004. PLOG_DEBUG << "Negotiation needed for data channels";
  1005. return true;
  1006. }
  1007. }
  1008. {
  1009. std::shared_lock lock(mTracksMutex);
  1010. for(const auto &[mid, weakTrack] : mTracks)
  1011. if (auto track = weakTrack.lock())
  1012. if (!description || !description->hasMid(track->mid())) {
  1013. PLOG_DEBUG << "Negotiation needed to add track, mid=" << track->mid();
  1014. return true;
  1015. }
  1016. if(description) {
  1017. for(int i = 0; i < description->mediaCount(); ++i) {
  1018. if (std::holds_alternative<Description::Media *>(description->media(i))) {
  1019. auto media = std::get<Description::Media *>(description->media(i));
  1020. if (!media->isRemoved())
  1021. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1022. if (auto track = it->second.lock(); !track || track->isClosed()) {
  1023. PLOG_DEBUG << "Negotiation needed to remove track, mid=" << media->mid();
  1024. return true;
  1025. }
  1026. }
  1027. }
  1028. }
  1029. }
  1030. return false;
  1031. }
  1032. void PeerConnection::setMediaHandler(shared_ptr<MediaHandler> handler) {
  1033. std::unique_lock lock(mMediaHandlerMutex);
  1034. mMediaHandler = handler;
  1035. }
  1036. shared_ptr<MediaHandler> PeerConnection::getMediaHandler() const {
  1037. std::shared_lock lock(mMediaHandlerMutex);
  1038. return mMediaHandler;
  1039. }
  1040. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  1041. auto dataChannel = weakDataChannel.lock();
  1042. if (dataChannel) {
  1043. dataChannel->resetOpenCallback(); // might be set internally
  1044. mPendingDataChannels.push(std::move(dataChannel));
  1045. }
  1046. triggerPendingDataChannels();
  1047. }
  1048. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  1049. auto track = weakTrack.lock();
  1050. if (track) {
  1051. track->resetOpenCallback(); // might be set internally
  1052. mPendingTracks.push(std::move(track));
  1053. }
  1054. triggerPendingTracks();
  1055. }
  1056. void PeerConnection::triggerPendingDataChannels() {
  1057. while (dataChannelCallback) {
  1058. auto next = mPendingDataChannels.pop();
  1059. if (!next)
  1060. break;
  1061. auto impl = std::move(*next);
  1062. try {
  1063. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  1064. } catch (const std::exception &e) {
  1065. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  1066. }
  1067. impl->triggerOpen();
  1068. }
  1069. }
  1070. void PeerConnection::triggerPendingTracks() {
  1071. while (trackCallback) {
  1072. auto next = mPendingTracks.pop();
  1073. if (!next)
  1074. break;
  1075. auto impl = std::move(*next);
  1076. try {
  1077. trackCallback(std::make_shared<rtc::Track>(impl));
  1078. } catch (const std::exception &e) {
  1079. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  1080. }
  1081. // Do not trigger open immediately for tracks as it'll be done later
  1082. }
  1083. }
  1084. void PeerConnection::flushPendingDataChannels() {
  1085. mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
  1086. }
  1087. void PeerConnection::flushPendingTracks() {
  1088. mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
  1089. }
  1090. bool PeerConnection::changeState(State newState) {
  1091. State current;
  1092. do {
  1093. current = state.load();
  1094. if (current == State::Closed)
  1095. return false;
  1096. if (current == newState)
  1097. return false;
  1098. } while (!state.compare_exchange_weak(current, newState));
  1099. std::ostringstream s;
  1100. s << newState;
  1101. PLOG_INFO << "Changed state to " << s.str();
  1102. if (newState == State::Closed) {
  1103. auto callback = std::move(stateChangeCallback); // steal the callback
  1104. callback(State::Closed); // call it synchronously
  1105. } else {
  1106. mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
  1107. &stateChangeCallback, newState);
  1108. }
  1109. return true;
  1110. }
  1111. bool PeerConnection::changeIceState(IceState newState) {
  1112. if (iceState.exchange(newState) == newState)
  1113. return false;
  1114. std::ostringstream s;
  1115. s << newState;
  1116. PLOG_INFO << "Changed ICE state to " << s.str();
  1117. if (newState == IceState::Closed) {
  1118. auto callback = std::move(iceStateChangeCallback); // steal the callback
  1119. callback(IceState::Closed); // call it synchronously
  1120. } else {
  1121. mProcessor.enqueue(&PeerConnection::trigger<IceState>, shared_from_this(),
  1122. &iceStateChangeCallback, newState);
  1123. }
  1124. return true;
  1125. }
  1126. bool PeerConnection::changeGatheringState(GatheringState newState) {
  1127. if (gatheringState.exchange(newState) == newState)
  1128. return false;
  1129. std::ostringstream s;
  1130. s << newState;
  1131. PLOG_INFO << "Changed gathering state to " << s.str();
  1132. mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
  1133. &gatheringStateChangeCallback, newState);
  1134. return true;
  1135. }
  1136. bool PeerConnection::changeSignalingState(SignalingState newState) {
  1137. if (signalingState.exchange(newState) == newState)
  1138. return false;
  1139. std::ostringstream s;
  1140. s << newState;
  1141. PLOG_INFO << "Changed signaling state to " << s.str();
  1142. mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
  1143. &signalingStateChangeCallback, newState);
  1144. return true;
  1145. }
  1146. void PeerConnection::resetCallbacks() {
  1147. // Unregister all callbacks
  1148. dataChannelCallback = nullptr;
  1149. localDescriptionCallback = nullptr;
  1150. localCandidateCallback = nullptr;
  1151. stateChangeCallback = nullptr;
  1152. iceStateChangeCallback = nullptr;
  1153. gatheringStateChangeCallback = nullptr;
  1154. signalingStateChangeCallback = nullptr;
  1155. trackCallback = nullptr;
  1156. }
  1157. CertificateFingerprint PeerConnection::remoteFingerprint() {
  1158. std::lock_guard lock(mRemoteDescriptionMutex);
  1159. if (mRemoteFingerprint)
  1160. return {CertificateFingerprint{mRemoteFingerprintAlgorithm, *mRemoteFingerprint}};
  1161. else
  1162. return {};
  1163. }
  1164. void PeerConnection::updateTrackSsrcCache(const Description &description) {
  1165. std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc
  1166. // Setup SSRC -> Track mapping
  1167. for (int i = 0; i < description.mediaCount(); ++i)
  1168. std::visit( // ssrc -> track mapping
  1169. rtc::overloaded{
  1170. [&](Description::Application const *) { return; },
  1171. [&](Description::Media const *media) {
  1172. const auto ssrcs = media->getSSRCs();
  1173. // Note: We don't want to lock (or do any other lookups), if we
  1174. // already know there's no SSRCs to loop over.
  1175. if (ssrcs.size() <= 0) {
  1176. return;
  1177. }
  1178. std::shared_ptr<Track> track{nullptr};
  1179. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1180. if (auto track_for_mid = it->second.lock())
  1181. track = track_for_mid;
  1182. if (!track) {
  1183. // Unable to find track for MID
  1184. return;
  1185. }
  1186. for (auto ssrc : ssrcs) {
  1187. mTracksBySsrc.insert_or_assign(ssrc, track);
  1188. }
  1189. },
  1190. },
  1191. description.media(i));
  1192. }
  1193. } // namespace rtc::impl