peerconnection.cpp 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "icetransport.hpp"
  24. #include "internals.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #include "threadpool.hpp"
  31. #if RTC_ENABLE_MEDIA
  32. #include "dtlssrtptransport.hpp"
  33. #endif
  34. #include <array>
  35. #include <iomanip>
  36. #include <set>
  37. #include <thread>
  38. using namespace std::placeholders;
  39. namespace rtc::impl {
  40. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  41. "Number of truncated RTP packets over past second");
  42. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  43. "Number of SRTP decryption errors over past second");
  44. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  45. "Number of SRTP encryption errors over past second");
  46. static LogCounter
  47. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  48. "Number of unknown RTCP packet types over past second");
  49. PeerConnection::PeerConnection(Configuration config_)
  50. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
  51. mProcessor(std::make_unique<Processor>()) {
  52. PLOG_VERBOSE << "Creating PeerConnection";
  53. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  54. throw std::invalid_argument("Invalid port range");
  55. if (config.mtu) {
  56. if (*config.mtu < 576) // Min MTU for IPv4
  57. throw std::invalid_argument("Invalid MTU value");
  58. if (*config.mtu > 1500) { // Standard Ethernet
  59. PLOG_WARNING << "MTU set to " << *config.mtu;
  60. } else {
  61. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  62. }
  63. }
  64. }
  65. PeerConnection::~PeerConnection() {
  66. PLOG_VERBOSE << "Destroying PeerConnection";
  67. mProcessor->join();
  68. }
  69. void PeerConnection::close() {
  70. PLOG_VERBOSE << "Closing PeerConnection";
  71. negotiationNeeded = false;
  72. // Close data channels asynchronously
  73. mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
  74. closeTransports();
  75. }
  76. optional<Description> PeerConnection::localDescription() const {
  77. std::lock_guard lock(mLocalDescriptionMutex);
  78. return mLocalDescription;
  79. }
  80. optional<Description> PeerConnection::remoteDescription() const {
  81. std::lock_guard lock(mRemoteDescriptionMutex);
  82. return mRemoteDescription;
  83. }
  84. size_t PeerConnection::remoteMaxMessageSize() const {
  85. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  86. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  87. std::lock_guard lock(mRemoteDescriptionMutex);
  88. if (mRemoteDescription)
  89. if (auto *application = mRemoteDescription->application())
  90. if (auto max = application->maxMessageSize()) {
  91. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  92. // size value of zero, it indicates that the SCTP endpoint will handle messages
  93. // of any size, subject to memory capacity, etc.
  94. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  95. }
  96. return std::min(remoteMax, localMax);
  97. }
  98. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  99. template <typename T>
  100. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  101. transport->start();
  102. std::atomic_store(member, transport);
  103. if (pc->state.load() == PeerConnection::State::Closed) {
  104. std::atomic_store(member, decltype(transport)(nullptr));
  105. transport->stop();
  106. return nullptr;
  107. }
  108. return transport;
  109. }
  110. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  111. try {
  112. if (auto transport = std::atomic_load(&mIceTransport))
  113. return transport;
  114. PLOG_VERBOSE << "Starting ICE transport";
  115. auto transport = std::make_shared<IceTransport>(
  116. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  117. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  118. auto shared_this = weak_this.lock();
  119. if (!shared_this)
  120. return;
  121. switch (transportState) {
  122. case IceTransport::State::Connecting:
  123. changeState(State::Connecting);
  124. break;
  125. case IceTransport::State::Failed:
  126. changeState(State::Failed);
  127. break;
  128. case IceTransport::State::Connected:
  129. initDtlsTransport();
  130. break;
  131. case IceTransport::State::Disconnected:
  132. changeState(State::Disconnected);
  133. break;
  134. default:
  135. // Ignore
  136. break;
  137. }
  138. },
  139. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  140. auto shared_this = weak_this.lock();
  141. if (!shared_this)
  142. return;
  143. switch (gatheringState) {
  144. case IceTransport::GatheringState::InProgress:
  145. changeGatheringState(GatheringState::InProgress);
  146. break;
  147. case IceTransport::GatheringState::Complete:
  148. endLocalCandidates();
  149. changeGatheringState(GatheringState::Complete);
  150. break;
  151. default:
  152. // Ignore
  153. break;
  154. }
  155. });
  156. return emplaceTransport(this, &mIceTransport, std::move(transport));
  157. } catch (const std::exception &e) {
  158. PLOG_ERROR << e.what();
  159. changeState(State::Failed);
  160. throw std::runtime_error("ICE transport initialization failed");
  161. }
  162. }
  163. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  164. try {
  165. if (auto transport = std::atomic_load(&mDtlsTransport))
  166. return transport;
  167. PLOG_VERBOSE << "Starting DTLS transport";
  168. auto lower = std::atomic_load(&mIceTransport);
  169. if (!lower)
  170. throw std::logic_error("No underlying ICE transport for DTLS transport");
  171. auto certificate = mCertificate.get();
  172. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  173. auto dtlsStateChangeCallback =
  174. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  175. auto shared_this = weak_this.lock();
  176. if (!shared_this)
  177. return;
  178. switch (transportState) {
  179. case DtlsTransport::State::Connected:
  180. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  181. initSctpTransport();
  182. else
  183. changeState(State::Connected);
  184. mProcessor->enqueue(&PeerConnection::openTracks, this);
  185. break;
  186. case DtlsTransport::State::Failed:
  187. changeState(State::Failed);
  188. break;
  189. case DtlsTransport::State::Disconnected:
  190. changeState(State::Disconnected);
  191. break;
  192. default:
  193. // Ignore
  194. break;
  195. }
  196. };
  197. shared_ptr<DtlsTransport> transport;
  198. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  199. #if RTC_ENABLE_MEDIA
  200. PLOG_INFO << "This connection requires media support";
  201. // DTLS-SRTP
  202. transport = std::make_shared<DtlsSrtpTransport>(
  203. lower, certificate, config.mtu, verifierCallback,
  204. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  205. #else
  206. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  207. #endif
  208. }
  209. if (!transport) {
  210. // DTLS only
  211. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  212. verifierCallback, dtlsStateChangeCallback);
  213. }
  214. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  215. } catch (const std::exception &e) {
  216. PLOG_ERROR << e.what();
  217. changeState(State::Failed);
  218. throw std::runtime_error("DTLS transport initialization failed");
  219. }
  220. }
  221. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  222. try {
  223. if (auto transport = std::atomic_load(&mSctpTransport))
  224. return transport;
  225. PLOG_VERBOSE << "Starting SCTP transport";
  226. auto lower = std::atomic_load(&mDtlsTransport);
  227. if (!lower)
  228. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  229. auto local = localDescription();
  230. if (!local || !local->application())
  231. throw std::logic_error("Starting SCTP transport without local application description");
  232. auto remote = remoteDescription();
  233. if (!remote || !remote->application())
  234. throw std::logic_error(
  235. "Starting SCTP transport without remote application description");
  236. SctpTransport::Ports ports = {};
  237. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  238. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  239. // This is the last occasion to ensure the stream numbers are coherent with the role
  240. shiftDataChannels();
  241. auto transport = std::make_shared<SctpTransport>(
  242. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  243. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  244. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  245. auto shared_this = weak_this.lock();
  246. if (!shared_this)
  247. return;
  248. switch (transportState) {
  249. case SctpTransport::State::Connected:
  250. changeState(State::Connected);
  251. mProcessor->enqueue(&PeerConnection::openDataChannels, this);
  252. break;
  253. case SctpTransport::State::Failed:
  254. LOG_WARNING << "SCTP transport failed";
  255. changeState(State::Failed);
  256. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  257. break;
  258. case SctpTransport::State::Disconnected:
  259. changeState(State::Disconnected);
  260. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  261. break;
  262. default:
  263. // Ignore
  264. break;
  265. }
  266. });
  267. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  268. } catch (const std::exception &e) {
  269. PLOG_ERROR << e.what();
  270. changeState(State::Failed);
  271. throw std::runtime_error("SCTP transport initialization failed");
  272. }
  273. }
  274. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  275. return std::atomic_load(&mIceTransport);
  276. }
  277. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  278. return std::atomic_load(&mDtlsTransport);
  279. }
  280. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  281. return std::atomic_load(&mSctpTransport);
  282. }
  283. void PeerConnection::closeTransports() {
  284. PLOG_VERBOSE << "Closing transports";
  285. // Change state to sink state Closed
  286. if (!changeState(State::Closed))
  287. return; // already closed
  288. // Reset callbacks now that state is changed
  289. resetCallbacks();
  290. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  291. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  292. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  293. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  294. if (sctp) {
  295. sctp->onRecv(nullptr);
  296. sctp->onBufferedAmount(nullptr);
  297. }
  298. using array = std::array<shared_ptr<Transport>, 3>;
  299. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  300. for (const auto &t : transports)
  301. if (t)
  302. t->onStateChange(nullptr);
  303. // Initiate transport stop on the processor after closing the data channels
  304. mProcessor->enqueue([transports = std::move(transports)]() {
  305. ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
  306. for (const auto &t : transports)
  307. if (t)
  308. t->stop();
  309. for (auto &t : transports)
  310. t.reset();
  311. });
  312. });
  313. }
  314. void PeerConnection::endLocalCandidates() {
  315. std::lock_guard lock(mLocalDescriptionMutex);
  316. if (mLocalDescription)
  317. mLocalDescription->endCandidates();
  318. }
  319. void PeerConnection::rollbackLocalDescription() {
  320. PLOG_DEBUG << "Rolling back pending local description";
  321. std::unique_lock lock(mLocalDescriptionMutex);
  322. if (mCurrentLocalDescription) {
  323. std::vector<Candidate> existingCandidates;
  324. if (mLocalDescription)
  325. existingCandidates = mLocalDescription->extractCandidates();
  326. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  327. mLocalDescription->addCandidates(std::move(existingCandidates));
  328. mCurrentLocalDescription.reset();
  329. }
  330. }
  331. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  332. std::lock_guard lock(mRemoteDescriptionMutex);
  333. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  334. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  335. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  336. return true;
  337. }
  338. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  339. << expectedFingerprint.value_or("[none]") << "\"";
  340. return false;
  341. }
  342. void PeerConnection::forwardMessage(message_ptr message) {
  343. if (!message) {
  344. remoteCloseDataChannels();
  345. return;
  346. }
  347. uint16_t stream = uint16_t(message->stream);
  348. auto channel = findDataChannel(stream);
  349. if (!channel) {
  350. auto iceTransport = getIceTransport();
  351. auto sctpTransport = getSctpTransport();
  352. if (!iceTransport || !sctpTransport)
  353. return;
  354. // See https://tools.ietf.org/html/rfc8832
  355. const byte dataChannelOpenMessage{0x03};
  356. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  357. if (message->type == Message::Control) {
  358. if (message->size() == 0 || *message->data() != dataChannelOpenMessage)
  359. return; // ignore
  360. if (stream % 2 != remoteParity) {
  361. // The odd/even rule is violated, close the DataChannel
  362. sctpTransport->closeStream(message->stream);
  363. return;
  364. }
  365. channel =
  366. std::make_shared<NegotiatedDataChannel>(weak_from_this(), sctpTransport, stream);
  367. channel->openCallback = weak_bind(&PeerConnection::triggerDataChannel, this,
  368. weak_ptr<DataChannel>{channel});
  369. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  370. mDataChannels.emplace(stream, channel);
  371. } else {
  372. // Invalid, close the DataChannel
  373. sctpTransport->closeStream(message->stream);
  374. return;
  375. }
  376. }
  377. channel->incoming(message);
  378. }
  379. void PeerConnection::forwardMedia(message_ptr message) {
  380. if (!message)
  381. return;
  382. // Browsers like to compound their packets with a random SSRC.
  383. // we have to do this monstrosity to distribute the report blocks
  384. if (message->type == Message::Control) {
  385. std::set<uint32_t> ssrcs;
  386. size_t offset = 0;
  387. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  388. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  389. if (header->lengthInBytes() > message->size() - offset) {
  390. COUNTER_MEDIA_TRUNCATED++;
  391. break;
  392. }
  393. offset += header->lengthInBytes();
  394. if (header->payloadType() == 205 || header->payloadType() == 206) {
  395. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  396. ssrcs.insert(rtcpfb->packetSenderSSRC());
  397. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  398. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  399. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  400. ssrcs.insert(rtcpsr->senderSSRC());
  401. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  402. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  403. } else if (header->payloadType() == 202) {
  404. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  405. if (!sdes->isValid()) {
  406. PLOG_WARNING << "RTCP SDES packet is invalid";
  407. continue;
  408. }
  409. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  410. auto chunk = sdes->getChunk(i);
  411. ssrcs.insert(chunk->ssrc());
  412. }
  413. } else {
  414. // PT=207 == Extended Report
  415. if (header->payloadType() != 207) {
  416. COUNTER_UNKNOWN_PACKET_TYPE++;
  417. }
  418. }
  419. }
  420. if (!ssrcs.empty()) {
  421. for (uint32_t ssrc : ssrcs) {
  422. if (auto mid = getMidFromSsrc(ssrc)) {
  423. std::shared_lock lock(mTracksMutex); // read-only
  424. if (auto it = mTracks.find(*mid); it != mTracks.end())
  425. if (auto track = it->second.lock())
  426. track->incoming(message);
  427. }
  428. }
  429. return;
  430. }
  431. }
  432. uint32_t ssrc = uint32_t(message->stream);
  433. if (auto mid = getMidFromSsrc(ssrc)) {
  434. std::shared_lock lock(mTracksMutex); // read-only
  435. if (auto it = mTracks.find(*mid); it != mTracks.end())
  436. if (auto track = it->second.lock())
  437. track->incoming(message);
  438. } else {
  439. /*
  440. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  441. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  442. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  443. * this ideal? No! Do I know how to fix it? No!
  444. */
  445. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  446. return;
  447. }
  448. }
  449. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  450. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  451. return it->second;
  452. {
  453. std::lock_guard lock(mRemoteDescriptionMutex);
  454. if (!mRemoteDescription)
  455. return nullopt;
  456. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  457. if (auto found =
  458. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  459. return std::nullopt;
  460. },
  461. [&](Description::Media *media) -> optional<string> {
  462. return media->hasSSRC(ssrc)
  463. ? std::make_optional(media->mid())
  464. : nullopt;
  465. }},
  466. mRemoteDescription->media(i))) {
  467. mMidFromSsrc.emplace(ssrc, *found);
  468. return *found;
  469. }
  470. }
  471. }
  472. {
  473. std::lock_guard lock(mLocalDescriptionMutex);
  474. if (!mLocalDescription)
  475. return nullopt;
  476. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  477. if (auto found =
  478. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  479. return std::nullopt;
  480. },
  481. [&](Description::Media *media) -> optional<string> {
  482. return media->hasSSRC(ssrc)
  483. ? std::make_optional(media->mid())
  484. : nullopt;
  485. }},
  486. mLocalDescription->media(i))) {
  487. mMidFromSsrc.emplace(ssrc, *found);
  488. return *found;
  489. }
  490. }
  491. }
  492. return nullopt;
  493. }
  494. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  495. if (auto channel = findDataChannel(stream))
  496. channel->triggerBufferedAmount(amount);
  497. }
  498. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  499. cleanupDataChannels();
  500. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  501. uint16_t stream;
  502. if (init.id) {
  503. stream = *init.id;
  504. if (stream == 65535)
  505. throw std::invalid_argument("Invalid DataChannel id");
  506. } else {
  507. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  508. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  509. // See https://tools.ietf.org/html/rfc5763#section-5
  510. // Therefore, we assume passive role if we are the offerer.
  511. auto iceTransport = getIceTransport();
  512. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  513. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  514. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  515. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  516. // the DTLS server, it MUST choose an odd one.
  517. // See https://tools.ietf.org/html/rfc8832#section-6
  518. stream = (role == Description::Role::Active) ? 0 : 1;
  519. while (mDataChannels.find(stream) != mDataChannels.end()) {
  520. if (stream >= 65535 - 2)
  521. throw std::runtime_error("Too many DataChannels");
  522. stream += 2;
  523. }
  524. }
  525. // If the DataChannel is user-negotiated, do not negociate it here
  526. auto channel =
  527. init.negotiated
  528. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  529. std::move(init.protocol), std::move(init.reliability))
  530. : std::make_shared<NegotiatedDataChannel>(weak_from_this(), stream, std::move(label),
  531. std::move(init.protocol),
  532. std::move(init.reliability));
  533. mDataChannels.emplace(std::make_pair(stream, channel));
  534. return channel;
  535. }
  536. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  537. std::shared_lock lock(mDataChannelsMutex); // read-only
  538. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  539. if (auto channel = it->second.lock())
  540. return channel;
  541. return nullptr;
  542. }
  543. void PeerConnection::shiftDataChannels() {
  544. auto iceTransport = std::atomic_load(&mIceTransport);
  545. auto sctpTransport = std::atomic_load(&mSctpTransport);
  546. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  547. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  548. decltype(mDataChannels) newDataChannels;
  549. auto it = mDataChannels.begin();
  550. while (it != mDataChannels.end()) {
  551. auto channel = it->second.lock();
  552. channel->shiftStream();
  553. newDataChannels.emplace(channel->stream(), channel);
  554. ++it;
  555. }
  556. std::swap(mDataChannels, newDataChannels);
  557. }
  558. }
  559. void PeerConnection::iterateDataChannels(
  560. std::function<void(shared_ptr<DataChannel> channel)> func) {
  561. std::vector<shared_ptr<DataChannel>> locked;
  562. {
  563. std::shared_lock lock(mDataChannelsMutex); // read-only
  564. locked.reserve(mDataChannels.size());
  565. auto it = mDataChannels.begin();
  566. while (it != mDataChannels.end()) {
  567. auto channel = it->second.lock();
  568. if (channel && !channel->isClosed())
  569. locked.push_back(std::move(channel));
  570. ++it;
  571. }
  572. }
  573. for (auto &channel : locked)
  574. func(std::move(channel));
  575. }
  576. void PeerConnection::cleanupDataChannels() {
  577. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  578. auto it = mDataChannels.begin();
  579. while (it != mDataChannels.end()) {
  580. if (!it->second.lock()) {
  581. it = mDataChannels.erase(it);
  582. continue;
  583. }
  584. ++it;
  585. }
  586. }
  587. void PeerConnection::openDataChannels() {
  588. if (auto transport = std::atomic_load(&mSctpTransport))
  589. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  590. }
  591. void PeerConnection::closeDataChannels() {
  592. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  593. }
  594. void PeerConnection::remoteCloseDataChannels() {
  595. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  596. }
  597. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  598. shared_ptr<Track> track;
  599. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  600. if (track = it->second.lock(); track)
  601. track->setDescription(std::move(description));
  602. if (!track) {
  603. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  604. mTracks.emplace(std::make_pair(track->mid(), track));
  605. mTrackLines.emplace_back(track);
  606. }
  607. return track;
  608. }
  609. void PeerConnection::incomingTrack(Description::Media description) {
  610. std::unique_lock lock(mTracksMutex); // we are going to emplace
  611. #if !RTC_ENABLE_MEDIA
  612. if (mTracks.empty()) {
  613. PLOG_WARNING << "Tracks will be inative (not compiled with media support)";
  614. }
  615. #endif
  616. if (mTracks.find(description.mid()) == mTracks.end()) {
  617. auto track = std::make_shared<Track>(weak_from_this(), std::move(description));
  618. mTracks.emplace(std::make_pair(track->mid(), track));
  619. mTrackLines.emplace_back(track);
  620. triggerTrack(track);
  621. }
  622. }
  623. void PeerConnection::openTracks() {
  624. #if RTC_ENABLE_MEDIA
  625. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  626. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  627. std::shared_lock lock(mTracksMutex); // read-only
  628. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  629. if (auto track = it->second.lock())
  630. if (!track->isOpen())
  631. track->open(srtpTransport);
  632. }
  633. #endif
  634. }
  635. void PeerConnection::validateRemoteDescription(const Description &description) {
  636. if (!description.iceUfrag())
  637. throw std::invalid_argument("Remote description has no ICE user fragment");
  638. if (!description.icePwd())
  639. throw std::invalid_argument("Remote description has no ICE password");
  640. if (!description.fingerprint())
  641. throw std::invalid_argument("Remote description has no valid fingerprint");
  642. if (description.mediaCount() == 0)
  643. throw std::invalid_argument("Remote description has no media line");
  644. int activeMediaCount = 0;
  645. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  646. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  647. [&](const Description::Media *media) {
  648. if (media->direction() != Description::Direction::Inactive)
  649. ++activeMediaCount;
  650. }},
  651. description.media(i));
  652. if (activeMediaCount == 0)
  653. throw std::invalid_argument("Remote description has no active media");
  654. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  655. if (*description.iceUfrag() == *local->iceUfrag() &&
  656. *description.icePwd() == *local->icePwd())
  657. throw std::logic_error("Got the local description as remote description");
  658. PLOG_VERBOSE << "Remote description looks valid";
  659. }
  660. void PeerConnection::processLocalDescription(Description description) {
  661. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  662. const size_t localMaxMessageSize =
  663. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  664. // Clean up the application entry the ICE transport might have added already (libnice)
  665. description.clearMedia();
  666. if (auto remote = remoteDescription()) {
  667. // Reciprocate remote description
  668. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  669. std::visit( // reciprocate each media
  670. rtc::overloaded{
  671. [&](Description::Application *remoteApp) {
  672. std::shared_lock lock(mDataChannelsMutex);
  673. if (!mDataChannels.empty()) {
  674. // Prefer local description
  675. Description::Application app(remoteApp->mid());
  676. app.setSctpPort(localSctpPort);
  677. app.setMaxMessageSize(localMaxMessageSize);
  678. PLOG_DEBUG << "Adding application to local description, mid=\""
  679. << app.mid() << "\"";
  680. description.addMedia(std::move(app));
  681. return;
  682. }
  683. auto reciprocated = remoteApp->reciprocate();
  684. reciprocated.hintSctpPort(localSctpPort);
  685. reciprocated.setMaxMessageSize(localMaxMessageSize);
  686. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  687. << reciprocated.mid() << "\"";
  688. description.addMedia(std::move(reciprocated));
  689. },
  690. [&](Description::Media *remoteMedia) {
  691. std::shared_lock lock(mTracksMutex);
  692. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  693. // Prefer local description
  694. if (auto track = it->second.lock()) {
  695. auto media = track->description();
  696. #if !RTC_ENABLE_MEDIA
  697. // No media support, mark as inactive
  698. media.setDirection(Description::Direction::Inactive);
  699. #endif
  700. PLOG_DEBUG
  701. << "Adding media to local description, mid=\"" << media.mid()
  702. << "\", active=" << std::boolalpha
  703. << (media.direction() != Description::Direction::Inactive);
  704. description.addMedia(std::move(media));
  705. } else {
  706. auto reciprocated = remoteMedia->reciprocate();
  707. reciprocated.setDirection(Description::Direction::Inactive);
  708. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  709. << reciprocated.mid() << "\"";
  710. description.addMedia(std::move(reciprocated));
  711. }
  712. return;
  713. }
  714. lock.unlock(); // we are going to call incomingTrack()
  715. auto reciprocated = remoteMedia->reciprocate();
  716. #if !RTC_ENABLE_MEDIA
  717. // No media support, mark as inactive
  718. reciprocated.setDirection(Description::Direction::Inactive);
  719. #endif
  720. incomingTrack(reciprocated);
  721. PLOG_DEBUG
  722. << "Reciprocating media in local description, mid=\""
  723. << reciprocated.mid() << "\", active=" << std::boolalpha
  724. << (reciprocated.direction() != Description::Direction::Inactive);
  725. description.addMedia(std::move(reciprocated));
  726. },
  727. },
  728. remote->media(i));
  729. }
  730. if (description.type() == Description::Type::Offer) {
  731. // This is an offer, add locally created data channels and tracks
  732. // Add application for data channels
  733. if (!description.hasApplication()) {
  734. std::shared_lock lock(mDataChannelsMutex);
  735. if (!mDataChannels.empty()) {
  736. unsigned int m = 0;
  737. while (description.hasMid(std::to_string(m)))
  738. ++m;
  739. Description::Application app(std::to_string(m));
  740. app.setSctpPort(localSctpPort);
  741. app.setMaxMessageSize(localMaxMessageSize);
  742. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  743. << "\"";
  744. description.addMedia(std::move(app));
  745. }
  746. }
  747. // Add media for local tracks
  748. std::shared_lock lock(mTracksMutex);
  749. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  750. if (auto track = it->lock()) {
  751. if (description.hasMid(track->mid()))
  752. continue;
  753. auto media = track->description();
  754. #if !RTC_ENABLE_MEDIA
  755. // No media support, mark as inactive
  756. media.setDirection(Description::Direction::Inactive);
  757. #endif
  758. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  759. << "\", active=" << std::boolalpha
  760. << (media.direction() != Description::Direction::Inactive);
  761. description.addMedia(std::move(media));
  762. }
  763. }
  764. // There might be no media at this point if the user created a Track, deleted it,
  765. // then called setLocalDescription().
  766. if (description.mediaCount() == 0)
  767. throw std::runtime_error("No DataChannel or Track to negotiate");
  768. }
  769. // Set local fingerprint (wait for certificate if necessary)
  770. description.setFingerprint(mCertificate.get()->fingerprint());
  771. PLOG_VERBOSE << "Issuing local description: " << description;
  772. if (description.mediaCount() == 0)
  773. throw std::logic_error("Local description has no media line");
  774. {
  775. // Set as local description
  776. std::lock_guard lock(mLocalDescriptionMutex);
  777. std::vector<Candidate> existingCandidates;
  778. if (mLocalDescription) {
  779. existingCandidates = mLocalDescription->extractCandidates();
  780. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  781. }
  782. mLocalDescription.emplace(description);
  783. mLocalDescription->addCandidates(std::move(existingCandidates));
  784. }
  785. mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
  786. // Reciprocated tracks might need to be open
  787. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  788. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  789. mProcessor->enqueue(&PeerConnection::openTracks, this);
  790. }
  791. void PeerConnection::processLocalCandidate(Candidate candidate) {
  792. std::lock_guard lock(mLocalDescriptionMutex);
  793. if (!mLocalDescription)
  794. throw std::logic_error("Got a local candidate without local description");
  795. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  796. candidate.type() != Candidate::Type::Relayed) {
  797. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  798. return;
  799. }
  800. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  801. candidate.resolve(Candidate::ResolveMode::Simple);
  802. mLocalDescription->addCandidate(candidate);
  803. mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
  804. }
  805. void PeerConnection::processRemoteDescription(Description description) {
  806. {
  807. // Set as remote description
  808. std::lock_guard lock(mRemoteDescriptionMutex);
  809. std::vector<Candidate> existingCandidates;
  810. if (mRemoteDescription)
  811. existingCandidates = mRemoteDescription->extractCandidates();
  812. mRemoteDescription.emplace(description);
  813. mRemoteDescription->addCandidates(std::move(existingCandidates));
  814. }
  815. auto iceTransport = initIceTransport();
  816. if (!iceTransport)
  817. return; // closed
  818. iceTransport->setRemoteDescription(std::move(description));
  819. // Since we assumed passive role during DataChannel creation, we might need to shift the stream
  820. // numbers from odd to even.
  821. shiftDataChannels();
  822. if (description.hasApplication()) {
  823. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  824. auto sctpTransport = std::atomic_load(&mSctpTransport);
  825. if (!sctpTransport && dtlsTransport &&
  826. dtlsTransport->state() == Transport::State::Connected)
  827. initSctpTransport();
  828. }
  829. }
  830. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  831. auto iceTransport = std::atomic_load(&mIceTransport);
  832. {
  833. // Set as remote candidate
  834. std::lock_guard lock(mRemoteDescriptionMutex);
  835. if (!mRemoteDescription)
  836. throw std::logic_error("Got a remote candidate without remote description");
  837. if (!iceTransport)
  838. throw std::logic_error("Got a remote candidate without ICE transport");
  839. candidate.hintMid(mRemoteDescription->bundleMid());
  840. if (mRemoteDescription->hasCandidate(candidate))
  841. return; // already in description, ignore
  842. candidate.resolve(Candidate::ResolveMode::Simple);
  843. mRemoteDescription->addCandidate(candidate);
  844. }
  845. if (candidate.isResolved()) {
  846. iceTransport->addRemoteCandidate(std::move(candidate));
  847. } else {
  848. // We might need a lookup, do it asynchronously
  849. // We don't use the thread pool because we have no control on the timeout
  850. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  851. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  852. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  853. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  854. if (auto iceTransport = weakIceTransport.lock())
  855. iceTransport->addRemoteCandidate(std::move(candidate));
  856. });
  857. t.detach();
  858. }
  859. }
  860. }
  861. string PeerConnection::localBundleMid() const {
  862. std::lock_guard lock(mLocalDescriptionMutex);
  863. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  864. }
  865. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  866. auto dataChannel = weakDataChannel.lock();
  867. if (dataChannel) {
  868. dataChannel->resetOpenCallback(); // might be set internally
  869. mPendingDataChannels.push(std::move(dataChannel));
  870. }
  871. triggerPendingDataChannels();
  872. }
  873. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  874. auto track = weakTrack.lock();
  875. if (track) {
  876. track->resetOpenCallback(); // might be set internally
  877. mPendingTracks.push(std::move(track));
  878. }
  879. triggerPendingTracks();
  880. }
  881. void PeerConnection::triggerPendingDataChannels() {
  882. while (dataChannelCallback) {
  883. auto next = mPendingDataChannels.tryPop();
  884. if (!next)
  885. break;
  886. auto impl = std::move(*next);
  887. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  888. impl->triggerOpen();
  889. }
  890. }
  891. void PeerConnection::triggerPendingTracks() {
  892. while (trackCallback) {
  893. auto next = mPendingTracks.tryPop();
  894. if (!next)
  895. break;
  896. auto impl = std::move(*next);
  897. trackCallback(std::make_shared<rtc::Track>(impl));
  898. impl->triggerOpen();
  899. }
  900. }
  901. void PeerConnection::flushPendingDataChannels() {
  902. mProcessor->enqueue(&PeerConnection::triggerPendingDataChannels, this);
  903. }
  904. void PeerConnection::flushPendingTracks() {
  905. mProcessor->enqueue(&PeerConnection::triggerPendingTracks, this);
  906. }
  907. bool PeerConnection::changeState(State newState) {
  908. State current;
  909. do {
  910. current = state.load();
  911. if (current == State::Closed)
  912. return false;
  913. if (current == newState)
  914. return false;
  915. } while (!state.compare_exchange_weak(current, newState));
  916. std::ostringstream s;
  917. s << newState;
  918. PLOG_INFO << "Changed state to " << s.str();
  919. if (newState == State::Closed)
  920. // This is the last state change, so we may steal the callback
  921. mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
  922. else
  923. mProcessor->enqueue(stateChangeCallback.wrap(), newState);
  924. return true;
  925. }
  926. bool PeerConnection::changeGatheringState(GatheringState newState) {
  927. if (gatheringState.exchange(newState) == newState)
  928. return false;
  929. std::ostringstream s;
  930. s << newState;
  931. PLOG_INFO << "Changed gathering state to " << s.str();
  932. mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
  933. return true;
  934. }
  935. bool PeerConnection::changeSignalingState(SignalingState newState) {
  936. if (signalingState.exchange(newState) == newState)
  937. return false;
  938. std::ostringstream s;
  939. s << state;
  940. PLOG_INFO << "Changed signaling state to " << s.str();
  941. mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
  942. return true;
  943. }
  944. void PeerConnection::resetCallbacks() {
  945. // Unregister all callbacks
  946. dataChannelCallback = nullptr;
  947. localDescriptionCallback = nullptr;
  948. localCandidateCallback = nullptr;
  949. stateChangeCallback = nullptr;
  950. gatheringStateChangeCallback = nullptr;
  951. }
  952. } // namespace rtc::impl