peerconnection.cpp 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "peerconnection.hpp"
  19. #include "certificate.hpp"
  20. #include "include.hpp"
  21. #include "processor.hpp"
  22. #include "threadpool.hpp"
  23. #include "dtlstransport.hpp"
  24. #include "icetransport.hpp"
  25. #include "sctptransport.hpp"
  26. #if RTC_ENABLE_MEDIA
  27. #include "dtlssrtptransport.hpp"
  28. #endif
  29. #include <iomanip>
  30. #include <thread>
  31. namespace rtc {
  32. using namespace std::placeholders;
  33. using std::shared_ptr;
  34. using std::weak_ptr;
  35. PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
  36. PeerConnection::PeerConnection(const Configuration &config)
  37. : mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
  38. mState(State::New), mGatheringState(GatheringState::New) {
  39. PLOG_VERBOSE << "Creating PeerConnection";
  40. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  41. throw std::invalid_argument("Invalid port range");
  42. }
  43. PeerConnection::~PeerConnection() {
  44. PLOG_VERBOSE << "Destroying PeerConnection";
  45. close();
  46. mProcessor->join();
  47. }
  48. void PeerConnection::close() {
  49. PLOG_VERBOSE << "Closing PeerConnection";
  50. // Close data channels asynchronously
  51. mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
  52. closeTransports();
  53. }
  54. const Configuration *PeerConnection::config() const { return &mConfig; }
  55. PeerConnection::State PeerConnection::state() const { return mState; }
  56. PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
  57. std::optional<Description> PeerConnection::localDescription() const {
  58. std::lock_guard lock(mLocalDescriptionMutex);
  59. return mLocalDescription;
  60. }
  61. std::optional<Description> PeerConnection::remoteDescription() const {
  62. std::lock_guard lock(mRemoteDescriptionMutex);
  63. return mRemoteDescription;
  64. }
  65. void PeerConnection::setLocalDescription() {
  66. PLOG_VERBOSE << "Setting local description";
  67. if (!std::atomic_load(&mIceTransport)) {
  68. // RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
  69. // setup:actpass.
  70. // See https://tools.ietf.org/html/rfc5763#section-5
  71. auto iceTransport = initIceTransport(Description::Role::ActPass);
  72. Description localDescription = iceTransport->getLocalDescription(Description::Type::Offer);
  73. processLocalDescription(localDescription);
  74. iceTransport->gatherLocalCandidates();
  75. } else {
  76. auto localDescription = std::atomic_load(&mIceTransport)->getLocalDescription(Description::Type::Offer);
  77. processLocalDescription(localDescription);
  78. }
  79. }
  80. void PeerConnection::setRemoteDescription(Description description) {
  81. PLOG_VERBOSE << "Setting remote description: " << string(description);
  82. if (description.mediaCount() == 0)
  83. throw std::invalid_argument("Remote description has no media line");
  84. int activeMediaCount = 0;
  85. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  86. std::visit( // reciprocate each media
  87. rtc::overloaded{[&](Description::Application *) { ++activeMediaCount; },
  88. [&](Description::Media *media) {
  89. if (media->direction() != Description::Direction::Inactive)
  90. ++activeMediaCount;
  91. }},
  92. description.media(i));
  93. if (activeMediaCount == 0)
  94. throw std::invalid_argument("Remote description has no active media");
  95. if (!description.fingerprint())
  96. throw std::invalid_argument("Remote description has no fingerprint");
  97. description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer);
  98. auto type = description.type();
  99. auto remoteCandidates = description.extractCandidates(); // Candidates will be added at the end
  100. auto iceTransport = std::atomic_load(&mIceTransport);
  101. if (!iceTransport)
  102. iceTransport = initIceTransport(Description::Role::ActPass);
  103. iceTransport->setRemoteDescription(description);
  104. {
  105. std::lock_guard lock(mRemoteDescriptionMutex);
  106. mRemoteDescription.emplace(std::move(description));
  107. }
  108. if (type == Description::Type::Offer) {
  109. // This is an offer and we are the answerer.
  110. Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
  111. processLocalDescription(localDescription);
  112. iceTransport->gatherLocalCandidates();
  113. } else {
  114. // This is an answer and we are the offerer.
  115. auto sctpTransport = std::atomic_load(&mSctpTransport);
  116. if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
  117. // Since we assumed passive role during DataChannel creation, we need to shift the
  118. // stream numbers by one to shift them from odd to even.
  119. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  120. decltype(mDataChannels) newDataChannels;
  121. auto it = mDataChannels.begin();
  122. while (it != mDataChannels.end()) {
  123. auto channel = it->second.lock();
  124. if (channel->stream() % 2 == 1)
  125. channel->mStream -= 1;
  126. newDataChannels.emplace(channel->stream(), channel);
  127. ++it;
  128. }
  129. std::swap(mDataChannels, newDataChannels);
  130. }
  131. }
  132. for (const auto &candidate : remoteCandidates)
  133. addRemoteCandidate(candidate);
  134. if (auto transport = std::atomic_load(&mDtlsTransport); transport && transport->state() == rtc::DtlsTransport::State::Connected) {
  135. openTracks();
  136. }
  137. }
  138. void PeerConnection::addRemoteCandidate(Candidate candidate) {
  139. PLOG_VERBOSE << "Adding remote candidate: " << string(candidate);
  140. auto iceTransport = std::atomic_load(&mIceTransport);
  141. if (!mRemoteDescription || !iceTransport)
  142. throw std::logic_error("Remote candidate set without remote description");
  143. if (candidate.resolve(Candidate::ResolveMode::Simple)) {
  144. iceTransport->addRemoteCandidate(candidate);
  145. } else {
  146. // OK, we might need a lookup, do it asynchronously
  147. // We don't use the thread pool because we have no control on the timeout
  148. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  149. std::thread t([weakIceTransport, candidate]() mutable {
  150. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  151. if (auto iceTransport = weakIceTransport.lock())
  152. iceTransport->addRemoteCandidate(candidate);
  153. });
  154. t.detach();
  155. }
  156. std::lock_guard lock(mRemoteDescriptionMutex);
  157. mRemoteDescription->addCandidate(candidate);
  158. }
  159. std::optional<string> PeerConnection::localAddress() const {
  160. auto iceTransport = std::atomic_load(&mIceTransport);
  161. return iceTransport ? iceTransport->getLocalAddress() : nullopt;
  162. }
  163. std::optional<string> PeerConnection::remoteAddress() const {
  164. auto iceTransport = std::atomic_load(&mIceTransport);
  165. return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
  166. }
  167. shared_ptr<DataChannel> PeerConnection::addDataChannel(string label, string protocol,
  168. Reliability reliability) {
  169. if (auto local = localDescription(); local && !local->hasApplication()) {
  170. PLOG_ERROR << "The PeerConnection was negociated without DataChannel support.";
  171. throw std::runtime_error("No DataChannel support on the PeerConnection");
  172. }
  173. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  174. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  175. // See https://tools.ietf.org/html/rfc5763#section-5
  176. // Therefore, we assume passive role when we are the offerer.
  177. auto iceTransport = std::atomic_load(&mIceTransport);
  178. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  179. auto channel =
  180. emplaceDataChannel(role, std::move(label), std::move(protocol), std::move(reliability));
  181. if (auto transport = std::atomic_load(&mSctpTransport))
  182. if (transport->state() == SctpTransport::State::Connected)
  183. channel->open(transport);
  184. return channel;
  185. }
  186. shared_ptr<DataChannel> PeerConnection::createDataChannel(string label, string protocol,
  187. Reliability reliability) {
  188. auto channel = addDataChannel(label, protocol, reliability);
  189. setLocalDescription();
  190. return channel;
  191. }
  192. void PeerConnection::onDataChannel(
  193. std::function<void(shared_ptr<DataChannel> dataChannel)> callback) {
  194. mDataChannelCallback = callback;
  195. }
  196. void PeerConnection::onLocalDescription(std::function<void(Description description)> callback) {
  197. mLocalDescriptionCallback = callback;
  198. }
  199. void PeerConnection::onLocalCandidate(std::function<void(Candidate candidate)> callback) {
  200. mLocalCandidateCallback = callback;
  201. }
  202. void PeerConnection::onStateChange(std::function<void(State state)> callback) {
  203. mStateChangeCallback = callback;
  204. }
  205. void PeerConnection::onGatheringStateChange(std::function<void(GatheringState state)> callback) {
  206. mGatheringStateChangeCallback = callback;
  207. }
  208. bool PeerConnection::hasMedia() const {
  209. auto local = localDescription();
  210. return local && local->hasAudioOrVideo();
  211. }
  212. std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
  213. // if (localDescription())
  214. // throw std::logic_error("Tracks must be created before local description");
  215. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  216. if (auto track = it->second.lock())
  217. return track;
  218. #if !RTC_ENABLE_MEDIA
  219. if (mTracks.empty()) {
  220. PLOG_WARNING << "Tracks will be inative (not compiled with SRTP support)";
  221. }
  222. #endif
  223. auto track = std::make_shared<Track>(std::move(description));
  224. mTracks.emplace(std::make_pair(track->mid(), track));
  225. mTrackLines.emplace_back(track);
  226. return track;
  227. }
  228. void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callback) {
  229. mTrackCallback = callback;
  230. }
  231. shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
  232. try {
  233. if (auto transport = std::atomic_load(&mIceTransport))
  234. return transport;
  235. auto transport = std::make_shared<IceTransport>(
  236. mConfig, role, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  237. [this, weak_this = weak_from_this()](IceTransport::State state) {
  238. auto shared_this = weak_this.lock();
  239. if (!shared_this)
  240. return;
  241. switch (state) {
  242. case IceTransport::State::Connecting:
  243. changeState(State::Connecting);
  244. break;
  245. case IceTransport::State::Failed:
  246. changeState(State::Failed);
  247. break;
  248. case IceTransport::State::Connected:
  249. initDtlsTransport();
  250. break;
  251. case IceTransport::State::Disconnected:
  252. changeState(State::Disconnected);
  253. break;
  254. default:
  255. // Ignore
  256. break;
  257. }
  258. },
  259. [this, weak_this = weak_from_this()](IceTransport::GatheringState state) {
  260. auto shared_this = weak_this.lock();
  261. if (!shared_this)
  262. return;
  263. switch (state) {
  264. case IceTransport::GatheringState::InProgress:
  265. changeGatheringState(GatheringState::InProgress);
  266. break;
  267. case IceTransport::GatheringState::Complete:
  268. endLocalCandidates();
  269. changeGatheringState(GatheringState::Complete);
  270. break;
  271. default:
  272. // Ignore
  273. break;
  274. }
  275. });
  276. std::atomic_store(&mIceTransport, transport);
  277. if (mState == State::Closed) {
  278. mIceTransport.reset();
  279. throw std::runtime_error("Connection is closed");
  280. }
  281. transport->start();
  282. return transport;
  283. } catch (const std::exception &e) {
  284. PLOG_ERROR << e.what();
  285. changeState(State::Failed);
  286. throw std::runtime_error("ICE transport initialization failed");
  287. }
  288. }
  289. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  290. try {
  291. if (auto transport = std::atomic_load(&mDtlsTransport))
  292. return transport;
  293. auto certificate = mCertificate.get();
  294. auto lower = std::atomic_load(&mIceTransport);
  295. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  296. auto stateChangeCallback = [this,
  297. weak_this = weak_from_this()](DtlsTransport::State state) {
  298. auto shared_this = weak_this.lock();
  299. if (!shared_this)
  300. return;
  301. switch (state) {
  302. case DtlsTransport::State::Connected:
  303. if (auto local = localDescription(); local && local->hasApplication())
  304. initSctpTransport();
  305. else
  306. changeState(State::Connected);
  307. openTracks();
  308. break;
  309. case DtlsTransport::State::Failed:
  310. changeState(State::Failed);
  311. break;
  312. case DtlsTransport::State::Disconnected:
  313. changeState(State::Disconnected);
  314. break;
  315. default:
  316. // Ignore
  317. break;
  318. }
  319. };
  320. shared_ptr<DtlsTransport> transport;
  321. if (hasMedia()) {
  322. #if RTC_ENABLE_MEDIA
  323. PLOG_INFO << "This connection requires media support";
  324. // DTLS-SRTP
  325. transport = std::make_shared<DtlsSrtpTransport>(
  326. lower, certificate, verifierCallback,
  327. std::bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
  328. #else
  329. PLOG_WARNING << "Ignoring media support (not compiled with SRTP support)";
  330. #endif
  331. }
  332. if (!transport) {
  333. // DTLS only
  334. transport = std::make_shared<DtlsTransport>(lower, certificate, verifierCallback,
  335. stateChangeCallback);
  336. }
  337. std::atomic_store(&mDtlsTransport, transport);
  338. if (mState == State::Closed) {
  339. mDtlsTransport.reset();
  340. throw std::runtime_error("Connection is closed");
  341. }
  342. transport->start();
  343. return transport;
  344. } catch (const std::exception &e) {
  345. PLOG_ERROR << e.what();
  346. changeState(State::Failed);
  347. throw std::runtime_error("DTLS transport initialization failed");
  348. }
  349. }
  350. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  351. try {
  352. if (auto transport = std::atomic_load(&mSctpTransport))
  353. return transport;
  354. auto remote = remoteDescription();
  355. if (!remote || !remote->application())
  356. throw std::logic_error("Initializing SCTP transport without application description");
  357. uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  358. auto lower = std::atomic_load(&mDtlsTransport);
  359. auto transport = std::make_shared<SctpTransport>(
  360. lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
  361. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  362. [this, weak_this = weak_from_this()](SctpTransport::State state) {
  363. auto shared_this = weak_this.lock();
  364. if (!shared_this)
  365. return;
  366. switch (state) {
  367. case SctpTransport::State::Connected:
  368. changeState(State::Connected);
  369. openDataChannels();
  370. break;
  371. case SctpTransport::State::Failed:
  372. LOG_WARNING << "SCTP transport failed";
  373. remoteCloseDataChannels();
  374. changeState(State::Failed);
  375. break;
  376. case SctpTransport::State::Disconnected:
  377. remoteCloseDataChannels();
  378. changeState(State::Disconnected);
  379. break;
  380. default:
  381. // Ignore
  382. break;
  383. }
  384. });
  385. std::atomic_store(&mSctpTransport, transport);
  386. if (mState == State::Closed) {
  387. mSctpTransport.reset();
  388. throw std::runtime_error("Connection is closed");
  389. }
  390. transport->start();
  391. return transport;
  392. } catch (const std::exception &e) {
  393. PLOG_ERROR << e.what();
  394. changeState(State::Failed);
  395. throw std::runtime_error("SCTP transport initialization failed");
  396. }
  397. }
  398. void PeerConnection::closeTransports() {
  399. PLOG_VERBOSE << "Closing transports";
  400. // Change state to sink state Closed
  401. changeState(State::Closed);
  402. // Reset callbacks now that state is changed
  403. resetCallbacks();
  404. // Initiate transport stop on the processor after closing the data channels
  405. mProcessor->enqueue([this]() {
  406. // Pass the pointers to a thread
  407. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  408. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  409. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  410. ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
  411. if (sctp)
  412. sctp->stop();
  413. if (dtls)
  414. dtls->stop();
  415. if (ice)
  416. ice->stop();
  417. sctp.reset();
  418. dtls.reset();
  419. ice.reset();
  420. });
  421. });
  422. }
  423. void PeerConnection::endLocalCandidates() {
  424. std::lock_guard lock(mLocalDescriptionMutex);
  425. if (mLocalDescription)
  426. mLocalDescription->endCandidates();
  427. }
  428. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  429. std::lock_guard lock(mRemoteDescriptionMutex);
  430. if (auto expectedFingerprint =
  431. mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt) {
  432. return *expectedFingerprint == fingerprint;
  433. }
  434. return false;
  435. }
  436. void PeerConnection::forwardMessage(message_ptr message) {
  437. if (!message) {
  438. remoteCloseDataChannels();
  439. return;
  440. }
  441. auto channel = findDataChannel(uint16_t(message->stream));
  442. auto iceTransport = std::atomic_load(&mIceTransport);
  443. auto sctpTransport = std::atomic_load(&mSctpTransport);
  444. if (!iceTransport || !sctpTransport)
  445. return;
  446. if (!channel) {
  447. const byte dataChannelOpenMessage{0x03};
  448. unsigned int remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  449. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  450. message->stream % 2 == remoteParity) {
  451. channel =
  452. std::make_shared<DataChannel>(shared_from_this(), sctpTransport, message->stream);
  453. channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this,
  454. weak_ptr<DataChannel>{channel}));
  455. mDataChannels.insert(std::make_pair(message->stream, channel));
  456. } else {
  457. // Invalid, close the DataChannel
  458. sctpTransport->closeStream(message->stream);
  459. return;
  460. }
  461. }
  462. channel->incoming(message);
  463. }
  464. void PeerConnection::forwardMedia(message_ptr message) {
  465. if (!message)
  466. return;
  467. unsigned int ssrc = message->stream;
  468. std::optional<string> mid;
  469. if (auto it = mMidFromSssrc.find(ssrc); it != mMidFromSssrc.end()) {
  470. mid = it->second;
  471. } else {
  472. std::lock_guard lock(mLocalDescriptionMutex);
  473. if (!mLocalDescription)
  474. return;
  475. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  476. if (auto found = std::visit(
  477. rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
  478. return std::nullopt;
  479. },
  480. [&](Description::Media *media) -> std::optional<string> {
  481. return media->hasSSRC(ssrc)
  482. ? std::make_optional(media->mid())
  483. : nullopt;
  484. }},
  485. mRemoteDescription->media(i))) {
  486. mMidFromSssrc.emplace(ssrc, *found);
  487. mid = *found;
  488. break;
  489. }else
  490. PLOG_WARNING << "Unknown SSRC " << ssrc;
  491. }
  492. }
  493. if (!mid) {
  494. PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  495. return;
  496. }
  497. std::shared_lock lock(mTracksMutex); // read-only
  498. if (auto it = mTracks.find(*mid); it != mTracks.end())
  499. if (auto track = it->second.lock())
  500. track->incoming(message);
  501. }
  502. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  503. if (auto channel = findDataChannel(stream))
  504. channel->triggerBufferedAmount(amount);
  505. }
  506. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role, string label,
  507. string protocol,
  508. Reliability reliability) {
  509. // The active side must use streams with even identifiers, whereas the passive side must use
  510. // streams with odd identifiers.
  511. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09#section-6
  512. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  513. unsigned int stream = (role == Description::Role::Active) ? 0 : 1;
  514. while (mDataChannels.find(stream) != mDataChannels.end()) {
  515. stream += 2;
  516. if (stream >= 65535)
  517. throw std::runtime_error("Too many DataChannels");
  518. }
  519. auto channel = std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
  520. std::move(protocol), std::move(reliability));
  521. mDataChannels.emplace(std::make_pair(stream, channel));
  522. return channel;
  523. }
  524. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  525. std::shared_lock lock(mDataChannelsMutex); // read-only
  526. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  527. if (auto channel = it->second.lock())
  528. return channel;
  529. return nullptr;
  530. }
  531. void PeerConnection::iterateDataChannels(
  532. std::function<void(shared_ptr<DataChannel> channel)> func) {
  533. // Iterate
  534. {
  535. std::shared_lock lock(mDataChannelsMutex); // read-only
  536. auto it = mDataChannels.begin();
  537. while (it != mDataChannels.end()) {
  538. auto channel = it->second.lock();
  539. if (channel && !channel->isClosed())
  540. func(channel);
  541. ++it;
  542. }
  543. }
  544. // Cleanup
  545. {
  546. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  547. auto it = mDataChannels.begin();
  548. while (it != mDataChannels.end()) {
  549. if (!it->second.lock()) {
  550. it = mDataChannels.erase(it);
  551. continue;
  552. }
  553. ++it;
  554. }
  555. }
  556. }
  557. void PeerConnection::openDataChannels() {
  558. if (auto transport = std::atomic_load(&mSctpTransport))
  559. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  560. }
  561. void PeerConnection::closeDataChannels() {
  562. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  563. }
  564. void PeerConnection::remoteCloseDataChannels() {
  565. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  566. }
  567. void PeerConnection::incomingTrack(Description::Media description) {
  568. std::unique_lock lock(mTracksMutex); // we are going to emplace
  569. #if !RTC_ENABLE_MEDIA
  570. if (mTracks.empty()) {
  571. PLOG_WARNING << "Tracks will be inative (not compiled with SRTP support)";
  572. }
  573. #endif
  574. if (mTracks.find(description.mid()) == mTracks.end()) {
  575. auto track = std::make_shared<Track>(std::move(description));
  576. mTracks.emplace(std::make_pair(track->mid(), track));
  577. triggerTrack(std::move(track));
  578. }
  579. }
  580. void PeerConnection::openTracks() {
  581. #if RTC_ENABLE_MEDIA
  582. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  583. auto srtpTransport = std::reinterpret_pointer_cast<DtlsSrtpTransport>(transport);
  584. std::shared_lock lock(mTracksMutex); // read-only
  585. // for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  586. if (mTrackLines.size() == remoteDescription()->mediaCount()) {
  587. for (unsigned int i = 0; i < mTrackLines.size(); i++) {
  588. if (auto track = mTrackLines[i].lock()) {
  589. if (!track->isOpen()) {
  590. // if (track->description().direction() == rtc::Description::Direction::RecvOnly || track->description().direction() == rtc::Description::Direction::SendRecv)
  591. // srtpTransport->addInboundSSRC(0);
  592. // if (track->description().direction() == rtc::Description::Direction::SendOnly || track->description().direction() == rtc::Description::Direction::SendRecv)
  593. for (auto ssrc : track->description().getSSRCs()) {
  594. PLOG_DEBUG << "Adding " << ssrc << " to list";
  595. srtpTransport->addSSRC(ssrc);
  596. }
  597. for (auto ssrc : std::get<rtc::Description::Media *>(remoteDescription()->media(i))->getSSRCs()) {
  598. PLOG_DEBUG << "Adding " << ssrc << " to list";
  599. srtpTransport->addSSRC(ssrc);
  600. }
  601. track->open(srtpTransport);
  602. }
  603. }
  604. }
  605. }
  606. }
  607. #endif
  608. }
  609. void PeerConnection::processLocalDescription(Description description) {
  610. int activeMediaCount = 0;
  611. auto remote = remoteDescription();
  612. if (remote && remote->type() == Description::Type::Offer) {
  613. // Reciprocate remote description
  614. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  615. std::visit( // reciprocate each media
  616. rtc::overloaded{
  617. [&](Description::Application *app) {
  618. auto reciprocated = app->reciprocate();
  619. reciprocated.hintSctpPort(DEFAULT_SCTP_PORT);
  620. reciprocated.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  621. ++activeMediaCount;
  622. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  623. << reciprocated.mid() << "\"";
  624. description.addMedia(std::move(reciprocated));
  625. },
  626. [&](Description::Media *media) {
  627. auto reciprocated = media->reciprocate();
  628. #if RTC_ENABLE_MEDIA
  629. if (reciprocated.direction() != Description::Direction::Inactive)
  630. ++activeMediaCount;
  631. #else
  632. // No media support, mark as inactive
  633. reciprocated.setDirection(Description::Direction::Inactive);
  634. #endif
  635. incomingTrack(reciprocated);
  636. PLOG_DEBUG
  637. << "Reciprocating media in local description, mid=\""
  638. << reciprocated.mid() << "\", active=" << std::boolalpha
  639. << (reciprocated.direction() != Description::Direction::Inactive);
  640. description.addMedia(std::move(reciprocated));
  641. },
  642. },
  643. remote->media(i));
  644. } else {
  645. // Add application for data channels
  646. {
  647. std::shared_lock lock(mDataChannelsMutex);
  648. if (!mDataChannels.empty()) {
  649. Description::Application app("data");
  650. app.setSctpPort(DEFAULT_SCTP_PORT);
  651. app.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  652. ++activeMediaCount;
  653. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  654. << "\"";
  655. description.addMedia(std::move(app));
  656. }
  657. }
  658. // Add media for local tracks
  659. {
  660. std::shared_lock lock(mTracksMutex);
  661. // for (auto it = mTracks.begin(); it != mTracks.end(); ++it) {
  662. for (auto ptr : mTrackLines) {
  663. if (auto track = ptr.lock()) {
  664. auto media = track->description();
  665. #if RTC_ENABLE_MEDIA
  666. if (media.direction() != Description::Direction::Inactive)
  667. ++activeMediaCount;
  668. #else
  669. // No media support, mark as inactive
  670. media.setDirection(Description::Direction::Inactive);
  671. #endif
  672. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  673. << "\", active=" << std::boolalpha
  674. << (media.direction() != Description::Direction::Inactive);
  675. description.addMedia(std::move(media));
  676. }
  677. }
  678. }
  679. }
  680. // There must be at least one active media to negociate
  681. if (activeMediaCount == 0)
  682. throw std::runtime_error("Nothing to negociate");
  683. // Set local fingerprint (wait for certificate if necessary)
  684. description.setFingerprint(mCertificate.get()->fingerprint());
  685. std::lock_guard lock(mLocalDescriptionMutex);
  686. mLocalDescription.emplace(std::move(description));
  687. mProcessor->enqueue([this, description = *mLocalDescription]() {
  688. PLOG_VERBOSE << "Issuing local description: " << description;
  689. mLocalDescriptionCallback(std::move(description));
  690. });
  691. }
  692. void PeerConnection::processLocalCandidate(Candidate candidate) {
  693. std::lock_guard lock(mLocalDescriptionMutex);
  694. if (!mLocalDescription)
  695. throw std::logic_error("Got a local candidate without local description");
  696. mLocalDescription->addCandidate(candidate);
  697. mProcessor->enqueue([this, candidate = std::move(candidate)]() {
  698. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  699. mLocalCandidateCallback(std::move(candidate));
  700. });
  701. }
  702. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  703. auto dataChannel = weakDataChannel.lock();
  704. if (!dataChannel)
  705. return;
  706. mProcessor->enqueue(
  707. [this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
  708. }
  709. void PeerConnection::triggerTrack(std::shared_ptr<Track> track) {
  710. mProcessor->enqueue([this, track = std::move(track)]() { mTrackCallback(track); });
  711. }
  712. bool PeerConnection::changeState(State state) {
  713. State current;
  714. do {
  715. current = mState.load();
  716. if (current == state)
  717. return true;
  718. if (current == State::Closed)
  719. return false;
  720. } while (!mState.compare_exchange_weak(current, state));
  721. if (state == State::Closed)
  722. // This is the last state change, so we may steal the callback
  723. mProcessor->enqueue([cb = std::move(mStateChangeCallback)]() { cb(State::Closed); });
  724. else
  725. mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
  726. return true;
  727. }
  728. bool PeerConnection::changeGatheringState(GatheringState state) {
  729. if (mGatheringState.exchange(state) != state)
  730. mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
  731. return true;
  732. }
  733. void PeerConnection::resetCallbacks() {
  734. // Unregister all callbacks
  735. mDataChannelCallback = nullptr;
  736. mLocalDescriptionCallback = nullptr;
  737. mLocalCandidateCallback = nullptr;
  738. mStateChangeCallback = nullptr;
  739. mGatheringStateChangeCallback = nullptr;
  740. }
  741. bool PeerConnection::getSelectedCandidatePair([[maybe_unused]] CandidateInfo *local,
  742. [[maybe_unused]] CandidateInfo *remote) {
  743. #if USE_NICE
  744. auto iceTransport = std::atomic_load(&mIceTransport);
  745. return iceTransport->getSelectedCandidatePair(local, remote);
  746. #else
  747. PLOG_WARNING << "getSelectedCandidatePair() is only implemented with libnice as ICE backend";
  748. return false;
  749. #endif
  750. }
  751. void PeerConnection::clearStats() {
  752. auto sctpTransport = std::atomic_load(&mSctpTransport);
  753. if (sctpTransport)
  754. return sctpTransport->clearStats();
  755. }
  756. size_t PeerConnection::bytesSent() {
  757. auto sctpTransport = std::atomic_load(&mSctpTransport);
  758. if (sctpTransport)
  759. return sctpTransport->bytesSent();
  760. return 0;
  761. }
  762. size_t PeerConnection::bytesReceived() {
  763. auto sctpTransport = std::atomic_load(&mSctpTransport);
  764. if (sctpTransport)
  765. return sctpTransport->bytesReceived();
  766. return 0;
  767. }
  768. std::optional<std::chrono::milliseconds> PeerConnection::rtt() {
  769. auto sctpTransport = std::atomic_load(&mSctpTransport);
  770. if (sctpTransport)
  771. return sctpTransport->rtt();
  772. PLOG_WARNING << "Could not load sctpTransport";
  773. return std::nullopt;
  774. }
  775. } // namespace rtc
  776. std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &state) {
  777. using State = rtc::PeerConnection::State;
  778. std::string str;
  779. switch (state) {
  780. case State::New:
  781. str = "new";
  782. break;
  783. case State::Connecting:
  784. str = "connecting";
  785. break;
  786. case State::Connected:
  787. str = "connected";
  788. break;
  789. case State::Disconnected:
  790. str = "disconnected";
  791. break;
  792. case State::Failed:
  793. str = "failed";
  794. break;
  795. case State::Closed:
  796. str = "closed";
  797. break;
  798. default:
  799. str = "unknown";
  800. break;
  801. }
  802. return out << str;
  803. }
  804. std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::GatheringState &state) {
  805. using GatheringState = rtc::PeerConnection::GatheringState;
  806. std::string str;
  807. switch (state) {
  808. case GatheringState::New:
  809. str = "new";
  810. break;
  811. case GatheringState::InProgress:
  812. str = "in_progress";
  813. break;
  814. case GatheringState::Complete:
  815. str = "complete";
  816. break;
  817. default:
  818. str = "unknown";
  819. break;
  820. }
  821. return out << str;
  822. }