peerconnection.cpp 39 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This Source Code Form is subject to the terms of the Mozilla Public
  6. * License, v. 2.0. If a copy of the MPL was not distributed with this
  7. * file, You can obtain one at https://mozilla.org/MPL/2.0/.
  8. */
  9. #include "peerconnection.hpp"
  10. #include "certificate.hpp"
  11. #include "common.hpp"
  12. #include "dtlstransport.hpp"
  13. #include "icetransport.hpp"
  14. #include "internals.hpp"
  15. #include "logcounter.hpp"
  16. #include "peerconnection.hpp"
  17. #include "processor.hpp"
  18. #include "rtp.hpp"
  19. #include "sctptransport.hpp"
  20. #if RTC_ENABLE_MEDIA
  21. #include "dtlssrtptransport.hpp"
  22. #endif
  23. #include <algorithm>
  24. #include <array>
  25. #include <iomanip>
  26. #include <set>
  27. #include <sstream>
  28. #include <thread>
  29. using namespace std::placeholders;
  30. namespace rtc::impl {
  31. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  32. "Number of truncated RTP packets over past second");
  33. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  34. "Number of SRTP decryption errors over past second");
  35. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  36. "Number of SRTP encryption errors over past second");
  37. static LogCounter
  38. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  39. "Number of unknown RTCP packet types over past second");
  40. PeerConnection::PeerConnection(Configuration config_)
  41. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)) {
  42. PLOG_VERBOSE << "Creating PeerConnection";
  43. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  44. throw std::invalid_argument("Invalid port range");
  45. if (config.mtu) {
  46. if (*config.mtu < 576) // Min MTU for IPv4
  47. throw std::invalid_argument("Invalid MTU value");
  48. if (*config.mtu > 1500) { // Standard Ethernet
  49. PLOG_WARNING << "MTU set to " << *config.mtu;
  50. } else {
  51. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  52. }
  53. }
  54. }
  55. PeerConnection::~PeerConnection() {
  56. PLOG_VERBOSE << "Destroying PeerConnection";
  57. mProcessor.join();
  58. }
  59. void PeerConnection::close() {
  60. negotiationNeeded = false;
  61. if (!closing.exchange(true)) {
  62. PLOG_VERBOSE << "Closing PeerConnection";
  63. if (auto transport = std::atomic_load(&mSctpTransport))
  64. transport->stop();
  65. else
  66. remoteClose();
  67. }
  68. }
  69. void PeerConnection::remoteClose() {
  70. close();
  71. if (state.load() != State::Closed) {
  72. // Close data channels and tracks asynchronously
  73. mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
  74. mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
  75. closeTransports();
  76. }
  77. }
  78. optional<Description> PeerConnection::localDescription() const {
  79. std::lock_guard lock(mLocalDescriptionMutex);
  80. return mLocalDescription;
  81. }
  82. optional<Description> PeerConnection::remoteDescription() const {
  83. std::lock_guard lock(mRemoteDescriptionMutex);
  84. return mRemoteDescription;
  85. }
  86. size_t PeerConnection::remoteMaxMessageSize() const {
  87. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  88. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  89. std::lock_guard lock(mRemoteDescriptionMutex);
  90. if (mRemoteDescription)
  91. if (auto *application = mRemoteDescription->application())
  92. if (auto max = application->maxMessageSize()) {
  93. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  94. // size value of zero, it indicates that the SCTP endpoint will handle messages
  95. // of any size, subject to memory capacity, etc.
  96. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  97. }
  98. return std::min(remoteMax, localMax);
  99. }
  100. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  101. template <typename T>
  102. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  103. std::atomic_store(member, transport);
  104. try {
  105. transport->start();
  106. } catch (...) {
  107. std::atomic_store(member, decltype(transport)(nullptr));
  108. throw;
  109. }
  110. if (pc->closing.load() || pc->state.load() == PeerConnection::State::Closed) {
  111. std::atomic_store(member, decltype(transport)(nullptr));
  112. transport->stop();
  113. return nullptr;
  114. }
  115. return transport;
  116. }
  117. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  118. try {
  119. if (auto transport = std::atomic_load(&mIceTransport))
  120. return transport;
  121. PLOG_VERBOSE << "Starting ICE transport";
  122. auto transport = std::make_shared<IceTransport>(
  123. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  124. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  125. auto shared_this = weak_this.lock();
  126. if (!shared_this)
  127. return;
  128. switch (transportState) {
  129. case IceTransport::State::Connecting:
  130. changeState(State::Connecting);
  131. break;
  132. case IceTransport::State::Connected:
  133. initDtlsTransport();
  134. break;
  135. case IceTransport::State::Failed:
  136. changeState(State::Failed);
  137. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  138. break;
  139. case IceTransport::State::Disconnected:
  140. changeState(State::Disconnected);
  141. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  142. break;
  143. default:
  144. // Ignore
  145. break;
  146. }
  147. },
  148. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  149. auto shared_this = weak_this.lock();
  150. if (!shared_this)
  151. return;
  152. switch (gatheringState) {
  153. case IceTransport::GatheringState::InProgress:
  154. changeGatheringState(GatheringState::InProgress);
  155. break;
  156. case IceTransport::GatheringState::Complete:
  157. endLocalCandidates();
  158. changeGatheringState(GatheringState::Complete);
  159. break;
  160. default:
  161. // Ignore
  162. break;
  163. }
  164. });
  165. return emplaceTransport(this, &mIceTransport, std::move(transport));
  166. } catch (const std::exception &e) {
  167. PLOG_ERROR << e.what();
  168. changeState(State::Failed);
  169. throw std::runtime_error("ICE transport initialization failed");
  170. }
  171. }
  172. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  173. try {
  174. if (auto transport = std::atomic_load(&mDtlsTransport))
  175. return transport;
  176. PLOG_VERBOSE << "Starting DTLS transport";
  177. auto lower = std::atomic_load(&mIceTransport);
  178. if (!lower)
  179. throw std::logic_error("No underlying ICE transport for DTLS transport");
  180. auto certificate = mCertificate.get();
  181. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  182. auto dtlsStateChangeCallback =
  183. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  184. auto shared_this = weak_this.lock();
  185. if (!shared_this)
  186. return;
  187. switch (transportState) {
  188. case DtlsTransport::State::Connected:
  189. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  190. initSctpTransport();
  191. else
  192. changeState(State::Connected);
  193. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  194. break;
  195. case DtlsTransport::State::Failed:
  196. changeState(State::Failed);
  197. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  198. break;
  199. case DtlsTransport::State::Disconnected:
  200. changeState(State::Disconnected);
  201. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  202. break;
  203. default:
  204. // Ignore
  205. break;
  206. }
  207. };
  208. shared_ptr<DtlsTransport> transport;
  209. auto local = localDescription();
  210. if (config.forceMediaTransport || (local && local->hasAudioOrVideo())) {
  211. #if RTC_ENABLE_MEDIA
  212. PLOG_INFO << "This connection requires media support";
  213. // DTLS-SRTP
  214. transport = std::make_shared<DtlsSrtpTransport>(
  215. lower, certificate, config.mtu, verifierCallback,
  216. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  217. #else
  218. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  219. #endif
  220. }
  221. if (!transport) {
  222. // DTLS only
  223. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  224. verifierCallback, dtlsStateChangeCallback);
  225. }
  226. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  227. } catch (const std::exception &e) {
  228. PLOG_ERROR << e.what();
  229. changeState(State::Failed);
  230. throw std::runtime_error("DTLS transport initialization failed");
  231. }
  232. }
  233. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  234. try {
  235. if (auto transport = std::atomic_load(&mSctpTransport))
  236. return transport;
  237. PLOG_VERBOSE << "Starting SCTP transport";
  238. auto lower = std::atomic_load(&mDtlsTransport);
  239. if (!lower)
  240. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  241. auto local = localDescription();
  242. if (!local || !local->application())
  243. throw std::logic_error("Starting SCTP transport without local application description");
  244. auto remote = remoteDescription();
  245. if (!remote || !remote->application())
  246. throw std::logic_error(
  247. "Starting SCTP transport without remote application description");
  248. SctpTransport::Ports ports = {};
  249. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  250. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  251. auto transport = std::make_shared<SctpTransport>(
  252. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  253. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  254. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  255. auto shared_this = weak_this.lock();
  256. if (!shared_this)
  257. return;
  258. switch (transportState) {
  259. case SctpTransport::State::Connected:
  260. changeState(State::Connected);
  261. assignDataChannels();
  262. mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
  263. break;
  264. case SctpTransport::State::Failed:
  265. changeState(State::Failed);
  266. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  267. break;
  268. case SctpTransport::State::Disconnected:
  269. changeState(State::Disconnected);
  270. mProcessor.enqueue(&PeerConnection::remoteClose, shared_from_this());
  271. break;
  272. default:
  273. // Ignore
  274. break;
  275. }
  276. });
  277. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  278. } catch (const std::exception &e) {
  279. PLOG_ERROR << e.what();
  280. changeState(State::Failed);
  281. throw std::runtime_error("SCTP transport initialization failed");
  282. }
  283. }
  284. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  285. return std::atomic_load(&mIceTransport);
  286. }
  287. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  288. return std::atomic_load(&mDtlsTransport);
  289. }
  290. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  291. return std::atomic_load(&mSctpTransport);
  292. }
  293. void PeerConnection::closeTransports() {
  294. PLOG_VERBOSE << "Closing transports";
  295. // Change state to sink state Closed
  296. if (!changeState(State::Closed))
  297. return; // already closed
  298. // Reset callbacks now that state is changed
  299. resetCallbacks();
  300. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  301. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  302. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  303. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  304. if (sctp) {
  305. sctp->onRecv(nullptr);
  306. sctp->onBufferedAmount(nullptr);
  307. }
  308. using array = std::array<shared_ptr<Transport>, 3>;
  309. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  310. for (const auto &t : transports)
  311. if (t)
  312. t->onStateChange(nullptr);
  313. TearDownProcessor::Instance().enqueue(
  314. [transports = std::move(transports), token = Init::Instance().token()]() mutable {
  315. for (const auto &t : transports) {
  316. if (t) {
  317. t->stop();
  318. break;
  319. }
  320. }
  321. for (auto &t : transports)
  322. t.reset();
  323. });
  324. }
  325. void PeerConnection::endLocalCandidates() {
  326. std::lock_guard lock(mLocalDescriptionMutex);
  327. if (mLocalDescription)
  328. mLocalDescription->endCandidates();
  329. }
  330. void PeerConnection::rollbackLocalDescription() {
  331. PLOG_DEBUG << "Rolling back pending local description";
  332. std::unique_lock lock(mLocalDescriptionMutex);
  333. if (mCurrentLocalDescription) {
  334. std::vector<Candidate> existingCandidates;
  335. if (mLocalDescription)
  336. existingCandidates = mLocalDescription->extractCandidates();
  337. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  338. mLocalDescription->addCandidates(std::move(existingCandidates));
  339. mCurrentLocalDescription.reset();
  340. }
  341. }
  342. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  343. std::lock_guard lock(mRemoteDescriptionMutex);
  344. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  345. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  346. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  347. return true;
  348. }
  349. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  350. << expectedFingerprint.value_or("[none]") << "\"";
  351. return false;
  352. }
  353. void PeerConnection::forwardMessage(message_ptr message) {
  354. if (!message) {
  355. remoteCloseDataChannels();
  356. return;
  357. }
  358. const uint16_t stream = uint16_t(message->stream);
  359. auto channel = findDataChannel(stream);
  360. if (DataChannel::IsOpenMessage(message)) {
  361. auto iceTransport = getIceTransport();
  362. auto sctpTransport = getSctpTransport();
  363. if (!iceTransport || !sctpTransport)
  364. return;
  365. const uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  366. if (stream % 2 != remoteParity) {
  367. // The odd/even rule is violated, close the DataChannel
  368. PLOG_WARNING << "Got open message violating the odd/even rule on stream " << stream;
  369. sctpTransport->closeStream(message->stream);
  370. return;
  371. }
  372. if (channel && channel->isOpen()) {
  373. PLOG_WARNING << "Got open message on stream " << stream
  374. << " for an already open DataChannel, closing it first";
  375. channel->close();
  376. }
  377. channel = std::make_shared<IncomingDataChannel>(weak_from_this(), sctpTransport);
  378. channel->assignStream(stream);
  379. channel->openCallback =
  380. weak_bind(&PeerConnection::triggerDataChannel, this, weak_ptr<DataChannel>{channel});
  381. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  382. mDataChannels.emplace(stream, channel);
  383. }
  384. if (!channel) {
  385. if (message->type == Message::Reset)
  386. return; // ignore
  387. // Invalid, close the DataChannel
  388. PLOG_WARNING << "Got unexpected message on stream " << stream;
  389. if (auto sctpTransport = getSctpTransport())
  390. sctpTransport->closeStream(message->stream);
  391. return;
  392. }
  393. // Forward the message
  394. channel->incoming(message);
  395. }
  396. void PeerConnection::forwardMedia(message_ptr message) {
  397. if (!message)
  398. return;
  399. // Browsers like to compound their packets with a random SSRC.
  400. // we have to do this monstrosity to distribute the report blocks
  401. if (message->type == Message::Control) {
  402. std::set<uint32_t> ssrcs;
  403. size_t offset = 0;
  404. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  405. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  406. if (header->lengthInBytes() > message->size() - offset) {
  407. COUNTER_MEDIA_TRUNCATED++;
  408. break;
  409. }
  410. offset += header->lengthInBytes();
  411. if (header->payloadType() == 205 || header->payloadType() == 206) {
  412. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  413. ssrcs.insert(rtcpfb->packetSenderSSRC());
  414. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  415. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  416. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  417. ssrcs.insert(rtcpsr->senderSSRC());
  418. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  419. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  420. } else if (header->payloadType() == 202) {
  421. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  422. if (!sdes->isValid()) {
  423. PLOG_WARNING << "RTCP SDES packet is invalid";
  424. continue;
  425. }
  426. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  427. auto chunk = sdes->getChunk(i);
  428. ssrcs.insert(chunk->ssrc());
  429. }
  430. } else {
  431. // PT=203 == Goodbye
  432. // PT=204 == Application Specific
  433. // PT=207 == Extended Report
  434. if (header->payloadType() != 203 && header->payloadType() != 204 &&
  435. header->payloadType() != 207) {
  436. COUNTER_UNKNOWN_PACKET_TYPE++;
  437. }
  438. }
  439. }
  440. if (!ssrcs.empty()) {
  441. std::shared_lock lock(mTracksMutex); // read-only
  442. for (uint32_t ssrc : ssrcs) {
  443. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  444. if (auto track = it->second.lock())
  445. track->incoming(message);
  446. }
  447. }
  448. return;
  449. }
  450. }
  451. uint32_t ssrc = uint32_t(message->stream);
  452. std::shared_lock lock(mTracksMutex); // read-only
  453. if (auto it = mTracksBySsrc.find(ssrc); it != mTracksBySsrc.end()) {
  454. if (auto track = it->second.lock())
  455. track->incoming(message);
  456. } else {
  457. /*
  458. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  459. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  460. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  461. * this ideal? No! Do I know how to fix it? No!
  462. */
  463. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  464. return;
  465. }
  466. }
  467. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  468. if (auto channel = findDataChannel(stream))
  469. channel->triggerBufferedAmount(amount);
  470. }
  471. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  472. cleanupDataChannels();
  473. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  474. // If the DataChannel is user-negotiated, do not negotiate it in-band
  475. auto channel =
  476. init.negotiated
  477. ? std::make_shared<DataChannel>(weak_from_this(), std::move(label),
  478. std::move(init.protocol), std::move(init.reliability))
  479. : std::make_shared<OutgoingDataChannel>(weak_from_this(), std::move(label),
  480. std::move(init.protocol),
  481. std::move(init.reliability));
  482. // If the user supplied a stream id, use it, otherwise assign it later
  483. if (init.id) {
  484. uint16_t stream = *init.id;
  485. if (stream > maxDataChannelStream())
  486. throw std::invalid_argument("DataChannel stream id is too high");
  487. channel->assignStream(stream);
  488. mDataChannels.emplace(std::make_pair(stream, channel));
  489. } else {
  490. mUnassignedDataChannels.push_back(channel);
  491. }
  492. lock.unlock(); // we are going to call assignDataChannels()
  493. // If SCTP is connected, assign and open now
  494. auto sctpTransport = getSctpTransport();
  495. if (sctpTransport && sctpTransport->state() == SctpTransport::State::Connected) {
  496. assignDataChannels();
  497. channel->open(sctpTransport);
  498. }
  499. return channel;
  500. }
  501. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  502. std::shared_lock lock(mDataChannelsMutex); // read-only
  503. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  504. if (auto channel = it->second.lock())
  505. return channel;
  506. return nullptr;
  507. }
  508. uint16_t PeerConnection::maxDataChannelStream() const {
  509. auto sctpTransport = std::atomic_load(&mSctpTransport);
  510. return sctpTransport ? sctpTransport->maxStream() : (MAX_SCTP_STREAMS_COUNT - 1);
  511. }
  512. void PeerConnection::assignDataChannels() {
  513. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  514. auto iceTransport = getIceTransport();
  515. if (!iceTransport)
  516. throw std::logic_error("Attempted to assign DataChannels without ICE transport");
  517. const uint16_t maxStream = maxDataChannelStream();
  518. for (auto it = mUnassignedDataChannels.begin(); it != mUnassignedDataChannels.end(); ++it) {
  519. auto channel = it->lock();
  520. if (!channel)
  521. continue;
  522. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier
  523. // for which the corresponding incoming and outgoing streams are unused. If the side is
  524. // acting as the DTLS client, it MUST choose an even stream identifier; if the side is
  525. // acting as the DTLS server, it MUST choose an odd one. See
  526. // https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  527. uint16_t stream = (iceTransport->role() == Description::Role::Active) ? 0 : 1;
  528. while (true) {
  529. if (stream > maxStream)
  530. throw std::runtime_error("Too many DataChannels");
  531. auto it = mDataChannels.find(stream);
  532. if (it == mDataChannels.end() || !it->second.lock())
  533. break;
  534. stream += 2;
  535. }
  536. PLOG_DEBUG << "Assigning stream " << stream << " to DataChannel";
  537. channel->assignStream(stream);
  538. mDataChannels.emplace(std::make_pair(stream, channel));
  539. }
  540. mUnassignedDataChannels.clear();
  541. }
  542. void PeerConnection::iterateDataChannels(
  543. std::function<void(shared_ptr<DataChannel> channel)> func) {
  544. std::vector<shared_ptr<DataChannel>> locked;
  545. {
  546. std::shared_lock lock(mDataChannelsMutex); // read-only
  547. locked.reserve(mDataChannels.size());
  548. auto it = mDataChannels.begin();
  549. while (it != mDataChannels.end()) {
  550. auto channel = it->second.lock();
  551. if (channel && !channel->isClosed())
  552. locked.push_back(std::move(channel));
  553. ++it;
  554. }
  555. }
  556. for (auto &channel : locked) {
  557. try {
  558. func(std::move(channel));
  559. } catch (const std::exception &e) {
  560. PLOG_WARNING << e.what();
  561. }
  562. }
  563. }
  564. void PeerConnection::cleanupDataChannels() {
  565. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  566. auto it = mDataChannels.begin();
  567. while (it != mDataChannels.end()) {
  568. if (!it->second.lock()) {
  569. it = mDataChannels.erase(it);
  570. continue;
  571. }
  572. ++it;
  573. }
  574. }
  575. void PeerConnection::openDataChannels() {
  576. if (auto transport = std::atomic_load(&mSctpTransport))
  577. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  578. if (!channel->isOpen())
  579. channel->open(transport);
  580. });
  581. }
  582. void PeerConnection::closeDataChannels() {
  583. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  584. }
  585. void PeerConnection::remoteCloseDataChannels() {
  586. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  587. }
  588. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  589. #if !RTC_ENABLE_MEDIA
  590. // No media support, mark as removed
  591. PLOG_WARNING << "Tracks are disabled (not compiled with media support)";
  592. description.markRemoved();
  593. #endif
  594. shared_ptr<Track> track;
  595. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  596. if (track = it->second.lock(); track)
  597. track->setDescription(std::move(description));
  598. if (!track) {
  599. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  600. mTracks.emplace(std::make_pair(track->mid(), track));
  601. mTrackLines.emplace_back(track);
  602. }
  603. if (track->description().isRemoved())
  604. track->close();
  605. return track;
  606. }
  607. void PeerConnection::iterateTracks(std::function<void(shared_ptr<Track> track)> func) {
  608. std::shared_lock lock(mTracksMutex); // read-only
  609. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  610. auto track = it->lock();
  611. if (track && !track->isClosed()) {
  612. try {
  613. func(std::move(track));
  614. } catch (const std::exception &e) {
  615. PLOG_WARNING << e.what();
  616. }
  617. }
  618. }
  619. }
  620. void PeerConnection::openTracks() {
  621. #if RTC_ENABLE_MEDIA
  622. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  623. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  624. iterateTracks([&](const shared_ptr<Track> &track) {
  625. if (!track->isOpen()) {
  626. if (srtpTransport) {
  627. track->open(srtpTransport);
  628. } else {
  629. // A track was added during a latter renegotiation, whereas SRTP transport was
  630. // not initialized. This is an optimization to use the library with data
  631. // channels only. Set forceMediaTransport to true to initialize the transport
  632. // before dynamically adding tracks.
  633. auto errorMsg = "The connection has no media transport";
  634. PLOG_ERROR << errorMsg;
  635. track->triggerError(errorMsg);
  636. }
  637. }
  638. });
  639. }
  640. #endif
  641. }
  642. void PeerConnection::closeTracks() {
  643. std::shared_lock lock(mTracksMutex); // read-only
  644. iterateTracks([&](shared_ptr<Track> track) { track->close(); });
  645. }
  646. void PeerConnection::validateRemoteDescription(const Description &description) {
  647. if (!description.iceUfrag())
  648. throw std::invalid_argument("Remote description has no ICE user fragment");
  649. if (!description.icePwd())
  650. throw std::invalid_argument("Remote description has no ICE password");
  651. if (!description.fingerprint())
  652. throw std::invalid_argument("Remote description has no valid fingerprint");
  653. if (description.mediaCount() == 0)
  654. throw std::invalid_argument("Remote description has no media line");
  655. int activeMediaCount = 0;
  656. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  657. std::visit(rtc::overloaded{[&](const Description::Application *application) {
  658. if (!application->isRemoved())
  659. ++activeMediaCount;
  660. },
  661. [&](const Description::Media *media) {
  662. if (!media->isRemoved() ||
  663. media->direction() != Description::Direction::Inactive)
  664. ++activeMediaCount;
  665. }},
  666. description.media(i));
  667. if (activeMediaCount == 0)
  668. throw std::invalid_argument("Remote description has no active media");
  669. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  670. if (*description.iceUfrag() == *local->iceUfrag() &&
  671. *description.icePwd() == *local->icePwd())
  672. throw std::logic_error("Got the local description as remote description");
  673. PLOG_VERBOSE << "Remote description looks valid";
  674. }
  675. void PeerConnection::processLocalDescription(Description description) {
  676. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  677. const size_t localMaxMessageSize =
  678. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  679. // Clean up the application entry the ICE transport might have added already (libnice)
  680. description.clearMedia();
  681. if (auto remote = remoteDescription()) {
  682. // Reciprocate remote description
  683. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  684. std::visit( // reciprocate each media
  685. rtc::overloaded{
  686. [&](Description::Application *remoteApp) {
  687. std::shared_lock lock(mDataChannelsMutex);
  688. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  689. // Prefer local description
  690. Description::Application app(remoteApp->mid());
  691. app.setSctpPort(localSctpPort);
  692. app.setMaxMessageSize(localMaxMessageSize);
  693. PLOG_DEBUG << "Adding application to local description, mid=\""
  694. << app.mid() << "\"";
  695. description.addMedia(std::move(app));
  696. return;
  697. }
  698. auto reciprocated = remoteApp->reciprocate();
  699. reciprocated.hintSctpPort(localSctpPort);
  700. reciprocated.setMaxMessageSize(localMaxMessageSize);
  701. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  702. << reciprocated.mid() << "\"";
  703. description.addMedia(std::move(reciprocated));
  704. },
  705. [&](Description::Media *remoteMedia) {
  706. std::shared_lock lock(mTracksMutex);
  707. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  708. // Prefer local description
  709. if (auto track = it->second.lock()) {
  710. auto media = track->description();
  711. PLOG_DEBUG << "Adding media to local description, mid=\""
  712. << media.mid() << "\", removed=" << std::boolalpha
  713. << media.isRemoved();
  714. description.addMedia(std::move(media));
  715. } else {
  716. auto reciprocated = remoteMedia->reciprocate();
  717. reciprocated.markRemoved();
  718. PLOG_DEBUG << "Adding media to local description, mid=\""
  719. << reciprocated.mid()
  720. << "\", removed=true (track is destroyed)";
  721. description.addMedia(std::move(reciprocated));
  722. }
  723. return;
  724. }
  725. auto reciprocated = remoteMedia->reciprocate();
  726. #if !RTC_ENABLE_MEDIA
  727. if (!reciprocated.isRemoved()) {
  728. // No media support, mark as removed
  729. PLOG_WARNING << "Rejecting track (not compiled with media support)";
  730. reciprocated.markRemoved();
  731. }
  732. #endif
  733. PLOG_DEBUG << "Reciprocating media in local description, mid=\""
  734. << reciprocated.mid() << "\", removed=" << std::boolalpha
  735. << reciprocated.isRemoved();
  736. // Create incoming track
  737. auto track =
  738. std::make_shared<Track>(weak_from_this(), std::move(reciprocated));
  739. mTracks.emplace(std::make_pair(track->mid(), track));
  740. mTrackLines.emplace_back(track);
  741. triggerTrack(track); // The user may modify the track description
  742. if (track->description().isRemoved())
  743. track->close();
  744. description.addMedia(track->description());
  745. },
  746. },
  747. remote->media(i));
  748. // We need to update the SSRC cache for newly-created incoming tracks
  749. updateTrackSsrcCache(*remote);
  750. }
  751. if (description.type() == Description::Type::Offer) {
  752. // This is an offer, add locally created data channels and tracks
  753. // Add media for local tracks
  754. std::shared_lock lock(mTracksMutex);
  755. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  756. if (auto track = it->lock()) {
  757. if (description.hasMid(track->mid()))
  758. continue;
  759. auto media = track->description();
  760. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  761. << "\", removed=" << std::boolalpha << media.isRemoved();
  762. description.addMedia(std::move(media));
  763. }
  764. }
  765. // Add application for data channels
  766. if (!description.hasApplication()) {
  767. std::shared_lock lock(mDataChannelsMutex);
  768. if (!mDataChannels.empty() || !mUnassignedDataChannels.empty()) {
  769. // Prevents mid collision with remote or local tracks
  770. unsigned int m = 0;
  771. while (description.hasMid(std::to_string(m)))
  772. ++m;
  773. Description::Application app(std::to_string(m));
  774. app.setSctpPort(localSctpPort);
  775. app.setMaxMessageSize(localMaxMessageSize);
  776. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  777. << "\"";
  778. description.addMedia(std::move(app));
  779. }
  780. }
  781. // There might be no media at this point if the user created a Track, deleted it,
  782. // then called setLocalDescription().
  783. if (description.mediaCount() == 0)
  784. throw std::runtime_error("No DataChannel or Track to negotiate");
  785. }
  786. // Set local fingerprint (wait for certificate if necessary)
  787. description.setFingerprint(mCertificate.get()->fingerprint());
  788. PLOG_VERBOSE << "Issuing local description: " << description;
  789. if (description.mediaCount() == 0)
  790. throw std::logic_error("Local description has no media line");
  791. updateTrackSsrcCache(description);
  792. {
  793. // Set as local description
  794. std::lock_guard lock(mLocalDescriptionMutex);
  795. std::vector<Candidate> existingCandidates;
  796. if (mLocalDescription) {
  797. existingCandidates = mLocalDescription->extractCandidates();
  798. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  799. }
  800. mLocalDescription.emplace(description);
  801. mLocalDescription->addCandidates(std::move(existingCandidates));
  802. }
  803. mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
  804. &localDescriptionCallback, std::move(description));
  805. // Reciprocated tracks might need to be open
  806. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  807. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  808. mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
  809. }
  810. void PeerConnection::processLocalCandidate(Candidate candidate) {
  811. std::lock_guard lock(mLocalDescriptionMutex);
  812. if (!mLocalDescription)
  813. throw std::logic_error("Got a local candidate without local description");
  814. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  815. candidate.type() != Candidate::Type::Relayed) {
  816. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  817. return;
  818. }
  819. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  820. candidate.resolve(Candidate::ResolveMode::Simple);
  821. mLocalDescription->addCandidate(candidate);
  822. mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
  823. &localCandidateCallback, std::move(candidate));
  824. }
  825. void PeerConnection::processRemoteDescription(Description description) {
  826. // Update the SSRC cache for existing tracks
  827. updateTrackSsrcCache(description);
  828. {
  829. // Set as remote description
  830. std::lock_guard lock(mRemoteDescriptionMutex);
  831. std::vector<Candidate> existingCandidates;
  832. if (mRemoteDescription)
  833. existingCandidates = mRemoteDescription->extractCandidates();
  834. mRemoteDescription.emplace(description);
  835. mRemoteDescription->addCandidates(std::move(existingCandidates));
  836. }
  837. auto iceTransport = initIceTransport();
  838. if (!iceTransport)
  839. return; // closed
  840. iceTransport->setRemoteDescription(std::move(description));
  841. if (description.hasApplication()) {
  842. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  843. auto sctpTransport = std::atomic_load(&mSctpTransport);
  844. if (!sctpTransport && dtlsTransport &&
  845. dtlsTransport->state() == Transport::State::Connected)
  846. initSctpTransport();
  847. } else {
  848. mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
  849. }
  850. }
  851. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  852. auto iceTransport = std::atomic_load(&mIceTransport);
  853. {
  854. // Set as remote candidate
  855. std::lock_guard lock(mRemoteDescriptionMutex);
  856. if (!mRemoteDescription)
  857. throw std::logic_error("Got a remote candidate without remote description");
  858. if (!iceTransport)
  859. throw std::logic_error("Got a remote candidate without ICE transport");
  860. candidate.hintMid(mRemoteDescription->bundleMid());
  861. if (mRemoteDescription->hasCandidate(candidate))
  862. return; // already in description, ignore
  863. candidate.resolve(Candidate::ResolveMode::Simple);
  864. mRemoteDescription->addCandidate(candidate);
  865. }
  866. if (candidate.isResolved()) {
  867. iceTransport->addRemoteCandidate(std::move(candidate));
  868. } else {
  869. // We might need a lookup, do it asynchronously
  870. // We don't use the thread pool because we have no control on the timeout
  871. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  872. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  873. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  874. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  875. if (auto iceTransport = weakIceTransport.lock())
  876. iceTransport->addRemoteCandidate(std::move(candidate));
  877. });
  878. t.detach();
  879. }
  880. }
  881. }
  882. string PeerConnection::localBundleMid() const {
  883. std::lock_guard lock(mLocalDescriptionMutex);
  884. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  885. }
  886. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  887. auto dataChannel = weakDataChannel.lock();
  888. if (dataChannel) {
  889. dataChannel->resetOpenCallback(); // might be set internally
  890. mPendingDataChannels.push(std::move(dataChannel));
  891. }
  892. triggerPendingDataChannels();
  893. }
  894. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  895. auto track = weakTrack.lock();
  896. if (track) {
  897. track->resetOpenCallback(); // might be set internally
  898. mPendingTracks.push(std::move(track));
  899. }
  900. triggerPendingTracks();
  901. }
  902. void PeerConnection::triggerPendingDataChannels() {
  903. while (dataChannelCallback) {
  904. auto next = mPendingDataChannels.tryPop();
  905. if (!next)
  906. break;
  907. auto impl = std::move(*next);
  908. try {
  909. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  910. } catch (const std::exception &e) {
  911. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  912. }
  913. impl->triggerOpen();
  914. }
  915. }
  916. void PeerConnection::triggerPendingTracks() {
  917. while (trackCallback) {
  918. auto next = mPendingTracks.tryPop();
  919. if (!next)
  920. break;
  921. auto impl = std::move(*next);
  922. try {
  923. trackCallback(std::make_shared<rtc::Track>(impl));
  924. } catch (const std::exception &e) {
  925. PLOG_WARNING << "Uncaught exception in callback: " << e.what();
  926. }
  927. // Do not trigger open immediately for tracks as it'll be done later
  928. }
  929. }
  930. void PeerConnection::flushPendingDataChannels() {
  931. mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
  932. }
  933. void PeerConnection::flushPendingTracks() {
  934. mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
  935. }
  936. bool PeerConnection::changeState(State newState) {
  937. State current;
  938. do {
  939. current = state.load();
  940. if (current == State::Closed)
  941. return false;
  942. if (current == newState)
  943. return false;
  944. } while (!state.compare_exchange_weak(current, newState));
  945. std::ostringstream s;
  946. s << newState;
  947. PLOG_INFO << "Changed state to " << s.str();
  948. if (newState == State::Closed) {
  949. auto callback = std::move(stateChangeCallback); // steal the callback
  950. callback(State::Closed); // call it synchronously
  951. } else {
  952. mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(),
  953. &stateChangeCallback, newState);
  954. }
  955. return true;
  956. }
  957. bool PeerConnection::changeGatheringState(GatheringState newState) {
  958. if (gatheringState.exchange(newState) == newState)
  959. return false;
  960. std::ostringstream s;
  961. s << newState;
  962. PLOG_INFO << "Changed gathering state to " << s.str();
  963. mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
  964. &gatheringStateChangeCallback, newState);
  965. return true;
  966. }
  967. bool PeerConnection::changeSignalingState(SignalingState newState) {
  968. if (signalingState.exchange(newState) == newState)
  969. return false;
  970. std::ostringstream s;
  971. s << newState;
  972. PLOG_INFO << "Changed signaling state to " << s.str();
  973. mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
  974. &signalingStateChangeCallback, newState);
  975. return true;
  976. }
  977. void PeerConnection::resetCallbacks() {
  978. // Unregister all callbacks
  979. dataChannelCallback = nullptr;
  980. localDescriptionCallback = nullptr;
  981. localCandidateCallback = nullptr;
  982. stateChangeCallback = nullptr;
  983. gatheringStateChangeCallback = nullptr;
  984. signalingStateChangeCallback = nullptr;
  985. trackCallback = nullptr;
  986. }
  987. void PeerConnection::updateTrackSsrcCache(const Description &description) {
  988. std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc
  989. // Setup SSRC -> Track mapping
  990. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  991. std::visit( // ssrc -> track mapping
  992. rtc::overloaded{
  993. [&](Description::Application const *) { return; },
  994. [&](Description::Media const *media) {
  995. const auto ssrcs = media->getSSRCs();
  996. // Note: We don't want to lock (or do any other lookups), if we
  997. // already know there's no SSRCs to loop over.
  998. if (ssrcs.size() <= 0) {
  999. return;
  1000. }
  1001. std::shared_ptr<Track> track{nullptr};
  1002. if (auto it = mTracks.find(media->mid()); it != mTracks.end())
  1003. if (auto track_for_mid = it->second.lock())
  1004. track = track_for_mid;
  1005. if (!track) {
  1006. // Unable to find track for MID
  1007. return;
  1008. }
  1009. for (auto ssrc : ssrcs) {
  1010. mTracksBySsrc.insert_or_assign(ssrc, track);
  1011. }
  1012. },
  1013. },
  1014. description.media(i));
  1015. }
  1016. } // namespace rtc::impl