peerconnection.cpp 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "icetransport.hpp"
  24. #include "internals.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #include "threadpool.hpp"
  31. #if RTC_ENABLE_MEDIA
  32. #include "dtlssrtptransport.hpp"
  33. #endif
  34. #include <array>
  35. #include <iomanip>
  36. #include <set>
  37. #include <thread>
  38. using namespace std::placeholders;
  39. namespace rtc::impl {
  40. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  41. "Number of truncated RTP packets over past second");
  42. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  43. "Number of SRTP decryption errors over past second");
  44. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  45. "Number of SRTP encryption errors over past second");
  46. static LogCounter
  47. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  48. "Number of unknown RTCP packet types over past second");
  49. PeerConnection::PeerConnection(Configuration config_)
  50. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
  51. mProcessor(std::make_unique<Processor>()) {
  52. PLOG_VERBOSE << "Creating PeerConnection";
  53. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  54. throw std::invalid_argument("Invalid port range");
  55. if (config.mtu) {
  56. if (*config.mtu < 576) // Min MTU for IPv4
  57. throw std::invalid_argument("Invalid MTU value");
  58. if (*config.mtu > 1500) { // Standard Ethernet
  59. PLOG_WARNING << "MTU set to " << *config.mtu;
  60. } else {
  61. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  62. }
  63. }
  64. }
  65. PeerConnection::~PeerConnection() {
  66. PLOG_VERBOSE << "Destroying PeerConnection";
  67. mProcessor->join();
  68. }
  69. void PeerConnection::close() {
  70. PLOG_VERBOSE << "Closing PeerConnection";
  71. negotiationNeeded = false;
  72. // Close data channels asynchronously
  73. mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
  74. closeTransports();
  75. }
  76. optional<Description> PeerConnection::localDescription() const {
  77. std::lock_guard lock(mLocalDescriptionMutex);
  78. return mLocalDescription;
  79. }
  80. optional<Description> PeerConnection::remoteDescription() const {
  81. std::lock_guard lock(mRemoteDescriptionMutex);
  82. return mRemoteDescription;
  83. }
  84. size_t PeerConnection::remoteMaxMessageSize() const {
  85. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  86. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  87. std::lock_guard lock(mRemoteDescriptionMutex);
  88. if (mRemoteDescription)
  89. if (auto *application = mRemoteDescription->application())
  90. if (auto max = application->maxMessageSize()) {
  91. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  92. // size value of zero, it indicates that the SCTP endpoint will handle messages
  93. // of any size, subject to memory capacity, etc.
  94. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  95. }
  96. return std::min(remoteMax, localMax);
  97. }
  98. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  99. template <typename T>
  100. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  101. std::atomic_store(member, transport);
  102. try {
  103. transport->start();
  104. } catch (...) {
  105. std::atomic_store(member, decltype(transport)(nullptr));
  106. transport->stop();
  107. throw;
  108. }
  109. if (pc->state.load() == PeerConnection::State::Closed) {
  110. std::atomic_store(member, decltype(transport)(nullptr));
  111. transport->stop();
  112. return nullptr;
  113. }
  114. return transport;
  115. }
  116. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  117. try {
  118. if (auto transport = std::atomic_load(&mIceTransport))
  119. return transport;
  120. PLOG_VERBOSE << "Starting ICE transport";
  121. auto transport = std::make_shared<IceTransport>(
  122. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  123. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  124. auto shared_this = weak_this.lock();
  125. if (!shared_this)
  126. return;
  127. switch (transportState) {
  128. case IceTransport::State::Connecting:
  129. changeState(State::Connecting);
  130. break;
  131. case IceTransport::State::Failed:
  132. changeState(State::Failed);
  133. break;
  134. case IceTransport::State::Connected:
  135. initDtlsTransport();
  136. break;
  137. case IceTransport::State::Disconnected:
  138. changeState(State::Disconnected);
  139. break;
  140. default:
  141. // Ignore
  142. break;
  143. }
  144. },
  145. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  146. auto shared_this = weak_this.lock();
  147. if (!shared_this)
  148. return;
  149. switch (gatheringState) {
  150. case IceTransport::GatheringState::InProgress:
  151. changeGatheringState(GatheringState::InProgress);
  152. break;
  153. case IceTransport::GatheringState::Complete:
  154. endLocalCandidates();
  155. changeGatheringState(GatheringState::Complete);
  156. break;
  157. default:
  158. // Ignore
  159. break;
  160. }
  161. });
  162. return emplaceTransport(this, &mIceTransport, std::move(transport));
  163. } catch (const std::exception &e) {
  164. PLOG_ERROR << e.what();
  165. changeState(State::Failed);
  166. throw std::runtime_error("ICE transport initialization failed");
  167. }
  168. }
  169. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  170. try {
  171. if (auto transport = std::atomic_load(&mDtlsTransport))
  172. return transport;
  173. PLOG_VERBOSE << "Starting DTLS transport";
  174. auto lower = std::atomic_load(&mIceTransport);
  175. if (!lower)
  176. throw std::logic_error("No underlying ICE transport for DTLS transport");
  177. auto certificate = mCertificate.get();
  178. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  179. auto dtlsStateChangeCallback =
  180. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  181. auto shared_this = weak_this.lock();
  182. if (!shared_this)
  183. return;
  184. switch (transportState) {
  185. case DtlsTransport::State::Connected:
  186. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  187. initSctpTransport();
  188. else
  189. changeState(State::Connected);
  190. mProcessor->enqueue(&PeerConnection::openTracks, this);
  191. break;
  192. case DtlsTransport::State::Failed:
  193. changeState(State::Failed);
  194. break;
  195. case DtlsTransport::State::Disconnected:
  196. changeState(State::Disconnected);
  197. break;
  198. default:
  199. // Ignore
  200. break;
  201. }
  202. };
  203. shared_ptr<DtlsTransport> transport;
  204. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  205. #if RTC_ENABLE_MEDIA
  206. PLOG_INFO << "This connection requires media support";
  207. // DTLS-SRTP
  208. transport = std::make_shared<DtlsSrtpTransport>(
  209. lower, certificate, config.mtu, verifierCallback,
  210. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  211. #else
  212. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  213. #endif
  214. }
  215. if (!transport) {
  216. // DTLS only
  217. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  218. verifierCallback, dtlsStateChangeCallback);
  219. }
  220. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  221. } catch (const std::exception &e) {
  222. PLOG_ERROR << e.what();
  223. changeState(State::Failed);
  224. throw std::runtime_error("DTLS transport initialization failed");
  225. }
  226. }
  227. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  228. try {
  229. if (auto transport = std::atomic_load(&mSctpTransport))
  230. return transport;
  231. PLOG_VERBOSE << "Starting SCTP transport";
  232. auto lower = std::atomic_load(&mDtlsTransport);
  233. if (!lower)
  234. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  235. auto local = localDescription();
  236. if (!local || !local->application())
  237. throw std::logic_error("Starting SCTP transport without local application description");
  238. auto remote = remoteDescription();
  239. if (!remote || !remote->application())
  240. throw std::logic_error(
  241. "Starting SCTP transport without remote application description");
  242. SctpTransport::Ports ports = {};
  243. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  244. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  245. // This is the last occasion to ensure the stream numbers are coherent with the role
  246. shiftDataChannels();
  247. auto transport = std::make_shared<SctpTransport>(
  248. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  249. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  250. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  251. auto shared_this = weak_this.lock();
  252. if (!shared_this)
  253. return;
  254. switch (transportState) {
  255. case SctpTransport::State::Connected:
  256. changeState(State::Connected);
  257. mProcessor->enqueue(&PeerConnection::openDataChannels, this);
  258. break;
  259. case SctpTransport::State::Failed:
  260. LOG_WARNING << "SCTP transport failed";
  261. changeState(State::Failed);
  262. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  263. break;
  264. case SctpTransport::State::Disconnected:
  265. changeState(State::Disconnected);
  266. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  267. break;
  268. default:
  269. // Ignore
  270. break;
  271. }
  272. });
  273. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  274. } catch (const std::exception &e) {
  275. PLOG_ERROR << e.what();
  276. changeState(State::Failed);
  277. throw std::runtime_error("SCTP transport initialization failed");
  278. }
  279. }
  280. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  281. return std::atomic_load(&mIceTransport);
  282. }
  283. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  284. return std::atomic_load(&mDtlsTransport);
  285. }
  286. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  287. return std::atomic_load(&mSctpTransport);
  288. }
  289. void PeerConnection::closeTransports() {
  290. PLOG_VERBOSE << "Closing transports";
  291. // Change state to sink state Closed
  292. if (!changeState(State::Closed))
  293. return; // already closed
  294. // Reset callbacks now that state is changed
  295. resetCallbacks();
  296. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  297. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  298. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  299. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  300. if (sctp) {
  301. sctp->onRecv(nullptr);
  302. sctp->onBufferedAmount(nullptr);
  303. }
  304. using array = std::array<shared_ptr<Transport>, 3>;
  305. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  306. for (const auto &t : transports)
  307. if (t)
  308. t->onStateChange(nullptr);
  309. // Initiate transport stop on the processor after closing the data channels
  310. mProcessor->enqueue([transports = std::move(transports)]() {
  311. ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
  312. for (const auto &t : transports)
  313. if (t)
  314. t->stop();
  315. for (auto &t : transports)
  316. t.reset();
  317. });
  318. });
  319. }
  320. void PeerConnection::endLocalCandidates() {
  321. std::lock_guard lock(mLocalDescriptionMutex);
  322. if (mLocalDescription)
  323. mLocalDescription->endCandidates();
  324. }
  325. void PeerConnection::rollbackLocalDescription() {
  326. PLOG_DEBUG << "Rolling back pending local description";
  327. std::unique_lock lock(mLocalDescriptionMutex);
  328. if (mCurrentLocalDescription) {
  329. std::vector<Candidate> existingCandidates;
  330. if (mLocalDescription)
  331. existingCandidates = mLocalDescription->extractCandidates();
  332. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  333. mLocalDescription->addCandidates(std::move(existingCandidates));
  334. mCurrentLocalDescription.reset();
  335. }
  336. }
  337. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  338. std::lock_guard lock(mRemoteDescriptionMutex);
  339. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  340. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  341. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  342. return true;
  343. }
  344. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  345. << expectedFingerprint.value_or("[none]") << "\"";
  346. return false;
  347. }
  348. void PeerConnection::forwardMessage(message_ptr message) {
  349. if (!message) {
  350. remoteCloseDataChannels();
  351. return;
  352. }
  353. uint16_t stream = uint16_t(message->stream);
  354. auto channel = findDataChannel(stream);
  355. if (!channel) {
  356. auto iceTransport = getIceTransport();
  357. auto sctpTransport = getSctpTransport();
  358. if (!iceTransport || !sctpTransport)
  359. return;
  360. // See https://www.rfc-editor.org/rfc/rfc8832.html
  361. const byte dataChannelOpenMessage{0x03};
  362. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  363. if (message->type == Message::Control) {
  364. if (message->size() == 0 || *message->data() != dataChannelOpenMessage)
  365. return; // ignore
  366. if (stream % 2 != remoteParity) {
  367. // The odd/even rule is violated, close the DataChannel
  368. sctpTransport->closeStream(message->stream);
  369. return;
  370. }
  371. channel =
  372. std::make_shared<NegotiatedDataChannel>(weak_from_this(), sctpTransport, stream);
  373. channel->openCallback = weak_bind(&PeerConnection::triggerDataChannel, this,
  374. weak_ptr<DataChannel>{channel});
  375. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  376. mDataChannels.emplace(stream, channel);
  377. } else {
  378. // Invalid, close the DataChannel
  379. sctpTransport->closeStream(message->stream);
  380. return;
  381. }
  382. }
  383. channel->incoming(message);
  384. }
  385. void PeerConnection::forwardMedia(message_ptr message) {
  386. if (!message)
  387. return;
  388. // Browsers like to compound their packets with a random SSRC.
  389. // we have to do this monstrosity to distribute the report blocks
  390. if (message->type == Message::Control) {
  391. std::set<uint32_t> ssrcs;
  392. size_t offset = 0;
  393. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  394. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  395. if (header->lengthInBytes() > message->size() - offset) {
  396. COUNTER_MEDIA_TRUNCATED++;
  397. break;
  398. }
  399. offset += header->lengthInBytes();
  400. if (header->payloadType() == 205 || header->payloadType() == 206) {
  401. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  402. ssrcs.insert(rtcpfb->packetSenderSSRC());
  403. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  404. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  405. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  406. ssrcs.insert(rtcpsr->senderSSRC());
  407. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  408. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  409. } else if (header->payloadType() == 202) {
  410. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  411. if (!sdes->isValid()) {
  412. PLOG_WARNING << "RTCP SDES packet is invalid";
  413. continue;
  414. }
  415. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  416. auto chunk = sdes->getChunk(i);
  417. ssrcs.insert(chunk->ssrc());
  418. }
  419. } else {
  420. // PT=207 == Extended Report
  421. if (header->payloadType() != 207) {
  422. COUNTER_UNKNOWN_PACKET_TYPE++;
  423. }
  424. }
  425. }
  426. if (!ssrcs.empty()) {
  427. for (uint32_t ssrc : ssrcs) {
  428. if (auto mid = getMidFromSsrc(ssrc)) {
  429. std::shared_lock lock(mTracksMutex); // read-only
  430. if (auto it = mTracks.find(*mid); it != mTracks.end())
  431. if (auto track = it->second.lock())
  432. track->incoming(message);
  433. }
  434. }
  435. return;
  436. }
  437. }
  438. uint32_t ssrc = uint32_t(message->stream);
  439. if (auto mid = getMidFromSsrc(ssrc)) {
  440. std::shared_lock lock(mTracksMutex); // read-only
  441. if (auto it = mTracks.find(*mid); it != mTracks.end())
  442. if (auto track = it->second.lock())
  443. track->incoming(message);
  444. } else {
  445. /*
  446. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  447. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  448. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  449. * this ideal? No! Do I know how to fix it? No!
  450. */
  451. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  452. return;
  453. }
  454. }
  455. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  456. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  457. return it->second;
  458. {
  459. std::lock_guard lock(mRemoteDescriptionMutex);
  460. if (!mRemoteDescription)
  461. return nullopt;
  462. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  463. if (auto found =
  464. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  465. return std::nullopt;
  466. },
  467. [&](Description::Media *media) -> optional<string> {
  468. return media->hasSSRC(ssrc)
  469. ? std::make_optional(media->mid())
  470. : nullopt;
  471. }},
  472. mRemoteDescription->media(i))) {
  473. mMidFromSsrc.emplace(ssrc, *found);
  474. return *found;
  475. }
  476. }
  477. }
  478. {
  479. std::lock_guard lock(mLocalDescriptionMutex);
  480. if (!mLocalDescription)
  481. return nullopt;
  482. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  483. if (auto found =
  484. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  485. return std::nullopt;
  486. },
  487. [&](Description::Media *media) -> optional<string> {
  488. return media->hasSSRC(ssrc)
  489. ? std::make_optional(media->mid())
  490. : nullopt;
  491. }},
  492. mLocalDescription->media(i))) {
  493. mMidFromSsrc.emplace(ssrc, *found);
  494. return *found;
  495. }
  496. }
  497. }
  498. return nullopt;
  499. }
  500. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  501. if (auto channel = findDataChannel(stream))
  502. channel->triggerBufferedAmount(amount);
  503. }
  504. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  505. cleanupDataChannels();
  506. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  507. uint16_t stream;
  508. if (init.id) {
  509. stream = *init.id;
  510. if (stream == 65535)
  511. throw std::invalid_argument("Invalid DataChannel id");
  512. } else {
  513. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  514. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  515. // See https://www.rfc-editor.org/rfc/rfc5763.html#section-5
  516. // Therefore, we assume passive role if we are the offerer.
  517. auto iceTransport = getIceTransport();
  518. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  519. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  520. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  521. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  522. // the DTLS server, it MUST choose an odd one.
  523. // See https://www.rfc-editor.org/rfc/rfc8832.html#section-6
  524. stream = (role == Description::Role::Active) ? 0 : 1;
  525. while (mDataChannels.find(stream) != mDataChannels.end()) {
  526. if (stream >= 65535 - 2)
  527. throw std::runtime_error("Too many DataChannels");
  528. stream += 2;
  529. }
  530. }
  531. // If the DataChannel is user-negotiated, do not negotiate it here
  532. auto channel =
  533. init.negotiated
  534. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  535. std::move(init.protocol), std::move(init.reliability))
  536. : std::make_shared<NegotiatedDataChannel>(weak_from_this(), stream, std::move(label),
  537. std::move(init.protocol),
  538. std::move(init.reliability));
  539. mDataChannels.emplace(std::make_pair(stream, channel));
  540. return channel;
  541. }
  542. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  543. std::shared_lock lock(mDataChannelsMutex); // read-only
  544. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  545. if (auto channel = it->second.lock())
  546. return channel;
  547. return nullptr;
  548. }
  549. void PeerConnection::shiftDataChannels() {
  550. auto iceTransport = std::atomic_load(&mIceTransport);
  551. auto sctpTransport = std::atomic_load(&mSctpTransport);
  552. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  553. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  554. decltype(mDataChannels) newDataChannels;
  555. auto it = mDataChannels.begin();
  556. while (it != mDataChannels.end()) {
  557. auto channel = it->second.lock();
  558. channel->shiftStream();
  559. newDataChannels.emplace(channel->stream(), channel);
  560. ++it;
  561. }
  562. std::swap(mDataChannels, newDataChannels);
  563. }
  564. }
  565. void PeerConnection::iterateDataChannels(
  566. std::function<void(shared_ptr<DataChannel> channel)> func) {
  567. std::vector<shared_ptr<DataChannel>> locked;
  568. {
  569. std::shared_lock lock(mDataChannelsMutex); // read-only
  570. locked.reserve(mDataChannels.size());
  571. auto it = mDataChannels.begin();
  572. while (it != mDataChannels.end()) {
  573. auto channel = it->second.lock();
  574. if (channel && !channel->isClosed())
  575. locked.push_back(std::move(channel));
  576. ++it;
  577. }
  578. }
  579. for (auto &channel : locked)
  580. func(std::move(channel));
  581. }
  582. void PeerConnection::cleanupDataChannels() {
  583. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  584. auto it = mDataChannels.begin();
  585. while (it != mDataChannels.end()) {
  586. if (!it->second.lock()) {
  587. it = mDataChannels.erase(it);
  588. continue;
  589. }
  590. ++it;
  591. }
  592. }
  593. void PeerConnection::openDataChannels() {
  594. if (auto transport = std::atomic_load(&mSctpTransport))
  595. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  596. }
  597. void PeerConnection::closeDataChannels() {
  598. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  599. }
  600. void PeerConnection::remoteCloseDataChannels() {
  601. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  602. }
  603. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  604. shared_ptr<Track> track;
  605. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  606. if (track = it->second.lock(); track)
  607. track->setDescription(std::move(description));
  608. if (!track) {
  609. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  610. mTracks.emplace(std::make_pair(track->mid(), track));
  611. mTrackLines.emplace_back(track);
  612. }
  613. return track;
  614. }
  615. void PeerConnection::incomingTrack(Description::Media description) {
  616. std::unique_lock lock(mTracksMutex); // we are going to emplace
  617. #if !RTC_ENABLE_MEDIA
  618. if (mTracks.empty()) {
  619. PLOG_WARNING << "Tracks will be inative (not compiled with media support)";
  620. }
  621. #endif
  622. if (mTracks.find(description.mid()) == mTracks.end()) {
  623. auto track = std::make_shared<Track>(weak_from_this(), std::move(description));
  624. mTracks.emplace(std::make_pair(track->mid(), track));
  625. mTrackLines.emplace_back(track);
  626. triggerTrack(track);
  627. }
  628. }
  629. void PeerConnection::openTracks() {
  630. #if RTC_ENABLE_MEDIA
  631. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  632. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  633. std::shared_lock lock(mTracksMutex); // read-only
  634. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  635. if (auto track = it->second.lock())
  636. if (!track->isOpen())
  637. track->open(srtpTransport);
  638. }
  639. #endif
  640. }
  641. void PeerConnection::validateRemoteDescription(const Description &description) {
  642. if (!description.iceUfrag())
  643. throw std::invalid_argument("Remote description has no ICE user fragment");
  644. if (!description.icePwd())
  645. throw std::invalid_argument("Remote description has no ICE password");
  646. if (!description.fingerprint())
  647. throw std::invalid_argument("Remote description has no valid fingerprint");
  648. if (description.mediaCount() == 0)
  649. throw std::invalid_argument("Remote description has no media line");
  650. int activeMediaCount = 0;
  651. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  652. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  653. [&](const Description::Media *media) {
  654. if (media->direction() != Description::Direction::Inactive)
  655. ++activeMediaCount;
  656. }},
  657. description.media(i));
  658. if (activeMediaCount == 0)
  659. throw std::invalid_argument("Remote description has no active media");
  660. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  661. if (*description.iceUfrag() == *local->iceUfrag() &&
  662. *description.icePwd() == *local->icePwd())
  663. throw std::logic_error("Got the local description as remote description");
  664. PLOG_VERBOSE << "Remote description looks valid";
  665. }
  666. void PeerConnection::processLocalDescription(Description description) {
  667. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  668. const size_t localMaxMessageSize =
  669. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  670. // Clean up the application entry the ICE transport might have added already (libnice)
  671. description.clearMedia();
  672. if (auto remote = remoteDescription()) {
  673. // Reciprocate remote description
  674. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  675. std::visit( // reciprocate each media
  676. rtc::overloaded{
  677. [&](Description::Application *remoteApp) {
  678. std::shared_lock lock(mDataChannelsMutex);
  679. if (!mDataChannels.empty()) {
  680. // Prefer local description
  681. Description::Application app(remoteApp->mid());
  682. app.setSctpPort(localSctpPort);
  683. app.setMaxMessageSize(localMaxMessageSize);
  684. PLOG_DEBUG << "Adding application to local description, mid=\""
  685. << app.mid() << "\"";
  686. description.addMedia(std::move(app));
  687. return;
  688. }
  689. auto reciprocated = remoteApp->reciprocate();
  690. reciprocated.hintSctpPort(localSctpPort);
  691. reciprocated.setMaxMessageSize(localMaxMessageSize);
  692. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  693. << reciprocated.mid() << "\"";
  694. description.addMedia(std::move(reciprocated));
  695. },
  696. [&](Description::Media *remoteMedia) {
  697. std::shared_lock lock(mTracksMutex);
  698. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  699. // Prefer local description
  700. if (auto track = it->second.lock()) {
  701. auto media = track->description();
  702. #if !RTC_ENABLE_MEDIA
  703. // No media support, mark as inactive
  704. media.setDirection(Description::Direction::Inactive);
  705. #endif
  706. PLOG_DEBUG
  707. << "Adding media to local description, mid=\"" << media.mid()
  708. << "\", active=" << std::boolalpha
  709. << (media.direction() != Description::Direction::Inactive);
  710. description.addMedia(std::move(media));
  711. } else {
  712. auto reciprocated = remoteMedia->reciprocate();
  713. reciprocated.setDirection(Description::Direction::Inactive);
  714. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  715. << reciprocated.mid() << "\"";
  716. description.addMedia(std::move(reciprocated));
  717. }
  718. return;
  719. }
  720. lock.unlock(); // we are going to call incomingTrack()
  721. auto reciprocated = remoteMedia->reciprocate();
  722. #if !RTC_ENABLE_MEDIA
  723. // No media support, mark as inactive
  724. reciprocated.setDirection(Description::Direction::Inactive);
  725. #endif
  726. incomingTrack(reciprocated);
  727. PLOG_DEBUG
  728. << "Reciprocating media in local description, mid=\""
  729. << reciprocated.mid() << "\", active=" << std::boolalpha
  730. << (reciprocated.direction() != Description::Direction::Inactive);
  731. description.addMedia(std::move(reciprocated));
  732. },
  733. },
  734. remote->media(i));
  735. }
  736. if (description.type() == Description::Type::Offer) {
  737. // This is an offer, add locally created data channels and tracks
  738. // Add application for data channels
  739. if (!description.hasApplication()) {
  740. std::shared_lock lock(mDataChannelsMutex);
  741. if (!mDataChannels.empty()) {
  742. unsigned int m = 0;
  743. while (description.hasMid(std::to_string(m)))
  744. ++m;
  745. Description::Application app(std::to_string(m));
  746. app.setSctpPort(localSctpPort);
  747. app.setMaxMessageSize(localMaxMessageSize);
  748. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  749. << "\"";
  750. description.addMedia(std::move(app));
  751. }
  752. }
  753. // Add media for local tracks
  754. std::shared_lock lock(mTracksMutex);
  755. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  756. if (auto track = it->lock()) {
  757. if (description.hasMid(track->mid()))
  758. continue;
  759. auto media = track->description();
  760. #if !RTC_ENABLE_MEDIA
  761. // No media support, mark as inactive
  762. media.setDirection(Description::Direction::Inactive);
  763. #endif
  764. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  765. << "\", active=" << std::boolalpha
  766. << (media.direction() != Description::Direction::Inactive);
  767. description.addMedia(std::move(media));
  768. }
  769. }
  770. // There might be no media at this point if the user created a Track, deleted it,
  771. // then called setLocalDescription().
  772. if (description.mediaCount() == 0)
  773. throw std::runtime_error("No DataChannel or Track to negotiate");
  774. }
  775. // Set local fingerprint (wait for certificate if necessary)
  776. description.setFingerprint(mCertificate.get()->fingerprint());
  777. PLOG_VERBOSE << "Issuing local description: " << description;
  778. if (description.mediaCount() == 0)
  779. throw std::logic_error("Local description has no media line");
  780. {
  781. // Set as local description
  782. std::lock_guard lock(mLocalDescriptionMutex);
  783. std::vector<Candidate> existingCandidates;
  784. if (mLocalDescription) {
  785. existingCandidates = mLocalDescription->extractCandidates();
  786. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  787. }
  788. mLocalDescription.emplace(description);
  789. mLocalDescription->addCandidates(std::move(existingCandidates));
  790. }
  791. mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
  792. // Reciprocated tracks might need to be open
  793. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  794. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  795. mProcessor->enqueue(&PeerConnection::openTracks, this);
  796. }
  797. void PeerConnection::processLocalCandidate(Candidate candidate) {
  798. std::lock_guard lock(mLocalDescriptionMutex);
  799. if (!mLocalDescription)
  800. throw std::logic_error("Got a local candidate without local description");
  801. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  802. candidate.type() != Candidate::Type::Relayed) {
  803. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  804. return;
  805. }
  806. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  807. candidate.resolve(Candidate::ResolveMode::Simple);
  808. mLocalDescription->addCandidate(candidate);
  809. mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
  810. }
  811. void PeerConnection::processRemoteDescription(Description description) {
  812. {
  813. // Set as remote description
  814. std::lock_guard lock(mRemoteDescriptionMutex);
  815. std::vector<Candidate> existingCandidates;
  816. if (mRemoteDescription)
  817. existingCandidates = mRemoteDescription->extractCandidates();
  818. mRemoteDescription.emplace(description);
  819. mRemoteDescription->addCandidates(std::move(existingCandidates));
  820. }
  821. auto iceTransport = initIceTransport();
  822. if (!iceTransport)
  823. return; // closed
  824. iceTransport->setRemoteDescription(std::move(description));
  825. // Since we assumed passive role during DataChannel creation, we might need to shift the stream
  826. // numbers from odd to even.
  827. shiftDataChannels();
  828. if (description.hasApplication()) {
  829. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  830. auto sctpTransport = std::atomic_load(&mSctpTransport);
  831. if (!sctpTransport && dtlsTransport &&
  832. dtlsTransport->state() == Transport::State::Connected)
  833. initSctpTransport();
  834. }
  835. }
  836. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  837. auto iceTransport = std::atomic_load(&mIceTransport);
  838. {
  839. // Set as remote candidate
  840. std::lock_guard lock(mRemoteDescriptionMutex);
  841. if (!mRemoteDescription)
  842. throw std::logic_error("Got a remote candidate without remote description");
  843. if (!iceTransport)
  844. throw std::logic_error("Got a remote candidate without ICE transport");
  845. candidate.hintMid(mRemoteDescription->bundleMid());
  846. if (mRemoteDescription->hasCandidate(candidate))
  847. return; // already in description, ignore
  848. candidate.resolve(Candidate::ResolveMode::Simple);
  849. mRemoteDescription->addCandidate(candidate);
  850. }
  851. if (candidate.isResolved()) {
  852. iceTransport->addRemoteCandidate(std::move(candidate));
  853. } else {
  854. // We might need a lookup, do it asynchronously
  855. // We don't use the thread pool because we have no control on the timeout
  856. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  857. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  858. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  859. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  860. if (auto iceTransport = weakIceTransport.lock())
  861. iceTransport->addRemoteCandidate(std::move(candidate));
  862. });
  863. t.detach();
  864. }
  865. }
  866. }
  867. string PeerConnection::localBundleMid() const {
  868. std::lock_guard lock(mLocalDescriptionMutex);
  869. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  870. }
  871. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  872. auto dataChannel = weakDataChannel.lock();
  873. if (dataChannel) {
  874. dataChannel->resetOpenCallback(); // might be set internally
  875. mPendingDataChannels.push(std::move(dataChannel));
  876. }
  877. triggerPendingDataChannels();
  878. }
  879. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  880. auto track = weakTrack.lock();
  881. if (track) {
  882. track->resetOpenCallback(); // might be set internally
  883. mPendingTracks.push(std::move(track));
  884. }
  885. triggerPendingTracks();
  886. }
  887. void PeerConnection::triggerPendingDataChannels() {
  888. while (dataChannelCallback) {
  889. auto next = mPendingDataChannels.tryPop();
  890. if (!next)
  891. break;
  892. auto impl = std::move(*next);
  893. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  894. impl->triggerOpen();
  895. }
  896. }
  897. void PeerConnection::triggerPendingTracks() {
  898. while (trackCallback) {
  899. auto next = mPendingTracks.tryPop();
  900. if (!next)
  901. break;
  902. auto impl = std::move(*next);
  903. trackCallback(std::make_shared<rtc::Track>(impl));
  904. impl->triggerOpen();
  905. }
  906. }
  907. void PeerConnection::flushPendingDataChannels() {
  908. mProcessor->enqueue(&PeerConnection::triggerPendingDataChannels, this);
  909. }
  910. void PeerConnection::flushPendingTracks() {
  911. mProcessor->enqueue(&PeerConnection::triggerPendingTracks, this);
  912. }
  913. bool PeerConnection::changeState(State newState) {
  914. State current;
  915. do {
  916. current = state.load();
  917. if (current == State::Closed)
  918. return false;
  919. if (current == newState)
  920. return false;
  921. } while (!state.compare_exchange_weak(current, newState));
  922. std::ostringstream s;
  923. s << newState;
  924. PLOG_INFO << "Changed state to " << s.str();
  925. if (newState == State::Closed)
  926. // This is the last state change, so we may steal the callback
  927. mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
  928. else
  929. mProcessor->enqueue(stateChangeCallback.wrap(), newState);
  930. return true;
  931. }
  932. bool PeerConnection::changeGatheringState(GatheringState newState) {
  933. if (gatheringState.exchange(newState) == newState)
  934. return false;
  935. std::ostringstream s;
  936. s << newState;
  937. PLOG_INFO << "Changed gathering state to " << s.str();
  938. mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
  939. return true;
  940. }
  941. bool PeerConnection::changeSignalingState(SignalingState newState) {
  942. if (signalingState.exchange(newState) == newState)
  943. return false;
  944. std::ostringstream s;
  945. s << state;
  946. PLOG_INFO << "Changed signaling state to " << s.str();
  947. mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
  948. return true;
  949. }
  950. void PeerConnection::resetCallbacks() {
  951. // Unregister all callbacks
  952. dataChannelCallback = nullptr;
  953. localDescriptionCallback = nullptr;
  954. localCandidateCallback = nullptr;
  955. stateChangeCallback = nullptr;
  956. gatheringStateChangeCallback = nullptr;
  957. }
  958. } // namespace rtc::impl