peerconnection.cpp 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "icetransport.hpp"
  24. #include "internals.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #include "threadpool.hpp"
  31. #if RTC_ENABLE_MEDIA
  32. #include "dtlssrtptransport.hpp"
  33. #endif
  34. #include <iomanip>
  35. #include <set>
  36. #include <thread>
  37. #include <array>
  38. using namespace std::placeholders;
  39. namespace rtc::impl {
  40. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  41. "Number of RTP packets truncated over past second");
  42. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  43. "Number of SRTP decryption errors over past second");
  44. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  45. "Number of SRTP encryption errors over past second");
  46. static LogCounter
  47. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  48. "Number of unknown RTCP packet types over past second");
  49. PeerConnection::PeerConnection(Configuration config_)
  50. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
  51. mProcessor(std::make_unique<Processor>()) {
  52. PLOG_VERBOSE << "Creating PeerConnection";
  53. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  54. throw std::invalid_argument("Invalid port range");
  55. if (config.mtu) {
  56. if (*config.mtu < 576) // Min MTU for IPv4
  57. throw std::invalid_argument("Invalid MTU value");
  58. if (*config.mtu > 1500) { // Standard Ethernet
  59. PLOG_WARNING << "MTU set to " << *config.mtu;
  60. } else {
  61. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  62. }
  63. }
  64. }
  65. PeerConnection::~PeerConnection() {
  66. PLOG_VERBOSE << "Destroying PeerConnection";
  67. mProcessor->join();
  68. }
  69. void PeerConnection::close() {
  70. PLOG_VERBOSE << "Closing PeerConnection";
  71. negotiationNeeded = false;
  72. // Close data channels asynchronously
  73. mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
  74. closeTransports();
  75. }
  76. optional<Description> PeerConnection::localDescription() const {
  77. std::lock_guard lock(mLocalDescriptionMutex);
  78. return mLocalDescription;
  79. }
  80. optional<Description> PeerConnection::remoteDescription() const {
  81. std::lock_guard lock(mRemoteDescriptionMutex);
  82. return mRemoteDescription;
  83. }
  84. size_t PeerConnection::remoteMaxMessageSize() const {
  85. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  86. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  87. std::lock_guard lock(mRemoteDescriptionMutex);
  88. if (mRemoteDescription)
  89. if (auto *application = mRemoteDescription->application())
  90. if (auto max = application->maxMessageSize()) {
  91. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  92. // size value of zero, it indicates that the SCTP endpoint will handle messages
  93. // of any size, subject to memory capacity, etc.
  94. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  95. }
  96. return std::min(remoteMax, localMax);
  97. }
  98. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  99. template <typename T>
  100. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  101. transport->start();
  102. std::atomic_store(member, transport);
  103. if (pc->state.load() == PeerConnection::State::Closed) {
  104. std::atomic_store(member, decltype(transport)(nullptr));
  105. transport->stop();
  106. return nullptr;
  107. }
  108. return transport;
  109. }
  110. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  111. try {
  112. if (auto transport = std::atomic_load(&mIceTransport))
  113. return transport;
  114. PLOG_VERBOSE << "Starting ICE transport";
  115. auto transport = std::make_shared<IceTransport>(
  116. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  117. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  118. auto shared_this = weak_this.lock();
  119. if (!shared_this)
  120. return;
  121. switch (transportState) {
  122. case IceTransport::State::Connecting:
  123. changeState(State::Connecting);
  124. break;
  125. case IceTransport::State::Failed:
  126. changeState(State::Failed);
  127. break;
  128. case IceTransport::State::Connected:
  129. initDtlsTransport();
  130. break;
  131. case IceTransport::State::Disconnected:
  132. changeState(State::Disconnected);
  133. break;
  134. default:
  135. // Ignore
  136. break;
  137. }
  138. },
  139. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  140. auto shared_this = weak_this.lock();
  141. if (!shared_this)
  142. return;
  143. switch (gatheringState) {
  144. case IceTransport::GatheringState::InProgress:
  145. changeGatheringState(GatheringState::InProgress);
  146. break;
  147. case IceTransport::GatheringState::Complete:
  148. endLocalCandidates();
  149. changeGatheringState(GatheringState::Complete);
  150. break;
  151. default:
  152. // Ignore
  153. break;
  154. }
  155. });
  156. return emplaceTransport(this, &mIceTransport, std::move(transport));
  157. } catch (const std::exception &e) {
  158. PLOG_ERROR << e.what();
  159. changeState(State::Failed);
  160. throw std::runtime_error("ICE transport initialization failed");
  161. }
  162. }
  163. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  164. try {
  165. if (auto transport = std::atomic_load(&mDtlsTransport))
  166. return transport;
  167. PLOG_VERBOSE << "Starting DTLS transport";
  168. auto lower = std::atomic_load(&mIceTransport);
  169. if (!lower)
  170. throw std::logic_error("No underlying ICE transport for DTLS transport");
  171. auto certificate = mCertificate.get();
  172. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  173. auto dtlsStateChangeCallback =
  174. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  175. auto shared_this = weak_this.lock();
  176. if (!shared_this)
  177. return;
  178. switch (transportState) {
  179. case DtlsTransport::State::Connected:
  180. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  181. initSctpTransport();
  182. else
  183. changeState(State::Connected);
  184. mProcessor->enqueue(&PeerConnection::openTracks, this);
  185. break;
  186. case DtlsTransport::State::Failed:
  187. changeState(State::Failed);
  188. break;
  189. case DtlsTransport::State::Disconnected:
  190. changeState(State::Disconnected);
  191. break;
  192. default:
  193. // Ignore
  194. break;
  195. }
  196. };
  197. shared_ptr<DtlsTransport> transport;
  198. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  199. #if RTC_ENABLE_MEDIA
  200. PLOG_INFO << "This connection requires media support";
  201. // DTLS-SRTP
  202. transport = std::make_shared<DtlsSrtpTransport>(
  203. lower, certificate, config.mtu, verifierCallback,
  204. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  205. #else
  206. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  207. #endif
  208. }
  209. if (!transport) {
  210. // DTLS only
  211. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  212. verifierCallback, dtlsStateChangeCallback);
  213. }
  214. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  215. } catch (const std::exception &e) {
  216. PLOG_ERROR << e.what();
  217. changeState(State::Failed);
  218. throw std::runtime_error("DTLS transport initialization failed");
  219. }
  220. }
  221. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  222. try {
  223. if (auto transport = std::atomic_load(&mSctpTransport))
  224. return transport;
  225. PLOG_VERBOSE << "Starting SCTP transport";
  226. auto lower = std::atomic_load(&mDtlsTransport);
  227. if (!lower)
  228. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  229. auto remote = remoteDescription();
  230. if (!remote || !remote->application())
  231. throw std::logic_error("Starting SCTP transport without application description");
  232. uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  233. // This is the last occasion to ensure the stream numbers are coherent with the role
  234. shiftDataChannels();
  235. auto transport = std::make_shared<SctpTransport>(
  236. lower, config, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
  237. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  238. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  239. auto shared_this = weak_this.lock();
  240. if (!shared_this)
  241. return;
  242. switch (transportState) {
  243. case SctpTransport::State::Connected:
  244. changeState(State::Connected);
  245. mProcessor->enqueue(&PeerConnection::openDataChannels, this);
  246. break;
  247. case SctpTransport::State::Failed:
  248. LOG_WARNING << "SCTP transport failed";
  249. changeState(State::Failed);
  250. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  251. break;
  252. case SctpTransport::State::Disconnected:
  253. changeState(State::Disconnected);
  254. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  255. break;
  256. default:
  257. // Ignore
  258. break;
  259. }
  260. });
  261. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  262. } catch (const std::exception &e) {
  263. PLOG_ERROR << e.what();
  264. changeState(State::Failed);
  265. throw std::runtime_error("SCTP transport initialization failed");
  266. }
  267. }
  268. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  269. return std::atomic_load(&mIceTransport);
  270. }
  271. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  272. return std::atomic_load(&mDtlsTransport);
  273. }
  274. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  275. return std::atomic_load(&mSctpTransport);
  276. }
  277. void PeerConnection::closeTransports() {
  278. PLOG_VERBOSE << "Closing transports";
  279. // Change state to sink state Closed
  280. if (!changeState(State::Closed))
  281. return; // already closed
  282. // Reset callbacks now that state is changed
  283. resetCallbacks();
  284. // Initiate transport stop on the processor after closing the data channels
  285. mProcessor->enqueue([this]() {
  286. // Pass the pointers to a thread
  287. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  288. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  289. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  290. if (sctp) {
  291. sctp->onRecv(nullptr);
  292. sctp->onBufferedAmount(nullptr);
  293. }
  294. using array = std::array<shared_ptr<Transport>, 3>;
  295. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  296. for (const auto &t : transports)
  297. if (t)
  298. t->onStateChange(nullptr);
  299. ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
  300. for (const auto &t : transports)
  301. if (t)
  302. t->stop();
  303. for (auto &t : transports)
  304. t.reset();
  305. });
  306. });
  307. }
  308. void PeerConnection::endLocalCandidates() {
  309. std::lock_guard lock(mLocalDescriptionMutex);
  310. if (mLocalDescription)
  311. mLocalDescription->endCandidates();
  312. }
  313. void PeerConnection::rollbackLocalDescription() {
  314. PLOG_DEBUG << "Rolling back pending local description";
  315. std::unique_lock lock(mLocalDescriptionMutex);
  316. if (mCurrentLocalDescription) {
  317. std::vector<Candidate> existingCandidates;
  318. if (mLocalDescription)
  319. existingCandidates = mLocalDescription->extractCandidates();
  320. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  321. mLocalDescription->addCandidates(std::move(existingCandidates));
  322. mCurrentLocalDescription.reset();
  323. }
  324. }
  325. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  326. std::lock_guard lock(mRemoteDescriptionMutex);
  327. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  328. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  329. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  330. return true;
  331. }
  332. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  333. << expectedFingerprint.value_or("[none]") << "\"";
  334. return false;
  335. }
  336. void PeerConnection::forwardMessage(message_ptr message) {
  337. if (!message) {
  338. remoteCloseDataChannels();
  339. return;
  340. }
  341. uint16_t stream = uint16_t(message->stream);
  342. auto channel = findDataChannel(stream);
  343. if (!channel) {
  344. auto iceTransport = getIceTransport();
  345. auto sctpTransport = getSctpTransport();
  346. if (!iceTransport || !sctpTransport)
  347. return;
  348. const byte dataChannelOpenMessage{0x03};
  349. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  350. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  351. stream % 2 == remoteParity) {
  352. channel =
  353. std::make_shared<NegotiatedDataChannel>(weak_from_this(), sctpTransport, stream);
  354. channel->openCallback = weak_bind(&PeerConnection::triggerDataChannel, this,
  355. weak_ptr<DataChannel>{channel});
  356. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  357. mDataChannels.emplace(stream, channel);
  358. } else {
  359. // Invalid, close the DataChannel
  360. sctpTransport->closeStream(message->stream);
  361. return;
  362. }
  363. }
  364. channel->incoming(message);
  365. }
  366. void PeerConnection::forwardMedia(message_ptr message) {
  367. if (!message)
  368. return;
  369. // Browsers like to compound their packets with a random SSRC.
  370. // we have to do this monstrosity to distribute the report blocks
  371. if (message->type == Message::Control) {
  372. std::set<uint32_t> ssrcs;
  373. size_t offset = 0;
  374. while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
  375. auto header = reinterpret_cast<rtc::RTCP_HEADER *>(message->data() + offset);
  376. if (header->lengthInBytes() > message->size() - offset) {
  377. COUNTER_MEDIA_TRUNCATED++;
  378. break;
  379. }
  380. offset += header->lengthInBytes();
  381. if (header->payloadType() == 205 || header->payloadType() == 206) {
  382. auto rtcpfb = reinterpret_cast<RTCP_FB_HEADER *>(header);
  383. ssrcs.insert(rtcpfb->packetSenderSSRC());
  384. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  385. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  386. auto rtcpsr = reinterpret_cast<RTCP_SR *>(header);
  387. ssrcs.insert(rtcpsr->senderSSRC());
  388. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  389. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  390. } else if (header->payloadType() == 202) {
  391. auto sdes = reinterpret_cast<RTCP_SDES *>(header);
  392. if (!sdes->isValid()) {
  393. PLOG_WARNING << "RTCP SDES packet is invalid";
  394. continue;
  395. }
  396. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  397. auto chunk = sdes->getChunk(i);
  398. ssrcs.insert(chunk->ssrc());
  399. }
  400. } else {
  401. // PT=207 == Extended Report
  402. if (header->payloadType() != 207) {
  403. COUNTER_UNKNOWN_PACKET_TYPE++;
  404. }
  405. }
  406. }
  407. if (!ssrcs.empty()) {
  408. for (uint32_t ssrc : ssrcs) {
  409. if (auto mid = getMidFromSsrc(ssrc)) {
  410. std::shared_lock lock(mTracksMutex); // read-only
  411. if (auto it = mTracks.find(*mid); it != mTracks.end())
  412. if (auto track = it->second.lock())
  413. track->incoming(message);
  414. }
  415. }
  416. return;
  417. }
  418. }
  419. uint32_t ssrc = uint32_t(message->stream);
  420. if (auto mid = getMidFromSsrc(ssrc)) {
  421. std::shared_lock lock(mTracksMutex); // read-only
  422. if (auto it = mTracks.find(*mid); it != mTracks.end())
  423. if (auto track = it->second.lock())
  424. track->incoming(message);
  425. } else {
  426. /*
  427. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  428. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  429. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  430. * this ideal? No! Do I know how to fix it? No!
  431. */
  432. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  433. return;
  434. }
  435. }
  436. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  437. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  438. return it->second;
  439. {
  440. std::lock_guard lock(mRemoteDescriptionMutex);
  441. if (!mRemoteDescription)
  442. return nullopt;
  443. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  444. if (auto found =
  445. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  446. return std::nullopt;
  447. },
  448. [&](Description::Media *media) -> optional<string> {
  449. return media->hasSSRC(ssrc)
  450. ? std::make_optional(media->mid())
  451. : nullopt;
  452. }},
  453. mRemoteDescription->media(i))) {
  454. mMidFromSsrc.emplace(ssrc, *found);
  455. return *found;
  456. }
  457. }
  458. }
  459. {
  460. std::lock_guard lock(mLocalDescriptionMutex);
  461. if (!mLocalDescription)
  462. return nullopt;
  463. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  464. if (auto found =
  465. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  466. return std::nullopt;
  467. },
  468. [&](Description::Media *media) -> optional<string> {
  469. return media->hasSSRC(ssrc)
  470. ? std::make_optional(media->mid())
  471. : nullopt;
  472. }},
  473. mLocalDescription->media(i))) {
  474. mMidFromSsrc.emplace(ssrc, *found);
  475. return *found;
  476. }
  477. }
  478. }
  479. return nullopt;
  480. }
  481. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  482. if (auto channel = findDataChannel(stream))
  483. channel->triggerBufferedAmount(amount);
  484. }
  485. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  486. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  487. uint16_t stream;
  488. if (init.id) {
  489. stream = *init.id;
  490. if (stream == 65535)
  491. throw std::invalid_argument("Invalid DataChannel id");
  492. } else {
  493. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  494. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  495. // See https://tools.ietf.org/html/rfc5763#section-5
  496. // Therefore, we assume passive role if we are the offerer.
  497. auto iceTransport = getIceTransport();
  498. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  499. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  500. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  501. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  502. // the DTLS server, it MUST choose an odd one.
  503. // See https://tools.ietf.org/html/rfc8832#section-6
  504. stream = (role == Description::Role::Active) ? 0 : 1;
  505. while (mDataChannels.find(stream) != mDataChannels.end()) {
  506. if (stream >= 65535 - 2)
  507. throw std::runtime_error("Too many DataChannels");
  508. stream += 2;
  509. }
  510. }
  511. // If the DataChannel is user-negotiated, do not negociate it here
  512. auto channel =
  513. init.negotiated
  514. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  515. std::move(init.protocol), std::move(init.reliability))
  516. : std::make_shared<NegotiatedDataChannel>(weak_from_this(), stream, std::move(label),
  517. std::move(init.protocol),
  518. std::move(init.reliability));
  519. mDataChannels.emplace(std::make_pair(stream, channel));
  520. return channel;
  521. }
  522. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  523. std::shared_lock lock(mDataChannelsMutex); // read-only
  524. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  525. if (auto channel = it->second.lock())
  526. return channel;
  527. return nullptr;
  528. }
  529. void PeerConnection::shiftDataChannels() {
  530. auto iceTransport = std::atomic_load(&mIceTransport);
  531. auto sctpTransport = std::atomic_load(&mSctpTransport);
  532. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  533. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  534. decltype(mDataChannels) newDataChannels;
  535. auto it = mDataChannels.begin();
  536. while (it != mDataChannels.end()) {
  537. auto channel = it->second.lock();
  538. channel->shiftStream();
  539. newDataChannels.emplace(channel->stream(), channel);
  540. ++it;
  541. }
  542. std::swap(mDataChannels, newDataChannels);
  543. }
  544. }
  545. void PeerConnection::iterateDataChannels(
  546. std::function<void(shared_ptr<DataChannel> channel)> func) {
  547. // Iterate
  548. {
  549. std::shared_lock lock(mDataChannelsMutex); // read-only
  550. auto it = mDataChannels.begin();
  551. while (it != mDataChannels.end()) {
  552. auto channel = it->second.lock();
  553. if (channel && !channel->isClosed())
  554. func(channel);
  555. ++it;
  556. }
  557. }
  558. // Cleanup
  559. {
  560. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  561. auto it = mDataChannels.begin();
  562. while (it != mDataChannels.end()) {
  563. if (!it->second.lock()) {
  564. it = mDataChannels.erase(it);
  565. continue;
  566. }
  567. ++it;
  568. }
  569. }
  570. }
  571. void PeerConnection::openDataChannels() {
  572. if (auto transport = std::atomic_load(&mSctpTransport))
  573. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  574. }
  575. void PeerConnection::closeDataChannels() {
  576. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  577. }
  578. void PeerConnection::remoteCloseDataChannels() {
  579. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  580. }
  581. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  582. shared_ptr<Track> track;
  583. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  584. if (track = it->second.lock(); track)
  585. track->setDescription(std::move(description));
  586. if (!track) {
  587. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  588. mTracks.emplace(std::make_pair(track->mid(), track));
  589. mTrackLines.emplace_back(track);
  590. }
  591. return track;
  592. }
  593. void PeerConnection::incomingTrack(Description::Media description) {
  594. std::unique_lock lock(mTracksMutex); // we are going to emplace
  595. #if !RTC_ENABLE_MEDIA
  596. if (mTracks.empty()) {
  597. PLOG_WARNING << "Tracks will be inative (not compiled with media support)";
  598. }
  599. #endif
  600. if (mTracks.find(description.mid()) == mTracks.end()) {
  601. auto track = std::make_shared<Track>(weak_from_this(), std::move(description));
  602. mTracks.emplace(std::make_pair(track->mid(), track));
  603. mTrackLines.emplace_back(track);
  604. triggerTrack(track);
  605. }
  606. }
  607. void PeerConnection::openTracks() {
  608. #if RTC_ENABLE_MEDIA
  609. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  610. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  611. std::shared_lock lock(mTracksMutex); // read-only
  612. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  613. if (auto track = it->second.lock())
  614. if (!track->isOpen())
  615. track->open(srtpTransport);
  616. }
  617. #endif
  618. }
  619. void PeerConnection::validateRemoteDescription(const Description &description) {
  620. if (!description.iceUfrag())
  621. throw std::invalid_argument("Remote description has no ICE user fragment");
  622. if (!description.icePwd())
  623. throw std::invalid_argument("Remote description has no ICE password");
  624. if (!description.fingerprint())
  625. throw std::invalid_argument("Remote description has no valid fingerprint");
  626. if (description.mediaCount() == 0)
  627. throw std::invalid_argument("Remote description has no media line");
  628. int activeMediaCount = 0;
  629. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  630. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  631. [&](const Description::Media *media) {
  632. if (media->direction() != Description::Direction::Inactive)
  633. ++activeMediaCount;
  634. }},
  635. description.media(i));
  636. if (activeMediaCount == 0)
  637. throw std::invalid_argument("Remote description has no active media");
  638. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  639. if (*description.iceUfrag() == *local->iceUfrag() &&
  640. *description.icePwd() == *local->icePwd())
  641. throw std::logic_error("Got the local description as remote description");
  642. PLOG_VERBOSE << "Remote description looks valid";
  643. }
  644. void PeerConnection::processLocalDescription(Description description) {
  645. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  646. const size_t localMaxMessageSize =
  647. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  648. // Clean up the application entry the ICE transport might have added already (libnice)
  649. description.clearMedia();
  650. if (auto remote = remoteDescription()) {
  651. // Reciprocate remote description
  652. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  653. std::visit( // reciprocate each media
  654. rtc::overloaded{
  655. [&](Description::Application *remoteApp) {
  656. std::shared_lock lock(mDataChannelsMutex);
  657. if (!mDataChannels.empty()) {
  658. // Prefer local description
  659. Description::Application app(remoteApp->mid());
  660. app.setSctpPort(localSctpPort);
  661. app.setMaxMessageSize(localMaxMessageSize);
  662. PLOG_DEBUG << "Adding application to local description, mid=\""
  663. << app.mid() << "\"";
  664. description.addMedia(std::move(app));
  665. return;
  666. }
  667. auto reciprocated = remoteApp->reciprocate();
  668. reciprocated.hintSctpPort(localSctpPort);
  669. reciprocated.setMaxMessageSize(localMaxMessageSize);
  670. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  671. << reciprocated.mid() << "\"";
  672. description.addMedia(std::move(reciprocated));
  673. },
  674. [&](Description::Media *remoteMedia) {
  675. std::shared_lock lock(mTracksMutex);
  676. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  677. // Prefer local description
  678. if (auto track = it->second.lock()) {
  679. auto media = track->description();
  680. #if !RTC_ENABLE_MEDIA
  681. // No media support, mark as inactive
  682. media.setDirection(Description::Direction::Inactive);
  683. #endif
  684. PLOG_DEBUG
  685. << "Adding media to local description, mid=\"" << media.mid()
  686. << "\", active=" << std::boolalpha
  687. << (media.direction() != Description::Direction::Inactive);
  688. description.addMedia(std::move(media));
  689. } else {
  690. auto reciprocated = remoteMedia->reciprocate();
  691. reciprocated.setDirection(Description::Direction::Inactive);
  692. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  693. << reciprocated.mid() << "\"";
  694. description.addMedia(std::move(reciprocated));
  695. }
  696. return;
  697. }
  698. lock.unlock(); // we are going to call incomingTrack()
  699. auto reciprocated = remoteMedia->reciprocate();
  700. #if !RTC_ENABLE_MEDIA
  701. // No media support, mark as inactive
  702. reciprocated.setDirection(Description::Direction::Inactive);
  703. #endif
  704. incomingTrack(reciprocated);
  705. PLOG_DEBUG
  706. << "Reciprocating media in local description, mid=\""
  707. << reciprocated.mid() << "\", active=" << std::boolalpha
  708. << (reciprocated.direction() != Description::Direction::Inactive);
  709. description.addMedia(std::move(reciprocated));
  710. },
  711. },
  712. remote->media(i));
  713. }
  714. if (description.type() == Description::Type::Offer) {
  715. // This is an offer, add locally created data channels and tracks
  716. // Add application for data channels
  717. if (!description.hasApplication()) {
  718. std::shared_lock lock(mDataChannelsMutex);
  719. if (!mDataChannels.empty()) {
  720. unsigned int m = 0;
  721. while (description.hasMid(std::to_string(m)))
  722. ++m;
  723. Description::Application app(std::to_string(m));
  724. app.setSctpPort(localSctpPort);
  725. app.setMaxMessageSize(localMaxMessageSize);
  726. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  727. << "\"";
  728. description.addMedia(std::move(app));
  729. }
  730. }
  731. // Add media for local tracks
  732. std::shared_lock lock(mTracksMutex);
  733. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  734. if (auto track = it->lock()) {
  735. if (description.hasMid(track->mid()))
  736. continue;
  737. auto media = track->description();
  738. #if !RTC_ENABLE_MEDIA
  739. // No media support, mark as inactive
  740. media.setDirection(Description::Direction::Inactive);
  741. #endif
  742. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  743. << "\", active=" << std::boolalpha
  744. << (media.direction() != Description::Direction::Inactive);
  745. description.addMedia(std::move(media));
  746. }
  747. }
  748. // There might be no media at this point if the user created a Track, deleted it,
  749. // then called setLocalDescription().
  750. if (description.mediaCount() == 0)
  751. throw std::runtime_error("No DataChannel or Track to negotiate");
  752. }
  753. // Set local fingerprint (wait for certificate if necessary)
  754. description.setFingerprint(mCertificate.get()->fingerprint());
  755. PLOG_VERBOSE << "Issuing local description: " << description;
  756. if (description.mediaCount() == 0)
  757. throw std::logic_error("Local description has no media line");
  758. {
  759. // Set as local description
  760. std::lock_guard lock(mLocalDescriptionMutex);
  761. std::vector<Candidate> existingCandidates;
  762. if (mLocalDescription) {
  763. existingCandidates = mLocalDescription->extractCandidates();
  764. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  765. }
  766. mLocalDescription.emplace(description);
  767. mLocalDescription->addCandidates(std::move(existingCandidates));
  768. }
  769. mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
  770. // Reciprocated tracks might need to be open
  771. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  772. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  773. mProcessor->enqueue(&PeerConnection::openTracks, this);
  774. }
  775. void PeerConnection::processLocalCandidate(Candidate candidate) {
  776. std::lock_guard lock(mLocalDescriptionMutex);
  777. if (!mLocalDescription)
  778. throw std::logic_error("Got a local candidate without local description");
  779. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  780. candidate.type() != Candidate::Type::Relayed) {
  781. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  782. return;
  783. }
  784. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  785. candidate.resolve(Candidate::ResolveMode::Simple);
  786. mLocalDescription->addCandidate(candidate);
  787. mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
  788. }
  789. void PeerConnection::processRemoteDescription(Description description) {
  790. {
  791. // Set as remote description
  792. std::lock_guard lock(mRemoteDescriptionMutex);
  793. std::vector<Candidate> existingCandidates;
  794. if (mRemoteDescription)
  795. existingCandidates = mRemoteDescription->extractCandidates();
  796. mRemoteDescription.emplace(description);
  797. mRemoteDescription->addCandidates(std::move(existingCandidates));
  798. }
  799. auto iceTransport = initIceTransport();
  800. if (!iceTransport)
  801. return; // closed
  802. iceTransport->setRemoteDescription(std::move(description));
  803. // Since we assumed passive role during DataChannel creation, we might need to shift the stream
  804. // numbers from odd to even.
  805. shiftDataChannels();
  806. if (description.hasApplication()) {
  807. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  808. auto sctpTransport = std::atomic_load(&mSctpTransport);
  809. if (!sctpTransport && dtlsTransport &&
  810. dtlsTransport->state() == Transport::State::Connected)
  811. initSctpTransport();
  812. }
  813. }
  814. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  815. auto iceTransport = std::atomic_load(&mIceTransport);
  816. {
  817. // Set as remote candidate
  818. std::lock_guard lock(mRemoteDescriptionMutex);
  819. if (!mRemoteDescription)
  820. throw std::logic_error("Got a remote candidate without remote description");
  821. if (!iceTransport)
  822. throw std::logic_error("Got a remote candidate without ICE transport");
  823. candidate.hintMid(mRemoteDescription->bundleMid());
  824. if (mRemoteDescription->hasCandidate(candidate))
  825. return; // already in description, ignore
  826. candidate.resolve(Candidate::ResolveMode::Simple);
  827. mRemoteDescription->addCandidate(candidate);
  828. }
  829. if (candidate.isResolved()) {
  830. iceTransport->addRemoteCandidate(std::move(candidate));
  831. } else {
  832. // We might need a lookup, do it asynchronously
  833. // We don't use the thread pool because we have no control on the timeout
  834. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  835. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  836. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  837. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  838. if (auto iceTransport = weakIceTransport.lock())
  839. iceTransport->addRemoteCandidate(std::move(candidate));
  840. });
  841. t.detach();
  842. }
  843. }
  844. }
  845. string PeerConnection::localBundleMid() const {
  846. std::lock_guard lock(mLocalDescriptionMutex);
  847. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  848. }
  849. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  850. auto dataChannel = weakDataChannel.lock();
  851. if (dataChannel) {
  852. dataChannel->resetOpenCallback(); // might be set internally
  853. mPendingDataChannels.push(std::move(dataChannel));
  854. }
  855. triggerPendingDataChannels();
  856. }
  857. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  858. auto track = weakTrack.lock();
  859. if (track) {
  860. track->resetOpenCallback(); // might be set internally
  861. mPendingTracks.push(std::move(track));
  862. }
  863. triggerPendingTracks();
  864. }
  865. void PeerConnection::triggerPendingDataChannels() {
  866. while (dataChannelCallback) {
  867. auto next = mPendingDataChannels.tryPop();
  868. if (!next)
  869. break;
  870. auto impl = std::move(*next);
  871. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  872. impl->triggerOpen();
  873. }
  874. }
  875. void PeerConnection::triggerPendingTracks() {
  876. while (trackCallback) {
  877. auto next = mPendingTracks.tryPop();
  878. if (!next)
  879. break;
  880. auto impl = std::move(*next);
  881. trackCallback(std::make_shared<rtc::Track>(impl));
  882. impl->triggerOpen();
  883. }
  884. }
  885. void PeerConnection::flushPendingDataChannels() {
  886. mProcessor->enqueue(&PeerConnection::triggerPendingDataChannels, this);
  887. }
  888. void PeerConnection::flushPendingTracks() {
  889. mProcessor->enqueue(&PeerConnection::triggerPendingTracks, this);
  890. }
  891. bool PeerConnection::changeState(State newState) {
  892. State current;
  893. do {
  894. current = state.load();
  895. if (current == State::Closed)
  896. return false;
  897. if (current == newState)
  898. return false;
  899. } while (!state.compare_exchange_weak(current, newState));
  900. std::ostringstream s;
  901. s << newState;
  902. PLOG_INFO << "Changed state to " << s.str();
  903. if (newState == State::Closed)
  904. // This is the last state change, so we may steal the callback
  905. mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
  906. else
  907. mProcessor->enqueue(stateChangeCallback.wrap(), newState);
  908. return true;
  909. }
  910. bool PeerConnection::changeGatheringState(GatheringState newState) {
  911. if (gatheringState.exchange(newState) == newState)
  912. return false;
  913. std::ostringstream s;
  914. s << newState;
  915. PLOG_INFO << "Changed gathering state to " << s.str();
  916. mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
  917. return true;
  918. }
  919. bool PeerConnection::changeSignalingState(SignalingState newState) {
  920. if (signalingState.exchange(newState) == newState)
  921. return false;
  922. std::ostringstream s;
  923. s << state;
  924. PLOG_INFO << "Changed signaling state to " << s.str();
  925. mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
  926. return true;
  927. }
  928. void PeerConnection::resetCallbacks() {
  929. // Unregister all callbacks
  930. dataChannelCallback = nullptr;
  931. localDescriptionCallback = nullptr;
  932. localCandidateCallback = nullptr;
  933. stateChangeCallback = nullptr;
  934. gatheringStateChangeCallback = nullptr;
  935. }
  936. } // namespace rtc::impl