peerconnection.cpp 34 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "common.hpp"
  22. #include "dtlstransport.hpp"
  23. #include "globals.hpp"
  24. #include "icetransport.hpp"
  25. #include "logcounter.hpp"
  26. #include "peerconnection.hpp"
  27. #include "processor.hpp"
  28. #include "rtp.hpp"
  29. #include "sctptransport.hpp"
  30. #include "threadpool.hpp"
  31. #if RTC_ENABLE_MEDIA
  32. #include "dtlssrtptransport.hpp"
  33. #endif
  34. #include <iomanip>
  35. #include <set>
  36. #include <thread>
  37. using namespace std::placeholders;
  38. namespace rtc::impl {
  39. static LogCounter COUNTER_MEDIA_TRUNCATED(plog::warning,
  40. "Number of RTP packets truncated over past second");
  41. static LogCounter COUNTER_SRTP_DECRYPT_ERROR(plog::warning,
  42. "Number of SRTP decryption errors over past second");
  43. static LogCounter COUNTER_SRTP_ENCRYPT_ERROR(plog::warning,
  44. "Number of SRTP encryption errors over past second");
  45. static LogCounter
  46. COUNTER_UNKNOWN_PACKET_TYPE(plog::warning,
  47. "Number of unknown RTCP packet types over past second");
  48. PeerConnection::PeerConnection(Configuration config_)
  49. : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
  50. mProcessor(std::make_unique<Processor>()) {
  51. PLOG_VERBOSE << "Creating PeerConnection";
  52. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  53. throw std::invalid_argument("Invalid port range");
  54. if (config.mtu) {
  55. if (*config.mtu < 576) // Min MTU for IPv4
  56. throw std::invalid_argument("Invalid MTU value");
  57. if (*config.mtu > 1500) { // Standard Ethernet
  58. PLOG_WARNING << "MTU set to " << *config.mtu;
  59. } else {
  60. PLOG_VERBOSE << "MTU set to " << *config.mtu;
  61. }
  62. }
  63. }
  64. PeerConnection::~PeerConnection() {
  65. PLOG_VERBOSE << "Destroying PeerConnection";
  66. mProcessor->join();
  67. }
  68. void PeerConnection::close() {
  69. PLOG_VERBOSE << "Closing PeerConnection";
  70. negotiationNeeded = false;
  71. // Close data channels asynchronously
  72. mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
  73. closeTransports();
  74. }
  75. optional<Description> PeerConnection::localDescription() const {
  76. std::lock_guard lock(mLocalDescriptionMutex);
  77. return mLocalDescription;
  78. }
  79. optional<Description> PeerConnection::remoteDescription() const {
  80. std::lock_guard lock(mRemoteDescriptionMutex);
  81. return mRemoteDescription;
  82. }
  83. size_t PeerConnection::remoteMaxMessageSize() const {
  84. const size_t localMax = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  85. size_t remoteMax = DEFAULT_MAX_MESSAGE_SIZE;
  86. std::lock_guard lock(mRemoteDescriptionMutex);
  87. if (mRemoteDescription)
  88. if (auto *application = mRemoteDescription->application())
  89. if (auto max = application->maxMessageSize()) {
  90. // RFC 8841: If the SDP "max-message-size" attribute contains a maximum message
  91. // size value of zero, it indicates that the SCTP endpoint will handle messages
  92. // of any size, subject to memory capacity, etc.
  93. remoteMax = *max > 0 ? *max : std::numeric_limits<size_t>::max();
  94. }
  95. return std::min(remoteMax, localMax);
  96. }
  97. shared_ptr<IceTransport> PeerConnection::initIceTransport() {
  98. try {
  99. if (auto transport = std::atomic_load(&mIceTransport))
  100. return transport;
  101. PLOG_VERBOSE << "Starting ICE transport";
  102. auto transport = std::make_shared<IceTransport>(
  103. config, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  104. [this, weak_this = weak_from_this()](IceTransport::State transportState) {
  105. auto shared_this = weak_this.lock();
  106. if (!shared_this)
  107. return;
  108. switch (transportState) {
  109. case IceTransport::State::Connecting:
  110. changeState(State::Connecting);
  111. break;
  112. case IceTransport::State::Failed:
  113. changeState(State::Failed);
  114. break;
  115. case IceTransport::State::Connected:
  116. initDtlsTransport();
  117. break;
  118. case IceTransport::State::Disconnected:
  119. changeState(State::Disconnected);
  120. break;
  121. default:
  122. // Ignore
  123. break;
  124. }
  125. },
  126. [this, weak_this = weak_from_this()](IceTransport::GatheringState gatheringState) {
  127. auto shared_this = weak_this.lock();
  128. if (!shared_this)
  129. return;
  130. switch (gatheringState) {
  131. case IceTransport::GatheringState::InProgress:
  132. changeGatheringState(GatheringState::InProgress);
  133. break;
  134. case IceTransport::GatheringState::Complete:
  135. endLocalCandidates();
  136. changeGatheringState(GatheringState::Complete);
  137. break;
  138. default:
  139. // Ignore
  140. break;
  141. }
  142. });
  143. std::atomic_store(&mIceTransport, transport);
  144. if (state.load() == State::Closed) {
  145. mIceTransport.reset();
  146. throw std::runtime_error("Connection is closed");
  147. }
  148. transport->start();
  149. return transport;
  150. } catch (const std::exception &e) {
  151. PLOG_ERROR << e.what();
  152. changeState(State::Failed);
  153. throw std::runtime_error("ICE transport initialization failed");
  154. }
  155. }
  156. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  157. try {
  158. if (auto transport = std::atomic_load(&mDtlsTransport))
  159. return transport;
  160. PLOG_VERBOSE << "Starting DTLS transport";
  161. auto certificate = mCertificate.get();
  162. auto lower = std::atomic_load(&mIceTransport);
  163. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  164. auto stateChangeCallback =
  165. [this, weak_this = weak_from_this()](DtlsTransport::State transportState) {
  166. auto shared_this = weak_this.lock();
  167. if (!shared_this)
  168. return;
  169. switch (transportState) {
  170. case DtlsTransport::State::Connected:
  171. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  172. initSctpTransport();
  173. else
  174. changeState(State::Connected);
  175. mProcessor->enqueue(&PeerConnection::openTracks, this);
  176. break;
  177. case DtlsTransport::State::Failed:
  178. changeState(State::Failed);
  179. break;
  180. case DtlsTransport::State::Disconnected:
  181. changeState(State::Disconnected);
  182. break;
  183. default:
  184. // Ignore
  185. break;
  186. }
  187. };
  188. shared_ptr<DtlsTransport> transport;
  189. if (auto local = localDescription(); local && local->hasAudioOrVideo()) {
  190. #if RTC_ENABLE_MEDIA
  191. PLOG_INFO << "This connection requires media support";
  192. // DTLS-SRTP
  193. transport = std::make_shared<DtlsSrtpTransport>(
  194. lower, certificate, config.mtu, verifierCallback,
  195. weak_bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
  196. #else
  197. PLOG_WARNING << "Ignoring media support (not compiled with media support)";
  198. #endif
  199. }
  200. if (!transport) {
  201. // DTLS only
  202. transport = std::make_shared<DtlsTransport>(lower, certificate, config.mtu,
  203. verifierCallback, stateChangeCallback);
  204. }
  205. std::atomic_store(&mDtlsTransport, transport);
  206. if (state.load() == State::Closed) {
  207. mDtlsTransport.reset();
  208. throw std::runtime_error("Connection is closed");
  209. }
  210. transport->start();
  211. return transport;
  212. } catch (const std::exception &e) {
  213. PLOG_ERROR << e.what();
  214. changeState(State::Failed);
  215. throw std::runtime_error("DTLS transport initialization failed");
  216. }
  217. }
  218. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  219. try {
  220. if (auto transport = std::atomic_load(&mSctpTransport))
  221. return transport;
  222. PLOG_VERBOSE << "Starting SCTP transport";
  223. auto remote = remoteDescription();
  224. if (!remote || !remote->application())
  225. throw std::logic_error("Starting SCTP transport without application description");
  226. uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  227. auto lower = std::atomic_load(&mDtlsTransport);
  228. auto transport = std::make_shared<SctpTransport>(
  229. lower, config, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
  230. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  231. [this, weak_this = weak_from_this()](SctpTransport::State transportState) {
  232. auto shared_this = weak_this.lock();
  233. if (!shared_this)
  234. return;
  235. switch (transportState) {
  236. case SctpTransport::State::Connected:
  237. changeState(State::Connected);
  238. mProcessor->enqueue(&PeerConnection::openDataChannels, this);
  239. break;
  240. case SctpTransport::State::Failed:
  241. LOG_WARNING << "SCTP transport failed";
  242. changeState(State::Failed);
  243. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  244. break;
  245. case SctpTransport::State::Disconnected:
  246. changeState(State::Disconnected);
  247. mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
  248. break;
  249. default:
  250. // Ignore
  251. break;
  252. }
  253. });
  254. std::atomic_store(&mSctpTransport, transport);
  255. if (state.load() == State::Closed) {
  256. mSctpTransport.reset();
  257. throw std::runtime_error("Connection is closed");
  258. }
  259. transport->start();
  260. return transport;
  261. } catch (const std::exception &e) {
  262. PLOG_ERROR << e.what();
  263. changeState(State::Failed);
  264. throw std::runtime_error("SCTP transport initialization failed");
  265. }
  266. }
  267. shared_ptr<IceTransport> PeerConnection::getIceTransport() const {
  268. return std::atomic_load(&mIceTransport);
  269. }
  270. shared_ptr<DtlsTransport> PeerConnection::getDtlsTransport() const {
  271. return std::atomic_load(&mDtlsTransport);
  272. }
  273. shared_ptr<SctpTransport> PeerConnection::getSctpTransport() const {
  274. return std::atomic_load(&mSctpTransport);
  275. }
  276. void PeerConnection::closeTransports() {
  277. PLOG_VERBOSE << "Closing transports";
  278. // Change state to sink state Closed
  279. if (!changeState(State::Closed))
  280. return; // already closed
  281. // Reset callbacks now that state is changed
  282. resetCallbacks();
  283. // Initiate transport stop on the processor after closing the data channels
  284. mProcessor->enqueue([this]() {
  285. // Pass the pointers to a thread
  286. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  287. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  288. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  289. ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
  290. if (sctp)
  291. sctp->stop();
  292. if (dtls)
  293. dtls->stop();
  294. if (ice)
  295. ice->stop();
  296. sctp.reset();
  297. dtls.reset();
  298. ice.reset();
  299. });
  300. });
  301. }
  302. void PeerConnection::endLocalCandidates() {
  303. std::lock_guard lock(mLocalDescriptionMutex);
  304. if (mLocalDescription)
  305. mLocalDescription->endCandidates();
  306. }
  307. void PeerConnection::rollbackLocalDescription() {
  308. PLOG_DEBUG << "Rolling back pending local description";
  309. std::unique_lock lock(mLocalDescriptionMutex);
  310. if (mCurrentLocalDescription) {
  311. std::vector<Candidate> existingCandidates;
  312. if (mLocalDescription)
  313. existingCandidates = mLocalDescription->extractCandidates();
  314. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  315. mLocalDescription->addCandidates(std::move(existingCandidates));
  316. mCurrentLocalDescription.reset();
  317. }
  318. }
  319. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  320. std::lock_guard lock(mRemoteDescriptionMutex);
  321. auto expectedFingerprint = mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt;
  322. if (expectedFingerprint && *expectedFingerprint == fingerprint) {
  323. PLOG_VERBOSE << "Valid fingerprint \"" << fingerprint << "\"";
  324. return true;
  325. }
  326. PLOG_ERROR << "Invalid fingerprint \"" << fingerprint << "\", expected \""
  327. << expectedFingerprint.value_or("[none]") << "\"";
  328. return false;
  329. }
  330. void PeerConnection::forwardMessage(message_ptr message) {
  331. if (!message) {
  332. remoteCloseDataChannels();
  333. return;
  334. }
  335. uint16_t stream = uint16_t(message->stream);
  336. auto channel = findDataChannel(stream);
  337. if (!channel) {
  338. auto iceTransport = getIceTransport();
  339. auto sctpTransport = getSctpTransport();
  340. if (!iceTransport || !sctpTransport)
  341. return;
  342. const byte dataChannelOpenMessage{0x03};
  343. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  344. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  345. stream % 2 == remoteParity) {
  346. channel =
  347. std::make_shared<NegotiatedDataChannel>(weak_from_this(), sctpTransport, stream);
  348. channel->openCallback = weak_bind(&PeerConnection::triggerDataChannel, this,
  349. weak_ptr<DataChannel>{channel});
  350. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  351. mDataChannels.emplace(stream, channel);
  352. } else {
  353. // Invalid, close the DataChannel
  354. sctpTransport->closeStream(message->stream);
  355. return;
  356. }
  357. }
  358. channel->incoming(message);
  359. }
  360. void PeerConnection::forwardMedia(message_ptr message) {
  361. if (!message)
  362. return;
  363. // Browsers like to compound their packets with a random SSRC.
  364. // we have to do this monstrosity to distribute the report blocks
  365. if (message->type == Message::Control) {
  366. std::set<uint32_t> ssrcs;
  367. size_t offset = 0;
  368. while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
  369. auto header = reinterpret_cast<rtc::RTCP_HEADER *>(message->data() + offset);
  370. if (header->lengthInBytes() > message->size() - offset) {
  371. COUNTER_MEDIA_TRUNCATED++;
  372. break;
  373. }
  374. offset += header->lengthInBytes();
  375. if (header->payloadType() == 205 || header->payloadType() == 206) {
  376. auto rtcpfb = reinterpret_cast<RTCP_FB_HEADER *>(header);
  377. ssrcs.insert(rtcpfb->getPacketSenderSSRC());
  378. ssrcs.insert(rtcpfb->getMediaSourceSSRC());
  379. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  380. auto rtcpsr = reinterpret_cast<RTCP_SR *>(header);
  381. ssrcs.insert(rtcpsr->senderSSRC());
  382. for (int i = 0; i < rtcpsr->header.reportCount(); ++i)
  383. ssrcs.insert(rtcpsr->getReportBlock(i)->getSSRC());
  384. } else if (header->payloadType() == 202) {
  385. auto sdes = reinterpret_cast<RTCP_SDES *>(header);
  386. if (!sdes->isValid()) {
  387. PLOG_WARNING << "RTCP SDES packet is invalid";
  388. continue;
  389. }
  390. for (unsigned int i = 0; i < sdes->chunksCount(); i++) {
  391. auto chunk = sdes->getChunk(i);
  392. ssrcs.insert(chunk->ssrc());
  393. }
  394. } else {
  395. // PT=207 == Extended Report
  396. if (header->payloadType() != 207) {
  397. COUNTER_UNKNOWN_PACKET_TYPE++;
  398. }
  399. }
  400. }
  401. if (!ssrcs.empty()) {
  402. for (uint32_t ssrc : ssrcs) {
  403. if (auto mid = getMidFromSsrc(ssrc)) {
  404. std::shared_lock lock(mTracksMutex); // read-only
  405. if (auto it = mTracks.find(*mid); it != mTracks.end())
  406. if (auto track = it->second.lock())
  407. track->incoming(message);
  408. }
  409. }
  410. return;
  411. }
  412. }
  413. uint32_t ssrc = uint32_t(message->stream);
  414. if (auto mid = getMidFromSsrc(ssrc)) {
  415. std::shared_lock lock(mTracksMutex); // read-only
  416. if (auto it = mTracks.find(*mid); it != mTracks.end())
  417. if (auto track = it->second.lock())
  418. track->incoming(message);
  419. } else {
  420. /*
  421. * TODO: So the problem is that when stop sending streams, we stop getting report blocks for
  422. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  423. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  424. * this ideal? No! Do I know how to fix it? No!
  425. */
  426. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  427. return;
  428. }
  429. }
  430. optional<std::string> PeerConnection::getMidFromSsrc(uint32_t ssrc) {
  431. if (auto it = mMidFromSsrc.find(ssrc); it != mMidFromSsrc.end())
  432. return it->second;
  433. {
  434. std::lock_guard lock(mRemoteDescriptionMutex);
  435. if (!mRemoteDescription)
  436. return nullopt;
  437. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  438. if (auto found =
  439. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  440. return std::nullopt;
  441. },
  442. [&](Description::Media *media) -> optional<string> {
  443. return media->hasSSRC(ssrc)
  444. ? std::make_optional(media->mid())
  445. : nullopt;
  446. }},
  447. mRemoteDescription->media(i))) {
  448. mMidFromSsrc.emplace(ssrc, *found);
  449. return *found;
  450. }
  451. }
  452. }
  453. {
  454. std::lock_guard lock(mLocalDescriptionMutex);
  455. if (!mLocalDescription)
  456. return nullopt;
  457. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  458. if (auto found =
  459. std::visit(rtc::overloaded{[&](Description::Application *) -> optional<string> {
  460. return std::nullopt;
  461. },
  462. [&](Description::Media *media) -> optional<string> {
  463. return media->hasSSRC(ssrc)
  464. ? std::make_optional(media->mid())
  465. : nullopt;
  466. }},
  467. mLocalDescription->media(i))) {
  468. mMidFromSsrc.emplace(ssrc, *found);
  469. return *found;
  470. }
  471. }
  472. }
  473. return nullopt;
  474. }
  475. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  476. if (auto channel = findDataChannel(stream))
  477. channel->triggerBufferedAmount(amount);
  478. }
  479. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role, string label,
  480. DataChannelInit init) {
  481. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  482. uint16_t stream;
  483. if (init.id) {
  484. stream = *init.id;
  485. if (stream == 65535)
  486. throw std::invalid_argument("Invalid DataChannel id");
  487. } else {
  488. // RFC 8832: The peer that initiates opening a data channel selects a stream identifier for
  489. // which the corresponding incoming and outgoing streams are unused. If the side is acting
  490. // as the DTLS client, it MUST choose an even stream identifier; if the side is acting as
  491. // the DTLS server, it MUST choose an odd one.
  492. // See https://tools.ietf.org/html/rfc8832#section-6
  493. stream = (role == Description::Role::Active) ? 0 : 1;
  494. while (mDataChannels.find(stream) != mDataChannels.end()) {
  495. if (stream >= 65535 - 2)
  496. throw std::runtime_error("Too many DataChannels");
  497. stream += 2;
  498. }
  499. }
  500. // If the DataChannel is user-negotiated, do not negociate it here
  501. auto channel =
  502. init.negotiated
  503. ? std::make_shared<DataChannel>(weak_from_this(), stream, std::move(label),
  504. std::move(init.protocol), std::move(init.reliability))
  505. : std::make_shared<NegotiatedDataChannel>(weak_from_this(), stream, std::move(label),
  506. std::move(init.protocol),
  507. std::move(init.reliability));
  508. mDataChannels.emplace(std::make_pair(stream, channel));
  509. return channel;
  510. }
  511. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  512. std::shared_lock lock(mDataChannelsMutex); // read-only
  513. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  514. if (auto channel = it->second.lock())
  515. return channel;
  516. return nullptr;
  517. }
  518. void PeerConnection::shiftDataChannels() {
  519. auto iceTransport = std::atomic_load(&mIceTransport);
  520. auto sctpTransport = std::atomic_load(&mSctpTransport);
  521. if (!sctpTransport && iceTransport && iceTransport->role() == Description::Role::Active) {
  522. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  523. decltype(mDataChannels) newDataChannels;
  524. auto it = mDataChannels.begin();
  525. while (it != mDataChannels.end()) {
  526. auto channel = it->second.lock();
  527. channel->shiftStream();
  528. newDataChannels.emplace(channel->stream(), channel);
  529. ++it;
  530. }
  531. std::swap(mDataChannels, newDataChannels);
  532. }
  533. }
  534. void PeerConnection::iterateDataChannels(
  535. std::function<void(shared_ptr<DataChannel> channel)> func) {
  536. // Iterate
  537. {
  538. std::shared_lock lock(mDataChannelsMutex); // read-only
  539. auto it = mDataChannels.begin();
  540. while (it != mDataChannels.end()) {
  541. auto channel = it->second.lock();
  542. if (channel && !channel->isClosed())
  543. func(channel);
  544. ++it;
  545. }
  546. }
  547. // Cleanup
  548. {
  549. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  550. auto it = mDataChannels.begin();
  551. while (it != mDataChannels.end()) {
  552. if (!it->second.lock()) {
  553. it = mDataChannels.erase(it);
  554. continue;
  555. }
  556. ++it;
  557. }
  558. }
  559. }
  560. void PeerConnection::openDataChannels() {
  561. if (auto transport = std::atomic_load(&mSctpTransport))
  562. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  563. }
  564. void PeerConnection::closeDataChannels() {
  565. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  566. }
  567. void PeerConnection::remoteCloseDataChannels() {
  568. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  569. }
  570. shared_ptr<Track> PeerConnection::emplaceTrack(Description::Media description) {
  571. shared_ptr<Track> track;
  572. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  573. if (track = it->second.lock(); track)
  574. track->setDescription(std::move(description));
  575. if (!track) {
  576. track = std::make_shared<Track>(weak_from_this(), std::move(description));
  577. mTracks.emplace(std::make_pair(track->mid(), track));
  578. mTrackLines.emplace_back(track);
  579. }
  580. return track;
  581. }
  582. void PeerConnection::incomingTrack(Description::Media description) {
  583. std::unique_lock lock(mTracksMutex); // we are going to emplace
  584. #if !RTC_ENABLE_MEDIA
  585. if (mTracks.empty()) {
  586. PLOG_WARNING << "Tracks will be inative (not compiled with media support)";
  587. }
  588. #endif
  589. if (mTracks.find(description.mid()) == mTracks.end()) {
  590. auto track = std::make_shared<Track>(weak_from_this(), std::move(description));
  591. mTracks.emplace(std::make_pair(track->mid(), track));
  592. mTrackLines.emplace_back(track);
  593. triggerTrack(track);
  594. }
  595. }
  596. void PeerConnection::openTracks() {
  597. #if RTC_ENABLE_MEDIA
  598. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  599. auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
  600. std::shared_lock lock(mTracksMutex); // read-only
  601. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  602. if (auto track = it->second.lock())
  603. if (!track->isOpen())
  604. track->open(srtpTransport);
  605. }
  606. #endif
  607. }
  608. void PeerConnection::validateRemoteDescription(const Description &description) {
  609. if (!description.iceUfrag())
  610. throw std::invalid_argument("Remote description has no ICE user fragment");
  611. if (!description.icePwd())
  612. throw std::invalid_argument("Remote description has no ICE password");
  613. if (!description.fingerprint())
  614. throw std::invalid_argument("Remote description has no valid fingerprint");
  615. if (description.mediaCount() == 0)
  616. throw std::invalid_argument("Remote description has no media line");
  617. int activeMediaCount = 0;
  618. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  619. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  620. [&](const Description::Media *media) {
  621. if (media->direction() != Description::Direction::Inactive)
  622. ++activeMediaCount;
  623. }},
  624. description.media(i));
  625. if (activeMediaCount == 0)
  626. throw std::invalid_argument("Remote description has no active media");
  627. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  628. if (*description.iceUfrag() == *local->iceUfrag() &&
  629. *description.icePwd() == *local->icePwd())
  630. throw std::logic_error("Got the local description as remote description");
  631. PLOG_VERBOSE << "Remote description looks valid";
  632. }
  633. void PeerConnection::processLocalDescription(Description description) {
  634. const size_t localSctpPort = DEFAULT_SCTP_PORT;
  635. const size_t localMaxMessageSize = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
  636. // Clean up the application entry the ICE transport might have added already (libnice)
  637. description.clearMedia();
  638. if (auto remote = remoteDescription()) {
  639. // Reciprocate remote description
  640. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  641. std::visit( // reciprocate each media
  642. rtc::overloaded{
  643. [&](Description::Application *remoteApp) {
  644. std::shared_lock lock(mDataChannelsMutex);
  645. if (!mDataChannels.empty()) {
  646. // Prefer local description
  647. Description::Application app(remoteApp->mid());
  648. app.setSctpPort(localSctpPort);
  649. app.setMaxMessageSize(localMaxMessageSize);
  650. PLOG_DEBUG << "Adding application to local description, mid=\""
  651. << app.mid() << "\"";
  652. description.addMedia(std::move(app));
  653. return;
  654. }
  655. auto reciprocated = remoteApp->reciprocate();
  656. reciprocated.hintSctpPort(localSctpPort);
  657. reciprocated.setMaxMessageSize(localMaxMessageSize);
  658. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  659. << reciprocated.mid() << "\"";
  660. description.addMedia(std::move(reciprocated));
  661. },
  662. [&](Description::Media *remoteMedia) {
  663. std::shared_lock lock(mTracksMutex);
  664. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  665. // Prefer local description
  666. if (auto track = it->second.lock()) {
  667. auto media = track->description();
  668. #if !RTC_ENABLE_MEDIA
  669. // No media support, mark as inactive
  670. media.setDirection(Description::Direction::Inactive);
  671. #endif
  672. PLOG_DEBUG
  673. << "Adding media to local description, mid=\"" << media.mid()
  674. << "\", active=" << std::boolalpha
  675. << (media.direction() != Description::Direction::Inactive);
  676. description.addMedia(std::move(media));
  677. } else {
  678. auto reciprocated = remoteMedia->reciprocate();
  679. reciprocated.setDirection(Description::Direction::Inactive);
  680. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  681. << reciprocated.mid() << "\"";
  682. description.addMedia(std::move(reciprocated));
  683. }
  684. return;
  685. }
  686. lock.unlock(); // we are going to call incomingTrack()
  687. auto reciprocated = remoteMedia->reciprocate();
  688. #if !RTC_ENABLE_MEDIA
  689. // No media support, mark as inactive
  690. reciprocated.setDirection(Description::Direction::Inactive);
  691. #endif
  692. incomingTrack(reciprocated);
  693. PLOG_DEBUG
  694. << "Reciprocating media in local description, mid=\""
  695. << reciprocated.mid() << "\", active=" << std::boolalpha
  696. << (reciprocated.direction() != Description::Direction::Inactive);
  697. description.addMedia(std::move(reciprocated));
  698. },
  699. },
  700. remote->media(i));
  701. }
  702. if (description.type() == Description::Type::Offer) {
  703. // This is an offer, add locally created data channels and tracks
  704. // Add application for data channels
  705. if (!description.hasApplication()) {
  706. std::shared_lock lock(mDataChannelsMutex);
  707. if (!mDataChannels.empty()) {
  708. unsigned int m = 0;
  709. while (description.hasMid(std::to_string(m)))
  710. ++m;
  711. Description::Application app(std::to_string(m));
  712. app.setSctpPort(localSctpPort);
  713. app.setMaxMessageSize(localMaxMessageSize);
  714. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  715. << "\"";
  716. description.addMedia(std::move(app));
  717. }
  718. }
  719. // Add media for local tracks
  720. std::shared_lock lock(mTracksMutex);
  721. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  722. if (auto track = it->lock()) {
  723. if (description.hasMid(track->mid()))
  724. continue;
  725. auto media = track->description();
  726. #if !RTC_ENABLE_MEDIA
  727. // No media support, mark as inactive
  728. media.setDirection(Description::Direction::Inactive);
  729. #endif
  730. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  731. << "\", active=" << std::boolalpha
  732. << (media.direction() != Description::Direction::Inactive);
  733. description.addMedia(std::move(media));
  734. }
  735. }
  736. }
  737. // Set local fingerprint (wait for certificate if necessary)
  738. description.setFingerprint(mCertificate.get()->fingerprint());
  739. {
  740. // Set as local description
  741. std::lock_guard lock(mLocalDescriptionMutex);
  742. std::vector<Candidate> existingCandidates;
  743. if (mLocalDescription) {
  744. existingCandidates = mLocalDescription->extractCandidates();
  745. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  746. }
  747. mLocalDescription.emplace(description);
  748. mLocalDescription->addCandidates(std::move(existingCandidates));
  749. }
  750. PLOG_VERBOSE << "Issuing local description: " << description;
  751. mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
  752. // Reciprocated tracks might need to be open
  753. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  754. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  755. mProcessor->enqueue(&PeerConnection::openTracks, this);
  756. }
  757. void PeerConnection::processLocalCandidate(Candidate candidate) {
  758. std::lock_guard lock(mLocalDescriptionMutex);
  759. if (!mLocalDescription)
  760. throw std::logic_error("Got a local candidate without local description");
  761. candidate.resolve(Candidate::ResolveMode::Simple);
  762. mLocalDescription->addCandidate(candidate);
  763. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  764. mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
  765. }
  766. void PeerConnection::processRemoteDescription(Description description) {
  767. {
  768. // Set as remote description
  769. std::lock_guard lock(mRemoteDescriptionMutex);
  770. std::vector<Candidate> existingCandidates;
  771. if (mRemoteDescription)
  772. existingCandidates = mRemoteDescription->extractCandidates();
  773. mRemoteDescription.emplace(description);
  774. mRemoteDescription->addCandidates(std::move(existingCandidates));
  775. }
  776. auto iceTransport = initIceTransport();
  777. iceTransport->setRemoteDescription(std::move(description));
  778. if (description.hasApplication()) {
  779. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  780. auto sctpTransport = std::atomic_load(&mSctpTransport);
  781. if (!sctpTransport && dtlsTransport &&
  782. dtlsTransport->state() == Transport::State::Connected)
  783. initSctpTransport();
  784. }
  785. }
  786. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  787. auto iceTransport = std::atomic_load(&mIceTransport);
  788. {
  789. // Set as remote candidate
  790. std::lock_guard lock(mRemoteDescriptionMutex);
  791. if (!mRemoteDescription)
  792. throw std::logic_error("Got a remote candidate without remote description");
  793. if (!iceTransport)
  794. throw std::logic_error("Got a remote candidate without ICE transport");
  795. candidate.hintMid(mRemoteDescription->bundleMid());
  796. if (mRemoteDescription->hasCandidate(candidate))
  797. return; // already in description, ignore
  798. candidate.resolve(Candidate::ResolveMode::Simple);
  799. mRemoteDescription->addCandidate(candidate);
  800. }
  801. if (candidate.isResolved()) {
  802. iceTransport->addRemoteCandidate(std::move(candidate));
  803. } else {
  804. // We might need a lookup, do it asynchronously
  805. // We don't use the thread pool because we have no control on the timeout
  806. if ((iceTransport = std::atomic_load(&mIceTransport))) {
  807. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  808. std::thread t([weakIceTransport, candidate = std::move(candidate)]() mutable {
  809. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  810. if (auto iceTransport = weakIceTransport.lock())
  811. iceTransport->addRemoteCandidate(std::move(candidate));
  812. });
  813. t.detach();
  814. }
  815. }
  816. }
  817. string PeerConnection::localBundleMid() const {
  818. std::lock_guard lock(mLocalDescriptionMutex);
  819. return mLocalDescription ? mLocalDescription->bundleMid() : "0";
  820. }
  821. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  822. auto dataChannel = weakDataChannel.lock();
  823. if (!dataChannel)
  824. return;
  825. mProcessor->enqueue(dataChannelCallback.wrap(),
  826. std::make_shared<rtc::DataChannel>(std::move(dataChannel)));
  827. }
  828. void PeerConnection::triggerTrack(shared_ptr<Track> track) {
  829. mProcessor->enqueue(trackCallback.wrap(), std::make_shared<rtc::Track>(std::move(track)));
  830. }
  831. bool PeerConnection::changeState(State newState) {
  832. State current;
  833. do {
  834. current = state.load();
  835. if (current == State::Closed)
  836. return false;
  837. if (current == newState)
  838. return false;
  839. } while (!state.compare_exchange_weak(current, newState));
  840. std::ostringstream s;
  841. s << newState;
  842. PLOG_INFO << "Changed state to " << s.str();
  843. if (newState == State::Closed)
  844. // This is the last state change, so we may steal the callback
  845. mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
  846. else
  847. mProcessor->enqueue(stateChangeCallback.wrap(), newState);
  848. return true;
  849. }
  850. bool PeerConnection::changeGatheringState(GatheringState newState) {
  851. if (gatheringState.exchange(newState) == newState)
  852. return false;
  853. std::ostringstream s;
  854. s << newState;
  855. PLOG_INFO << "Changed gathering state to " << s.str();
  856. mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
  857. return true;
  858. }
  859. bool PeerConnection::changeSignalingState(SignalingState newState) {
  860. if (signalingState.exchange(newState) == newState)
  861. return false;
  862. std::ostringstream s;
  863. s << state;
  864. PLOG_INFO << "Changed signaling state to " << s.str();
  865. mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
  866. return true;
  867. }
  868. void PeerConnection::resetCallbacks() {
  869. // Unregister all callbacks
  870. dataChannelCallback = nullptr;
  871. localDescriptionCallback = nullptr;
  872. localCandidateCallback = nullptr;
  873. stateChangeCallback = nullptr;
  874. gatheringStateChangeCallback = nullptr;
  875. }
  876. } // namespace rtc::impl