peerconnection.cpp 45 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. * Copyright (c) 2020 Filip Klembara (in2core)
  4. *
  5. * This library is free software; you can redistribute it and/or
  6. * modify it under the terms of the GNU Lesser General Public
  7. * License as published by the Free Software Foundation; either
  8. * version 2.1 of the License, or (at your option) any later version.
  9. *
  10. * This library is distributed in the hope that it will be useful,
  11. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  12. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  13. * Lesser General Public License for more details.
  14. *
  15. * You should have received a copy of the GNU Lesser General Public
  16. * License along with this library; if not, write to the Free Software
  17. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  18. */
  19. #include "peerconnection.hpp"
  20. #include "certificate.hpp"
  21. #include "include.hpp"
  22. #include "processor.hpp"
  23. #include "threadpool.hpp"
  24. #include "dtlstransport.hpp"
  25. #include "icetransport.hpp"
  26. #include "sctptransport.hpp"
  27. #if RTC_ENABLE_MEDIA
  28. #include "dtlssrtptransport.hpp"
  29. #endif
  30. #include <iomanip>
  31. #include <thread>
  32. #if __clang__
  33. namespace {
  34. template <typename To, typename From>
  35. inline std::shared_ptr<To> reinterpret_pointer_cast(std::shared_ptr<From> const &ptr) noexcept {
  36. return std::shared_ptr<To>(ptr, reinterpret_cast<To *>(ptr.get()));
  37. }
  38. } // namespace
  39. #else
  40. using std::reinterpret_pointer_cast;
  41. #endif
  42. namespace rtc {
  43. using namespace std::placeholders;
  44. using std::shared_ptr;
  45. using std::weak_ptr;
  46. PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
  47. PeerConnection::PeerConnection(const Configuration &config)
  48. : mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
  49. mState(State::New), mGatheringState(GatheringState::New),
  50. mSignalingState(SignalingState::Stable), mNegotiationNeeded(false) {
  51. PLOG_VERBOSE << "Creating PeerConnection";
  52. if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
  53. throw std::invalid_argument("Invalid port range");
  54. }
  55. PeerConnection::~PeerConnection() {
  56. PLOG_VERBOSE << "Destroying PeerConnection";
  57. close();
  58. mProcessor->join();
  59. }
  60. void PeerConnection::close() {
  61. PLOG_VERBOSE << "Closing PeerConnection";
  62. mNegotiationNeeded = false;
  63. // Close data channels asynchronously
  64. mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
  65. closeTransports();
  66. }
  67. const Configuration *PeerConnection::config() const { return &mConfig; }
  68. PeerConnection::State PeerConnection::state() const { return mState; }
  69. PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
  70. PeerConnection::SignalingState PeerConnection::signalingState() const { return mSignalingState; }
  71. std::optional<Description> PeerConnection::localDescription() const {
  72. std::lock_guard lock(mLocalDescriptionMutex);
  73. return mLocalDescription;
  74. }
  75. std::optional<Description> PeerConnection::remoteDescription() const {
  76. std::lock_guard lock(mRemoteDescriptionMutex);
  77. return mRemoteDescription;
  78. }
  79. bool PeerConnection::hasLocalDescription() const {
  80. std::lock_guard lock(mLocalDescriptionMutex);
  81. return bool(mLocalDescription);
  82. }
  83. bool PeerConnection::hasRemoteDescription() const {
  84. std::lock_guard lock(mRemoteDescriptionMutex);
  85. return bool(mRemoteDescription);
  86. }
  87. bool PeerConnection::hasMedia() const {
  88. auto local = localDescription();
  89. return local && local->hasAudioOrVideo();
  90. }
  91. void PeerConnection::setLocalDescription(Description::Type type) {
  92. PLOG_VERBOSE << "Setting local description, type=" << Description::typeToString(type);
  93. SignalingState signalingState = mSignalingState.load();
  94. if (type == Description::Type::Rollback) {
  95. if (signalingState == SignalingState::HaveLocalOffer ||
  96. signalingState == SignalingState::HaveLocalPranswer) {
  97. PLOG_DEBUG << "Rolling back pending local description";
  98. std::unique_lock lock(mLocalDescriptionMutex);
  99. if (mCurrentLocalDescription) {
  100. std::vector<Candidate> existingCandidates;
  101. if (mLocalDescription)
  102. existingCandidates = mLocalDescription->extractCandidates();
  103. mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
  104. mLocalDescription->addCandidates(std::move(existingCandidates));
  105. mCurrentLocalDescription.reset();
  106. }
  107. lock.unlock();
  108. changeSignalingState(SignalingState::Stable);
  109. }
  110. return;
  111. }
  112. // Guess the description type if unspecified
  113. if (type == Description::Type::Unspec) {
  114. if (mSignalingState == SignalingState::HaveRemoteOffer)
  115. type = Description::Type::Answer;
  116. else
  117. type = Description::Type::Offer;
  118. }
  119. // Only a local offer resets the negotiation needed flag
  120. if (type == Description::Type::Offer && !mNegotiationNeeded.exchange(false)) {
  121. PLOG_DEBUG << "No negotiation needed";
  122. return;
  123. }
  124. // Get the new signaling state
  125. SignalingState newSignalingState;
  126. switch (signalingState) {
  127. case SignalingState::Stable:
  128. if (type != Description::Type::Offer) {
  129. std::ostringstream oss;
  130. oss << "Unexpected local desciption type " << type << " in signaling state "
  131. << signalingState;
  132. throw std::logic_error(oss.str());
  133. }
  134. newSignalingState = SignalingState::HaveLocalOffer;
  135. break;
  136. case SignalingState::HaveRemoteOffer:
  137. case SignalingState::HaveLocalPranswer:
  138. if (type != Description::Type::Answer && type != Description::Type::Pranswer) {
  139. std::ostringstream oss;
  140. oss << "Unexpected local description type " << type
  141. << " description in signaling state " << signalingState;
  142. throw std::logic_error(oss.str());
  143. }
  144. newSignalingState = SignalingState::Stable;
  145. break;
  146. default: {
  147. std::ostringstream oss;
  148. oss << "Unexpected local description in signaling state " << signalingState << ", ignoring";
  149. LOG_WARNING << oss.str();
  150. return;
  151. }
  152. }
  153. auto iceTransport = std::atomic_load(&mIceTransport);
  154. if (!iceTransport) {
  155. // RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
  156. // setup:actpass.
  157. // See https://tools.ietf.org/html/rfc5763#section-5
  158. iceTransport = initIceTransport(Description::Role::ActPass);
  159. }
  160. Description localDescription = iceTransport->getLocalDescription(type);
  161. processLocalDescription(std::move(localDescription));
  162. changeSignalingState(newSignalingState);
  163. if (mGatheringState == GatheringState::New)
  164. iceTransport->gatherLocalCandidates();
  165. }
  166. void PeerConnection::setRemoteDescription(Description description) {
  167. PLOG_VERBOSE << "Setting remote description: " << string(description);
  168. if (description.type() == Description::Type::Rollback) {
  169. // This is mostly useless because we accept any offer
  170. PLOG_VERBOSE << "Rolling back pending remote description";
  171. changeSignalingState(SignalingState::Stable);
  172. return;
  173. }
  174. validateRemoteDescription(description);
  175. // Get the new signaling state
  176. SignalingState signalingState = mSignalingState.load();
  177. SignalingState newSignalingState;
  178. switch (signalingState) {
  179. case SignalingState::Stable:
  180. description.hintType(Description::Type::Offer);
  181. if (description.type() != Description::Type::Offer) {
  182. std::ostringstream oss;
  183. oss << "Unexpected remote " << description.type() << " description in signaling state "
  184. << signalingState;
  185. throw std::logic_error(oss.str());
  186. }
  187. newSignalingState = SignalingState::HaveRemoteOffer;
  188. break;
  189. case SignalingState::HaveLocalOffer:
  190. description.hintType(Description::Type::Answer);
  191. if (description.type() == Description::Type::Offer) {
  192. // The ICE agent will automatically initiate a rollback when a peer that had previously
  193. // created an offer receives an offer from the remote peer
  194. setLocalDescription(Description::Type::Rollback);
  195. newSignalingState = SignalingState::HaveRemoteOffer;
  196. break;
  197. }
  198. if (description.type() != Description::Type::Answer &&
  199. description.type() != Description::Type::Pranswer) {
  200. std::ostringstream oss;
  201. oss << "Unexpected remote " << description.type() << " description in signaling state "
  202. << signalingState;
  203. throw std::logic_error(oss.str());
  204. }
  205. newSignalingState = SignalingState::Stable;
  206. break;
  207. case SignalingState::HaveRemotePranswer:
  208. description.hintType(Description::Type::Answer);
  209. if (description.type() != Description::Type::Answer &&
  210. description.type() != Description::Type::Pranswer) {
  211. std::ostringstream oss;
  212. oss << "Unexpected remote " << description.type() << " description in signaling state "
  213. << signalingState;
  214. throw std::logic_error(oss.str());
  215. }
  216. newSignalingState = SignalingState::Stable;
  217. break;
  218. default: {
  219. std::ostringstream oss;
  220. oss << "Unexpected remote description in signaling state " << signalingState;
  221. throw std::logic_error(oss.str());
  222. }
  223. }
  224. // Candidates will be added at the end, extract them for now
  225. auto remoteCandidates = description.extractCandidates();
  226. auto type = description.type();
  227. auto iceTransport = std::atomic_load(&mIceTransport);
  228. if (!iceTransport)
  229. iceTransport = initIceTransport(Description::Role::ActPass);
  230. iceTransport->setRemoteDescription(description);
  231. processRemoteDescription(std::move(description));
  232. changeSignalingState(newSignalingState);
  233. if (type == Description::Type::Offer) {
  234. // This is an offer, we need to answer
  235. setLocalDescription(Description::Type::Answer);
  236. } else {
  237. // This is an answer
  238. auto sctpTransport = std::atomic_load(&mSctpTransport);
  239. if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
  240. // Since we assumed passive role during DataChannel creation, we need to shift the
  241. // stream numbers by one to shift them from odd to even.
  242. std::unique_lock lock(mDataChannelsMutex); // we are going to swap the container
  243. decltype(mDataChannels) newDataChannels;
  244. auto it = mDataChannels.begin();
  245. while (it != mDataChannels.end()) {
  246. auto channel = it->second.lock();
  247. if (channel->stream() % 2 == 1)
  248. channel->mStream -= 1;
  249. newDataChannels.emplace(channel->stream(), channel);
  250. ++it;
  251. }
  252. std::swap(mDataChannels, newDataChannels);
  253. }
  254. }
  255. for (const auto &candidate : remoteCandidates)
  256. addRemoteCandidate(candidate);
  257. }
  258. void PeerConnection::addRemoteCandidate(Candidate candidate) {
  259. PLOG_VERBOSE << "Adding remote candidate: " << string(candidate);
  260. processRemoteCandidate(std::move(candidate));
  261. }
  262. std::optional<string> PeerConnection::localAddress() const {
  263. auto iceTransport = std::atomic_load(&mIceTransport);
  264. return iceTransport ? iceTransport->getLocalAddress() : nullopt;
  265. }
  266. std::optional<string> PeerConnection::remoteAddress() const {
  267. auto iceTransport = std::atomic_load(&mIceTransport);
  268. return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
  269. }
  270. shared_ptr<DataChannel> PeerConnection::addDataChannel(string label, DataChannelInit init) {
  271. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  272. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  273. // See https://tools.ietf.org/html/rfc5763#section-5
  274. // Therefore, we assume passive role when we are the offerer.
  275. auto iceTransport = std::atomic_load(&mIceTransport);
  276. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  277. auto channel = emplaceDataChannel(role, std::move(label), std::move(init));
  278. if (auto transport = std::atomic_load(&mSctpTransport))
  279. if (transport->state() == SctpTransport::State::Connected)
  280. channel->open(transport);
  281. // Renegotiation is needed iff the current local description does not have application
  282. std::lock_guard lock(mLocalDescriptionMutex);
  283. if (!mLocalDescription || !mLocalDescription->hasApplication())
  284. mNegotiationNeeded = true;
  285. return channel;
  286. }
  287. shared_ptr<DataChannel> PeerConnection::createDataChannel(string label, DataChannelInit init) {
  288. auto channel = addDataChannel(std::move(label), std::move(init));
  289. setLocalDescription();
  290. return channel;
  291. }
  292. void PeerConnection::onDataChannel(
  293. std::function<void(shared_ptr<DataChannel> dataChannel)> callback) {
  294. mDataChannelCallback = callback;
  295. }
  296. void PeerConnection::onLocalDescription(std::function<void(Description description)> callback) {
  297. mLocalDescriptionCallback = callback;
  298. }
  299. void PeerConnection::onLocalCandidate(std::function<void(Candidate candidate)> callback) {
  300. mLocalCandidateCallback = callback;
  301. }
  302. void PeerConnection::onStateChange(std::function<void(State state)> callback) {
  303. mStateChangeCallback = callback;
  304. }
  305. void PeerConnection::onGatheringStateChange(std::function<void(GatheringState state)> callback) {
  306. mGatheringStateChangeCallback = callback;
  307. }
  308. void PeerConnection::onSignalingStateChange(std::function<void(SignalingState state)> callback) {
  309. mSignalingStateChangeCallback = callback;
  310. }
  311. std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
  312. #if !RTC_ENABLE_MEDIA
  313. if (mTracks.empty()) {
  314. PLOG_WARNING << "Tracks will be inative (not compiled with SRTP support)";
  315. }
  316. #endif
  317. std::shared_ptr<Track> track;
  318. if (auto it = mTracks.find(description.mid()); it != mTracks.end())
  319. if (track = it->second.lock(); track)
  320. track->setDescription(std::move(description));
  321. if (!track) {
  322. track = std::make_shared<Track>(std::move(description));
  323. mTracks.emplace(std::make_pair(track->mid(), track));
  324. mTrackLines.emplace_back(track);
  325. }
  326. // Renegotiation is needed for the new or updated track
  327. mNegotiationNeeded = true;
  328. return track;
  329. }
  330. void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callback) {
  331. mTrackCallback = callback;
  332. }
  333. shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
  334. PLOG_VERBOSE << "Starting ICE transport";
  335. try {
  336. if (auto transport = std::atomic_load(&mIceTransport))
  337. return transport;
  338. auto transport = std::make_shared<IceTransport>(
  339. mConfig, role, weak_bind(&PeerConnection::processLocalCandidate, this, _1),
  340. [this, weak_this = weak_from_this()](IceTransport::State state) {
  341. auto shared_this = weak_this.lock();
  342. if (!shared_this)
  343. return;
  344. switch (state) {
  345. case IceTransport::State::Connecting:
  346. changeState(State::Connecting);
  347. break;
  348. case IceTransport::State::Failed:
  349. changeState(State::Failed);
  350. break;
  351. case IceTransport::State::Connected:
  352. initDtlsTransport();
  353. break;
  354. case IceTransport::State::Disconnected:
  355. changeState(State::Disconnected);
  356. break;
  357. default:
  358. // Ignore
  359. break;
  360. }
  361. },
  362. [this, weak_this = weak_from_this()](IceTransport::GatheringState state) {
  363. auto shared_this = weak_this.lock();
  364. if (!shared_this)
  365. return;
  366. switch (state) {
  367. case IceTransport::GatheringState::InProgress:
  368. changeGatheringState(GatheringState::InProgress);
  369. break;
  370. case IceTransport::GatheringState::Complete:
  371. endLocalCandidates();
  372. changeGatheringState(GatheringState::Complete);
  373. break;
  374. default:
  375. // Ignore
  376. break;
  377. }
  378. });
  379. std::atomic_store(&mIceTransport, transport);
  380. if (mState == State::Closed) {
  381. mIceTransport.reset();
  382. throw std::runtime_error("Connection is closed");
  383. }
  384. transport->start();
  385. return transport;
  386. } catch (const std::exception &e) {
  387. PLOG_ERROR << e.what();
  388. changeState(State::Failed);
  389. throw std::runtime_error("ICE transport initialization failed");
  390. }
  391. }
  392. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  393. PLOG_VERBOSE << "Starting DTLS transport";
  394. try {
  395. if (auto transport = std::atomic_load(&mDtlsTransport))
  396. return transport;
  397. auto certificate = mCertificate.get();
  398. auto lower = std::atomic_load(&mIceTransport);
  399. auto verifierCallback = weak_bind(&PeerConnection::checkFingerprint, this, _1);
  400. auto stateChangeCallback = [this,
  401. weak_this = weak_from_this()](DtlsTransport::State state) {
  402. auto shared_this = weak_this.lock();
  403. if (!shared_this)
  404. return;
  405. switch (state) {
  406. case DtlsTransport::State::Connected:
  407. if (auto remote = remoteDescription(); remote && remote->hasApplication())
  408. initSctpTransport();
  409. else
  410. changeState(State::Connected);
  411. mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this));
  412. break;
  413. case DtlsTransport::State::Failed:
  414. changeState(State::Failed);
  415. break;
  416. case DtlsTransport::State::Disconnected:
  417. changeState(State::Disconnected);
  418. break;
  419. default:
  420. // Ignore
  421. break;
  422. }
  423. };
  424. shared_ptr<DtlsTransport> transport;
  425. if (hasMedia()) {
  426. #if RTC_ENABLE_MEDIA
  427. PLOG_INFO << "This connection requires media support";
  428. // DTLS-SRTP
  429. transport = std::make_shared<DtlsSrtpTransport>(
  430. lower, certificate, verifierCallback,
  431. std::bind(&PeerConnection::forwardMedia, this, _1), stateChangeCallback);
  432. #else
  433. PLOG_WARNING << "Ignoring media support (not compiled with SRTP support)";
  434. #endif
  435. }
  436. if (!transport) {
  437. // DTLS only
  438. transport = std::make_shared<DtlsTransport>(lower, certificate, verifierCallback,
  439. stateChangeCallback);
  440. }
  441. std::atomic_store(&mDtlsTransport, transport);
  442. if (mState == State::Closed) {
  443. mDtlsTransport.reset();
  444. throw std::runtime_error("Connection is closed");
  445. }
  446. transport->start();
  447. return transport;
  448. } catch (const std::exception &e) {
  449. PLOG_ERROR << e.what();
  450. changeState(State::Failed);
  451. throw std::runtime_error("DTLS transport initialization failed");
  452. }
  453. }
  454. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  455. PLOG_VERBOSE << "Starting SCTP transport";
  456. try {
  457. if (auto transport = std::atomic_load(&mSctpTransport))
  458. return transport;
  459. auto remote = remoteDescription();
  460. if (!remote || !remote->application())
  461. throw std::logic_error("Starting SCTP transport without application description");
  462. uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  463. auto lower = std::atomic_load(&mDtlsTransport);
  464. auto transport = std::make_shared<SctpTransport>(
  465. lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
  466. weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  467. [this, weak_this = weak_from_this()](SctpTransport::State state) {
  468. auto shared_this = weak_this.lock();
  469. if (!shared_this)
  470. return;
  471. switch (state) {
  472. case SctpTransport::State::Connected:
  473. changeState(State::Connected);
  474. mProcessor->enqueue(std::bind(&PeerConnection::openDataChannels, this));
  475. break;
  476. case SctpTransport::State::Failed:
  477. LOG_WARNING << "SCTP transport failed";
  478. changeState(State::Failed);
  479. mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
  480. break;
  481. case SctpTransport::State::Disconnected:
  482. changeState(State::Disconnected);
  483. mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
  484. break;
  485. default:
  486. // Ignore
  487. break;
  488. }
  489. });
  490. std::atomic_store(&mSctpTransport, transport);
  491. if (mState == State::Closed) {
  492. mSctpTransport.reset();
  493. throw std::runtime_error("Connection is closed");
  494. }
  495. transport->start();
  496. return transport;
  497. } catch (const std::exception &e) {
  498. PLOG_ERROR << e.what();
  499. changeState(State::Failed);
  500. throw std::runtime_error("SCTP transport initialization failed");
  501. }
  502. }
  503. void PeerConnection::closeTransports() {
  504. PLOG_VERBOSE << "Closing transports";
  505. // Change state to sink state Closed
  506. if (!changeState(State::Closed))
  507. return; // already closed
  508. // Reset callbacks now that state is changed
  509. resetCallbacks();
  510. // Initiate transport stop on the processor after closing the data channels
  511. mProcessor->enqueue([this]() {
  512. // Pass the pointers to a thread
  513. auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
  514. auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
  515. auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
  516. ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
  517. if (sctp)
  518. sctp->stop();
  519. if (dtls)
  520. dtls->stop();
  521. if (ice)
  522. ice->stop();
  523. sctp.reset();
  524. dtls.reset();
  525. ice.reset();
  526. });
  527. });
  528. }
  529. void PeerConnection::endLocalCandidates() {
  530. std::lock_guard lock(mLocalDescriptionMutex);
  531. if (mLocalDescription)
  532. mLocalDescription->endCandidates();
  533. }
  534. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  535. std::lock_guard lock(mRemoteDescriptionMutex);
  536. if (auto expectedFingerprint =
  537. mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt) {
  538. return *expectedFingerprint == fingerprint;
  539. }
  540. return false;
  541. }
  542. void PeerConnection::forwardMessage(message_ptr message) {
  543. if (!message) {
  544. remoteCloseDataChannels();
  545. return;
  546. }
  547. uint16_t stream = uint16_t(message->stream);
  548. auto channel = findDataChannel(stream);
  549. if (!channel) {
  550. auto iceTransport = std::atomic_load(&mIceTransport);
  551. auto sctpTransport = std::atomic_load(&mSctpTransport);
  552. if (!iceTransport || !sctpTransport)
  553. return;
  554. const byte dataChannelOpenMessage{0x03};
  555. uint16_t remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  556. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  557. stream % 2 == remoteParity) {
  558. channel = std::make_shared<NegociatedDataChannel>(shared_from_this(), sctpTransport,
  559. message->stream);
  560. channel->onOpen(weak_bind(&PeerConnection::triggerDataChannel, this,
  561. weak_ptr<DataChannel>{channel}));
  562. mDataChannels.emplace(message->stream, channel);
  563. } else {
  564. // Invalid, close the DataChannel
  565. sctpTransport->closeStream(message->stream);
  566. return;
  567. }
  568. }
  569. channel->incoming(message);
  570. }
  571. void PeerConnection::forwardMedia(message_ptr message) {
  572. if (!message)
  573. return;
  574. // Browsers like to compound their packets with a random SSRC.
  575. // we have to do this monstrosity to distribute the report blocks
  576. std::optional<unsigned int> mediaLine;
  577. if (message->type == Message::Control) {
  578. size_t offset = 0;
  579. std::vector<SSRC> ssrcsFound;
  580. bool hasFound = false;
  581. while ((sizeof(rtc::RTCP_HEADER) + offset) <= message->size()) {
  582. auto header = (rtc::RTCP_HEADER *)(message->data() + offset);
  583. if (header->lengthInBytes() > message->size() - offset) {
  584. PLOG_WARNING << "Packet was truncated";
  585. break;
  586. }
  587. offset += header->lengthInBytes();
  588. if (header->payloadType() == 205 || header->payloadType() == 206) {
  589. auto rtcpfb = (RTCP_FB_HEADER *)header;
  590. auto ssrc = rtcpfb->getPacketSenderSSRC();
  591. if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
  592. mediaLine = getMLineFromSSRC(ssrc);
  593. if (mediaLine.has_value()) {
  594. hasFound = true;
  595. std::shared_lock lock(mTracksMutex); // read-only
  596. if (auto track = mTrackLines[*mediaLine].lock()) {
  597. track->incoming(message);
  598. }
  599. ssrcsFound.emplace_back(ssrc);
  600. }
  601. }
  602. ssrc = rtcpfb->getMediaSourceSSRC();
  603. if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
  604. mediaLine = getMLineFromSSRC(ssrc);
  605. if (mediaLine.has_value()) {
  606. hasFound = true;
  607. std::shared_lock lock(mTracksMutex); // read-only
  608. if (auto track = mTrackLines[*mediaLine].lock()) {
  609. track->incoming(message);
  610. }
  611. ssrcsFound.emplace_back(ssrc);
  612. }
  613. }
  614. } else if (header->payloadType() == 200 || header->payloadType() == 201) {
  615. auto rtcpsr = (RTCP_SR *)header;
  616. auto ssrc = rtcpsr->senderSSRC();
  617. if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
  618. mediaLine = getMLineFromSSRC(ssrc);
  619. if (mediaLine.has_value()) {
  620. hasFound = true;
  621. std::shared_lock lock(mTracksMutex); // read-only
  622. if (auto track = mTrackLines[*mediaLine].lock()) {
  623. track->incoming(message);
  624. }
  625. ssrcsFound.emplace_back(ssrc);
  626. }
  627. }
  628. for (int i = 0; i < rtcpsr->header.reportCount(); i++) {
  629. auto block = rtcpsr->getReportBlock(i);
  630. ssrc = block->getSSRC();
  631. if (std::find(ssrcsFound.begin(), ssrcsFound.end(), ssrc) == ssrcsFound.end()) {
  632. mediaLine = getMLineFromSSRC(ssrc);
  633. if (mediaLine.has_value()) {
  634. hasFound = true;
  635. std::shared_lock lock(mTracksMutex); // read-only
  636. if (auto track = mTrackLines[*mediaLine].lock()) {
  637. track->incoming(message);
  638. }
  639. ssrcsFound.emplace_back(ssrc);
  640. }
  641. }
  642. }
  643. } else {
  644. // PT=202 == SDES
  645. // PT=207 == Extended Report
  646. if (header->payloadType() != 202 && header->payloadType() != 207) {
  647. PLOG_WARNING << "Unknown packet type: " << (int)header->version() << " "
  648. << header->payloadType() << "";
  649. }
  650. }
  651. }
  652. if (hasFound)
  653. return;
  654. }
  655. unsigned int ssrc = message->stream;
  656. mediaLine = getMLineFromSSRC(ssrc);
  657. if (!mediaLine) {
  658. /* TODO
  659. * So the problem is that when stop sending streams, we stop getting report blocks for
  660. * those streams Therefore when we get compound RTCP packets, they are empty, and we can't
  661. * forward them. Therefore, it is expected that we don't know where to forward packets. Is
  662. * this ideal? No! Do I know how to fix it? No!
  663. */
  664. // PLOG_WARNING << "Track not found for SSRC " << ssrc << ", dropping";
  665. return;
  666. }
  667. std::shared_lock lock(mTracksMutex); // read-only
  668. if (auto track = mTrackLines[*mediaLine].lock()) {
  669. track->incoming(message);
  670. }
  671. }
  672. std::optional<unsigned int> PeerConnection::getMLineFromSSRC(SSRC ssrc) {
  673. if (auto it = mMLineFromSssrc.find(ssrc); it != mMLineFromSssrc.end()) {
  674. return it->second;
  675. } else {
  676. {
  677. std::lock_guard lock(mRemoteDescriptionMutex);
  678. if (!mRemoteDescription)
  679. return nullopt;
  680. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  681. if (std::visit(
  682. rtc::overloaded{[&](Description::Application *) -> bool { return false; },
  683. [&](Description::Media *media) -> bool {
  684. return media->hasSSRC(ssrc);
  685. }},
  686. mRemoteDescription->media(i))) {
  687. mMLineFromSssrc.emplace(ssrc, i);
  688. return i;
  689. }
  690. }
  691. }
  692. {
  693. std::lock_guard lock(mLocalDescriptionMutex);
  694. if (!mLocalDescription)
  695. return nullopt;
  696. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  697. if (std::visit(
  698. rtc::overloaded{[&](Description::Application *) -> bool { return false; },
  699. [&](Description::Media *media) -> bool {
  700. return media->hasSSRC(ssrc);
  701. }},
  702. mLocalDescription->media(i))) {
  703. mMLineFromSssrc.emplace(ssrc, i);
  704. return i;
  705. }
  706. }
  707. }
  708. }
  709. return std::nullopt;
  710. }
  711. std::optional<std::string> PeerConnection::getMidFromSSRC(SSRC ssrc) {
  712. if (auto it = mMidFromSssrc.find(ssrc); it != mMidFromSssrc.end()) {
  713. return it->second;
  714. } else {
  715. {
  716. std::lock_guard lock(mRemoteDescriptionMutex);
  717. if (!mRemoteDescription)
  718. return nullopt;
  719. for (unsigned int i = 0; i < mRemoteDescription->mediaCount(); ++i) {
  720. if (auto found = std::visit(
  721. rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
  722. return std::nullopt;
  723. },
  724. [&](Description::Media *media) -> std::optional<string> {
  725. return media->hasSSRC(ssrc)
  726. ? std::make_optional(media->mid())
  727. : nullopt;
  728. }},
  729. mRemoteDescription->media(i))) {
  730. mMidFromSssrc.emplace(ssrc, *found);
  731. return *found;
  732. }
  733. }
  734. }
  735. {
  736. std::lock_guard lock(mLocalDescriptionMutex);
  737. if (!mLocalDescription)
  738. return nullopt;
  739. for (unsigned int i = 0; i < mLocalDescription->mediaCount(); ++i) {
  740. if (auto found = std::visit(
  741. rtc::overloaded{[&](Description::Application *) -> std::optional<string> {
  742. return std::nullopt;
  743. },
  744. [&](Description::Media *media) -> std::optional<string> {
  745. return media->hasSSRC(ssrc)
  746. ? std::make_optional(media->mid())
  747. : nullopt;
  748. }},
  749. mLocalDescription->media(i))) {
  750. mMidFromSssrc.emplace(ssrc, *found);
  751. return *found;
  752. }
  753. }
  754. }
  755. }
  756. return nullopt;
  757. }
  758. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  759. if (auto channel = findDataChannel(stream))
  760. channel->triggerBufferedAmount(amount);
  761. }
  762. shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role, string label,
  763. DataChannelInit init) {
  764. std::unique_lock lock(mDataChannelsMutex); // we are going to emplace
  765. uint16_t stream;
  766. if (init.id) {
  767. stream = *init.id;
  768. if (stream == 65535)
  769. throw std::invalid_argument("Invalid DataChannel id");
  770. } else {
  771. // The active side must use streams with even identifiers, whereas the passive side must use
  772. // streams with odd identifiers.
  773. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09#section-6
  774. stream = (role == Description::Role::Active) ? 0 : 1;
  775. while (mDataChannels.find(stream) != mDataChannels.end()) {
  776. if (stream >= 65535 - 2)
  777. throw std::runtime_error("Too many DataChannels");
  778. stream += 2;
  779. }
  780. }
  781. // If the DataChannel is user-negotiated, do not negociate it here
  782. auto channel =
  783. init.negotiated
  784. ? std::make_shared<DataChannel>(shared_from_this(), stream, std::move(label),
  785. std::move(init.protocol), std::move(init.reliability))
  786. : std::make_shared<NegociatedDataChannel>(shared_from_this(), stream, std::move(label),
  787. std::move(init.protocol),
  788. std::move(init.reliability));
  789. mDataChannels.emplace(std::make_pair(stream, channel));
  790. return channel;
  791. }
  792. shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
  793. std::shared_lock lock(mDataChannelsMutex); // read-only
  794. if (auto it = mDataChannels.find(stream); it != mDataChannels.end())
  795. if (auto channel = it->second.lock())
  796. return channel;
  797. return nullptr;
  798. }
  799. void PeerConnection::iterateDataChannels(
  800. std::function<void(shared_ptr<DataChannel> channel)> func) {
  801. // Iterate
  802. {
  803. std::shared_lock lock(mDataChannelsMutex); // read-only
  804. auto it = mDataChannels.begin();
  805. while (it != mDataChannels.end()) {
  806. auto channel = it->second.lock();
  807. if (channel && !channel->isClosed())
  808. func(channel);
  809. ++it;
  810. }
  811. }
  812. // Cleanup
  813. {
  814. std::unique_lock lock(mDataChannelsMutex); // we are going to erase
  815. auto it = mDataChannels.begin();
  816. while (it != mDataChannels.end()) {
  817. if (!it->second.lock()) {
  818. it = mDataChannels.erase(it);
  819. continue;
  820. }
  821. ++it;
  822. }
  823. }
  824. }
  825. void PeerConnection::openDataChannels() {
  826. if (auto transport = std::atomic_load(&mSctpTransport))
  827. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  828. }
  829. void PeerConnection::closeDataChannels() {
  830. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  831. }
  832. void PeerConnection::remoteCloseDataChannels() {
  833. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
  834. }
  835. void PeerConnection::incomingTrack(Description::Media description) {
  836. std::unique_lock lock(mTracksMutex); // we are going to emplace
  837. #if !RTC_ENABLE_MEDIA
  838. if (mTracks.empty()) {
  839. PLOG_WARNING << "Tracks will be inative (not compiled with SRTP support)";
  840. }
  841. #endif
  842. if (mTracks.find(description.mid()) == mTracks.end()) {
  843. auto track = std::make_shared<Track>(std::move(description));
  844. mTracks.emplace(std::make_pair(track->mid(), track));
  845. mTrackLines.emplace_back(track);
  846. triggerTrack(track);
  847. }
  848. }
  849. void PeerConnection::openTracks() {
  850. #if RTC_ENABLE_MEDIA
  851. if (auto transport = std::atomic_load(&mDtlsTransport)) {
  852. auto srtpTransport = reinterpret_pointer_cast<DtlsSrtpTransport>(transport);
  853. std::shared_lock lock(mTracksMutex); // read-only
  854. for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
  855. if (auto track = it->second.lock())
  856. if (!track->isOpen())
  857. track->open(srtpTransport);
  858. }
  859. #endif
  860. }
  861. void PeerConnection::validateRemoteDescription(const Description &description) {
  862. if (!description.iceUfrag())
  863. throw std::invalid_argument("Remote description has no ICE user fragment");
  864. if (!description.icePwd())
  865. throw std::invalid_argument("Remote description has no ICE password");
  866. if (!description.fingerprint())
  867. throw std::invalid_argument("Remote description has no fingerprint");
  868. if (description.mediaCount() == 0)
  869. throw std::invalid_argument("Remote description has no media line");
  870. int activeMediaCount = 0;
  871. for (unsigned int i = 0; i < description.mediaCount(); ++i)
  872. std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
  873. [&](const Description::Media *media) {
  874. if (media->direction() != Description::Direction::Inactive)
  875. ++activeMediaCount;
  876. }},
  877. description.media(i));
  878. if (activeMediaCount == 0)
  879. throw std::invalid_argument("Remote description has no active media");
  880. if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
  881. if (*description.iceUfrag() == *local->iceUfrag() &&
  882. *description.icePwd() == *local->icePwd())
  883. throw std::logic_error("Got the local description as remote description");
  884. PLOG_VERBOSE << "Remote description looks valid";
  885. }
  886. void PeerConnection::processLocalDescription(Description description) {
  887. if (auto remote = remoteDescription()) {
  888. // Reciprocate remote description
  889. for (unsigned int i = 0; i < remote->mediaCount(); ++i)
  890. std::visit( // reciprocate each media
  891. rtc::overloaded{
  892. [&](Description::Application *remoteApp) {
  893. std::shared_lock lock(mDataChannelsMutex);
  894. if (!mDataChannels.empty()) {
  895. // Prefer local description
  896. Description::Application app(remoteApp->mid());
  897. app.setSctpPort(DEFAULT_SCTP_PORT);
  898. app.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  899. PLOG_DEBUG << "Adding application to local description, mid=\""
  900. << app.mid() << "\"";
  901. description.addMedia(std::move(app));
  902. return;
  903. }
  904. auto reciprocated = remoteApp->reciprocate();
  905. reciprocated.hintSctpPort(DEFAULT_SCTP_PORT);
  906. reciprocated.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  907. PLOG_DEBUG << "Reciprocating application in local description, mid=\""
  908. << reciprocated.mid() << "\"";
  909. description.addMedia(std::move(reciprocated));
  910. },
  911. [&](Description::Media *remoteMedia) {
  912. std::shared_lock lock(mTracksMutex);
  913. if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
  914. // Prefer local description
  915. if (auto track = it->second.lock()) {
  916. auto media = track->description();
  917. #if !RTC_ENABLE_MEDIA
  918. // No media support, mark as inactive
  919. media.setDirection(Description::Direction::Inactive);
  920. #endif
  921. PLOG_DEBUG
  922. << "Adding media to local description, mid=\"" << media.mid()
  923. << "\", active=" << std::boolalpha
  924. << (media.direction() != Description::Direction::Inactive);
  925. description.addMedia(std::move(media));
  926. } else {
  927. auto reciprocated = remoteMedia->reciprocate();
  928. reciprocated.setDirection(Description::Direction::Inactive);
  929. PLOG_DEBUG << "Adding inactive media to local description, mid=\""
  930. << reciprocated.mid() << "\"";
  931. description.addMedia(std::move(reciprocated));
  932. }
  933. return;
  934. }
  935. lock.unlock(); // we are going to call incomingTrack()
  936. auto reciprocated = remoteMedia->reciprocate();
  937. #if !RTC_ENABLE_MEDIA
  938. // No media support, mark as inactive
  939. reciprocated.setDirection(Description::Direction::Inactive);
  940. #endif
  941. incomingTrack(reciprocated);
  942. PLOG_DEBUG
  943. << "Reciprocating media in local description, mid=\""
  944. << reciprocated.mid() << "\", active=" << std::boolalpha
  945. << (reciprocated.direction() != Description::Direction::Inactive);
  946. description.addMedia(std::move(reciprocated));
  947. },
  948. },
  949. remote->media(i));
  950. }
  951. if (description.type() == Description::Type::Offer) {
  952. // This is an offer, add locally created data channels and tracks
  953. // Add application for data channels
  954. if (!description.hasApplication()) {
  955. std::shared_lock lock(mDataChannelsMutex);
  956. if (!mDataChannels.empty()) {
  957. Description::Application app("data");
  958. app.setSctpPort(DEFAULT_SCTP_PORT);
  959. app.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  960. PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
  961. << "\"";
  962. description.addMedia(std::move(app));
  963. }
  964. }
  965. // Add media for local tracks
  966. std::shared_lock lock(mTracksMutex);
  967. for (auto it = mTrackLines.begin(); it != mTrackLines.end(); ++it) {
  968. if (auto track = it->lock()) {
  969. if (description.hasMid(track->mid()))
  970. continue;
  971. auto media = track->description();
  972. #if !RTC_ENABLE_MEDIA
  973. // No media support, mark as inactive
  974. media.setDirection(Description::Direction::Inactive);
  975. #endif
  976. PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
  977. << "\", active=" << std::boolalpha
  978. << (media.direction() != Description::Direction::Inactive);
  979. description.addMedia(std::move(media));
  980. }
  981. }
  982. }
  983. // Set local fingerprint (wait for certificate if necessary)
  984. description.setFingerprint(mCertificate.get()->fingerprint());
  985. {
  986. // Set as local description
  987. std::lock_guard lock(mLocalDescriptionMutex);
  988. std::vector<Candidate> existingCandidates;
  989. if (mLocalDescription) {
  990. existingCandidates = mLocalDescription->extractCandidates();
  991. mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
  992. }
  993. mLocalDescription.emplace(std::move(description));
  994. mLocalDescription->addCandidates(std::move(existingCandidates));
  995. }
  996. mProcessor->enqueue([this, description = *mLocalDescription]() {
  997. PLOG_VERBOSE << "Issuing local description: " << description;
  998. mLocalDescriptionCallback(std::move(description));
  999. });
  1000. // Reciprocated tracks might need to be open
  1001. if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  1002. dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
  1003. mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this));
  1004. }
  1005. void PeerConnection::processLocalCandidate(Candidate candidate) {
  1006. std::lock_guard lock(mLocalDescriptionMutex);
  1007. if (!mLocalDescription)
  1008. throw std::logic_error("Got a local candidate without local description");
  1009. candidate.resolve(Candidate::ResolveMode::Simple); // for proper SDP generation later
  1010. mLocalDescription->addCandidate(candidate);
  1011. mProcessor->enqueue([this, candidate = std::move(candidate)]() {
  1012. PLOG_VERBOSE << "Issuing local candidate: " << candidate;
  1013. mLocalCandidateCallback(std::move(candidate));
  1014. });
  1015. }
  1016. void PeerConnection::processRemoteDescription(Description description) {
  1017. {
  1018. // Set as remote description
  1019. std::lock_guard lock(mRemoteDescriptionMutex);
  1020. std::vector<Candidate> existingCandidates;
  1021. if (mRemoteDescription)
  1022. existingCandidates = mRemoteDescription->extractCandidates();
  1023. mRemoteDescription.emplace(std::move(description));
  1024. mRemoteDescription->addCandidates(std::move(existingCandidates));
  1025. }
  1026. if (description.hasApplication()) {
  1027. auto dtlsTransport = std::atomic_load(&mDtlsTransport);
  1028. auto sctpTransport = std::atomic_load(&mSctpTransport);
  1029. if (!sctpTransport && dtlsTransport &&
  1030. dtlsTransport->state() == Transport::State::Connected)
  1031. initSctpTransport();
  1032. }
  1033. }
  1034. void PeerConnection::processRemoteCandidate(Candidate candidate) {
  1035. std::lock_guard lock(mRemoteDescriptionMutex);
  1036. auto iceTransport = std::atomic_load(&mIceTransport);
  1037. if (!mRemoteDescription || !iceTransport)
  1038. throw std::logic_error("Got a remote candidate without remote description");
  1039. candidate.hintMid(mRemoteDescription->bundleMid());
  1040. if (candidate.resolve(Candidate::ResolveMode::Simple)) {
  1041. iceTransport->addRemoteCandidate(candidate);
  1042. } else {
  1043. // OK, we might need a lookup, do it asynchronously
  1044. // We don't use the thread pool because we have no control on the timeout
  1045. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  1046. std::thread t([weakIceTransport, candidate]() mutable {
  1047. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  1048. if (auto iceTransport = weakIceTransport.lock())
  1049. iceTransport->addRemoteCandidate(candidate);
  1050. });
  1051. t.detach();
  1052. }
  1053. mRemoteDescription->addCandidate(std::move(candidate));
  1054. }
  1055. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  1056. auto dataChannel = weakDataChannel.lock();
  1057. if (!dataChannel)
  1058. return;
  1059. mProcessor->enqueue(
  1060. [this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
  1061. }
  1062. void PeerConnection::triggerTrack(std::shared_ptr<Track> track) {
  1063. mProcessor->enqueue([this, track = std::move(track)]() { mTrackCallback(track); });
  1064. }
  1065. bool PeerConnection::changeState(State state) {
  1066. State current;
  1067. do {
  1068. current = mState.load();
  1069. if (current == State::Closed)
  1070. return false;
  1071. if (current == state)
  1072. return false;
  1073. } while (!mState.compare_exchange_weak(current, state));
  1074. std::ostringstream s;
  1075. s << state;
  1076. PLOG_INFO << "Changed state to " << s.str();
  1077. if (state == State::Closed)
  1078. // This is the last state change, so we may steal the callback
  1079. mProcessor->enqueue([cb = std::move(mStateChangeCallback)]() { cb(State::Closed); });
  1080. else
  1081. mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
  1082. return true;
  1083. }
  1084. bool PeerConnection::changeGatheringState(GatheringState state) {
  1085. if (mGatheringState.exchange(state) == state)
  1086. return false;
  1087. std::ostringstream s;
  1088. s << state;
  1089. PLOG_INFO << "Changed gathering state to " << s.str();
  1090. mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
  1091. return true;
  1092. }
  1093. bool PeerConnection::changeSignalingState(SignalingState state) {
  1094. if (mSignalingState.exchange(state) == state)
  1095. return false;
  1096. std::ostringstream s;
  1097. s << state;
  1098. PLOG_INFO << "Changed signaling state to " << s.str();
  1099. mProcessor->enqueue([this, state] { mSignalingStateChangeCallback(state); });
  1100. return true;
  1101. }
  1102. void PeerConnection::resetCallbacks() {
  1103. // Unregister all callbacks
  1104. mDataChannelCallback = nullptr;
  1105. mLocalDescriptionCallback = nullptr;
  1106. mLocalCandidateCallback = nullptr;
  1107. mStateChangeCallback = nullptr;
  1108. mGatheringStateChangeCallback = nullptr;
  1109. }
  1110. bool PeerConnection::getSelectedCandidatePair([[maybe_unused]] Candidate *local,
  1111. [[maybe_unused]] Candidate *remote) {
  1112. auto iceTransport = std::atomic_load(&mIceTransport);
  1113. return iceTransport ? iceTransport->getSelectedCandidatePair(local, remote) : false;
  1114. }
  1115. void PeerConnection::clearStats() {
  1116. auto sctpTransport = std::atomic_load(&mSctpTransport);
  1117. if (sctpTransport)
  1118. return sctpTransport->clearStats();
  1119. }
  1120. size_t PeerConnection::bytesSent() {
  1121. auto sctpTransport = std::atomic_load(&mSctpTransport);
  1122. if (sctpTransport)
  1123. return sctpTransport->bytesSent();
  1124. return 0;
  1125. }
  1126. size_t PeerConnection::bytesReceived() {
  1127. auto sctpTransport = std::atomic_load(&mSctpTransport);
  1128. if (sctpTransport)
  1129. return sctpTransport->bytesReceived();
  1130. return 0;
  1131. }
  1132. std::optional<std::chrono::milliseconds> PeerConnection::rtt() {
  1133. auto sctpTransport = std::atomic_load(&mSctpTransport);
  1134. if (sctpTransport)
  1135. return sctpTransport->rtt();
  1136. return std::nullopt;
  1137. }
  1138. } // namespace rtc
  1139. std::ostream &operator<<(std::ostream &out, rtc::PeerConnection::State state) {
  1140. using State = rtc::PeerConnection::State;
  1141. const char *str;
  1142. switch (state) {
  1143. case State::New:
  1144. str = "new";
  1145. break;
  1146. case State::Connecting:
  1147. str = "connecting";
  1148. break;
  1149. case State::Connected:
  1150. str = "connected";
  1151. break;
  1152. case State::Disconnected:
  1153. str = "disconnected";
  1154. break;
  1155. case State::Failed:
  1156. str = "failed";
  1157. break;
  1158. case State::Closed:
  1159. str = "closed";
  1160. break;
  1161. default:
  1162. str = "unknown";
  1163. break;
  1164. }
  1165. return out << str;
  1166. }
  1167. std::ostream &operator<<(std::ostream &out, rtc::PeerConnection::GatheringState state) {
  1168. using GatheringState = rtc::PeerConnection::GatheringState;
  1169. const char *str;
  1170. switch (state) {
  1171. case GatheringState::New:
  1172. str = "new";
  1173. break;
  1174. case GatheringState::InProgress:
  1175. str = "in-progress";
  1176. break;
  1177. case GatheringState::Complete:
  1178. str = "complete";
  1179. break;
  1180. default:
  1181. str = "unknown";
  1182. break;
  1183. }
  1184. return out << str;
  1185. }
  1186. std::ostream &operator<<(std::ostream &out, rtc::PeerConnection::SignalingState state) {
  1187. using SignalingState = rtc::PeerConnection::SignalingState;
  1188. const char *str;
  1189. switch (state) {
  1190. case SignalingState::Stable:
  1191. str = "stable";
  1192. break;
  1193. case SignalingState::HaveLocalOffer:
  1194. str = "have-local-offer";
  1195. break;
  1196. case SignalingState::HaveRemoteOffer:
  1197. str = "have-remote-offer";
  1198. break;
  1199. case SignalingState::HaveLocalPranswer:
  1200. str = "have-local-pranswer";
  1201. break;
  1202. case SignalingState::HaveRemotePranswer:
  1203. str = "have-remote-pranswer";
  1204. break;
  1205. default:
  1206. str = "unknown";
  1207. break;
  1208. }
  1209. return out << str;
  1210. }