peerconnection.cpp 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "peerconnection.hpp"
  19. #include "certificate.hpp"
  20. #include "include.hpp"
  21. #include "processor.hpp"
  22. #include "threadpool.hpp"
  23. #include "dtlstransport.hpp"
  24. #include "icetransport.hpp"
  25. #include "sctptransport.hpp"
  26. #if RTC_ENABLE_MEDIA
  27. #include "dtlssrtptransport.hpp"
  28. #endif
  29. #include <iomanip>
  30. #include <thread>
  31. namespace rtc {
  32. using namespace std::placeholders;
  33. using std::shared_ptr;
  34. using std::weak_ptr;
  35. PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
  36. PeerConnection::PeerConnection(const Configuration &config)
  37. : mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
  38. mState(State::New), mGatheringState(GatheringState::New) {
  39. PLOG_VERBOSE << "Creating PeerConnection";
  40. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  41. throw std::invalid_argument("Invalid port range");
  42. }
  43. PeerConnection::~PeerConnection() {
  44. PLOG_VERBOSE << "Destroying PeerConnection";
  45. close();
  46. mProcessor->join();
  47. }
  48. void PeerConnection::close() {
  49. PLOG_VERBOSE << "Closing PeerConnection";
  50. // Close data channels asynchronously
  51. mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
  52. closeTransports();
  53. }
  54. const Configuration *PeerConnection::config() const { return &mConfig; }
  55. PeerConnection::State PeerConnection::state() const { return mState; }
  56. PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
  57. std::optional<Description> PeerConnection::localDescription() const {
  58. std::lock_guard lock(mLocalDescriptionMutex);
  59. return mLocalDescription;
  60. }
  61. std::optional<Description> PeerConnection::remoteDescription() const {
  62. std::lock_guard lock(mRemoteDescriptionMutex);
  63. return mRemoteDescription;
  64. }
  65. void PeerConnection::setLocalDescription() {
  66. PLOG_VERBOSE << "Setting local description";
  67. if (std::atomic_load(&mIceTransport)) {
  68. PLOG_DEBUG << "Local description is already set";
  69. }
  70. // RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
  71. // setup:actpass.
  72. // See https://tools.ietf.org/html/rfc5763#section-5
  73. auto iceTransport = initIceTransport(Description::Role::ActPass);
  74. Description localDescription = iceTransport->getLocalDescription(Description::Type::Offer);
  75. processLocalDescription(localDescription);
  76. iceTransport->gatherLocalCandidates();
  77. }
  78. void PeerConnection::setRemoteDescription(Description description) {
  79. PLOG_VERBOSE << "Setting remote description: " << string(description);
  80. if (description.mediaCount() == 0)
  81. throw std::invalid_argument("Remote description has no media line");
  82. int activeMediaCount = 0;
  83. for (int i = 0; i < description.mediaCount(); ++i)
  84. std::visit( // reciprocate each media
  85. rtc::overloaded{[&](Description::Application *) { ++activeMediaCount; },
  86. [&](Description::Media *media) {
  87. if (media->direction() != Description::Direction::Inactive)
  88. ++activeMediaCount;
  89. }},
  90. description.media(i));
  91. if (activeMediaCount == 0)
  92. throw std::invalid_argument("Remote description has no active media");
  93. if (!description.fingerprint())
  94. throw std::invalid_argument("Remote description has no fingerprint");
  95. description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer);
  96. auto type = description.type();
  97. auto remoteCandidates = description.extractCandidates(); // Candidates will be added at the end
  98. auto iceTransport = std::atomic_load(&mIceTransport);
  99. if (!iceTransport)
  100. iceTransport = initIceTransport(Description::Role::ActPass);
  101. iceTransport->setRemoteDescription(description);
  102. {
  103. std::lock_guard lock(mRemoteDescriptionMutex);
  104. mRemoteDescription.emplace(std::move(description));
  105. }
  106. if (type == Description::Type::Offer) {
  107. // This is an offer and we are the answerer.
  108. Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
  109. processLocalDescription(localDescription);
  110. iceTransport->gatherLocalCandidates();
  111. } else {
  112. // This is an answer and we are the offerer.
  113. auto sctpTransport = std::atomic_load(&mSctpTransport);
  114. if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
  115. // Since we assumed passive role during DataChannel creation, we need to shift the
  116. // stream numbers by one to shift them from odd to even.
  117. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  118. decltype(mDataChannels) newDataChannels;
  119. auto it = mDataChannels.begin();
  120. while (it != mDataChannels.end()) {
  121. auto channel = it->second.lock();
  122. if (channel->stream() % 2 == 1)
  123. channel->mStream -= 1;
  124. newDataChannels.emplace(channel->stream(), channel);
  125. ++it;
  126. }
  127. std::swap(mDataChannels, newDataChannels);
  128. }
  129. }
  130. for (const auto &candidate : remoteCandidates)
  131. addRemoteCandidate(candidate);
  132. }
  133. void PeerConnection::addRemoteCandidate(Candidate candidate) {
  134. PLOG_VERBOSE << "Adding remote candidate: " << string(candidate);
  135. auto iceTransport = std::atomic_load(&mIceTransport);
  136. if (!mRemoteDescription || !iceTransport)
  137. throw std::logic_error("Remote candidate set without remote description");
  138. if (candidate.resolve(Candidate::ResolveMode::Simple)) {
  139. iceTransport->addRemoteCandidate(candidate);
  140. } else {
  141. // OK, we might need a lookup, do it asynchronously
  142. // We don't use the thread pool because we have no control on the timeout
  143. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  144. std::thread t([weakIceTransport, candidate]() mutable {
  145. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  146. if (auto iceTransport = weakIceTransport.lock())
  147. iceTransport->addRemoteCandidate(candidate);
  148. });
  149. t.detach();
  150. }
  151. std::lock_guard lock(mRemoteDescriptionMutex);
  152. mRemoteDescription->addCandidate(candidate);
  153. }
  154. std::optional<string> PeerConnection::localAddress() const {
  155. auto iceTransport = std::atomic_load(&mIceTransport);
  156. return iceTransport ? iceTransport->getLocalAddress() : nullopt;
  157. }
  158. std::optional<string> PeerConnection::remoteAddress() const {
  159. auto iceTransport = std::atomic_load(&mIceTransport);
  160. return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
  161. }
  162. shared_ptr<DataChannel> PeerConnection::addDataChannel(string label, string protocol,
  163. Reliability reliability) {
  164. if (auto local = localDescription(); local && !local->hasApplication()) {
  165. PLOG_ERROR << "The PeerConnection was negociated without DataChannel support.";
  166. throw std::runtime_error("No DataChannel support on the PeerConnection");
  167. }
  168. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  169. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  170. // See https://tools.ietf.org/html/rfc5763#section-5
  171. // Therefore, we assume passive role when we are the offerer.
  172. auto iceTransport = std::atomic_load(&mIceTransport);
  173. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  174. auto channel =
  175. emplaceDataChannel(role, std::move(label), std::move(protocol), std::move(reliability));
  176. if (auto transport = std::atomic_load(&mSctpTransport))
  177. if (transport->state() == SctpTransport::State::Connected)
  178. channel->open(transport);
  179. return channel;
  180. }
  181. shared_ptr<DataChannel> PeerConnection::createDataChannel(string label, string protocol,
  182. Reliability reliability) {
  183. auto channel = addDataChannel(label, protocol, reliability);
  184. setLocalDescription();
  185. return channel;
  186. }
  187. void PeerConnection::onDataChannel(
  188. std::function<void(shared_ptr<DataChannel> dataChannel)> callback) {
  189. mDataChannelCallback = callback;
  190. }
  191. void PeerConnection::onLocalDescription(std::function<void(Description description)> callback) {
  192. mLocalDescriptionCallback = callback;
  193. }
  194. void PeerConnection::onLocalCandidate(std::function<void(Candidate candidate)> callback) {
  195. mLocalCandidateCallback = callback;
  196. }
  197. void PeerConnection::onStateChange(std::function<void(State state)> callback) {
  198. mStateChangeCallback = callback;
  199. }
  200. void PeerConnection::onGatheringStateChange(std::function<void(GatheringState state)> callback) {
  201. mGatheringStateChangeCallback = callback;
  202. }
  203. bool PeerConnection::hasMedia() const {
  204. auto local = localDescription();
  205. return local && local->hasAudioOrVideo();
  206. }
  207. std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
  208. if (localDescription())
  209. throw std::logic_error("Tracks must be created before local description");
  210. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  211. if (auto track = it->second.lock())
  212. return track;
  213. #if !RTC_ENABLE_MEDIA
  214. if (mTracks.empty()) {
  215. PLOG_WARNING << "Tracks will be inative (not compiled with SRTP support)";
  216. }
  217. #endif
  218. auto track = std::make_shared<Track>(std::move(description));
  219. mTracks.emplace(std::make_pair(track->mid(), track));
  220. return track;
  221. }
  222. void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callback) {
  223. mTrackCallback = callback;
  224. }
  225. shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
  226. try {
  227. if (auto transport = std::atomic_load(&mIceTransport))
  228. return transport;
  229. auto transport = std::make_shared<IceTransport>(
  230. mConfig, role, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  231. [this, weak_this = weak_from_this()](IceTransport::State state) {
  232. auto shared_this = weak_this.lock();
  233. if (!shared_this)
  234. return;
  235. switch (state) {
  236. case IceTransport::State::Connecting:
  237. changeState(State::Connecting);
  238. break;
  239. case IceTransport::State::Failed:
  240. changeState(State::Failed);
  241. break;
  242. case IceTransport::State::Connected:
  243. initDtlsTransport();
  244. break;
  245. case IceTransport::State::Disconnected:
  246. changeState(State::Disconnected);
  247. break;
  248. default:
  249. // Ignore
  250. break;
  251. }
  252. },
  253. [this, weak_this = weak_from_this()](IceTransport::GatheringState state) {
  254. auto shared_this = weak_this.lock();
  255. if (!shared_this)
  256. return;
  257. switch (state) {
  258. case IceTransport::GatheringState::InProgress:
  259. changeGatheringState(GatheringState::InProgress);
  260. break;
  261. case IceTransport::GatheringState::Complete:
  262. endLocalCandidates();
  263. changeGatheringState(GatheringState::Complete);
  264. break;
  265. default:
  266. // Ignore
  267. break;
  268. }
  269. });
  270. std::atomic_store(&mIceTransport, transport);
  271. if (mState == State::Closed) {
  272. mIceTransport.reset();
  273. throw std::runtime_error("Connection is closed");
  274. }
  275. transport->start();
  276. return transport;
  277. } catch (const std::exception &e) {
  278. PLOG_ERROR << e.what();
  279. changeState(State::Failed);
  280. throw std::runtime_error("ICE transport initialization failed");
  281. }
  282. }
  283. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  284. try {
  285. if (auto transport = std::atomic_load(&mDtlsTransport))
  286. return transport;
  287. auto certificate = mCertificate.get();
  288. auto lower = std::atomic_load(&mIceTransport);
  289. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  290. auto stateChangeCallback = [this,
  291. weak_this = weak_from_this()](DtlsTransport::State state) {
  292. auto shared_this = weak_this.lock();
  293. if (!shared_this)
  294. return;
  295. switch (state) {
  296. case DtlsTransport::State::Connected:
  297. if (auto local = localDescription(); local && local->hasApplication())
  298. initSctpTransport();
  299. else
  300. changeState(State::Connected);
  301. openTracks();
  302. break;
  303. case DtlsTransport::State::Failed:
  304. changeState(State::Failed);
  305. break;
  306. case DtlsTransport::State::Disconnected:
  307. changeState(State::Disconnected);
  308. break;
  309. default:
  310. // Ignore
  311. break;
  312. }
  313. };
  314. shared_ptr<DtlsTransport> transport;
  315. if (hasMedia()) {
  316. #if RTC_ENABLE_MEDIA
  317. PLOG_INFO << "This connection requires media support";
  318. // DTLS-SRTP
  319. transport = std::make_shared<DtlsSrtpTransport>(
  320. lower, certificate, verifierCallback,
  321. std::bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
  322. #else
  323. PLOG_WARNING << "Ignoring media support (not compiled with SRTP support)";
  324. #endif
  325. }
  326. if (!transport) {
  327. // DTLS only
  328. transport = std::make_shared<DtlsTransport>(lower, certificate, verifierCallback,
  329. stateChangeCallback);
  330. }
  331. std::atomic_store(&mDtlsTransport, transport);
  332. if (mState == State::Closed) {
  333. mDtlsTransport.reset();
  334. throw std::runtime_error("Connection is closed");
  335. }
  336. transport->start();
  337. return transport;
  338. } catch (const std::exception &e) {
  339. PLOG_ERROR << e.what();
  340. changeState(State::Failed);
  341. throw std::runtime_error("DTLS transport initialization failed");
  342. }
  343. }
  344. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  345. try {
  346. if (auto transport = std::atomic_load(&mSctpTransport))
  347. return transport;
  348. auto remote = remoteDescription();
  349. if (!remote || !remote->application())
  350. throw std::logic_error("Initializing SCTP transport without application description");
  351. uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  352. auto lower = std::atomic_load(&mDtlsTransport);
  353. auto transport = std::make_shared<SctpTransport>(
  354. lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
  355. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  356. [this, weak_this = weak_from_this()](SctpTransport::State state) {
  357. auto shared_this = weak_this.lock();
  358. if (!shared_this)
  359. return;
  360. switch (state) {
  361. case SctpTransport::State::Connected:
  362. changeState(State::Connected);
  363. openDataChannels();
  364. break;
  365. case SctpTransport::State::Failed:
  366. LOG_WARNING << "SCTP transport failed";
  367. remoteCloseDataChannels();
  368. changeState(State::Failed);
  369. break;
  370. case SctpTransport::State::Disconnected:
  371. remoteCloseDataChannels();
  372. changeState(State::Disconnected);
  373. break;
  374. default:
  375. // Ignore
  376. break;
  377. }
  378. });
  379. std::atomic_store(&mSctpTransport, transport);
  380. if (mState == State::Closed) {
  381. mSctpTransport.reset();
  382. throw std::runtime_error("Connection is closed");
  383. }
  384. transport->start();
  385. return transport;
  386. } catch (const std::exception &e) {
  387. PLOG_ERROR << e.what();
  388. changeState(State::Failed);
  389. throw std::runtime_error("SCTP transport initialization failed");
  390. }
  391. }
  392. void PeerConnection::closeTransports() {
  393. PLOG_VERBOSE << "Closing transports";
  394. // Change state to sink state Closed
  395. changeState(State::Closed);
  396. // Reset callbacks now that state is changed
  397. resetCallbacks();
  398. // Initiate transport stop on the processor after closing the data channels
  399. mProcessor->enqueue([this]() {
  400. // Pass the pointers to a thread
  401. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  402. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  403. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  404. ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
  405. if (sctp)
  406. sctp->stop();
  407. if (dtls)
  408. dtls->stop();
  409. if (ice)
  410. ice->stop();
  411. sctp.reset();
  412. dtls.reset();
  413. ice.reset();
  414. });
  415. });
  416. }
  417. void PeerConnection::endLocalCandidates() {
  418. std::lock_guard lock(mLocalDescriptionMutex);
  419. if (mLocalDescription)
  420. mLocalDescription->endCandidates();
  421. }
  422. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  423. std::lock_guard lock(mRemoteDescriptionMutex);
  424. if (auto expectedFingerprint =
  425. mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt) {
  426. return *expectedFingerprint == fingerprint;
  427. }
  428. return false;
  429. }
  430. void PeerConnection::forwardMessage(message_ptr message) {
  431. if (!message) {
  432. remoteCloseDataChannels();
  433. return;
  434. }
  435. auto channel = findDataChannel(uint16_t(message->stream));
  436. auto iceTransport = std::atomic_load(&mIceTransport);
  437. auto sctpTransport = std::atomic_load(&mSctpTransport);
  438. if (!iceTransport || !sctpTransport)
  439. return;
  440. if (!channel) {
  441. const byte dataChannelOpenMessage{0x03};
  442. unsigned int remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  443. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  444. message->stream % 2 == remoteParity) {
  445. channel =
  446. std::make_shared<DataChannel>(shared_from_this(), sctpTransport, message->stream);
  447. channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this,
  448. weak_ptr<DataChannel>{channel}));
  449. mDataChannels.insert(std::make_pair(message->stream, channel));
  450. } else {
  451. // Invalid, close the DataChannel
  452. sctpTransport->closeStream(message->stream);
  453. return;
  454. }
  455. }
  456. channel->incoming(message);
  457. }
  458. void PeerConnection::forwardMedia(message_ptr message) {
  459. if (!message)
  460. return;
  461. if (message->type == Message::Type::Control) {
  462. std::shared_lock lock(mTracksMutex); // read-only
  463. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  464. if (auto track = it->second.lock())
  465. return track->incoming(message);
  466. PLOG_WARNING << "No track available to receive control, dropping";
  467. return;
  468. }
  469. unsigned int payloadType = message->stream;
  470. std::optional<string> mid;
  471. if (auto it = mMidFromPayloadType.find(payloadType); it != mMidFromPayloadType.end()) {
  472. mid = it->second;
  473. } else {
  474. std::lock_guard lock(mLocalDescriptionMutex);
  475. if (!mLocalDescription)
  476. return;
  477. for (int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  478. if (auto found = std::visit(
  479. rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
  480. return std::nullopt;
  481. },
  482. [&](Description::Media *media) -> std::optional<string> {
  483. return media->hasPayloadType(payloadType)
  484. ? std::make_optional(media->mid())
  485. : nullopt;
  486. }},
  487. mLocalDescription->media(i))) {
  488. mMidFromPayloadType.emplace(payloadType, *found);
  489. mid = *found;
  490. break;
  491. }
  492. }
  493. }
  494. if (!mid) {
  495. PLOG_WARNING << "Track not found for payload type " << payloadType << ", dropping";
  496. return;
  497. }
  498. std::shared_lock lock(mTracksMutex); // read-only
  499. if (auto it = mTracks.find(*mid); it != mTracks.end())
  500. if (auto track = it->second.lock())
  501. track->incoming(message);
  502. }
  503. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  504. if (auto channel = findDataChannel(stream))
  505. channel->triggerBufferedAmount(amount);
  506. }
  507. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role, string label,
  508. string protocol,
  509. Reliability reliability) {
  510. // The active side must use streams with even identifiers, whereas the passive side must use
  511. // streams with odd identifiers.
  512. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09#section-6
  513. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  514. unsigned int stream = (role == Description::Role::Active) ? 0 : 1;
  515. while (mDataChannels.find(stream) != mDataChannels.end()) {
  516. stream += 2;
  517. if (stream >= 65535)
  518. throw std::runtime_error("Too many DataChannels");
  519. }
  520. auto channel = std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
  521. std::move(protocol), std::move(reliability));
  522. mDataChannels.emplace(std::make_pair(stream, channel));
  523. return channel;
  524. }
  525. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  526. std::shared_lock lock(mDataChannelsMutex); // read-only
  527. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  528. if (auto channel = it->second.lock())
  529. return channel;
  530. return nullptr;
  531. }
  532. void PeerConnection::iterateDataChannels(
  533. std::function<void(shared_ptr<DataChannel> channel)> func) {
  534. // Iterate
  535. {
  536. std::shared_lock lock(mDataChannelsMutex); // read-only
  537. auto it = mDataChannels.begin();
  538. while (it != mDataChannels.end()) {
  539. auto channel = it->second.lock();
  540. if (channel && !channel->isClosed())
  541. func(channel);
  542. ++it;
  543. }
  544. }
  545. // Cleanup
  546. {
  547. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  548. auto it = mDataChannels.begin();
  549. while (it != mDataChannels.end()) {
  550. if (!it->second.lock()) {
  551. it = mDataChannels.erase(it);
  552. continue;
  553. }
  554. ++it;
  555. }
  556. }
  557. }
  558. void PeerConnection::openDataChannels() {
  559. if (auto transport = std::atomic_load(&mSctpTransport))
  560. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  561. }
  562. void PeerConnection::closeDataChannels() {
  563. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  564. }
  565. void PeerConnection::remoteCloseDataChannels() {
  566. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  567. }
  568. void PeerConnection::incomingTrack(Description::Media description) {
  569. std::unique_lock lock(mTracksMutex); // we are going to emplace
  570. #if !RTC_ENABLE_MEDIA
  571. if (mTracks.empty()) {
  572. PLOG_WARNING << "Tracks will be inative (not compiled with SRTP support)";
  573. }
  574. #endif
  575. if (mTracks.find(description.mid()) == mTracks.end()) {
  576. auto track = std::make_shared<Track>(std::move(description));
  577. mTracks.emplace(std::make_pair(track->mid(), track));
  578. triggerTrack(std::move(track));
  579. }
  580. }
  581. void PeerConnection::openTracks() {
  582. #if RTC_ENABLE_MEDIA
  583. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  584. auto srtpTransport = std::reinterpret_pointer_cast<DtlsSrtpTransport>(transport);
  585. std::shared_lock lock(mTracksMutex); // read-only
  586. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  587. if (auto track = it->second.lock())
  588. track->open(srtpTransport);
  589. }
  590. #endif
  591. }
  592. void PeerConnection::processLocalDescription(Description description) {
  593. int activeMediaCount = 0;
  594. if (auto remote = remoteDescription()) {
  595. // Reciprocate remote description
  596. for (int i = 0; i < remote->mediaCount(); ++i)
  597. std::visit( // reciprocate each media
  598. rtc::overloaded{
  599. [&](Description::Application *app) {
  600. auto reciprocated = app->reciprocate();
  601. reciprocated.hintSctpPort(DEFAULT_SCTP_PORT);
  602. reciprocated.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  603. ++activeMediaCount;
  604. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  605. << reciprocated.mid() << "\"";
  606. description.addMedia(std::move(reciprocated));
  607. },
  608. [&](Description::Media *media) {
  609. auto reciprocated = media->reciprocate();
  610. #if RTC_ENABLE_MEDIA
  611. if (reciprocated.direction() != Description::Direction::Inactive)
  612. ++activeMediaCount;
  613. #else
  614. // No media support, mark as inactive
  615. reciprocated.setDirection(Description::Direction::Inactive);
  616. #endif
  617. incomingTrack(reciprocated);
  618. PLOG_DEBUG
  619. << "Reciprocating media in local description, mid=\""
  620. << reciprocated.mid() << "\", active=" << std::boolalpha
  621. << (reciprocated.direction() != Description::Direction::Inactive);
  622. description.addMedia(std::move(reciprocated));
  623. },
  624. },
  625. remote->media(i));
  626. } else {
  627. // Add application for data channels
  628. {
  629. std::shared_lock lock(mDataChannelsMutex);
  630. if (!mDataChannels.empty()) {
  631. Description::Application app("data");
  632. app.setSctpPort(DEFAULT_SCTP_PORT);
  633. app.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  634. ++activeMediaCount;
  635. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  636. << "\"";
  637. description.addMedia(std::move(app));
  638. }
  639. }
  640. // Add media for local tracks
  641. {
  642. std::shared_lock lock(mTracksMutex);
  643. for (auto it = mTracks.begin(); it != mTracks.end(); ++it) {
  644. if (auto track = it->second.lock()) {
  645. auto media = track->description();
  646. #if RTC_ENABLE_MEDIA
  647. if (media.direction() != Description::Direction::Inactive)
  648. ++activeMediaCount;
  649. #else
  650. // No media support, mark as inactive
  651. media.setDirection(Description::Direction::Inactive);
  652. #endif
  653. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  654. << "\", active=" << std::boolalpha
  655. << (media.direction() != Description::Direction::Inactive);
  656. description.addMedia(std::move(media));
  657. }
  658. }
  659. }
  660. }
  661. // There must be at least one active media to negociate
  662. if (activeMediaCount == 0)
  663. throw std::runtime_error("Nothing to negociate");
  664. // Set local fingerprint (wait for certificate if necessary)
  665. description.setFingerprint(mCertificate.get()->fingerprint());
  666. std::lock_guard lock(mLocalDescriptionMutex);
  667. mLocalDescription.emplace(std::move(description));
  668. mProcessor->enqueue([this, description = *mLocalDescription]() {
  669. PLOG_VERBOSE << "Issuing local description: " << description;
  670. mLocalDescriptionCallback(std::move(description));
  671. });
  672. }
  673. void PeerConnection::processLocalCandidate(Candidate candidate) {
  674. std::lock_guard lock(mLocalDescriptionMutex);
  675. if (!mLocalDescription)
  676. throw std::logic_error("Got a local candidate without local description");
  677. mLocalDescription->addCandidate(candidate);
  678. mProcessor->enqueue([this, candidate = std::move(candidate)]() {
  679. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  680. mLocalCandidateCallback(std::move(candidate));
  681. });
  682. }
  683. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  684. auto dataChannel = weakDataChannel.lock();
  685. if (!dataChannel)
  686. return;
  687. mProcessor->enqueue(
  688. [this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
  689. }
  690. void PeerConnection::triggerTrack(std::shared_ptr<Track> track) {
  691. mProcessor->enqueue([this, track = std::move(track)]() { mTrackCallback(track); });
  692. }
  693. bool PeerConnection::changeState(State state) {
  694. State current;
  695. do {
  696. current = mState.load();
  697. if (current == state)
  698. return true;
  699. if (current == State::Closed)
  700. return false;
  701. } while (!mState.compare_exchange_weak(current, state));
  702. if (state == State::Closed)
  703. // This is the last state change, so we may steal the callback
  704. mProcessor->enqueue([cb = std::move(mStateChangeCallback)]() { cb(State::Closed); });
  705. else
  706. mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
  707. return true;
  708. }
  709. bool PeerConnection::changeGatheringState(GatheringState state) {
  710. if (mGatheringState.exchange(state) != state)
  711. mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
  712. return true;
  713. }
  714. void PeerConnection::resetCallbacks() {
  715. // Unregister all callbacks
  716. mDataChannelCallback = nullptr;
  717. mLocalDescriptionCallback = nullptr;
  718. mLocalCandidateCallback = nullptr;
  719. mStateChangeCallback = nullptr;
  720. mGatheringStateChangeCallback = nullptr;
  721. }
  722. bool PeerConnection::getSelectedCandidatePair([[maybe_unused]] CandidateInfo *local,
  723. [[maybe_unused]] CandidateInfo *remote) {
  724. #if USE_NICE
  725. auto iceTransport = std::atomic_load(&mIceTransport);
  726. return iceTransport->getSelectedCandidatePair(local, remote);
  727. #else
  728. PLOG_WARNING << "getSelectedCandidatePair() is only implemented with libnice as ICE backend";
  729. return false;
  730. #endif
  731. }
  732. void PeerConnection::clearStats() {
  733. auto sctpTransport = std::atomic_load(&mSctpTransport);
  734. if (sctpTransport)
  735. return sctpTransport->clearStats();
  736. }
  737. size_t PeerConnection::bytesSent() {
  738. auto sctpTransport = std::atomic_load(&mSctpTransport);
  739. if (sctpTransport)
  740. return sctpTransport->bytesSent();
  741. return 0;
  742. }
  743. size_t PeerConnection::bytesReceived() {
  744. auto sctpTransport = std::atomic_load(&mSctpTransport);
  745. if (sctpTransport)
  746. return sctpTransport->bytesReceived();
  747. return 0;
  748. }
  749. std::optional<std::chrono::milliseconds> PeerConnection::rtt() {
  750. auto sctpTransport = std::atomic_load(&mSctpTransport);
  751. if (sctpTransport)
  752. return sctpTransport->rtt();
  753. PLOG_WARNING << "Could not load sctpTransport";
  754. return std::nullopt;
  755. }
  756. } // namespace rtc
  757. std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &state) {
  758. using State = rtc::PeerConnection::State;
  759. std::string str;
  760. switch (state) {
  761. case State::New:
  762. str = "new";
  763. break;
  764. case State::Connecting:
  765. str = "connecting";
  766. break;
  767. case State::Connected:
  768. str = "connected";
  769. break;
  770. case State::Disconnected:
  771. str = "disconnected";
  772. break;
  773. case State::Failed:
  774. str = "failed";
  775. break;
  776. case State::Closed:
  777. str = "closed";
  778. break;
  779. default:
  780. str = "unknown";
  781. break;
  782. }
  783. return out << str;
  784. }
  785. std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::GatheringState &state) {
  786. using GatheringState = rtc::PeerConnection::GatheringState;
  787. std::string str;
  788. switch (state) {
  789. case GatheringState::New:
  790. str = "new";
  791. break;
  792. case GatheringState::InProgress:
  793. str = "in_progress";
  794. break;
  795. case GatheringState::Complete:
  796. str = "complete";
  797. break;
  798. default:
  799. str = "unknown";
  800. break;
  801. }
  802. return out << str;
  803. }