peerconnection.cpp 40 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "icetransport.hpp"
  24. #include "internals.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #if RTC_ENABLE_MEDIA
  31. #include "dtlssrtptransport.hpp"
  32. #endif
  33. #include <algorithm>
  34. #include <array>
  35. #include <iomanip>
  36. #include <set>
  37. #include <sstream>
  38. #include <thread>
  39. using namespace std::placeholders;
  40. namespace rtc::impl {
  41. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  42. "Number of truncated RTP packets over past second");
  43. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  44. "Number of SRTP decryption errors over past second");
  45. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  46. "Number of SRTP encryption errors over past second");
  47. static LogCounter
  48. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  49. "Number of unknown RTCP packet types over past second");
  50. PeerConnection::PeerConnection(Configuration config_)
  51. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)) {
  52. PLOG_VERBOSE << "Creating PeerConnection";
  53. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  54. throw std::invalid_argument("Invalid port range");
  55. if (config.mtu) {
  56. if (*config.mtu < 576) // Min MTU for IPv4
  57. throw std::invalid_argument("Invalid MTU value");
  58. if (*config.mtu > 1500) { // Standard Ethernet
  59. PLOG_WARNING << "MTU set to " << *config.mtu;
  60. } else {
  61. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  62. }
  63. }
  64. }
  65. PeerConnection::~PeerConnection() {
  66. PLOG_VERBOSE << "Destroying PeerConnection";
  67. mProcessor.join();
  68. }
  69. void PeerConnection::close() {
  70. PLOG_VERBOSE << "Closing PeerConnection";
  71. negotiationNeeded = false;
  72. // Close data channels and tracks asynchronously
  73. mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
  74. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  75. closeTransports();
  76. }
  77. optional<Description> PeerConnection::localDescription() const {
  78. std::lock_guard lock(mLocalDescriptionMutex);
  79. return mLocalDescription;
  80. }
  81. optional<Description> PeerConnection::remoteDescription() const {
  82. std::lock_guard lock(mRemoteDescriptionMutex);
  83. return mRemoteDescription;
  84. }
  85. size_t PeerConnection::remoteMaxMessageSize() const {
  86. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  87. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  88. std::lock_guard lock(mRemoteDescriptionMutex);
  89. if (mRemoteDescription)
  90. if (auto *application = mRemoteDescription->application())
  91. if (auto max = application->maxMessageSize()) {
  92. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  93. // size value of zero, it indicates that the SCTP endpoint will handle messages
  94. // of any size, subject to memory capacity, etc.
  95. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  96. }
  97. return std::min(remoteMax, localMax);
  98. }
  99. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  100. template <typename T>
  101. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  102. std::atomic_store(member, transport);
  103. try {
  104. transport->start();
  105. } catch (...) {
  106. std::atomic_store(member, decltype(transport)(nullptr));
  107. transport->stop();
  108. throw;
  109. }
  110. if (pc->state.load() == PeerConnection::State::Closed) {
  111. std::atomic_store(member, decltype(transport)(nullptr));
  112. transport->stop();
  113. return nullptr;
  114. }
  115. return transport;
  116. }
  117. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  118. try {
  119. if (auto transport = std::atomic_load(&mIceTransport))
  120. return transport;
  121. PLOG_VERBOSE << "Starting ICE transport";
  122. auto transport = std::make_shared<IceTransport>(
  123. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  124. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  125. auto shared_this = weak_this.lock();
  126. if (!shared_this)
  127. return;
  128. switch (transportState) {
  129. case IceTransport::State::Connecting:
  130. changeState(State::Connecting);
  131. break;
  132. case IceTransport::State::Failed:
  133. changeState(State::Failed);
  134. break;
  135. case IceTransport::State::Connected:
  136. initDtlsTransport();
  137. break;
  138. case IceTransport::State::Disconnected:
  139. changeState(State::Disconnected);
  140. break;
  141. default:
  142. // Ignore
  143. break;
  144. }
  145. },
  146. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  147. auto shared_this = weak_this.lock();
  148. if (!shared_this)
  149. return;
  150. switch (gatheringState) {
  151. case IceTransport::GatheringState::InProgress:
  152. changeGatheringState(GatheringState::InProgress);
  153. break;
  154. case IceTransport::GatheringState::Complete:
  155. endLocalCandidates();
  156. changeGatheringState(GatheringState::Complete);
  157. break;
  158. default:
  159. // Ignore
  160. break;
  161. }
  162. });
  163. return emplaceTransport(this, &mIceTransport, std::move(transport));
  164. } catch (const std::exception &e) {
  165. PLOG_ERROR << e.what();
  166. changeState(State::Failed);
  167. throw std::runtime_error("ICE transport initialization failed");
  168. }
  169. }
  170. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  171. try {
  172. if (auto transport = std::atomic_load(&mDtlsTransport))
  173. return transport;
  174. PLOG_VERBOSE << "Starting DTLS transport";
  175. auto lower = std::atomic_load(&mIceTransport);
  176. if (!lower)
  177. throw std::logic_error("No underlying ICE transport for DTLS transport");
  178. auto certificate = mCertificate.get();
  179. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  180. auto dtlsStateChangeCallback =
  181. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  182. auto shared_this = weak_this.lock();
  183. if (!shared_this)
  184. return;
  185. switch (transportState) {
  186. case DtlsTransport::State::Connected:
  187. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  188. initSctpTransport();
  189. else
  190. changeState(State::Connected);
  191. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  192. break;
  193. case DtlsTransport::State::Failed:
  194. changeState(State::Failed);
  195. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  196. break;
  197. case DtlsTransport::State::Disconnected:
  198. changeState(State::Disconnected);
  199. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  200. break;
  201. default:
  202. // Ignore
  203. break;
  204. }
  205. };
  206. shared_ptr<DtlsTransport> transport;
  207. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  208. #if RTC_ENABLE_MEDIA
  209. PLOG_INFO << "This connection requires media support";
  210. // DTLS-SRTP
  211. transport = std::make_shared<DtlsSrtpTransport>(
  212. lower, certificate, config.mtu, verifierCallback,
  213. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  214. #else
  215. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  216. #endif
  217. }
  218. if (!transport) {
  219. // DTLS only
  220. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  221. verifierCallback, dtlsStateChangeCallback);
  222. }
  223. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  224. } catch (const std::exception &e) {
  225. PLOG_ERROR << e.what();
  226. changeState(State::Failed);
  227. throw std::runtime_error("DTLS transport initialization failed");
  228. }
  229. }
  230. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  231. try {
  232. if (auto transport = std::atomic_load(&mSctpTransport))
  233. return transport;
  234. PLOG_VERBOSE << "Starting SCTP transport";
  235. auto lower = std::atomic_load(&mDtlsTransport);
  236. if (!lower)
  237. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  238. auto local = localDescription();
  239. if (!local || !local->application())
  240. throw std::logic_error("Starting SCTP transport without local application description");
  241. auto remote = remoteDescription();
  242. if (!remote || !remote->application())
  243. throw std::logic_error(
  244. "Starting SCTP transport without remote application description");
  245. SctpTransport::Ports ports = {};
  246. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  247. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  248. // This is the last occasion to ensure the stream numbers are coherent with the role
  249. shiftDataChannels();
  250. auto transport = std::make_shared<SctpTransport>(
  251. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  252. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  253. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  254. auto shared_this = weak_this.lock();
  255. if (!shared_this)
  256. return;
  257. switch (transportState) {
  258. case SctpTransport::State::Connected:
  259. changeState(State::Connected);
  260. mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
  261. break;
  262. case SctpTransport::State::Failed:
  263. LOG_WARNING << "SCTP transport failed";
  264. changeState(State::Failed);
  265. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels,
  266. shared_from_this());
  267. break;
  268. case SctpTransport::State::Disconnected:
  269. changeState(State::Disconnected);
  270. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels,
  271. shared_from_this());
  272. break;
  273. default:
  274. // Ignore
  275. break;
  276. }
  277. });
  278. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  279. } catch (const std::exception &e) {
  280. PLOG_ERROR << e.what();
  281. changeState(State::Failed);
  282. throw std::runtime_error("SCTP transport initialization failed");
  283. }
  284. }
  285. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  286. return std::atomic_load(&mIceTransport);
  287. }
  288. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  289. return std::atomic_load(&mDtlsTransport);
  290. }
  291. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  292. return std::atomic_load(&mSctpTransport);
  293. }
  294. void PeerConnection::closeTransports() {
  295. PLOG_VERBOSE << "Closing transports";
  296. // Change state to sink state Closed
  297. if (!changeState(State::Closed))
  298. return; // already closed
  299. // Reset callbacks now that state is changed
  300. resetCallbacks();
  301. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  302. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  303. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  304. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  305. if (sctp) {
  306. sctp->onRecv(nullptr);
  307. sctp->onBufferedAmount(nullptr);
  308. }
  309. using array = std::array<shared_ptr<Transport>, 3>;
  310. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  311. for (const auto &t : transports)
  312. if (t)
  313. t->onStateChange(nullptr);
  314. // Initiate transport stop on the processor after closing the data channels
  315. mProcessor.enqueue([self = shared_from_this(), transports = std::move(transports)]() {
  316. TearDownProcessor::Instance().enqueue(
  317. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  318. for (const auto &t : transports)
  319. if (t)
  320. t->stop();
  321. for (auto &t : transports)
  322. t.reset();
  323. });
  324. });
  325. }
  326. void PeerConnection::endLocalCandidates() {
  327. std::lock_guard lock(mLocalDescriptionMutex);
  328. if (mLocalDescription)
  329. mLocalDescription->endCandidates();
  330. }
  331. void PeerConnection::rollbackLocalDescription() {
  332. PLOG_DEBUG << "Rolling back pending local description";
  333. std::unique_lock lock(mLocalDescriptionMutex);
  334. if (mCurrentLocalDescription) {
  335. std::vector<Candidate> existingCandidates;
  336. if (mLocalDescription)
  337. existingCandidates = mLocalDescription->extractCandidates();
  338. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  339. mLocalDescription->addCandidates(std::move(existingCandidates));
  340. mCurrentLocalDescription.reset();
  341. }
  342. }
  343. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  344. std::lock_guard lock(mRemoteDescriptionMutex);
  345. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  346. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  347. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  348. return true;
  349. }
  350. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  351. << expectedFingerprint.value_or("[none]") << "\"";
  352. return false;
  353. }
  354. void PeerConnection::forwardMessage(message_ptr message) {
  355. if (!message) {
  356. remoteCloseDataChannels();
  357. return;
  358. }
  359. const uint16_t stream = uint16_t(message->stream);
  360. auto channel = findDataChannel(stream);
  361. if (DataChannel::IsOpenMessage(message)) {
  362. auto iceTransport = getIceTransport();
  363. auto sctpTransport = getSctpTransport();
  364. if (!iceTransport || !sctpTransport)
  365. return;
  366. const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  367. if (stream % 2 != remoteParity) {
  368. // The odd/even rule is violated, close the DataChannel
  369. PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
  370. sctpTransport->closeStream(message->stream);
  371. return;
  372. }
  373. if (channel && channel->isOpen()) {
  374. PLOG_WARNING << "Got open message on stream " << stream
  375. << " for an already open DataChannel, closing it first";
  376. channel->close();
  377. }
  378. channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport, stream);
  379. channel->openCallback =
  380. weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
  381. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  382. mDataChannels.emplace(stream, channel);
  383. }
  384. if (!channel) {
  385. if (message->type == Message::Control) // ignore control messages like Close
  386. return;
  387. // Invalid, close the DataChannel
  388. PLOG_WARNING << "Got unexpected message on stream " << stream;
  389. if (auto sctpTransport = getSctpTransport())
  390. sctpTransport->closeStream(message->stream);
  391. return;
  392. }
  393. // Forward the message
  394. channel->incoming(message);
  395. }
  396. void PeerConnection::forwardMedia(message_ptr message) {
  397. if (!message)
  398. return;
  399. // Browsers like to compound their packets with a random SSRC.
  400. // we have to do this monstrosity to distribute the report blocks
  401. if (message->type == Message::Control) {
  402. std::set<uint32_t> ssrcs;
  403. size_t offset = 0;
  404. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  405. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  406. if (header->lengthInBytes() > message->size() - offset) {
  407. COUNTER_MEDIA_TRUNCATED++;
  408. break;
  409. }
  410. offset += header->lengthInBytes();
  411. if (header->payloadType() == 205 || header->payloadType() == 206) {
  412. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  413. ssrcs.insert(rtcpfb->packetSenderSSRC());
  414. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  415. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  416. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  417. ssrcs.insert(rtcpsr->senderSSRC());
  418. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  419. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  420. } else if (header->payloadType() == 202) {
  421. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  422. if (!sdes->isValid()) {
  423. PLOG_WARNING << "RTCP SDES packet is invalid";
  424. continue;
  425. }
  426. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  427. auto chunk = sdes->getChunk(i);
  428. ssrcs.insert(chunk->ssrc());
  429. }
  430. } else {
  431. // PT=207 == Extended Report
  432. if (header->payloadType() != 207) {
  433. COUNTER_UNKNOWN_PACKET_TYPE++;
  434. }
  435. }
  436. }
  437. if (!ssrcs.empty()) {
  438. for (uint32_t ssrc : ssrcs) {
  439. if (auto mid = getMidFromSsrc(ssrc)) {
  440. std::shared_lock lock(mTracksMutex); // read-only
  441. if (auto it = mTracks.find(*mid); it != mTracks.end())
  442. if (auto track = it->second.lock())
  443. track->incoming(message);
  444. }
  445. }
  446. return;
  447. }
  448. }
  449. uint32_t ssrc = uint32_t(message->stream);
  450. if (auto mid = getMidFromSsrc(ssrc)) {
  451. std::shared_lock lock(mTracksMutex); // read-only
  452. if (auto it = mTracks.find(*mid); it != mTracks.end())
  453. if (auto track = it->second.lock())
  454. track->incoming(message);
  455. } else {
  456. /*
  457. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  458. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  459. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  460. * this ideal? No! Do I know how to fix it? No!
  461. */
  462. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  463. return;
  464. }
  465. }
  466. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  467. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  468. return it->second;
  469. {
  470. std::lock_guard lock(mRemoteDescriptionMutex);
  471. if (!mRemoteDescription)
  472. return nullopt;
  473. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  474. if (auto found =
  475. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  476. return std::nullopt;
  477. },
  478. [&](Description::Media *media) -> optional<string> {
  479. return media->hasSSRC(ssrc)
  480. ? std::make_optional(media->mid())
  481. : nullopt;
  482. }},
  483. mRemoteDescription->media(i))) {
  484. mMidFromSsrc.emplace(ssrc, *found);
  485. return *found;
  486. }
  487. }
  488. }
  489. {
  490. std::lock_guard lock(mLocalDescriptionMutex);
  491. if (!mLocalDescription)
  492. return nullopt;
  493. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  494. if (auto found =
  495. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  496. return std::nullopt;
  497. },
  498. [&](Description::Media *media) -> optional<string> {
  499. return media->hasSSRC(ssrc)
  500. ? std::make_optional(media->mid())
  501. : nullopt;
  502. }},
  503. mLocalDescription->media(i))) {
  504. mMidFromSsrc.emplace(ssrc, *found);
  505. return *found;
  506. }
  507. }
  508. }
  509. return nullopt;
  510. }
  511. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  512. if (auto channel = findDataChannel(stream))
  513. channel->triggerBufferedAmount(amount);
  514. }
  515. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  516. cleanupDataChannels();
  517. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  518. const uint16_t maxStream = maxDataChannelStream();
  519. uint16_t stream;
  520. if (init.id) {
  521. stream = *init.id;
  522. if (stream > maxStream)
  523. throw std::invalid_argument("DataChannel stream id is too high");
  524. } else {
  525. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  526. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  527. // See https://www.rfc-editor.org/rfc/rfc5763.html#section-5
  528. // Therefore, we assume passive role if we are the offerer.
  529. auto iceTransport = getIceTransport();
  530. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  531. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  532. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  533. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  534. // the DTLS server, it MUST choose an odd one.
  535. // See https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  536. stream = (role == Description::Role::Active) ? 0 : 1;
  537. while (true) {
  538. if (stream > maxStream)
  539. throw std::runtime_error("Too many DataChannels");
  540. auto it = mDataChannels.find(stream);
  541. if (it == mDataChannels.end() || !it->second.lock())
  542. break;
  543. stream += 2;
  544. }
  545. }
  546. // If the DataChannel is user-negotiated, do not negotiate it in-band
  547. auto channel =
  548. init.negotiated
  549. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  550. std::move(init.protocol), std::move(init.reliability))
  551. : std::make_shared<OutgoingDataChannel>(weak_from_this(), stream, std::move(label),
  552. std::move(init.protocol),
  553. std::move(init.reliability));
  554. mDataChannels.emplace(std::make_pair(stream, channel));
  555. return channel;
  556. }
  557. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  558. std::shared_lock lock(mDataChannelsMutex); // read-only
  559. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  560. if (auto channel = it->second.lock())
  561. return channel;
  562. return nullptr;
  563. }
  564. uint16_t PeerConnection::maxDataChannelStream() const {
  565. auto sctpTransport = std::atomic_load(&mSctpTransport);
  566. return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
  567. }
  568. void PeerConnection::shiftDataChannels() {
  569. auto iceTransport = std::atomic_load(&mIceTransport);
  570. auto sctpTransport = std::atomic_load(&mSctpTransport);
  571. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  572. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  573. decltype(mDataChannels) newDataChannels;
  574. auto it = mDataChannels.begin();
  575. while (it != mDataChannels.end()) {
  576. auto channel = it->second.lock();
  577. channel->shiftStream();
  578. newDataChannels.emplace(channel->stream(), channel);
  579. ++it;
  580. }
  581. std::swap(mDataChannels, newDataChannels);
  582. }
  583. }
  584. void PeerConnection::iterateDataChannels(
  585. std::function<void(shared_ptr<DataChannel> channel)> func) {
  586. std::vector<shared_ptr<DataChannel>> locked;
  587. {
  588. std::shared_lock lock(mDataChannelsMutex); // read-only
  589. locked.reserve(mDataChannels.size());
  590. auto it = mDataChannels.begin();
  591. while (it != mDataChannels.end()) {
  592. auto channel = it->second.lock();
  593. if (channel && !channel->isClosed())
  594. locked.push_back(std::move(channel));
  595. ++it;
  596. }
  597. }
  598. for (auto &channel : locked)
  599. func(std::move(channel));
  600. }
  601. void PeerConnection::cleanupDataChannels() {
  602. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  603. auto it = mDataChannels.begin();
  604. while (it != mDataChannels.end()) {
  605. if (!it->second.lock()) {
  606. it = mDataChannels.erase(it);
  607. continue;
  608. }
  609. ++it;
  610. }
  611. }
  612. void PeerConnection::openDataChannels() {
  613. if (auto transport = std::atomic_load(&mSctpTransport))
  614. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  615. // Check again as the maximum might have been negotiated lower
  616. if (channel->stream() <= transport->maxStream()) {
  617. channel->open(transport);
  618. } else {
  619. channel->triggerError("DataChannel stream id is too high");
  620. channel->remoteClose();
  621. }
  622. });
  623. }
  624. void PeerConnection::closeDataChannels() {
  625. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  626. }
  627. void PeerConnection::remoteCloseDataChannels() {
  628. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  629. }
  630. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  631. #if !RTC_ENABLE_MEDIA
  632. // No media support, mark as removed
  633. PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
  634. description.markRemoved();
  635. #endif
  636. shared_ptr<Track> track;
  637. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  638. if (track = it->second.lock(); track)
  639. track->setDescription(std::move(description));
  640. if (!track) {
  641. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  642. mTracks.emplace(std::make_pair(track->mid(), track));
  643. mTrackLines.emplace_back(track);
  644. }
  645. if (description.isRemoved())
  646. track->close();
  647. return track;
  648. }
  649. void PeerConnection::incomingTrack(Description::Media description) {
  650. std::unique_lock lock(mTracksMutex); // we are going to emplace
  651. shared_ptr<Track> track;
  652. if (auto it = mTracks.find(description.mid()); it != mTracks.end()) {
  653. if (track = it->second.lock(); track)
  654. track->setDescription(std::move(description));
  655. } else {
  656. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  657. mTracks.emplace(std::make_pair(track->mid(), track));
  658. mTrackLines.emplace_back(track);
  659. triggerTrack(track);
  660. }
  661. if (track && description.isRemoved())
  662. track->close();
  663. }
  664. void PeerConnection::openTracks() {
  665. #if RTC_ENABLE_MEDIA
  666. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  667. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  668. std::shared_lock lock(mTracksMutex); // read-only
  669. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it)
  670. if (auto track = it->lock())
  671. if (!track->isOpen())
  672. track->open(srtpTransport);
  673. }
  674. #endif
  675. }
  676. void PeerConnection::closeTracks() {
  677. std::shared_lock lock(mTracksMutex); // read-only
  678. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it)
  679. if (auto track = it->lock())
  680. track->close();
  681. }
  682. void PeerConnection::validateRemoteDescription(const Description &description) {
  683. if (!description.iceUfrag())
  684. throw std::invalid_argument("Remote description has no ICE user fragment");
  685. if (!description.icePwd())
  686. throw std::invalid_argument("Remote description has no ICE password");
  687. if (!description.fingerprint())
  688. throw std::invalid_argument("Remote description has no valid fingerprint");
  689. if (description.mediaCount() == 0)
  690. throw std::invalid_argument("Remote description has no media line");
  691. int activeMediaCount = 0;
  692. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  693. std::visit(rtc::overloaded{[&](const Description::Application *application) {
  694. if (!application->isRemoved())
  695. ++activeMediaCount;
  696. },
  697. [&](const Description::Media *media) {
  698. if (!media->isRemoved() ||
  699. media->direction() != Description::Direction::Inactive)
  700. ++activeMediaCount;
  701. }},
  702. description.media(i));
  703. if (activeMediaCount == 0)
  704. throw std::invalid_argument("Remote description has no active media");
  705. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  706. if (*description.iceUfrag() == *local->iceUfrag() &&
  707. *description.icePwd() == *local->icePwd())
  708. throw std::logic_error("Got the local description as remote description");
  709. PLOG_VERBOSE << "Remote description looks valid";
  710. }
  711. void PeerConnection::processLocalDescription(Description description) {
  712. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  713. const size_t localMaxMessageSize =
  714. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  715. // Clean up the application entry the ICE transport might have added already (libnice)
  716. description.clearMedia();
  717. if (auto remote = remoteDescription()) {
  718. // Reciprocate remote description
  719. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  720. std::visit( // reciprocate each media
  721. rtc::overloaded{
  722. [&](Description::Application *remoteApp) {
  723. std::shared_lock lock(mDataChannelsMutex);
  724. if (!mDataChannels.empty()) {
  725. // Prefer local description
  726. Description::Application app(remoteApp->mid());
  727. app.setSctpPort(localSctpPort);
  728. app.setMaxMessageSize(localMaxMessageSize);
  729. PLOG_DEBUG << "Adding application to local description, mid=\""
  730. << app.mid() << "\"";
  731. description.addMedia(std::move(app));
  732. return;
  733. }
  734. auto reciprocated = remoteApp->reciprocate();
  735. reciprocated.hintSctpPort(localSctpPort);
  736. reciprocated.setMaxMessageSize(localMaxMessageSize);
  737. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  738. << reciprocated.mid() << "\"";
  739. description.addMedia(std::move(reciprocated));
  740. },
  741. [&](Description::Media *remoteMedia) {
  742. std::shared_lock lock(mTracksMutex);
  743. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  744. // Prefer local description
  745. if (auto track = it->second.lock()) {
  746. auto media = track->description();
  747. PLOG_DEBUG << "Adding media to local description, mid=\""
  748. << media.mid() << "\", removed=" << std::boolalpha
  749. << media.isRemoved();
  750. description.addMedia(std::move(media));
  751. } else {
  752. auto reciprocated = remoteMedia->reciprocate();
  753. reciprocated.markRemoved();
  754. PLOG_DEBUG << "Adding media to local description, mid=\""
  755. << reciprocated.mid()
  756. << "\", removed=true (track is destroyed)";
  757. description.addMedia(std::move(reciprocated));
  758. }
  759. return;
  760. }
  761. lock.unlock(); // we are going to call incomingTrack()
  762. auto reciprocated = remoteMedia->reciprocate();
  763. #if !RTC_ENABLE_MEDIA
  764. if (!reciprocated.isRemoved()) {
  765. // No media support, mark as removed
  766. PLOG_WARNING << "Rejecting track (not compiled with media support)";
  767. reciprocated.markRemoved();
  768. }
  769. #endif
  770. incomingTrack(reciprocated);
  771. PLOG_DEBUG << "Reciprocating media in local description, mid=\""
  772. << reciprocated.mid() << "\", removed=" << std::boolalpha
  773. << reciprocated.isRemoved();
  774. description.addMedia(std::move(reciprocated));
  775. },
  776. },
  777. remote->media(i));
  778. }
  779. if (description.type() == Description::Type::Offer) {
  780. // This is an offer, add locally created data channels and tracks
  781. // Add media for local tracks
  782. std::shared_lock lock(mTracksMutex);
  783. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  784. if (auto track = it->lock()) {
  785. if (description.hasMid(track->mid()))
  786. continue;
  787. auto media = track->description();
  788. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  789. << "\", removed=" << std::boolalpha << media.isRemoved();
  790. description.addMedia(std::move(media));
  791. }
  792. }
  793. // Add application for data channels
  794. if (!description.hasApplication()) {
  795. std::shared_lock lock(mDataChannelsMutex);
  796. if (!mDataChannels.empty()) {
  797. // Prevents mid collision with remote or local tracks
  798. unsigned int m = 0;
  799. while (description.hasMid(std::to_string(m)))
  800. ++m;
  801. Description::Application app(std::to_string(m));
  802. app.setSctpPort(localSctpPort);
  803. app.setMaxMessageSize(localMaxMessageSize);
  804. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  805. << "\"";
  806. description.addMedia(std::move(app));
  807. }
  808. }
  809. // There might be no media at this point if the user created a Track, deleted it,
  810. // then called setLocalDescription().
  811. if (description.mediaCount() == 0)
  812. throw std::runtime_error("No DataChannel or Track to negotiate");
  813. }
  814. // Set local fingerprint (wait for certificate if necessary)
  815. description.setFingerprint(mCertificate.get()->fingerprint());
  816. PLOG_VERBOSE << "Issuing local description: " << description;
  817. if (description.mediaCount() == 0)
  818. throw std::logic_error("Local description has no media line");
  819. {
  820. // Set as local description
  821. std::lock_guard lock(mLocalDescriptionMutex);
  822. std::vector<Candidate> existingCandidates;
  823. if (mLocalDescription) {
  824. existingCandidates = mLocalDescription->extractCandidates();
  825. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  826. }
  827. mLocalDescription.emplace(description);
  828. mLocalDescription->addCandidates(std::move(existingCandidates));
  829. }
  830. mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
  831. &localDescriptionCallback, std::move(description));
  832. // Reciprocated tracks might need to be open
  833. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  834. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  835. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  836. }
  837. void PeerConnection::processLocalCandidate(Candidate candidate) {
  838. std::lock_guard lock(mLocalDescriptionMutex);
  839. if (!mLocalDescription)
  840. throw std::logic_error("Got a local candidate without local description");
  841. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  842. candidate.type() != Candidate::Type::Relayed) {
  843. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  844. return;
  845. }
  846. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  847. candidate.resolve(Candidate::ResolveMode::Simple);
  848. mLocalDescription->addCandidate(candidate);
  849. mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
  850. &localCandidateCallback, std::move(candidate));
  851. }
  852. void PeerConnection::processRemoteDescription(Description description) {
  853. {
  854. // Set as remote description
  855. std::lock_guard lock(mRemoteDescriptionMutex);
  856. std::vector<Candidate> existingCandidates;
  857. if (mRemoteDescription)
  858. existingCandidates = mRemoteDescription->extractCandidates();
  859. mRemoteDescription.emplace(description);
  860. mRemoteDescription->addCandidates(std::move(existingCandidates));
  861. }
  862. auto iceTransport = initIceTransport();
  863. if (!iceTransport)
  864. return; // closed
  865. iceTransport->setRemoteDescription(std::move(description));
  866. // Since we assumed passive role during DataChannel creation, we might need to shift the stream
  867. // numbers from odd to even.
  868. shiftDataChannels();
  869. if (description.hasApplication()) {
  870. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  871. auto sctpTransport = std::atomic_load(&mSctpTransport);
  872. if (!sctpTransport && dtlsTransport &&
  873. dtlsTransport->state() == Transport::State::Connected)
  874. initSctpTransport();
  875. } else {
  876. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
  877. }
  878. }
  879. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  880. auto iceTransport = std::atomic_load(&mIceTransport);
  881. {
  882. // Set as remote candidate
  883. std::lock_guard lock(mRemoteDescriptionMutex);
  884. if (!mRemoteDescription)
  885. throw std::logic_error("Got a remote candidate without remote description");
  886. if (!iceTransport)
  887. throw std::logic_error("Got a remote candidate without ICE transport");
  888. candidate.hintMid(mRemoteDescription->bundleMid());
  889. if (mRemoteDescription->hasCandidate(candidate))
  890. return; // already in description, ignore
  891. candidate.resolve(Candidate::ResolveMode::Simple);
  892. mRemoteDescription->addCandidate(candidate);
  893. }
  894. if (candidate.isResolved()) {
  895. iceTransport->addRemoteCandidate(std::move(candidate));
  896. } else {
  897. // We might need a lookup, do it asynchronously
  898. // We don't use the thread pool because we have no control on the timeout
  899. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  900. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  901. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  902. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  903. if (auto iceTransport = weakIceTransport.lock())
  904. iceTransport->addRemoteCandidate(std::move(candidate));
  905. });
  906. t.detach();
  907. }
  908. }
  909. }
  910. string PeerConnection::localBundleMid() const {
  911. std::lock_guard lock(mLocalDescriptionMutex);
  912. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  913. }
  914. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  915. auto dataChannel = weakDataChannel.lock();
  916. if (dataChannel) {
  917. dataChannel->resetOpenCallback(); // might be set internally
  918. mPendingDataChannels.push(std::move(dataChannel));
  919. }
  920. triggerPendingDataChannels();
  921. }
  922. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  923. auto track = weakTrack.lock();
  924. if (track) {
  925. track->resetOpenCallback(); // might be set internally
  926. mPendingTracks.push(std::move(track));
  927. }
  928. triggerPendingTracks();
  929. }
  930. void PeerConnection::triggerPendingDataChannels() {
  931. while (dataChannelCallback) {
  932. auto next = mPendingDataChannels.tryPop();
  933. if (!next)
  934. break;
  935. auto impl = std::move(*next);
  936. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  937. impl->triggerOpen();
  938. }
  939. }
  940. void PeerConnection::triggerPendingTracks() {
  941. while (trackCallback) {
  942. auto next = mPendingTracks.tryPop();
  943. if (!next)
  944. break;
  945. auto impl = std::move(*next);
  946. trackCallback(std::make_shared<rtc::Track>(impl));
  947. // Do not trigger open immediately for tracks as it'll be done later
  948. }
  949. }
  950. void PeerConnection::flushPendingDataChannels() {
  951. mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
  952. }
  953. void PeerConnection::flushPendingTracks() {
  954. mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
  955. }
  956. bool PeerConnection::changeState(State newState) {
  957. State current;
  958. do {
  959. current = state.load();
  960. if (current == State::Closed)
  961. return false;
  962. if (current == newState)
  963. return false;
  964. } while (!state.compare_exchange_weak(current, newState));
  965. std::ostringstream s;
  966. s << newState;
  967. PLOG_INFO << "Changed state to " << s.str();
  968. if (newState == State::Closed) {
  969. auto callback = std::move(stateChangeCallback); // steal the callback
  970. callback(State::Closed); // call it synchronously
  971. } else {
  972. mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
  973. &stateChangeCallback, newState);
  974. }
  975. return true;
  976. }
  977. bool PeerConnection::changeGatheringState(GatheringState newState) {
  978. if (gatheringState.exchange(newState) == newState)
  979. return false;
  980. std::ostringstream s;
  981. s << newState;
  982. PLOG_INFO << "Changed gathering state to " << s.str();
  983. mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
  984. &gatheringStateChangeCallback, newState);
  985. return true;
  986. }
  987. bool PeerConnection::changeSignalingState(SignalingState newState) {
  988. if (signalingState.exchange(newState) == newState)
  989. return false;
  990. std::ostringstream s;
  991. s << newState;
  992. PLOG_INFO << "Changed signaling state to " << s.str();
  993. mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
  994. &signalingStateChangeCallback, newState);
  995. return true;
  996. }
  997. void PeerConnection::resetCallbacks() {
  998. // Unregister all callbacks
  999. dataChannelCallback = nullptr;
  1000. localDescriptionCallback = nullptr;
  1001. localCandidateCallback = nullptr;
  1002. stateChangeCallback = nullptr;
  1003. gatheringStateChangeCallback = nullptr;
  1004. signalingStateChangeCallback = nullptr;
  1005. trackCallback = nullptr;
  1006. }
  1007. } // namespace rtc::impl