peerconnection.cpp 37 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "icetransport.hpp"
  24. #include "internals.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #include "threadpool.hpp"
  31. #if RTC_ENABLE_MEDIA
  32. #include "dtlssrtptransport.hpp"
  33. #endif
  34. #include <array>
  35. #include <iomanip>
  36. #include <set>
  37. #include <thread>
  38. using namespace std::placeholders;
  39. namespace rtc::impl {
  40. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  41. "Number of truncated RTP packets over past second");
  42. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  43. "Number of SRTP decryption errors over past second");
  44. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  45. "Number of SRTP encryption errors over past second");
  46. static LogCounter
  47. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  48. "Number of unknown RTCP packet types over past second");
  49. PeerConnection::PeerConnection(Configuration config_)
  50. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
  51. mProcessor(std::make_unique<Processor>()) {
  52. PLOG_VERBOSE << "Creating PeerConnection";
  53. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  54. throw std::invalid_argument("Invalid port range");
  55. if (config.mtu) {
  56. if (*config.mtu < 576) // Min MTU for IPv4
  57. throw std::invalid_argument("Invalid MTU value");
  58. if (*config.mtu > 1500) { // Standard Ethernet
  59. PLOG_WARNING << "MTU set to " << *config.mtu;
  60. } else {
  61. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  62. }
  63. }
  64. }
  65. PeerConnection::~PeerConnection() {
  66. PLOG_VERBOSE << "Destroying PeerConnection";
  67. mProcessor->join();
  68. }
  69. void PeerConnection::close() {
  70. PLOG_VERBOSE << "Closing PeerConnection";
  71. negotiationNeeded = false;
  72. // Close data channels asynchronously
  73. mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
  74. closeTransports();
  75. }
  76. optional<Description> PeerConnection::localDescription() const {
  77. std::lock_guard lock(mLocalDescriptionMutex);
  78. return mLocalDescription;
  79. }
  80. optional<Description> PeerConnection::remoteDescription() const {
  81. std::lock_guard lock(mRemoteDescriptionMutex);
  82. return mRemoteDescription;
  83. }
  84. size_t PeerConnection::remoteMaxMessageSize() const {
  85. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  86. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  87. std::lock_guard lock(mRemoteDescriptionMutex);
  88. if (mRemoteDescription)
  89. if (auto *application = mRemoteDescription->application())
  90. if (auto max = application->maxMessageSize()) {
  91. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  92. // size value of zero, it indicates that the SCTP endpoint will handle messages
  93. // of any size, subject to memory capacity, etc.
  94. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  95. }
  96. return std::min(remoteMax, localMax);
  97. }
  98. // Helper for PeerConnection::initXTransport methods: start and emplace the transport
  99. template <typename T>
  100. shared_ptr<T> emplaceTransport(PeerConnection *pc, shared_ptr<T> *member, shared_ptr<T> transport) {
  101. transport->start();
  102. std::atomic_store(member, transport);
  103. if (pc->state.load() == PeerConnection::State::Closed) {
  104. std::atomic_store(member, decltype(transport)(nullptr));
  105. transport->stop();
  106. return nullptr;
  107. }
  108. return transport;
  109. }
  110. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  111. try {
  112. if (auto transport = std::atomic_load(&mIceTransport))
  113. return transport;
  114. PLOG_VERBOSE << "Starting ICE transport";
  115. auto transport = std::make_shared<IceTransport>(
  116. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  117. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  118. auto shared_this = weak_this.lock();
  119. if (!shared_this)
  120. return;
  121. switch (transportState) {
  122. case IceTransport::State::Connecting:
  123. changeState(State::Connecting);
  124. break;
  125. case IceTransport::State::Failed:
  126. changeState(State::Failed);
  127. break;
  128. case IceTransport::State::Connected:
  129. initDtlsTransport();
  130. break;
  131. case IceTransport::State::Disconnected:
  132. changeState(State::Disconnected);
  133. break;
  134. default:
  135. // Ignore
  136. break;
  137. }
  138. },
  139. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  140. auto shared_this = weak_this.lock();
  141. if (!shared_this)
  142. return;
  143. switch (gatheringState) {
  144. case IceTransport::GatheringState::InProgress:
  145. changeGatheringState(GatheringState::InProgress);
  146. break;
  147. case IceTransport::GatheringState::Complete:
  148. endLocalCandidates();
  149. changeGatheringState(GatheringState::Complete);
  150. break;
  151. default:
  152. // Ignore
  153. break;
  154. }
  155. });
  156. return emplaceTransport(this, &mIceTransport, std::move(transport));
  157. } catch (const std::exception &e) {
  158. PLOG_ERROR << e.what();
  159. changeState(State::Failed);
  160. throw std::runtime_error("ICE transport initialization failed");
  161. }
  162. }
  163. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  164. try {
  165. if (auto transport = std::atomic_load(&mDtlsTransport))
  166. return transport;
  167. PLOG_VERBOSE << "Starting DTLS transport";
  168. auto lower = std::atomic_load(&mIceTransport);
  169. if (!lower)
  170. throw std::logic_error("No underlying ICE transport for DTLS transport");
  171. auto certificate = mCertificate.get();
  172. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  173. auto dtlsStateChangeCallback =
  174. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  175. auto shared_this = weak_this.lock();
  176. if (!shared_this)
  177. return;
  178. switch (transportState) {
  179. case DtlsTransport::State::Connected:
  180. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  181. initSctpTransport();
  182. else
  183. changeState(State::Connected);
  184. mProcessor->enqueue(&PeerConnection::openTracks, this);
  185. break;
  186. case DtlsTransport::State::Failed:
  187. changeState(State::Failed);
  188. break;
  189. case DtlsTransport::State::Disconnected:
  190. changeState(State::Disconnected);
  191. break;
  192. default:
  193. // Ignore
  194. break;
  195. }
  196. };
  197. shared_ptr<DtlsTransport> transport;
  198. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  199. #if RTC_ENABLE_MEDIA
  200. PLOG_INFO << "This connection requires media support";
  201. // DTLS-SRTP
  202. transport = std::make_shared<DtlsSrtpTransport>(
  203. lower, certificate, config.mtu, verifierCallback,
  204. weak_bind(&PeerConnection::forwardMedia, this, _1), dtlsStateChangeCallback);
  205. #else
  206. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  207. #endif
  208. }
  209. if (!transport) {
  210. // DTLS only
  211. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  212. verifierCallback, dtlsStateChangeCallback);
  213. }
  214. return emplaceTransport(this, &mDtlsTransport, std::move(transport));
  215. } catch (const std::exception &e) {
  216. PLOG_ERROR << e.what();
  217. changeState(State::Failed);
  218. throw std::runtime_error("DTLS transport initialization failed");
  219. }
  220. }
  221. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  222. try {
  223. if (auto transport = std::atomic_load(&mSctpTransport))
  224. return transport;
  225. PLOG_VERBOSE << "Starting SCTP transport";
  226. auto lower = std::atomic_load(&mDtlsTransport);
  227. if (!lower)
  228. throw std::logic_error("No underlying DTLS transport for SCTP transport");
  229. auto local = localDescription();
  230. if (!local || !local->application())
  231. throw std::logic_error("Starting SCTP transport without local application description");
  232. auto remote = remoteDescription();
  233. if (!remote || !remote->application())
  234. throw std::logic_error("Starting SCTP transport without remote application description");
  235. SctpTransport::Ports ports = {};
  236. ports.local = local->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  237. ports.remote = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  238. // This is the last occasion to ensure the stream numbers are coherent with the role
  239. shiftDataChannels();
  240. auto transport = std::make_shared<SctpTransport>(
  241. lower, config, std::move(ports), weak_bind(&PeerConnection::forwardMessage, this, _1),
  242. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  243. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  244. auto shared_this = weak_this.lock();
  245. if (!shared_this)
  246. return;
  247. switch (transportState) {
  248. case SctpTransport::State::Connected:
  249. changeState(State::Connected);
  250. mProcessor->enqueue(&PeerConnection::openDataChannels, this);
  251. break;
  252. case SctpTransport::State::Failed:
  253. LOG_WARNING << "SCTP transport failed";
  254. changeState(State::Failed);
  255. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  256. break;
  257. case SctpTransport::State::Disconnected:
  258. changeState(State::Disconnected);
  259. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  260. break;
  261. default:
  262. // Ignore
  263. break;
  264. }
  265. });
  266. return emplaceTransport(this, &mSctpTransport, std::move(transport));
  267. } catch (const std::exception &e) {
  268. PLOG_ERROR << e.what();
  269. changeState(State::Failed);
  270. throw std::runtime_error("SCTP transport initialization failed");
  271. }
  272. }
  273. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  274. return std::atomic_load(&mIceTransport);
  275. }
  276. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  277. return std::atomic_load(&mDtlsTransport);
  278. }
  279. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  280. return std::atomic_load(&mSctpTransport);
  281. }
  282. void PeerConnection::closeTransports() {
  283. PLOG_VERBOSE << "Closing transports";
  284. // Change state to sink state Closed
  285. if (!changeState(State::Closed))
  286. return; // already closed
  287. // Reset callbacks now that state is changed
  288. resetCallbacks();
  289. // Pass the pointers to a thread, allowing to terminate a transport from its own thread
  290. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  291. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  292. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  293. if (sctp) {
  294. sctp->onRecv(nullptr);
  295. sctp->onBufferedAmount(nullptr);
  296. }
  297. using array = std::array<shared_ptr<Transport>, 3>;
  298. array transports{std::move(sctp), std::move(dtls), std::move(ice)};
  299. for (const auto &t : transports)
  300. if (t)
  301. t->onStateChange(nullptr);
  302. // Initiate transport stop on the processor after closing the data channels
  303. mProcessor->enqueue([transports = std::move(transports)]() {
  304. ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
  305. for (const auto &t : transports)
  306. if (t)
  307. t->stop();
  308. for (auto &t : transports)
  309. t.reset();
  310. });
  311. });
  312. }
  313. void PeerConnection::endLocalCandidates() {
  314. std::lock_guard lock(mLocalDescriptionMutex);
  315. if (mLocalDescription)
  316. mLocalDescription->endCandidates();
  317. }
  318. void PeerConnection::rollbackLocalDescription() {
  319. PLOG_DEBUG << "Rolling back pending local description";
  320. std::unique_lock lock(mLocalDescriptionMutex);
  321. if (mCurrentLocalDescription) {
  322. std::vector<Candidate> existingCandidates;
  323. if (mLocalDescription)
  324. existingCandidates = mLocalDescription->extractCandidates();
  325. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  326. mLocalDescription->addCandidates(std::move(existingCandidates));
  327. mCurrentLocalDescription.reset();
  328. }
  329. }
  330. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  331. std::lock_guard lock(mRemoteDescriptionMutex);
  332. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  333. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  334. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  335. return true;
  336. }
  337. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  338. << expectedFingerprint.value_or("[none]") << "\"";
  339. return false;
  340. }
  341. void PeerConnection::forwardMessage(message_ptr message) {
  342. if (!message) {
  343. remoteCloseDataChannels();
  344. return;
  345. }
  346. uint16_t stream = uint16_t(message->stream);
  347. auto channel = findDataChannel(stream);
  348. if (!channel) {
  349. auto iceTransport = getIceTransport();
  350. auto sctpTransport = getSctpTransport();
  351. if (!iceTransport || !sctpTransport)
  352. return;
  353. const byte dataChannelOpenMessage{0x03};
  354. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  355. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  356. stream % 2 == remoteParity) {
  357. channel =
  358. std::make_shared<NegotiatedDataChannel>(weak_from_this(), sctpTransport, stream);
  359. channel->openCallback = weak_bind(&PeerConnection::triggerDataChannel, this,
  360. weak_ptr<DataChannel>{channel});
  361. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  362. mDataChannels.emplace(stream, channel);
  363. } else if(message->type != Message::Control) {
  364. // Invalid, close the DataChannel
  365. sctpTransport->closeStream(message->stream);
  366. return;
  367. }
  368. }
  369. channel->incoming(message);
  370. }
  371. void PeerConnection::forwardMedia(message_ptr message) {
  372. if (!message)
  373. return;
  374. // Browsers like to compound their packets with a random SSRC.
  375. // we have to do this monstrosity to distribute the report blocks
  376. if (message->type == Message::Control) {
  377. std::set<uint32_t> ssrcs;
  378. size_t offset = 0;
  379. while ((sizeof(RtcpHeader) + offset) <= message->size()) {
  380. auto header = reinterpret_cast<RtcpHeader *>(message->data() + offset);
  381. if (header->lengthInBytes() > message->size() - offset) {
  382. COUNTER_MEDIA_TRUNCATED++;
  383. break;
  384. }
  385. offset += header->lengthInBytes();
  386. if (header->payloadType() == 205 || header->payloadType() == 206) {
  387. auto rtcpfb = reinterpret_cast<RtcpFbHeader *>(header);
  388. ssrcs.insert(rtcpfb->packetSenderSSRC());
  389. ssrcs.insert(rtcpfb->mediaSourceSSRC());
  390. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  391. auto rtcpsr = reinterpret_cast<RtcpSr *>(header);
  392. ssrcs.insert(rtcpsr->senderSSRC());
  393. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  394. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  395. } else if (header->payloadType() == 202) {
  396. auto sdes = reinterpret_cast<RtcpSdes *>(header);
  397. if (!sdes->isValid()) {
  398. PLOG_WARNING << "RTCP SDES packet is invalid";
  399. continue;
  400. }
  401. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  402. auto chunk = sdes->getChunk(i);
  403. ssrcs.insert(chunk->ssrc());
  404. }
  405. } else {
  406. // PT=207 == Extended Report
  407. if (header->payloadType() != 207) {
  408. COUNTER_UNKNOWN_PACKET_TYPE++;
  409. }
  410. }
  411. }
  412. if (!ssrcs.empty()) {
  413. for (uint32_t ssrc : ssrcs) {
  414. if (auto mid = getMidFromSsrc(ssrc)) {
  415. std::shared_lock lock(mTracksMutex); // read-only
  416. if (auto it = mTracks.find(*mid); it != mTracks.end())
  417. if (auto track = it->second.lock())
  418. track->incoming(message);
  419. }
  420. }
  421. return;
  422. }
  423. }
  424. uint32_t ssrc = uint32_t(message->stream);
  425. if (auto mid = getMidFromSsrc(ssrc)) {
  426. std::shared_lock lock(mTracksMutex); // read-only
  427. if (auto it = mTracks.find(*mid); it != mTracks.end())
  428. if (auto track = it->second.lock())
  429. track->incoming(message);
  430. } else {
  431. /*
  432. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  433. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  434. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  435. * this ideal? No! Do I know how to fix it? No!
  436. */
  437. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  438. return;
  439. }
  440. }
  441. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  442. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  443. return it->second;
  444. {
  445. std::lock_guard lock(mRemoteDescriptionMutex);
  446. if (!mRemoteDescription)
  447. return nullopt;
  448. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  449. if (auto found =
  450. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  451. return std::nullopt;
  452. },
  453. [&](Description::Media *media) -> optional<string> {
  454. return media->hasSSRC(ssrc)
  455. ? std::make_optional(media->mid())
  456. : nullopt;
  457. }},
  458. mRemoteDescription->media(i))) {
  459. mMidFromSsrc.emplace(ssrc, *found);
  460. return *found;
  461. }
  462. }
  463. }
  464. {
  465. std::lock_guard lock(mLocalDescriptionMutex);
  466. if (!mLocalDescription)
  467. return nullopt;
  468. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  469. if (auto found =
  470. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  471. return std::nullopt;
  472. },
  473. [&](Description::Media *media) -> optional<string> {
  474. return media->hasSSRC(ssrc)
  475. ? std::make_optional(media->mid())
  476. : nullopt;
  477. }},
  478. mLocalDescription->media(i))) {
  479. mMidFromSsrc.emplace(ssrc, *found);
  480. return *found;
  481. }
  482. }
  483. }
  484. return nullopt;
  485. }
  486. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  487. if (auto channel = findDataChannel(stream))
  488. channel->triggerBufferedAmount(amount);
  489. }
  490. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(string label, DataChannelInit init) {
  491. cleanupDataChannels();
  492. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  493. uint16_t stream;
  494. if (init.id) {
  495. stream = *init.id;
  496. if (stream == 65535)
  497. throw std::invalid_argument("Invalid DataChannel id");
  498. } else {
  499. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  500. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  501. // See https://tools.ietf.org/html/rfc5763#section-5
  502. // Therefore, we assume passive role if we are the offerer.
  503. auto iceTransport = getIceTransport();
  504. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  505. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  506. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  507. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  508. // the DTLS server, it MUST choose an odd one.
  509. // See https://tools.ietf.org/html/rfc8832#section-6
  510. stream = (role == Description::Role::Active) ? 0 : 1;
  511. while (mDataChannels.find(stream) != mDataChannels.end()) {
  512. if (stream >= 65535 - 2)
  513. throw std::runtime_error("Too many DataChannels");
  514. stream += 2;
  515. }
  516. }
  517. // If the DataChannel is user-negotiated, do not negociate it here
  518. auto channel =
  519. init.negotiated
  520. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  521. std::move(init.protocol), std::move(init.reliability))
  522. : std::make_shared<NegotiatedDataChannel>(weak_from_this(), stream, std::move(label),
  523. std::move(init.protocol),
  524. std::move(init.reliability));
  525. mDataChannels.emplace(std::make_pair(stream, channel));
  526. return channel;
  527. }
  528. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  529. std::shared_lock lock(mDataChannelsMutex); // read-only
  530. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  531. if (auto channel = it->second.lock())
  532. return channel;
  533. return nullptr;
  534. }
  535. void PeerConnection::shiftDataChannels() {
  536. auto iceTransport = std::atomic_load(&mIceTransport);
  537. auto sctpTransport = std::atomic_load(&mSctpTransport);
  538. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  539. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  540. decltype(mDataChannels) newDataChannels;
  541. auto it = mDataChannels.begin();
  542. while (it != mDataChannels.end()) {
  543. auto channel = it->second.lock();
  544. channel->shiftStream();
  545. newDataChannels.emplace(channel->stream(), channel);
  546. ++it;
  547. }
  548. std::swap(mDataChannels, newDataChannels);
  549. }
  550. }
  551. void PeerConnection::iterateDataChannels(
  552. std::function<void(shared_ptr<DataChannel> channel)> func) {
  553. std::vector<shared_ptr<DataChannel>> locked;
  554. {
  555. std::shared_lock lock(mDataChannelsMutex); // read-only
  556. locked.reserve(mDataChannels.size());
  557. auto it = mDataChannels.begin();
  558. while (it != mDataChannels.end()) {
  559. auto channel = it->second.lock();
  560. if (channel && !channel->isClosed())
  561. locked.push_back(std::move(channel));
  562. ++it;
  563. }
  564. }
  565. for (auto &channel : locked)
  566. func(std::move(channel));
  567. }
  568. void PeerConnection::cleanupDataChannels() {
  569. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  570. auto it = mDataChannels.begin();
  571. while (it != mDataChannels.end()) {
  572. if (!it->second.lock()) {
  573. it = mDataChannels.erase(it);
  574. continue;
  575. }
  576. ++it;
  577. }
  578. }
  579. void PeerConnection::openDataChannels() {
  580. if (auto transport = std::atomic_load(&mSctpTransport))
  581. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  582. }
  583. void PeerConnection::closeDataChannels() {
  584. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  585. }
  586. void PeerConnection::remoteCloseDataChannels() {
  587. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  588. }
  589. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  590. shared_ptr<Track> track;
  591. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  592. if (track = it->second.lock(); track)
  593. track->setDescription(std::move(description));
  594. if (!track) {
  595. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  596. mTracks.emplace(std::make_pair(track->mid(), track));
  597. mTrackLines.emplace_back(track);
  598. }
  599. return track;
  600. }
  601. void PeerConnection::incomingTrack(Description::Media description) {
  602. std::unique_lock lock(mTracksMutex); // we are going to emplace
  603. #if !RTC_ENABLE_MEDIA
  604. if (mTracks.empty()) {
  605. PLOG_WARNING << "Tracks will be inative (not compiled with media support)";
  606. }
  607. #endif
  608. if (mTracks.find(description.mid()) == mTracks.end()) {
  609. auto track = std::make_shared<Track>(weak_from_this(), std::move(description));
  610. mTracks.emplace(std::make_pair(track->mid(), track));
  611. mTrackLines.emplace_back(track);
  612. triggerTrack(track);
  613. }
  614. }
  615. void PeerConnection::openTracks() {
  616. #if RTC_ENABLE_MEDIA
  617. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  618. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  619. std::shared_lock lock(mTracksMutex); // read-only
  620. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  621. if (auto track = it->second.lock())
  622. if (!track->isOpen())
  623. track->open(srtpTransport);
  624. }
  625. #endif
  626. }
  627. void PeerConnection::validateRemoteDescription(const Description &description) {
  628. if (!description.iceUfrag())
  629. throw std::invalid_argument("Remote description has no ICE user fragment");
  630. if (!description.icePwd())
  631. throw std::invalid_argument("Remote description has no ICE password");
  632. if (!description.fingerprint())
  633. throw std::invalid_argument("Remote description has no valid fingerprint");
  634. if (description.mediaCount() == 0)
  635. throw std::invalid_argument("Remote description has no media line");
  636. int activeMediaCount = 0;
  637. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  638. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  639. [&](const Description::Media *media) {
  640. if (media->direction() != Description::Direction::Inactive)
  641. ++activeMediaCount;
  642. }},
  643. description.media(i));
  644. if (activeMediaCount == 0)
  645. throw std::invalid_argument("Remote description has no active media");
  646. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  647. if (*description.iceUfrag() == *local->iceUfrag() &&
  648. *description.icePwd() == *local->icePwd())
  649. throw std::logic_error("Got the local description as remote description");
  650. PLOG_VERBOSE << "Remote description looks valid";
  651. }
  652. void PeerConnection::processLocalDescription(Description description) {
  653. const uint16_t localSctpPort = DEFAULT_SCTP_PORT;
  654. const size_t localMaxMessageSize =
  655. config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  656. // Clean up the application entry the ICE transport might have added already (libnice)
  657. description.clearMedia();
  658. if (auto remote = remoteDescription()) {
  659. // Reciprocate remote description
  660. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  661. std::visit( // reciprocate each media
  662. rtc::overloaded{
  663. [&](Description::Application *remoteApp) {
  664. std::shared_lock lock(mDataChannelsMutex);
  665. if (!mDataChannels.empty()) {
  666. // Prefer local description
  667. Description::Application app(remoteApp->mid());
  668. app.setSctpPort(localSctpPort);
  669. app.setMaxMessageSize(localMaxMessageSize);
  670. PLOG_DEBUG << "Adding application to local description, mid=\""
  671. << app.mid() << "\"";
  672. description.addMedia(std::move(app));
  673. return;
  674. }
  675. auto reciprocated = remoteApp->reciprocate();
  676. reciprocated.hintSctpPort(localSctpPort);
  677. reciprocated.setMaxMessageSize(localMaxMessageSize);
  678. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  679. << reciprocated.mid() << "\"";
  680. description.addMedia(std::move(reciprocated));
  681. },
  682. [&](Description::Media *remoteMedia) {
  683. std::shared_lock lock(mTracksMutex);
  684. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  685. // Prefer local description
  686. if (auto track = it->second.lock()) {
  687. auto media = track->description();
  688. #if !RTC_ENABLE_MEDIA
  689. // No media support, mark as inactive
  690. media.setDirection(Description::Direction::Inactive);
  691. #endif
  692. PLOG_DEBUG
  693. << "Adding media to local description, mid=\"" << media.mid()
  694. << "\", active=" << std::boolalpha
  695. << (media.direction() != Description::Direction::Inactive);
  696. description.addMedia(std::move(media));
  697. } else {
  698. auto reciprocated = remoteMedia->reciprocate();
  699. reciprocated.setDirection(Description::Direction::Inactive);
  700. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  701. << reciprocated.mid() << "\"";
  702. description.addMedia(std::move(reciprocated));
  703. }
  704. return;
  705. }
  706. lock.unlock(); // we are going to call incomingTrack()
  707. auto reciprocated = remoteMedia->reciprocate();
  708. #if !RTC_ENABLE_MEDIA
  709. // No media support, mark as inactive
  710. reciprocated.setDirection(Description::Direction::Inactive);
  711. #endif
  712. incomingTrack(reciprocated);
  713. PLOG_DEBUG
  714. << "Reciprocating media in local description, mid=\""
  715. << reciprocated.mid() << "\", active=" << std::boolalpha
  716. << (reciprocated.direction() != Description::Direction::Inactive);
  717. description.addMedia(std::move(reciprocated));
  718. },
  719. },
  720. remote->media(i));
  721. }
  722. if (description.type() == Description::Type::Offer) {
  723. // This is an offer, add locally created data channels and tracks
  724. // Add application for data channels
  725. if (!description.hasApplication()) {
  726. std::shared_lock lock(mDataChannelsMutex);
  727. if (!mDataChannels.empty()) {
  728. unsigned int m = 0;
  729. while (description.hasMid(std::to_string(m)))
  730. ++m;
  731. Description::Application app(std::to_string(m));
  732. app.setSctpPort(localSctpPort);
  733. app.setMaxMessageSize(localMaxMessageSize);
  734. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  735. << "\"";
  736. description.addMedia(std::move(app));
  737. }
  738. }
  739. // Add media for local tracks
  740. std::shared_lock lock(mTracksMutex);
  741. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  742. if (auto track = it->lock()) {
  743. if (description.hasMid(track->mid()))
  744. continue;
  745. auto media = track->description();
  746. #if !RTC_ENABLE_MEDIA
  747. // No media support, mark as inactive
  748. media.setDirection(Description::Direction::Inactive);
  749. #endif
  750. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  751. << "\", active=" << std::boolalpha
  752. << (media.direction() != Description::Direction::Inactive);
  753. description.addMedia(std::move(media));
  754. }
  755. }
  756. // There might be no media at this point if the user created a Track, deleted it,
  757. // then called setLocalDescription().
  758. if (description.mediaCount() == 0)
  759. throw std::runtime_error("No DataChannel or Track to negotiate");
  760. }
  761. // Set local fingerprint (wait for certificate if necessary)
  762. description.setFingerprint(mCertificate.get()->fingerprint());
  763. PLOG_VERBOSE << "Issuing local description: " << description;
  764. if (description.mediaCount() == 0)
  765. throw std::logic_error("Local description has no media line");
  766. {
  767. // Set as local description
  768. std::lock_guard lock(mLocalDescriptionMutex);
  769. std::vector<Candidate> existingCandidates;
  770. if (mLocalDescription) {
  771. existingCandidates = mLocalDescription->extractCandidates();
  772. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  773. }
  774. mLocalDescription.emplace(description);
  775. mLocalDescription->addCandidates(std::move(existingCandidates));
  776. }
  777. mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
  778. // Reciprocated tracks might need to be open
  779. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  780. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  781. mProcessor->enqueue(&PeerConnection::openTracks, this);
  782. }
  783. void PeerConnection::processLocalCandidate(Candidate candidate) {
  784. std::lock_guard lock(mLocalDescriptionMutex);
  785. if (!mLocalDescription)
  786. throw std::logic_error("Got a local candidate without local description");
  787. if (config.iceTransportPolicy == TransportPolicy::Relay &&
  788. candidate.type() != Candidate::Type::Relayed) {
  789. PLOG_VERBOSE << "Not issuing local candidate because of transport policy: " << candidate;
  790. return;
  791. }
  792. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  793. candidate.resolve(Candidate::ResolveMode::Simple);
  794. mLocalDescription->addCandidate(candidate);
  795. mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
  796. }
  797. void PeerConnection::processRemoteDescription(Description description) {
  798. {
  799. // Set as remote description
  800. std::lock_guard lock(mRemoteDescriptionMutex);
  801. std::vector<Candidate> existingCandidates;
  802. if (mRemoteDescription)
  803. existingCandidates = mRemoteDescription->extractCandidates();
  804. mRemoteDescription.emplace(description);
  805. mRemoteDescription->addCandidates(std::move(existingCandidates));
  806. }
  807. auto iceTransport = initIceTransport();
  808. if (!iceTransport)
  809. return; // closed
  810. iceTransport->setRemoteDescription(std::move(description));
  811. // Since we assumed passive role during DataChannel creation, we might need to shift the stream
  812. // numbers from odd to even.
  813. shiftDataChannels();
  814. if (description.hasApplication()) {
  815. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  816. auto sctpTransport = std::atomic_load(&mSctpTransport);
  817. if (!sctpTransport && dtlsTransport &&
  818. dtlsTransport->state() == Transport::State::Connected)
  819. initSctpTransport();
  820. }
  821. }
  822. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  823. auto iceTransport = std::atomic_load(&mIceTransport);
  824. {
  825. // Set as remote candidate
  826. std::lock_guard lock(mRemoteDescriptionMutex);
  827. if (!mRemoteDescription)
  828. throw std::logic_error("Got a remote candidate without remote description");
  829. if (!iceTransport)
  830. throw std::logic_error("Got a remote candidate without ICE transport");
  831. candidate.hintMid(mRemoteDescription->bundleMid());
  832. if (mRemoteDescription->hasCandidate(candidate))
  833. return; // already in description, ignore
  834. candidate.resolve(Candidate::ResolveMode::Simple);
  835. mRemoteDescription->addCandidate(candidate);
  836. }
  837. if (candidate.isResolved()) {
  838. iceTransport->addRemoteCandidate(std::move(candidate));
  839. } else {
  840. // We might need a lookup, do it asynchronously
  841. // We don't use the thread pool because we have no control on the timeout
  842. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  843. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  844. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  845. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  846. if (auto iceTransport = weakIceTransport.lock())
  847. iceTransport->addRemoteCandidate(std::move(candidate));
  848. });
  849. t.detach();
  850. }
  851. }
  852. }
  853. string PeerConnection::localBundleMid() const {
  854. std::lock_guard lock(mLocalDescriptionMutex);
  855. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  856. }
  857. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  858. auto dataChannel = weakDataChannel.lock();
  859. if (dataChannel) {
  860. dataChannel->resetOpenCallback(); // might be set internally
  861. mPendingDataChannels.push(std::move(dataChannel));
  862. }
  863. triggerPendingDataChannels();
  864. }
  865. void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
  866. auto track = weakTrack.lock();
  867. if (track) {
  868. track->resetOpenCallback(); // might be set internally
  869. mPendingTracks.push(std::move(track));
  870. }
  871. triggerPendingTracks();
  872. }
  873. void PeerConnection::triggerPendingDataChannels() {
  874. while (dataChannelCallback) {
  875. auto next = mPendingDataChannels.tryPop();
  876. if (!next)
  877. break;
  878. auto impl = std::move(*next);
  879. dataChannelCallback(std::make_shared<rtc::DataChannel>(impl));
  880. impl->triggerOpen();
  881. }
  882. }
  883. void PeerConnection::triggerPendingTracks() {
  884. while (trackCallback) {
  885. auto next = mPendingTracks.tryPop();
  886. if (!next)
  887. break;
  888. auto impl = std::move(*next);
  889. trackCallback(std::make_shared<rtc::Track>(impl));
  890. impl->triggerOpen();
  891. }
  892. }
  893. void PeerConnection::flushPendingDataChannels() {
  894. mProcessor->enqueue(&PeerConnection::triggerPendingDataChannels, this);
  895. }
  896. void PeerConnection::flushPendingTracks() {
  897. mProcessor->enqueue(&PeerConnection::triggerPendingTracks, this);
  898. }
  899. bool PeerConnection::changeState(State newState) {
  900. State current;
  901. do {
  902. current = state.load();
  903. if (current == State::Closed)
  904. return false;
  905. if (current == newState)
  906. return false;
  907. } while (!state.compare_exchange_weak(current, newState));
  908. std::ostringstream s;
  909. s << newState;
  910. PLOG_INFO << "Changed state to " << s.str();
  911. if (newState == State::Closed)
  912. // This is the last state change, so we may steal the callback
  913. mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
  914. else
  915. mProcessor->enqueue(stateChangeCallback.wrap(), newState);
  916. return true;
  917. }
  918. bool PeerConnection::changeGatheringState(GatheringState newState) {
  919. if (gatheringState.exchange(newState) == newState)
  920. return false;
  921. std::ostringstream s;
  922. s << newState;
  923. PLOG_INFO << "Changed gathering state to " << s.str();
  924. mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
  925. return true;
  926. }
  927. bool PeerConnection::changeSignalingState(SignalingState newState) {
  928. if (signalingState.exchange(newState) == newState)
  929. return false;
  930. std::ostringstream s;
  931. s << state;
  932. PLOG_INFO << "Changed signaling state to " << s.str();
  933. mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
  934. return true;
  935. }
  936. void PeerConnection::resetCallbacks() {
  937. // Unregister all callbacks
  938. dataChannelCallback = nullptr;
  939. localDescriptionCallback = nullptr;
  940. localCandidateCallback = nullptr;
  941. stateChangeCallback = nullptr;
  942. gatheringStateChangeCallback = nullptr;
  943. }
  944. } // namespace rtc::impl