2
0

peerconnection.cpp 37 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "icetransport.hpp"
  24. #include "internals.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #include "threadpool.hpp"
  31. #if RTC_ENABLE_MEDIA
  32. #include "dtlssrtptransport.hpp"
  33. #endif
  34. #include <iomanip>
  35. #include <set>
  36. #include <thread>
  37. #include <array>
  38. using namespace std::placeholders;
  39. namespace rtc::impl {
  40. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  41. "Number of RTP packets truncated over past second");
  42. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  43. "Number of SRTP decryption errors over past second");
  44. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  45. "Number of SRTP encryption errors over past second");
  46. static LogCounter
  47. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  48. "Number of unknown RTCP packet types over past second");
  49. PeerConnection::PeerConnection(Configuration config_)
  50. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
  51. mProcessor(std::make_unique<Processor>()) {
  52. PLOG_VERBOSE << "Creating PeerConnection";
  53. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  54. throw std::invalid_argument("Invalid port range");
  55. if (config.mtu) {
  56. if (*config.mtu < 576) // Min MTU for IPv4
  57. throw std::invalid_argument("Invalid MTU value");
  58. if (*config.mtu > 1500) { // Standard Ethernet
  59. PLOG_WARNING << "MTU set to " << *config.mtu;
  60. } else {
  61. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  62. }
  63. }
  64. }
  65. PeerConnection::~PeerConnection() {
  66. PLOG_VERBOSE << "Destroying PeerConnection";
  67. mProcessor->join();
  68. }
  69. void PeerConnection::close() {
  70. PLOG_VERBOSE << "Closing PeerConnection";
  71. negotiationNeeded = false;
  72. // Close data channels asynchronously
  73. mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
  74. closeTransports();
  75. }
  76. optional<Description> PeerConnection::localDescription() const {
  77. std::lock_guard lock(mLocalDescriptionMutex);
  78. return mLocalDescription;
  79. }
  80. optional<Description> PeerConnection::remoteDescription() const {
  81. std::lock_guard lock(mRemoteDescriptionMutex);
  82. return mRemoteDescription;
  83. }
  84. size_t PeerConnection::remoteMaxMessageSize() const {
  85. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  86. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  87. std::lock_guard lock(mRemoteDescriptionMutex);
  88. if (mRemoteDescription)
  89. if (auto *application = mRemoteDescription->application())
  90. if (auto max = application->maxMessageSize()) {
  91. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  92. // size value of zero, it indicates that the SCTP endpoint will handle messages
  93. // of any size, subject to memory capacity, etc.
  94. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  95. }
  96. return std::min(remoteMax, localMax);
  97. }
  98. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  99. template <typename T>
  100. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  101. transport->start();
  102. std::atomic_store(member, transport);
  103. if (pc->state.load() == PeerConnection::State::Closed) {
  104. std::atomic_store(member, decltype(transport)(nullptr));
  105. transport->stop();
  106. return nullptr;
  107. }
  108. return transport;
  109. }
  110. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  111. try {
  112. if (auto transport = std::atomic_load(&mIceTransport))
  113. return transport;
  114. PLOG_VERBOSE << "Starting ICE transport";
  115. auto transport = std::make_shared<IceTransport>(
  116. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  117. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  118. auto shared_this = weak_this.lock();
  119. if (!shared_this)
  120. return;
  121. switch (transportState) {
  122. case IceTransport::State::Connecting:
  123. changeState(State::Connecting);
  124. break;
  125. case IceTransport::State::Failed:
  126. changeState(State::Failed);
  127. break;
  128. case IceTransport::State::Connected:
  129. initDtlsTransport();
  130. break;
  131. case IceTransport::State::Disconnected:
  132. changeState(State::Disconnected);
  133. break;
  134. default:
  135. // Ignore
  136. break;
  137. }
  138. },
  139. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  140. auto shared_this = weak_this.lock();
  141. if (!shared_this)
  142. return;
  143. switch (gatheringState) {
  144. case IceTransport::GatheringState::InProgress:
  145. changeGatheringState(GatheringState::InProgress);
  146. break;
  147. case IceTransport::GatheringState::Complete:
  148. endLocalCandidates();
  149. changeGatheringState(GatheringState::Complete);
  150. break;
  151. default:
  152. // Ignore
  153. break;
  154. }
  155. });
  156. return emplaceTransport(this, &mIceTransport, std::move(transport));
  157. } catch (const std::exception &e) {
  158. PLOG_ERROR << e.what();
  159. changeState(State::Failed);
  160. throw std::runtime_error("ICE transport initialization failed");
  161. }
  162. }
  163. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  164. try {
  165. if (auto transport = std::atomic_load(&mDtlsTransport))
  166. return transport;
  167. PLOG_VERBOSE << "Starting DTLS transport";
  168. auto lower = std::atomic_load(&mIceTransport);
  169. if (!lower)
  170. throw std::logic_error("No underlying ICE transport for DTLS transport");
  171. auto certificate = mCertificate.get();
  172. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  173. auto dtlsStateChangeCallback =
  174. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  175. auto shared_this = weak_this.lock();
  176. if (!shared_this)
  177. return;
  178. switch (transportState) {
  179. case DtlsTransport::State::Connected:
  180. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  181. initSctpTransport();
  182. else
  183. changeState(State::Connected);
  184. mProcessor->enqueue(&PeerConnection::openTracks, this);
  185. break;
  186. case DtlsTransport::State::Failed:
  187. changeState(State::Failed);
  188. break;
  189. case DtlsTransport::State::Disconnected:
  190. changeState(State::Disconnected);
  191. break;
  192. default:
  193. // Ignore
  194. break;
  195. }
  196. };
  197. shared_ptr<DtlsTransport> transport;
  198. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  199. #if RTC_ENABLE_MEDIA
  200. PLOG_INFO << "This connection requires media support";
  201. // DTLS-SRTP
  202. transport = std::make_shared<DtlsSrtpTransport>(
  203. lower, certificate, config.mtu, verifierCallback,
  204. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  205. #else
  206. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  207. #endif
  208. }
  209. if (!transport) {
  210. // DTLS only
  211. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  212. verifierCallback, dtlsStateChangeCallback);
  213. }
  214. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  215. } catch (const std::exception &e) {
  216. PLOG_ERROR << e.what();
  217. changeState(State::Failed);
  218. throw std::runtime_error("DTLS transport initialization failed");
  219. }
  220. }
  221. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  222. try {
  223. if (auto transport = std::atomic_load(&mSctpTransport))
  224. return transport;
  225. PLOG_VERBOSE << "Starting SCTP transport";
  226. auto lower = std::atomic_load(&mDtlsTransport);
  227. if (!lower)
  228. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  229. auto remote = remoteDescription();
  230. if (!remote || !remote->application())
  231. throw std::logic_error("Starting SCTP transport without application description");
  232. uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  233. // This is the last occasion to ensure the stream numbers are coherent with the role
  234. shiftDataChannels();
  235. auto transport = std::make_shared<SctpTransport>(
  236. lower, config, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
  237. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  238. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  239. auto shared_this = weak_this.lock();
  240. if (!shared_this)
  241. return;
  242. switch (transportState) {
  243. case SctpTransport::State::Connected:
  244. changeState(State::Connected);
  245. mProcessor->enqueue(&PeerConnection::openDataChannels, this);
  246. break;
  247. case SctpTransport::State::Failed:
  248. LOG_WARNING << "SCTP transport failed";
  249. changeState(State::Failed);
  250. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  251. break;
  252. case SctpTransport::State::Disconnected:
  253. changeState(State::Disconnected);
  254. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  255. break;
  256. default:
  257. // Ignore
  258. break;
  259. }
  260. });
  261. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  262. } catch (const std::exception &e) {
  263. PLOG_ERROR << e.what();
  264. changeState(State::Failed);
  265. throw std::runtime_error("SCTP transport initialization failed");
  266. }
  267. }
  268. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  269. return std::atomic_load(&mIceTransport);
  270. }
  271. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  272. return std::atomic_load(&mDtlsTransport);
  273. }
  274. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  275. return std::atomic_load(&mSctpTransport);
  276. }
  277. void PeerConnection::closeTransports() {
  278. PLOG_VERBOSE << "Closing transports";
  279. // Change state to sink state Closed
  280. if (!changeState(State::Closed))
  281. return; // already closed
  282. // Reset callbacks now that state is changed
  283. resetCallbacks();
  284. // Initiate transport stop on the processor after closing the data channels
  285. mProcessor->enqueue([this]() {
  286. // Pass the pointers to a thread
  287. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  288. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  289. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  290. if (sctp) {
  291. sctp->onRecv(nullptr);
  292. sctp->onBufferedAmount(nullptr);
  293. }
  294. using array = std::array<shared_ptr<Transport>, 3>;
  295. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  296. for (const auto &t : transports)
  297. if (t)
  298. t->onStateChange(nullptr);
  299. ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
  300. for (const auto &t : transports)
  301. if (t)
  302. t->stop();
  303. for (auto &t : transports)
  304. t.reset();
  305. });
  306. });
  307. }
  308. void PeerConnection::endLocalCandidates() {
  309. std::lock_guard lock(mLocalDescriptionMutex);
  310. if (mLocalDescription)
  311. mLocalDescription->endCandidates();
  312. }
  313. void PeerConnection::rollbackLocalDescription() {
  314. PLOG_DEBUG << "Rolling back pending local description";
  315. std::unique_lock lock(mLocalDescriptionMutex);
  316. if (mCurrentLocalDescription) {
  317. std::vector<Candidate> existingCandidates;
  318. if (mLocalDescription)
  319. existingCandidates = mLocalDescription->extractCandidates();
  320. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  321. mLocalDescription->addCandidates(std::move(existingCandidates));
  322. mCurrentLocalDescription.reset();
  323. }
  324. }
  325. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  326. std::lock_guard lock(mRemoteDescriptionMutex);
  327. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  328. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  329. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  330. return true;
  331. }
  332. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  333. << expectedFingerprint.value_or("[none]") << "\"";
  334. return false;
  335. }
  336. void PeerConnection::forwardMessage(message_ptr message) {
  337. if (!message) {
  338. remoteCloseDataChannels();
  339. return;
  340. }
  341. uint16_t stream = uint16_t(message->stream);
  342. auto channel = findDataChannel(stream);
  343. if (!channel) {
  344. auto iceTransport = getIceTransport();
  345. auto sctpTransport = getSctpTransport();
  346. if (!iceTransport || !sctpTransport)
  347. return;
  348. const byte dataChannelOpenMessage{0x03};
  349. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  350. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  351. stream % 2 == remoteParity) {
  352. channel =
  353. std::make_shared<NegotiatedDataChannel>(weak_from_this(), sctpTransport, stream);
  354. channel->openCallback = weak_bind(&PeerConnection::triggerDataChannel, this,
  355. weak_ptr<DataChannel>{channel});
  356. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  357. mDataChannels.emplace(stream, channel);
  358. } else {
  359. // Invalid, close the DataChannel
  360. sctpTransport->closeStream(message->stream);
  361. return;
  362. }
  363. }
  364. channel->incoming(message);
  365. }
  366. void PeerConnection::forwardMedia(message_ptr message) {
  367. if (!message)
  368. return;
  369. // Browsers like to compound their packets with a random SSRC.
  370. // we have to do this monstrosity to distribute the report blocks
  371. if (message->type == Message::Control) {
  372. std::set<uint32_t> ssrcs;
  373. size_t offset = 0;
  374. while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
  375. auto header = reinterpret_cast<rtc::RTCP_HEADER *>(message->data() + offset);
  376. if (header->lengthInBytes() > message->size() - offset) {
  377. COUNTER_MEDIA_TRUNCATED++;
  378. break;
  379. }
  380. offset += header->lengthInBytes();
  381. if (header->payloadType() == 205 || header->payloadType() == 206) {
  382. auto rtcpfb = reinterpret_cast<RTCP_FB_HEADER *>(header);
  383. ssrcs.insert(rtcpfb->packetSenderSSRC());
  384. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  385. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  386. auto rtcpsr = reinterpret_cast<RTCP_SR *>(header);
  387. ssrcs.insert(rtcpsr->senderSSRC());
  388. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  389. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  390. } else if (header->payloadType() == 202) {
  391. auto sdes = reinterpret_cast<RTCP_SDES *>(header);
  392. if (!sdes->isValid()) {
  393. PLOG_WARNING << "RTCP SDES packet is invalid";
  394. continue;
  395. }
  396. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  397. auto chunk = sdes->getChunk(i);
  398. ssrcs.insert(chunk->ssrc());
  399. }
  400. } else {
  401. // PT=207 == Extended Report
  402. if (header->payloadType() != 207) {
  403. COUNTER_UNKNOWN_PACKET_TYPE++;
  404. }
  405. }
  406. }
  407. if (!ssrcs.empty()) {
  408. for (uint32_t ssrc : ssrcs) {
  409. if (auto mid = getMidFromSsrc(ssrc)) {
  410. std::shared_lock lock(mTracksMutex); // read-only
  411. if (auto it = mTracks.find(*mid); it != mTracks.end())
  412. if (auto track = it->second.lock())
  413. track->incoming(message);
  414. }
  415. }
  416. return;
  417. }
  418. }
  419. uint32_t ssrc = uint32_t(message->stream);
  420. if (auto mid = getMidFromSsrc(ssrc)) {
  421. std::shared_lock lock(mTracksMutex); // read-only
  422. if (auto it = mTracks.find(*mid); it != mTracks.end())
  423. if (auto track = it->second.lock())
  424. track->incoming(message);
  425. } else {
  426. /*
  427. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  428. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  429. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  430. * this ideal? No! Do I know how to fix it? No!
  431. */
  432. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  433. return;
  434. }
  435. }
  436. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  437. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  438. return it->second;
  439. {
  440. std::lock_guard lock(mRemoteDescriptionMutex);
  441. if (!mRemoteDescription)
  442. return nullopt;
  443. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  444. if (auto found =
  445. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  446. return std::nullopt;
  447. },
  448. [&](Description::Media *media) -> optional<string> {
  449. return media->hasSSRC(ssrc)
  450. ? std::make_optional(media->mid())
  451. : nullopt;
  452. }},
  453. mRemoteDescription->media(i))) {
  454. mMidFromSsrc.emplace(ssrc, *found);
  455. return *found;
  456. }
  457. }
  458. }
  459. {
  460. std::lock_guard lock(mLocalDescriptionMutex);
  461. if (!mLocalDescription)
  462. return nullopt;
  463. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  464. if (auto found =
  465. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  466. return std::nullopt;
  467. },
  468. [&](Description::Media *media) -> optional<string> {
  469. return media->hasSSRC(ssrc)
  470. ? std::make_optional(media->mid())
  471. : nullopt;
  472. }},
  473. mLocalDescription->media(i))) {
  474. mMidFromSsrc.emplace(ssrc, *found);
  475. return *found;
  476. }
  477. }
  478. }
  479. return nullopt;
  480. }
  481. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  482. if (auto channel = findDataChannel(stream))
  483. channel->triggerBufferedAmount(amount);
  484. }
  485. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  486. cleanupDataChannels();
  487. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  488. uint16_t stream;
  489. if (init.id) {
  490. stream = *init.id;
  491. if (stream == 65535)
  492. throw std::invalid_argument("Invalid DataChannel id");
  493. } else {
  494. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  495. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  496. // See https://tools.ietf.org/html/rfc5763#section-5
  497. // Therefore, we assume passive role if we are the offerer.
  498. auto iceTransport = getIceTransport();
  499. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  500. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  501. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  502. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  503. // the DTLS server, it MUST choose an odd one.
  504. // See https://tools.ietf.org/html/rfc8832#section-6
  505. stream = (role == Description::Role::Active) ? 0 : 1;
  506. while (mDataChannels.find(stream) != mDataChannels.end()) {
  507. if (stream >= 65535 - 2)
  508. throw std::runtime_error("Too many DataChannels");
  509. stream += 2;
  510. }
  511. }
  512. // If the DataChannel is user-negotiated, do not negociate it here
  513. auto channel =
  514. init.negotiated
  515. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  516. std::move(init.protocol), std::move(init.reliability))
  517. : std::make_shared<NegotiatedDataChannel>(weak_from_this(), stream, std::move(label),
  518. std::move(init.protocol),
  519. std::move(init.reliability));
  520. mDataChannels.emplace(std::make_pair(stream, channel));
  521. return channel;
  522. }
  523. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  524. std::shared_lock lock(mDataChannelsMutex); // read-only
  525. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  526. if (auto channel = it->second.lock())
  527. return channel;
  528. return nullptr;
  529. }
  530. void PeerConnection::shiftDataChannels() {
  531. auto iceTransport = std::atomic_load(&mIceTransport);
  532. auto sctpTransport = std::atomic_load(&mSctpTransport);
  533. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  534. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  535. decltype(mDataChannels) newDataChannels;
  536. auto it = mDataChannels.begin();
  537. while (it != mDataChannels.end()) {
  538. auto channel = it->second.lock();
  539. channel->shiftStream();
  540. newDataChannels.emplace(channel->stream(), channel);
  541. ++it;
  542. }
  543. std::swap(mDataChannels, newDataChannels);
  544. }
  545. }
  546. void PeerConnection::iterateDataChannels(std::function<void(shared_ptr<DataChannel> channel)> func) {
  547. std::vector<shared_ptr<DataChannel>> locked;
  548. {
  549. std::shared_lock lock(mDataChannelsMutex); // read-only
  550. locked.reserve(mDataChannels.size());
  551. auto it = mDataChannels.begin();
  552. while (it != mDataChannels.end()) {
  553. auto channel = it->second.lock();
  554. if (channel && !channel->isClosed())
  555. locked.push_back(std::move(channel));
  556. ++it;
  557. }
  558. }
  559. for(auto &channel : locked)
  560. func(std::move(channel));
  561. }
  562. void PeerConnection::cleanupDataChannels() {
  563. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  564. auto it = mDataChannels.begin();
  565. while (it != mDataChannels.end()) {
  566. if (!it->second.lock()) {
  567. it = mDataChannels.erase(it);
  568. continue;
  569. }
  570. ++it;
  571. }
  572. }
  573. void PeerConnection::openDataChannels() {
  574. if (auto transport = std::atomic_load(&mSctpTransport))
  575. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  576. }
  577. void PeerConnection::closeDataChannels() {
  578. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  579. }
  580. void PeerConnection::remoteCloseDataChannels() {
  581. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  582. }
  583. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  584. shared_ptr<Track> track;
  585. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  586. if (track = it->second.lock(); track)
  587. track->setDescription(std::move(description));
  588. if (!track) {
  589. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  590. mTracks.emplace(std::make_pair(track->mid(), track));
  591. mTrackLines.emplace_back(track);
  592. }
  593. return track;
  594. }
  595. void PeerConnection::incomingTrack(Description::Media description) {
  596. std::unique_lock lock(mTracksMutex); // we are going to emplace
  597. #if !RTC_ENABLE_MEDIA
  598. if (mTracks.empty()) {
  599. PLOG_WARNING << "Tracks will be inative (not compiled with media support)";
  600. }
  601. #endif
  602. if (mTracks.find(description.mid()) == mTracks.end()) {
  603. auto track = std::make_shared<Track>(weak_from_this(), std::move(description));
  604. mTracks.emplace(std::make_pair(track->mid(), track));
  605. mTrackLines.emplace_back(track);
  606. triggerTrack(track);
  607. }
  608. }
  609. void PeerConnection::openTracks() {
  610. #if RTC_ENABLE_MEDIA
  611. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  612. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  613. std::shared_lock lock(mTracksMutex); // read-only
  614. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  615. if (auto track = it->second.lock())
  616. if (!track->isOpen())
  617. track->open(srtpTransport);
  618. }
  619. #endif
  620. }
  621. void PeerConnection::validateRemoteDescription(const Description &description) {
  622. if (!description.iceUfrag())
  623. throw std::invalid_argument("Remote description has no ICE user fragment");
  624. if (!description.icePwd())
  625. throw std::invalid_argument("Remote description has no ICE password");
  626. if (!description.fingerprint())
  627. throw std::invalid_argument("Remote description has no valid fingerprint");
  628. if (description.mediaCount() == 0)
  629. throw std::invalid_argument("Remote description has no media line");
  630. int activeMediaCount = 0;
  631. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  632. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  633. [&](const Description::Media *media) {
  634. if (media->direction() != Description::Direction::Inactive)
  635. ++activeMediaCount;
  636. }},
  637. description.media(i));
  638. if (activeMediaCount == 0)
  639. throw std::invalid_argument("Remote description has no active media");
  640. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  641. if (*description.iceUfrag() == *local->iceUfrag() &&
  642. *description.icePwd() == *local->icePwd())
  643. throw std::logic_error("Got the local description as remote description");
  644. PLOG_VERBOSE << "Remote description looks valid";
  645. }
  646. void PeerConnection::processLocalDescription(Description description) {
  647. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  648. const size_t localMaxMessageSize =
  649. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  650. // Clean up the application entry the ICE transport might have added already (libnice)
  651. description.clearMedia();
  652. if (auto remote = remoteDescription()) {
  653. // Reciprocate remote description
  654. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  655. std::visit( // reciprocate each media
  656. rtc::overloaded{
  657. [&](Description::Application *remoteApp) {
  658. std::shared_lock lock(mDataChannelsMutex);
  659. if (!mDataChannels.empty()) {
  660. // Prefer local description
  661. Description::Application app(remoteApp->mid());
  662. app.setSctpPort(localSctpPort);
  663. app.setMaxMessageSize(localMaxMessageSize);
  664. PLOG_DEBUG << "Adding application to local description, mid=\""
  665. << app.mid() << "\"";
  666. description.addMedia(std::move(app));
  667. return;
  668. }
  669. auto reciprocated = remoteApp->reciprocate();
  670. reciprocated.hintSctpPort(localSctpPort);
  671. reciprocated.setMaxMessageSize(localMaxMessageSize);
  672. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  673. << reciprocated.mid() << "\"";
  674. description.addMedia(std::move(reciprocated));
  675. },
  676. [&](Description::Media *remoteMedia) {
  677. std::shared_lock lock(mTracksMutex);
  678. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  679. // Prefer local description
  680. if (auto track = it->second.lock()) {
  681. auto media = track->description();
  682. #if !RTC_ENABLE_MEDIA
  683. // No media support, mark as inactive
  684. media.setDirection(Description::Direction::Inactive);
  685. #endif
  686. PLOG_DEBUG
  687. << "Adding media to local description, mid=\"" << media.mid()
  688. << "\", active=" << std::boolalpha
  689. << (media.direction() != Description::Direction::Inactive);
  690. description.addMedia(std::move(media));
  691. } else {
  692. auto reciprocated = remoteMedia->reciprocate();
  693. reciprocated.setDirection(Description::Direction::Inactive);
  694. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  695. << reciprocated.mid() << "\"";
  696. description.addMedia(std::move(reciprocated));
  697. }
  698. return;
  699. }
  700. lock.unlock(); // we are going to call incomingTrack()
  701. auto reciprocated = remoteMedia->reciprocate();
  702. #if !RTC_ENABLE_MEDIA
  703. // No media support, mark as inactive
  704. reciprocated.setDirection(Description::Direction::Inactive);
  705. #endif
  706. incomingTrack(reciprocated);
  707. PLOG_DEBUG
  708. << "Reciprocating media in local description, mid=\""
  709. << reciprocated.mid() << "\", active=" << std::boolalpha
  710. << (reciprocated.direction() != Description::Direction::Inactive);
  711. description.addMedia(std::move(reciprocated));
  712. },
  713. },
  714. remote->media(i));
  715. }
  716. if (description.type() == Description::Type::Offer) {
  717. // This is an offer, add locally created data channels and tracks
  718. // Add application for data channels
  719. if (!description.hasApplication()) {
  720. std::shared_lock lock(mDataChannelsMutex);
  721. if (!mDataChannels.empty()) {
  722. unsigned int m = 0;
  723. while (description.hasMid(std::to_string(m)))
  724. ++m;
  725. Description::Application app(std::to_string(m));
  726. app.setSctpPort(localSctpPort);
  727. app.setMaxMessageSize(localMaxMessageSize);
  728. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  729. << "\"";
  730. description.addMedia(std::move(app));
  731. }
  732. }
  733. // Add media for local tracks
  734. std::shared_lock lock(mTracksMutex);
  735. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  736. if (auto track = it->lock()) {
  737. if (description.hasMid(track->mid()))
  738. continue;
  739. auto media = track->description();
  740. #if !RTC_ENABLE_MEDIA
  741. // No media support, mark as inactive
  742. media.setDirection(Description::Direction::Inactive);
  743. #endif
  744. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  745. << "\", active=" << std::boolalpha
  746. << (media.direction() != Description::Direction::Inactive);
  747. description.addMedia(std::move(media));
  748. }
  749. }
  750. // There might be no media at this point if the user created a Track, deleted it,
  751. // then called setLocalDescription().
  752. if (description.mediaCount() == 0)
  753. throw std::runtime_error("No DataChannel or Track to negotiate");
  754. }
  755. // Set local fingerprint (wait for certificate if necessary)
  756. description.setFingerprint(mCertificate.get()->fingerprint());
  757. PLOG_VERBOSE << "Issuing local description: " << description;
  758. if (description.mediaCount() == 0)
  759. throw std::logic_error("Local description has no media line");
  760. {
  761. // Set as local description
  762. std::lock_guard lock(mLocalDescriptionMutex);
  763. std::vector<Candidate> existingCandidates;
  764. if (mLocalDescription) {
  765. existingCandidates = mLocalDescription->extractCandidates();
  766. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  767. }
  768. mLocalDescription.emplace(description);
  769. mLocalDescription->addCandidates(std::move(existingCandidates));
  770. }
  771. mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
  772. // Reciprocated tracks might need to be open
  773. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  774. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  775. mProcessor->enqueue(&PeerConnection::openTracks, this);
  776. }
  777. void PeerConnection::processLocalCandidate(Candidate candidate) {
  778. std::lock_guard lock(mLocalDescriptionMutex);
  779. if (!mLocalDescription)
  780. throw std::logic_error("Got a local candidate without local description");
  781. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  782. candidate.type() != Candidate::Type::Relayed) {
  783. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  784. return;
  785. }
  786. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  787. candidate.resolve(Candidate::ResolveMode::Simple);
  788. mLocalDescription->addCandidate(candidate);
  789. mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
  790. }
  791. void PeerConnection::processRemoteDescription(Description description) {
  792. {
  793. // Set as remote description
  794. std::lock_guard lock(mRemoteDescriptionMutex);
  795. std::vector<Candidate> existingCandidates;
  796. if (mRemoteDescription)
  797. existingCandidates = mRemoteDescription->extractCandidates();
  798. mRemoteDescription.emplace(description);
  799. mRemoteDescription->addCandidates(std::move(existingCandidates));
  800. }
  801. auto iceTransport = initIceTransport();
  802. if (!iceTransport)
  803. return; // closed
  804. iceTransport->setRemoteDescription(std::move(description));
  805. // Since we assumed passive role during DataChannel creation, we might need to shift the stream
  806. // numbers from odd to even.
  807. shiftDataChannels();
  808. if (description.hasApplication()) {
  809. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  810. auto sctpTransport = std::atomic_load(&mSctpTransport);
  811. if (!sctpTransport && dtlsTransport &&
  812. dtlsTransport->state() == Transport::State::Connected)
  813. initSctpTransport();
  814. }
  815. }
  816. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  817. auto iceTransport = std::atomic_load(&mIceTransport);
  818. {
  819. // Set as remote candidate
  820. std::lock_guard lock(mRemoteDescriptionMutex);
  821. if (!mRemoteDescription)
  822. throw std::logic_error("Got a remote candidate without remote description");
  823. if (!iceTransport)
  824. throw std::logic_error("Got a remote candidate without ICE transport");
  825. candidate.hintMid(mRemoteDescription->bundleMid());
  826. if (mRemoteDescription->hasCandidate(candidate))
  827. return; // already in description, ignore
  828. candidate.resolve(Candidate::ResolveMode::Simple);
  829. mRemoteDescription->addCandidate(candidate);
  830. }
  831. if (candidate.isResolved()) {
  832. iceTransport->addRemoteCandidate(std::move(candidate));
  833. } else {
  834. // We might need a lookup, do it asynchronously
  835. // We don't use the thread pool because we have no control on the timeout
  836. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  837. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  838. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  839. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  840. if (auto iceTransport = weakIceTransport.lock())
  841. iceTransport->addRemoteCandidate(std::move(candidate));
  842. });
  843. t.detach();
  844. }
  845. }
  846. }
  847. string PeerConnection::localBundleMid() const {
  848. std::lock_guard lock(mLocalDescriptionMutex);
  849. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  850. }
  851. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  852. auto dataChannel = weakDataChannel.lock();
  853. if (dataChannel) {
  854. dataChannel->resetOpenCallback(); // might be set internally
  855. mPendingDataChannels.push(std::move(dataChannel));
  856. }
  857. triggerPendingDataChannels();
  858. }
  859. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  860. auto track = weakTrack.lock();
  861. if (track) {
  862. track->resetOpenCallback(); // might be set internally
  863. mPendingTracks.push(std::move(track));
  864. }
  865. triggerPendingTracks();
  866. }
  867. void PeerConnection::triggerPendingDataChannels() {
  868. while (dataChannelCallback) {
  869. auto next = mPendingDataChannels.tryPop();
  870. if (!next)
  871. break;
  872. auto impl = std::move(*next);
  873. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  874. impl->triggerOpen();
  875. }
  876. }
  877. void PeerConnection::triggerPendingTracks() {
  878. while (trackCallback) {
  879. auto next = mPendingTracks.tryPop();
  880. if (!next)
  881. break;
  882. auto impl = std::move(*next);
  883. trackCallback(std::make_shared<rtc::Track>(impl));
  884. impl->triggerOpen();
  885. }
  886. }
  887. void PeerConnection::flushPendingDataChannels() {
  888. mProcessor->enqueue(&PeerConnection::triggerPendingDataChannels, this);
  889. }
  890. void PeerConnection::flushPendingTracks() {
  891. mProcessor->enqueue(&PeerConnection::triggerPendingTracks, this);
  892. }
  893. bool PeerConnection::changeState(State newState) {
  894. State current;
  895. do {
  896. current = state.load();
  897. if (current == State::Closed)
  898. return false;
  899. if (current == newState)
  900. return false;
  901. } while (!state.compare_exchange_weak(current, newState));
  902. std::ostringstream s;
  903. s << newState;
  904. PLOG_INFO << "Changed state to " << s.str();
  905. if (newState == State::Closed)
  906. // This is the last state change, so we may steal the callback
  907. mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
  908. else
  909. mProcessor->enqueue(stateChangeCallback.wrap(), newState);
  910. return true;
  911. }
  912. bool PeerConnection::changeGatheringState(GatheringState newState) {
  913. if (gatheringState.exchange(newState) == newState)
  914. return false;
  915. std::ostringstream s;
  916. s << newState;
  917. PLOG_INFO << "Changed gathering state to " << s.str();
  918. mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
  919. return true;
  920. }
  921. bool PeerConnection::changeSignalingState(SignalingState newState) {
  922. if (signalingState.exchange(newState) == newState)
  923. return false;
  924. std::ostringstream s;
  925. s << state;
  926. PLOG_INFO << "Changed signaling state to " << s.str();
  927. mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
  928. return true;
  929. }
  930. void PeerConnection::resetCallbacks() {
  931. // Unregister all callbacks
  932. dataChannelCallback = nullptr;
  933. localDescriptionCallback = nullptr;
  934. localCandidateCallback = nullptr;
  935. stateChangeCallback = nullptr;
  936. gatheringStateChangeCallback = nullptr;
  937. }
  938. } // namespace rtc::impl