peerconnection.cpp 36 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "internals.hpp"
  24. #include "icetransport.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #include "threadpool.hpp"
  31. #if RTC_ENABLE_MEDIA
  32. #include "dtlssrtptransport.hpp"
  33. #endif
  34. #include <iomanip>
  35. #include <set>
  36. #include <thread>
  37. using namespace std::placeholders;
  38. namespace rtc::impl {
  39. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  40. "Number of RTP packets truncated over past second");
  41. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  42. "Number of SRTP decryption errors over past second");
  43. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  44. "Number of SRTP encryption errors over past second");
  45. static LogCounter
  46. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  47. "Number of unknown RTCP packet types over past second");
  48. PeerConnection::PeerConnection(Configuration config_)
  49. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
  50. mProcessor(std::make_unique<Processor>()) {
  51. PLOG_VERBOSE << "Creating PeerConnection";
  52. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  53. throw std::invalid_argument("Invalid port range");
  54. if (config.mtu) {
  55. if (*config.mtu < 576) // Min MTU for IPv4
  56. throw std::invalid_argument("Invalid MTU value");
  57. if (*config.mtu > 1500) { // Standard Ethernet
  58. PLOG_WARNING << "MTU set to " << *config.mtu;
  59. } else {
  60. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  61. }
  62. }
  63. }
  64. PeerConnection::~PeerConnection() {
  65. PLOG_VERBOSE << "Destroying PeerConnection";
  66. mProcessor->join();
  67. }
  68. void PeerConnection::close() {
  69. PLOG_VERBOSE << "Closing PeerConnection";
  70. negotiationNeeded = false;
  71. // Close data channels asynchronously
  72. mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
  73. closeTransports();
  74. }
  75. optional<Description> PeerConnection::localDescription() const {
  76. std::lock_guard lock(mLocalDescriptionMutex);
  77. return mLocalDescription;
  78. }
  79. optional<Description> PeerConnection::remoteDescription() const {
  80. std::lock_guard lock(mRemoteDescriptionMutex);
  81. return mRemoteDescription;
  82. }
  83. size_t PeerConnection::remoteMaxMessageSize() const {
  84. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  85. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  86. std::lock_guard lock(mRemoteDescriptionMutex);
  87. if (mRemoteDescription)
  88. if (auto *application = mRemoteDescription->application())
  89. if (auto max = application->maxMessageSize()) {
  90. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  91. // size value of zero, it indicates that the SCTP endpoint will handle messages
  92. // of any size, subject to memory capacity, etc.
  93. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  94. }
  95. return std::min(remoteMax, localMax);
  96. }
  97. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  98. try {
  99. if (auto transport = std::atomic_load(&mIceTransport))
  100. return transport;
  101. PLOG_VERBOSE << "Starting ICE transport";
  102. auto transport = std::make_shared<IceTransport>(
  103. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  104. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  105. auto shared_this = weak_this.lock();
  106. if (!shared_this)
  107. return;
  108. switch (transportState) {
  109. case IceTransport::State::Connecting:
  110. changeState(State::Connecting);
  111. break;
  112. case IceTransport::State::Failed:
  113. changeState(State::Failed);
  114. break;
  115. case IceTransport::State::Connected:
  116. initDtlsTransport();
  117. break;
  118. case IceTransport::State::Disconnected:
  119. changeState(State::Disconnected);
  120. break;
  121. default:
  122. // Ignore
  123. break;
  124. }
  125. },
  126. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  127. auto shared_this = weak_this.lock();
  128. if (!shared_this)
  129. return;
  130. switch (gatheringState) {
  131. case IceTransport::GatheringState::InProgress:
  132. changeGatheringState(GatheringState::InProgress);
  133. break;
  134. case IceTransport::GatheringState::Complete:
  135. endLocalCandidates();
  136. changeGatheringState(GatheringState::Complete);
  137. break;
  138. default:
  139. // Ignore
  140. break;
  141. }
  142. });
  143. std::atomic_store(&mIceTransport, transport);
  144. if (state.load() == State::Closed) {
  145. mIceTransport.reset();
  146. throw std::runtime_error("Connection is closed");
  147. }
  148. transport->start();
  149. return transport;
  150. } catch (const std::exception &e) {
  151. PLOG_ERROR << e.what();
  152. changeState(State::Failed);
  153. throw std::runtime_error("ICE transport initialization failed");
  154. }
  155. }
  156. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  157. try {
  158. if (auto transport = std::atomic_load(&mDtlsTransport))
  159. return transport;
  160. PLOG_VERBOSE << "Starting DTLS transport";
  161. auto lower = std::atomic_load(&mIceTransport);
  162. if(!lower)
  163. throw std::logic_error("No underlying ICE transport for DTLS transport");
  164. auto certificate = mCertificate.get();
  165. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  166. auto dtlsStateChangeCallback =
  167. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  168. auto shared_this = weak_this.lock();
  169. if (!shared_this)
  170. return;
  171. switch (transportState) {
  172. case DtlsTransport::State::Connected:
  173. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  174. initSctpTransport();
  175. else
  176. changeState(State::Connected);
  177. mProcessor->enqueue(&PeerConnection::openTracks, this);
  178. break;
  179. case DtlsTransport::State::Failed:
  180. changeState(State::Failed);
  181. break;
  182. case DtlsTransport::State::Disconnected:
  183. changeState(State::Disconnected);
  184. break;
  185. default:
  186. // Ignore
  187. break;
  188. }
  189. };
  190. shared_ptr<DtlsTransport> transport;
  191. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  192. #if RTC_ENABLE_MEDIA
  193. PLOG_INFO << "This connection requires media support";
  194. // DTLS-SRTP
  195. transport = std::make_shared<DtlsSrtpTransport>(
  196. lower, certificate, config.mtu, verifierCallback,
  197. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  198. #else
  199. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  200. #endif
  201. }
  202. if (!transport) {
  203. // DTLS only
  204. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  205. verifierCallback, dtlsStateChangeCallback);
  206. }
  207. std::atomic_store(&mDtlsTransport, transport);
  208. if (state.load() == State::Closed) {
  209. mDtlsTransport.reset();
  210. throw std::runtime_error("Connection is closed");
  211. }
  212. transport->start();
  213. return transport;
  214. } catch (const std::exception &e) {
  215. PLOG_ERROR << e.what();
  216. changeState(State::Failed);
  217. throw std::runtime_error("DTLS transport initialization failed");
  218. }
  219. }
  220. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  221. try {
  222. if (auto transport = std::atomic_load(&mSctpTransport))
  223. return transport;
  224. PLOG_VERBOSE << "Starting SCTP transport";
  225. auto lower = std::atomic_load(&mDtlsTransport);
  226. if(!lower)
  227. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  228. auto remote = remoteDescription();
  229. if (!remote || !remote->application())
  230. throw std::logic_error("Starting SCTP transport without application description");
  231. uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  232. // This is the last occasion to ensure the stream numbers are coherent with the role
  233. shiftDataChannels();
  234. auto transport = std::make_shared<SctpTransport>(
  235. lower, config, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
  236. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  237. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  238. auto shared_this = weak_this.lock();
  239. if (!shared_this)
  240. return;
  241. switch (transportState) {
  242. case SctpTransport::State::Connected:
  243. changeState(State::Connected);
  244. mProcessor->enqueue(&PeerConnection::openDataChannels, this);
  245. break;
  246. case SctpTransport::State::Failed:
  247. LOG_WARNING << "SCTP transport failed";
  248. changeState(State::Failed);
  249. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  250. break;
  251. case SctpTransport::State::Disconnected:
  252. changeState(State::Disconnected);
  253. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  254. break;
  255. default:
  256. // Ignore
  257. break;
  258. }
  259. });
  260. std::atomic_store(&mSctpTransport, transport);
  261. if (state.load() == State::Closed) {
  262. mSctpTransport.reset();
  263. throw std::runtime_error("Connection is closed");
  264. }
  265. transport->start();
  266. return transport;
  267. } catch (const std::exception &e) {
  268. PLOG_ERROR << e.what();
  269. changeState(State::Failed);
  270. throw std::runtime_error("SCTP transport initialization failed");
  271. }
  272. }
  273. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  274. return std::atomic_load(&mIceTransport);
  275. }
  276. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  277. return std::atomic_load(&mDtlsTransport);
  278. }
  279. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  280. return std::atomic_load(&mSctpTransport);
  281. }
  282. void PeerConnection::closeTransports() {
  283. PLOG_VERBOSE << "Closing transports";
  284. // Change state to sink state Closed
  285. if (!changeState(State::Closed))
  286. return; // already closed
  287. // Reset callbacks now that state is changed
  288. resetCallbacks();
  289. // Initiate transport stop on the processor after closing the data channels
  290. mProcessor->enqueue([this]() {
  291. // Pass the pointers to a thread
  292. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  293. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  294. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  295. ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
  296. if (sctp)
  297. sctp->stop();
  298. if (dtls)
  299. dtls->stop();
  300. if (ice)
  301. ice->stop();
  302. sctp.reset();
  303. dtls.reset();
  304. ice.reset();
  305. });
  306. });
  307. }
  308. void PeerConnection::endLocalCandidates() {
  309. std::lock_guard lock(mLocalDescriptionMutex);
  310. if (mLocalDescription)
  311. mLocalDescription->endCandidates();
  312. }
  313. void PeerConnection::rollbackLocalDescription() {
  314. PLOG_DEBUG << "Rolling back pending local description";
  315. std::unique_lock lock(mLocalDescriptionMutex);
  316. if (mCurrentLocalDescription) {
  317. std::vector<Candidate> existingCandidates;
  318. if (mLocalDescription)
  319. existingCandidates = mLocalDescription->extractCandidates();
  320. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  321. mLocalDescription->addCandidates(std::move(existingCandidates));
  322. mCurrentLocalDescription.reset();
  323. }
  324. }
  325. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  326. std::lock_guard lock(mRemoteDescriptionMutex);
  327. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  328. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  329. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  330. return true;
  331. }
  332. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  333. << expectedFingerprint.value_or("[none]") << "\"";
  334. return false;
  335. }
  336. void PeerConnection::forwardMessage(message_ptr message) {
  337. if (!message) {
  338. remoteCloseDataChannels();
  339. return;
  340. }
  341. uint16_t stream = uint16_t(message->stream);
  342. auto channel = findDataChannel(stream);
  343. if (!channel) {
  344. auto iceTransport = getIceTransport();
  345. auto sctpTransport = getSctpTransport();
  346. if (!iceTransport || !sctpTransport)
  347. return;
  348. const byte dataChannelOpenMessage{0x03};
  349. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  350. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  351. stream % 2 == remoteParity) {
  352. channel =
  353. std::make_shared<NegotiatedDataChannel>(weak_from_this(), sctpTransport, stream);
  354. channel->openCallback = weak_bind(&PeerConnection::triggerDataChannel, this,
  355. weak_ptr<DataChannel>{channel});
  356. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  357. mDataChannels.emplace(stream, channel);
  358. } else {
  359. // Invalid, close the DataChannel
  360. sctpTransport->closeStream(message->stream);
  361. return;
  362. }
  363. }
  364. channel->incoming(message);
  365. }
  366. void PeerConnection::forwardMedia(message_ptr message) {
  367. if (!message)
  368. return;
  369. // Browsers like to compound their packets with a random SSRC.
  370. // we have to do this monstrosity to distribute the report blocks
  371. if (message->type == Message::Control) {
  372. std::set<uint32_t> ssrcs;
  373. size_t offset = 0;
  374. while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
  375. auto header = reinterpret_cast<rtc::RTCP_HEADER *>(message->data() + offset);
  376. if (header->lengthInBytes() > message->size() - offset) {
  377. COUNTER_MEDIA_TRUNCATED++;
  378. break;
  379. }
  380. offset += header->lengthInBytes();
  381. if (header->payloadType() == 205 || header->payloadType() == 206) {
  382. auto rtcpfb = reinterpret_cast<RTCP_FB_HEADER *>(header);
  383. ssrcs.insert(rtcpfb->packetSenderSSRC());
  384. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  385. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  386. auto rtcpsr = reinterpret_cast<RTCP_SR *>(header);
  387. ssrcs.insert(rtcpsr->senderSSRC());
  388. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  389. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  390. } else if (header->payloadType() == 202) {
  391. auto sdes = reinterpret_cast<RTCP_SDES *>(header);
  392. if (!sdes->isValid()) {
  393. PLOG_WARNING << "RTCP SDES packet is invalid";
  394. continue;
  395. }
  396. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  397. auto chunk = sdes->getChunk(i);
  398. ssrcs.insert(chunk->ssrc());
  399. }
  400. } else {
  401. // PT=207 == Extended Report
  402. if (header->payloadType() != 207) {
  403. COUNTER_UNKNOWN_PACKET_TYPE++;
  404. }
  405. }
  406. }
  407. if (!ssrcs.empty()) {
  408. for (uint32_t ssrc : ssrcs) {
  409. if (auto mid = getMidFromSsrc(ssrc)) {
  410. std::shared_lock lock(mTracksMutex); // read-only
  411. if (auto it = mTracks.find(*mid); it != mTracks.end())
  412. if (auto track = it->second.lock())
  413. track->incoming(message);
  414. }
  415. }
  416. return;
  417. }
  418. }
  419. uint32_t ssrc = uint32_t(message->stream);
  420. if (auto mid = getMidFromSsrc(ssrc)) {
  421. std::shared_lock lock(mTracksMutex); // read-only
  422. if (auto it = mTracks.find(*mid); it != mTracks.end())
  423. if (auto track = it->second.lock())
  424. track->incoming(message);
  425. } else {
  426. /*
  427. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  428. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  429. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  430. * this ideal? No! Do I know how to fix it? No!
  431. */
  432. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  433. return;
  434. }
  435. }
  436. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  437. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  438. return it->second;
  439. {
  440. std::lock_guard lock(mRemoteDescriptionMutex);
  441. if (!mRemoteDescription)
  442. return nullopt;
  443. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  444. if (auto found =
  445. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  446. return std::nullopt;
  447. },
  448. [&](Description::Media *media) -> optional<string> {
  449. return media->hasSSRC(ssrc)
  450. ? std::make_optional(media->mid())
  451. : nullopt;
  452. }},
  453. mRemoteDescription->media(i))) {
  454. mMidFromSsrc.emplace(ssrc, *found);
  455. return *found;
  456. }
  457. }
  458. }
  459. {
  460. std::lock_guard lock(mLocalDescriptionMutex);
  461. if (!mLocalDescription)
  462. return nullopt;
  463. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  464. if (auto found =
  465. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  466. return std::nullopt;
  467. },
  468. [&](Description::Media *media) -> optional<string> {
  469. return media->hasSSRC(ssrc)
  470. ? std::make_optional(media->mid())
  471. : nullopt;
  472. }},
  473. mLocalDescription->media(i))) {
  474. mMidFromSsrc.emplace(ssrc, *found);
  475. return *found;
  476. }
  477. }
  478. }
  479. return nullopt;
  480. }
  481. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  482. if (auto channel = findDataChannel(stream))
  483. channel->triggerBufferedAmount(amount);
  484. }
  485. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  486. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  487. uint16_t stream;
  488. if (init.id) {
  489. stream = *init.id;
  490. if (stream == 65535)
  491. throw std::invalid_argument("Invalid DataChannel id");
  492. } else {
  493. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  494. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  495. // See https://tools.ietf.org/html/rfc5763#section-5
  496. // Therefore, we assume passive role if we are the offerer.
  497. auto iceTransport = getIceTransport();
  498. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  499. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  500. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  501. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  502. // the DTLS server, it MUST choose an odd one.
  503. // See https://tools.ietf.org/html/rfc8832#section-6
  504. stream = (role == Description::Role::Active) ? 0 : 1;
  505. while (mDataChannels.find(stream) != mDataChannels.end()) {
  506. if (stream >= 65535 - 2)
  507. throw std::runtime_error("Too many DataChannels");
  508. stream += 2;
  509. }
  510. }
  511. // If the DataChannel is user-negotiated, do not negociate it here
  512. auto channel =
  513. init.negotiated
  514. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  515. std::move(init.protocol), std::move(init.reliability))
  516. : std::make_shared<NegotiatedDataChannel>(weak_from_this(), stream, std::move(label),
  517. std::move(init.protocol),
  518. std::move(init.reliability));
  519. mDataChannels.emplace(std::make_pair(stream, channel));
  520. return channel;
  521. }
  522. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  523. std::shared_lock lock(mDataChannelsMutex); // read-only
  524. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  525. if (auto channel = it->second.lock())
  526. return channel;
  527. return nullptr;
  528. }
  529. void PeerConnection::shiftDataChannels() {
  530. auto iceTransport = std::atomic_load(&mIceTransport);
  531. auto sctpTransport = std::atomic_load(&mSctpTransport);
  532. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  533. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  534. decltype(mDataChannels) newDataChannels;
  535. auto it = mDataChannels.begin();
  536. while (it != mDataChannels.end()) {
  537. auto channel = it->second.lock();
  538. channel->shiftStream();
  539. newDataChannels.emplace(channel->stream(), channel);
  540. ++it;
  541. }
  542. std::swap(mDataChannels, newDataChannels);
  543. }
  544. }
  545. void PeerConnection::iterateDataChannels(
  546. std::function<void(shared_ptr<DataChannel> channel)> func) {
  547. // Iterate
  548. {
  549. std::shared_lock lock(mDataChannelsMutex); // read-only
  550. auto it = mDataChannels.begin();
  551. while (it != mDataChannels.end()) {
  552. auto channel = it->second.lock();
  553. if (channel && !channel->isClosed())
  554. func(channel);
  555. ++it;
  556. }
  557. }
  558. // Cleanup
  559. {
  560. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  561. auto it = mDataChannels.begin();
  562. while (it != mDataChannels.end()) {
  563. if (!it->second.lock()) {
  564. it = mDataChannels.erase(it);
  565. continue;
  566. }
  567. ++it;
  568. }
  569. }
  570. }
  571. void PeerConnection::openDataChannels() {
  572. if (auto transport = std::atomic_load(&mSctpTransport))
  573. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  574. }
  575. void PeerConnection::closeDataChannels() {
  576. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  577. }
  578. void PeerConnection::remoteCloseDataChannels() {
  579. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  580. }
  581. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  582. shared_ptr<Track> track;
  583. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  584. if (track = it->second.lock(); track)
  585. track->setDescription(std::move(description));
  586. if (!track) {
  587. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  588. mTracks.emplace(std::make_pair(track->mid(), track));
  589. mTrackLines.emplace_back(track);
  590. }
  591. return track;
  592. }
  593. void PeerConnection::incomingTrack(Description::Media description) {
  594. std::unique_lock lock(mTracksMutex); // we are going to emplace
  595. #if !RTC_ENABLE_MEDIA
  596. if (mTracks.empty()) {
  597. PLOG_WARNING << "Tracks will be inative (not compiled with media support)";
  598. }
  599. #endif
  600. if (mTracks.find(description.mid()) == mTracks.end()) {
  601. auto track = std::make_shared<Track>(weak_from_this(), std::move(description));
  602. mTracks.emplace(std::make_pair(track->mid(), track));
  603. mTrackLines.emplace_back(track);
  604. triggerTrack(track);
  605. }
  606. }
  607. void PeerConnection::openTracks() {
  608. #if RTC_ENABLE_MEDIA
  609. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  610. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  611. std::shared_lock lock(mTracksMutex); // read-only
  612. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  613. if (auto track = it->second.lock())
  614. if (!track->isOpen())
  615. track->open(srtpTransport);
  616. }
  617. #endif
  618. }
  619. void PeerConnection::validateRemoteDescription(const Description &description) {
  620. if (!description.iceUfrag())
  621. throw std::invalid_argument("Remote description has no ICE user fragment");
  622. if (!description.icePwd())
  623. throw std::invalid_argument("Remote description has no ICE password");
  624. if (!description.fingerprint())
  625. throw std::invalid_argument("Remote description has no valid fingerprint");
  626. if (description.mediaCount() == 0)
  627. throw std::invalid_argument("Remote description has no media line");
  628. int activeMediaCount = 0;
  629. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  630. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  631. [&](const Description::Media *media) {
  632. if (media->direction() != Description::Direction::Inactive)
  633. ++activeMediaCount;
  634. }},
  635. description.media(i));
  636. if (activeMediaCount == 0)
  637. throw std::invalid_argument("Remote description has no active media");
  638. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  639. if (*description.iceUfrag() == *local->iceUfrag() &&
  640. *description.icePwd() == *local->icePwd())
  641. throw std::logic_error("Got the local description as remote description");
  642. PLOG_VERBOSE << "Remote description looks valid";
  643. }
  644. void PeerConnection::processLocalDescription(Description description) {
  645. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  646. const size_t localMaxMessageSize =
  647. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  648. // Clean up the application entry the ICE transport might have added already (libnice)
  649. description.clearMedia();
  650. if (auto remote = remoteDescription()) {
  651. // Reciprocate remote description
  652. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  653. std::visit( // reciprocate each media
  654. rtc::overloaded{
  655. [&](Description::Application *remoteApp) {
  656. std::shared_lock lock(mDataChannelsMutex);
  657. if (!mDataChannels.empty()) {
  658. // Prefer local description
  659. Description::Application app(remoteApp->mid());
  660. app.setSctpPort(localSctpPort);
  661. app.setMaxMessageSize(localMaxMessageSize);
  662. PLOG_DEBUG << "Adding application to local description, mid=\""
  663. << app.mid() << "\"";
  664. description.addMedia(std::move(app));
  665. return;
  666. }
  667. auto reciprocated = remoteApp->reciprocate();
  668. reciprocated.hintSctpPort(localSctpPort);
  669. reciprocated.setMaxMessageSize(localMaxMessageSize);
  670. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  671. << reciprocated.mid() << "\"";
  672. description.addMedia(std::move(reciprocated));
  673. },
  674. [&](Description::Media *remoteMedia) {
  675. std::shared_lock lock(mTracksMutex);
  676. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  677. // Prefer local description
  678. if (auto track = it->second.lock()) {
  679. auto media = track->description();
  680. #if !RTC_ENABLE_MEDIA
  681. // No media support, mark as inactive
  682. media.setDirection(Description::Direction::Inactive);
  683. #endif
  684. PLOG_DEBUG
  685. << "Adding media to local description, mid=\"" << media.mid()
  686. << "\", active=" << std::boolalpha
  687. << (media.direction() != Description::Direction::Inactive);
  688. description.addMedia(std::move(media));
  689. } else {
  690. auto reciprocated = remoteMedia->reciprocate();
  691. reciprocated.setDirection(Description::Direction::Inactive);
  692. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  693. << reciprocated.mid() << "\"";
  694. description.addMedia(std::move(reciprocated));
  695. }
  696. return;
  697. }
  698. lock.unlock(); // we are going to call incomingTrack()
  699. auto reciprocated = remoteMedia->reciprocate();
  700. #if !RTC_ENABLE_MEDIA
  701. // No media support, mark as inactive
  702. reciprocated.setDirection(Description::Direction::Inactive);
  703. #endif
  704. incomingTrack(reciprocated);
  705. PLOG_DEBUG
  706. << "Reciprocating media in local description, mid=\""
  707. << reciprocated.mid() << "\", active=" << std::boolalpha
  708. << (reciprocated.direction() != Description::Direction::Inactive);
  709. description.addMedia(std::move(reciprocated));
  710. },
  711. },
  712. remote->media(i));
  713. }
  714. if (description.type() == Description::Type::Offer) {
  715. // This is an offer, add locally created data channels and tracks
  716. // Add application for data channels
  717. if (!description.hasApplication()) {
  718. std::shared_lock lock(mDataChannelsMutex);
  719. if (!mDataChannels.empty()) {
  720. unsigned int m = 0;
  721. while (description.hasMid(std::to_string(m)))
  722. ++m;
  723. Description::Application app(std::to_string(m));
  724. app.setSctpPort(localSctpPort);
  725. app.setMaxMessageSize(localMaxMessageSize);
  726. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  727. << "\"";
  728. description.addMedia(std::move(app));
  729. }
  730. }
  731. // Add media for local tracks
  732. std::shared_lock lock(mTracksMutex);
  733. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  734. if (auto track = it->lock()) {
  735. if (description.hasMid(track->mid()))
  736. continue;
  737. auto media = track->description();
  738. #if !RTC_ENABLE_MEDIA
  739. // No media support, mark as inactive
  740. media.setDirection(Description::Direction::Inactive);
  741. #endif
  742. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  743. << "\", active=" << std::boolalpha
  744. << (media.direction() != Description::Direction::Inactive);
  745. description.addMedia(std::move(media));
  746. }
  747. }
  748. }
  749. // Set local fingerprint (wait for certificate if necessary)
  750. description.setFingerprint(mCertificate.get()->fingerprint());
  751. {
  752. // Set as local description
  753. std::lock_guard lock(mLocalDescriptionMutex);
  754. std::vector<Candidate> existingCandidates;
  755. if (mLocalDescription) {
  756. existingCandidates = mLocalDescription->extractCandidates();
  757. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  758. }
  759. mLocalDescription.emplace(description);
  760. mLocalDescription->addCandidates(std::move(existingCandidates));
  761. }
  762. PLOG_VERBOSE << "Issuing local description: " << description;
  763. mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
  764. // Reciprocated tracks might need to be open
  765. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  766. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  767. mProcessor->enqueue(&PeerConnection::openTracks, this);
  768. }
  769. void PeerConnection::processLocalCandidate(Candidate candidate) {
  770. std::lock_guard lock(mLocalDescriptionMutex);
  771. if (!mLocalDescription)
  772. throw std::logic_error("Got a local candidate without local description");
  773. candidate.resolve(Candidate::ResolveMode::Simple);
  774. mLocalDescription->addCandidate(candidate);
  775. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  776. mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
  777. }
  778. void PeerConnection::processRemoteDescription(Description description) {
  779. {
  780. // Set as remote description
  781. std::lock_guard lock(mRemoteDescriptionMutex);
  782. std::vector<Candidate> existingCandidates;
  783. if (mRemoteDescription)
  784. existingCandidates = mRemoteDescription->extractCandidates();
  785. mRemoteDescription.emplace(description);
  786. mRemoteDescription->addCandidates(std::move(existingCandidates));
  787. }
  788. auto iceTransport = initIceTransport();
  789. iceTransport->setRemoteDescription(std::move(description));
  790. // Since we assumed passive role during DataChannel creation, we might need to shift the stream
  791. // numbers from odd to even.
  792. shiftDataChannels();
  793. if (description.hasApplication()) {
  794. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  795. auto sctpTransport = std::atomic_load(&mSctpTransport);
  796. if (!sctpTransport && dtlsTransport &&
  797. dtlsTransport->state() == Transport::State::Connected)
  798. initSctpTransport();
  799. }
  800. }
  801. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  802. auto iceTransport = std::atomic_load(&mIceTransport);
  803. {
  804. // Set as remote candidate
  805. std::lock_guard lock(mRemoteDescriptionMutex);
  806. if (!mRemoteDescription)
  807. throw std::logic_error("Got a remote candidate without remote description");
  808. if (!iceTransport)
  809. throw std::logic_error("Got a remote candidate without ICE transport");
  810. candidate.hintMid(mRemoteDescription->bundleMid());
  811. if (mRemoteDescription->hasCandidate(candidate))
  812. return; // already in description, ignore
  813. candidate.resolve(Candidate::ResolveMode::Simple);
  814. mRemoteDescription->addCandidate(candidate);
  815. }
  816. if (candidate.isResolved()) {
  817. iceTransport->addRemoteCandidate(std::move(candidate));
  818. } else {
  819. // We might need a lookup, do it asynchronously
  820. // We don't use the thread pool because we have no control on the timeout
  821. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  822. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  823. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  824. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  825. if (auto iceTransport = weakIceTransport.lock())
  826. iceTransport->addRemoteCandidate(std::move(candidate));
  827. });
  828. t.detach();
  829. }
  830. }
  831. }
  832. string PeerConnection::localBundleMid() const {
  833. std::lock_guard lock(mLocalDescriptionMutex);
  834. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  835. }
  836. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  837. auto dataChannel = weakDataChannel.lock();
  838. if (dataChannel) {
  839. dataChannel->resetOpenCallback(); // might be set internally
  840. mPendingDataChannels.push(std::move(dataChannel));
  841. }
  842. triggerPendingDataChannels();
  843. }
  844. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  845. auto track = weakTrack.lock();
  846. if (track) {
  847. track->resetOpenCallback(); // might be set internally
  848. mPendingTracks.push(std::move(track));
  849. }
  850. triggerPendingTracks();
  851. }
  852. void PeerConnection::triggerPendingDataChannels() {
  853. while (dataChannelCallback) {
  854. auto next = mPendingDataChannels.tryPop();
  855. if (!next)
  856. break;
  857. auto impl = std::move(*next);
  858. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  859. impl->triggerOpen();
  860. }
  861. }
  862. void PeerConnection::triggerPendingTracks() {
  863. while (trackCallback) {
  864. auto next = mPendingTracks.tryPop();
  865. if (!next)
  866. break;
  867. auto impl = std::move(*next);
  868. trackCallback(std::make_shared<rtc::Track>(impl));
  869. impl->triggerOpen();
  870. }
  871. }
  872. void PeerConnection::flushPendingDataChannels() {
  873. mProcessor->enqueue(std::bind(&PeerConnection::triggerPendingDataChannels, this));
  874. }
  875. void PeerConnection::flushPendingTracks() {
  876. mProcessor->enqueue(std::bind(&PeerConnection::triggerPendingTracks, this));
  877. }
  878. bool PeerConnection::changeState(State newState) {
  879. State current;
  880. do {
  881. current = state.load();
  882. if (current == State::Closed)
  883. return false;
  884. if (current == newState)
  885. return false;
  886. } while (!state.compare_exchange_weak(current, newState));
  887. std::ostringstream s;
  888. s << newState;
  889. PLOG_INFO << "Changed state to " << s.str();
  890. if (newState == State::Closed)
  891. // This is the last state change, so we may steal the callback
  892. mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
  893. else
  894. mProcessor->enqueue(stateChangeCallback.wrap(), newState);
  895. return true;
  896. }
  897. bool PeerConnection::changeGatheringState(GatheringState newState) {
  898. if (gatheringState.exchange(newState) == newState)
  899. return false;
  900. std::ostringstream s;
  901. s << newState;
  902. PLOG_INFO << "Changed gathering state to " << s.str();
  903. mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
  904. return true;
  905. }
  906. bool PeerConnection::changeSignalingState(SignalingState newState) {
  907. if (signalingState.exchange(newState) == newState)
  908. return false;
  909. std::ostringstream s;
  910. s << state;
  911. PLOG_INFO << "Changed signaling state to " << s.str();
  912. mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
  913. return true;
  914. }
  915. void PeerConnection::resetCallbacks() {
  916. // Unregister all callbacks
  917. dataChannelCallback = nullptr;
  918. localDescriptionCallback = nullptr;
  919. localCandidateCallback = nullptr;
  920. stateChangeCallback = nullptr;
  921. gatheringStateChangeCallback = nullptr;
  922. }
  923. } // namespace rtc::impl