peerconnection.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460
  1. /**
  2. * Copyright (c) 2019 Paul-Louis Ageneau
  3. *
  4. * This library is free software; you can redistribute it and/or
  5. * modify it under the terms of the GNU Lesser General Public
  6. * License as published by the Free Software Foundation; either
  7. * version 2.1 of the License, or (at your option) any later version.
  8. *
  9. * This library is distributed in the hope that it will be useful,
  10. * but WITHOUT ANY WARRANTY; without even the implied warranty of
  11. * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
  12. * Lesser General Public License for more details.
  13. *
  14. * You should have received a copy of the GNU Lesser General Public
  15. * License along with this library; if not, write to the Free Software
  16. * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
  17. */
  18. #include "peerconnection.hpp"
  19. #include "certificate.hpp"
  20. #include "dtlstransport.hpp"
  21. #include "icetransport.hpp"
  22. #include "include.hpp"
  23. #include "sctptransport.hpp"
  24. #include <iostream>
  25. namespace rtc {
  26. using namespace std::placeholders;
  27. using std::shared_ptr;
  28. using std::weak_ptr;
  29. PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
  30. PeerConnection::PeerConnection(const Configuration &config)
  31. : mConfig(config), mCertificate(make_certificate("libdatachannel")), mState(State::New) {}
  32. PeerConnection::~PeerConnection() {
  33. if (auto transport = std::atomic_load(&mIceTransport))
  34. transport->stop();
  35. if (auto transport = std::atomic_load(&mDtlsTransport))
  36. transport->stop();
  37. if (auto transport = std::atomic_load(&mSctpTransport))
  38. transport->stop();
  39. mSctpTransport.reset();
  40. mDtlsTransport.reset();
  41. mIceTransport.reset();
  42. }
  43. const Configuration *PeerConnection::config() const { return &mConfig; }
  44. PeerConnection::State PeerConnection::state() const { return mState; }
  45. PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
  46. std::optional<Description> PeerConnection::localDescription() const {
  47. std::lock_guard lock(mLocalDescriptionMutex);
  48. return mLocalDescription;
  49. }
  50. std::optional<Description> PeerConnection::remoteDescription() const {
  51. std::lock_guard lock(mRemoteDescriptionMutex);
  52. return mRemoteDescription;
  53. }
  54. void PeerConnection::setRemoteDescription(Description description) {
  55. std::lock_guard lock(mRemoteDescriptionMutex);
  56. auto remoteCandidates = description.extractCandidates();
  57. mRemoteDescription.emplace(std::move(description));
  58. auto iceTransport = std::atomic_load(&mIceTransport);
  59. if (!iceTransport)
  60. iceTransport = initIceTransport(Description::Role::ActPass);
  61. iceTransport->setRemoteDescription(*mRemoteDescription);
  62. if (mRemoteDescription->type() == Description::Type::Offer) {
  63. // This is an offer and we are the answerer.
  64. processLocalDescription(iceTransport->getLocalDescription(Description::Type::Answer));
  65. iceTransport->gatherLocalCandidates();
  66. } else {
  67. // This is an answer and we are the offerer.
  68. auto sctpTransport = std::atomic_load(&mSctpTransport);
  69. if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
  70. // Since we assumed passive role during DataChannel creation, we need to shift the
  71. // stream numbers by one to shift them from odd to even.
  72. decltype(mDataChannels) newDataChannels;
  73. iterateDataChannels([&](shared_ptr<DataChannel> channel) {
  74. if (channel->stream() % 2 == 1)
  75. channel->mStream -= 1;
  76. newDataChannels.emplace(channel->stream(), channel);
  77. });
  78. std::swap(mDataChannels, newDataChannels);
  79. }
  80. }
  81. for (const auto &candidate : remoteCandidates)
  82. addRemoteCandidate(candidate);
  83. }
  84. void PeerConnection::addRemoteCandidate(Candidate candidate) {
  85. std::lock_guard lock(mRemoteDescriptionMutex);
  86. auto iceTransport = std::atomic_load(&mIceTransport);
  87. if (!mRemoteDescription || !iceTransport)
  88. throw std::logic_error("Remote candidate set without remote description");
  89. mRemoteDescription->addCandidate(candidate);
  90. if (candidate.resolve(Candidate::ResolveMode::Simple)) {
  91. iceTransport->addRemoteCandidate(candidate);
  92. } else {
  93. // OK, we might need a lookup, do it asynchronously
  94. weak_ptr<IceTransport> weakIceTransport{iceTransport};
  95. std::thread t([weakIceTransport, candidate]() mutable {
  96. if (candidate.resolve(Candidate::ResolveMode::Lookup))
  97. if (auto iceTransport = weakIceTransport.lock())
  98. iceTransport->addRemoteCandidate(candidate);
  99. });
  100. t.detach();
  101. }
  102. }
  103. std::optional<string> PeerConnection::localAddress() const {
  104. auto iceTransport = std::atomic_load(&mIceTransport);
  105. return iceTransport ? iceTransport->getLocalAddress() : nullopt;
  106. }
  107. std::optional<string> PeerConnection::remoteAddress() const {
  108. auto iceTransport = std::atomic_load(&mIceTransport);
  109. return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
  110. }
  111. shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
  112. const string &protocol,
  113. const Reliability &reliability) {
  114. // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
  115. // setup:passive. [...] Thus, setup:active is RECOMMENDED.
  116. // See https://tools.ietf.org/html/rfc5763#section-5
  117. // Therefore, we assume passive role when we are the offerer.
  118. auto iceTransport = std::atomic_load(&mIceTransport);
  119. auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
  120. // The active side must use streams with even identifiers, whereas the passive side must use
  121. // streams with odd identifiers.
  122. // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09#section-6
  123. unsigned int stream = (role == Description::Role::Active) ? 0 : 1;
  124. while (mDataChannels.find(stream) != mDataChannels.end()) {
  125. stream += 2;
  126. if (stream >= 65535)
  127. throw std::runtime_error("Too many DataChannels");
  128. }
  129. auto channel =
  130. std::make_shared<DataChannel>(shared_from_this(), stream, label, protocol, reliability);
  131. mDataChannels.insert(std::make_pair(stream, channel));
  132. if (!iceTransport) {
  133. // RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
  134. // setup:actpass.
  135. // See https://tools.ietf.org/html/rfc5763#section-5
  136. iceTransport = initIceTransport(Description::Role::ActPass);
  137. processLocalDescription(iceTransport->getLocalDescription(Description::Type::Offer));
  138. iceTransport->gatherLocalCandidates();
  139. } else {
  140. if (auto transport = std::atomic_load(&mSctpTransport))
  141. if (transport->state() == SctpTransport::State::Connected)
  142. channel->open(transport);
  143. }
  144. return channel;
  145. }
  146. void PeerConnection::onDataChannel(
  147. std::function<void(shared_ptr<DataChannel> dataChannel)> callback) {
  148. mDataChannelCallback = callback;
  149. }
  150. void PeerConnection::onLocalDescription(
  151. std::function<void(const Description &description)> callback) {
  152. mLocalDescriptionCallback = callback;
  153. }
  154. void PeerConnection::onLocalCandidate(std::function<void(const Candidate &candidate)> callback) {
  155. mLocalCandidateCallback = callback;
  156. }
  157. void PeerConnection::onStateChange(std::function<void(State state)> callback) {
  158. mStateChangeCallback = callback;
  159. }
  160. void PeerConnection::onGatheringStateChange(std::function<void(GatheringState state)> callback) {
  161. mGatheringStateChangeCallback = callback;
  162. }
  163. shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
  164. auto transport = std::make_shared<IceTransport>(
  165. mConfig, role, std::bind(&PeerConnection::processLocalCandidate, this, _1),
  166. [this](IceTransport::State state) {
  167. switch (state) {
  168. case IceTransport::State::Connecting:
  169. changeState(State::Connecting);
  170. break;
  171. case IceTransport::State::Failed:
  172. changeState(State::Failed);
  173. break;
  174. case IceTransport::State::Connected:
  175. initDtlsTransport();
  176. break;
  177. default:
  178. // Ignore
  179. break;
  180. }
  181. },
  182. [this](IceTransport::GatheringState state) {
  183. switch (state) {
  184. case IceTransport::GatheringState::InProgress:
  185. changeGatheringState(GatheringState::InProgress);
  186. break;
  187. case IceTransport::GatheringState::Complete:
  188. if (mLocalDescription)
  189. mLocalDescription->endCandidates();
  190. changeGatheringState(GatheringState::Complete);
  191. break;
  192. default:
  193. // Ignore
  194. break;
  195. }
  196. });
  197. std::atomic_store(&mIceTransport, transport);
  198. return transport;
  199. }
  200. shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
  201. auto lower = std::atomic_load(&mIceTransport);
  202. auto transport = std::make_shared<DtlsTransport>(
  203. lower, mCertificate, std::bind(&PeerConnection::checkFingerprint, this, _1),
  204. [this](DtlsTransport::State state) {
  205. switch (state) {
  206. case DtlsTransport::State::Connected:
  207. initSctpTransport();
  208. break;
  209. case DtlsTransport::State::Failed:
  210. changeState(State::Failed);
  211. break;
  212. default:
  213. // Ignore
  214. break;
  215. }
  216. });
  217. std::atomic_store(&mDtlsTransport, transport);
  218. return transport;
  219. }
  220. shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
  221. uint16_t sctpPort = remoteDescription()->sctpPort().value_or(DEFAULT_SCTP_PORT);
  222. auto lower = std::atomic_load(&mDtlsTransport);
  223. auto transport = std::make_shared<SctpTransport>(
  224. lower, sctpPort, std::bind(&PeerConnection::forwardMessage, this, _1),
  225. std::bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
  226. [this](SctpTransport::State state) {
  227. switch (state) {
  228. case SctpTransport::State::Connected:
  229. changeState(State::Connected);
  230. openDataChannels();
  231. break;
  232. case SctpTransport::State::Failed:
  233. changeState(State::Failed);
  234. break;
  235. case SctpTransport::State::Disconnected:
  236. changeState(State::Disconnected);
  237. break;
  238. default:
  239. // Ignore
  240. break;
  241. }
  242. });
  243. std::atomic_store(&mSctpTransport, transport);
  244. return transport;
  245. }
  246. bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
  247. std::lock_guard lock(mRemoteDescriptionMutex);
  248. if (auto expectedFingerprint =
  249. mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt) {
  250. return *expectedFingerprint == fingerprint;
  251. }
  252. return false;
  253. }
  254. void PeerConnection::forwardMessage(message_ptr message) {
  255. if (!message) {
  256. closeDataChannels();
  257. return;
  258. }
  259. shared_ptr<DataChannel> channel;
  260. if (auto it = mDataChannels.find(message->stream); it != mDataChannels.end()) {
  261. channel = it->second.lock();
  262. if (!channel || channel->isClosed()) {
  263. mDataChannels.erase(it);
  264. channel = nullptr;
  265. }
  266. }
  267. auto iceTransport = std::atomic_load(&mIceTransport);
  268. auto sctpTransport = std::atomic_load(&mSctpTransport);
  269. if (!iceTransport || !sctpTransport)
  270. return;
  271. if (!channel) {
  272. const byte dataChannelOpenMessage{0x03};
  273. unsigned int remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
  274. if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
  275. message->stream % 2 == remoteParity) {
  276. channel =
  277. std::make_shared<DataChannel>(shared_from_this(), sctpTransport, message->stream);
  278. channel->onOpen(std::bind(&PeerConnection::triggerDataChannel, this,
  279. weak_ptr<DataChannel>{channel}));
  280. mDataChannels.insert(std::make_pair(message->stream, channel));
  281. } else {
  282. // Invalid, close the DataChannel by resetting the stream
  283. sctpTransport->reset(message->stream);
  284. return;
  285. }
  286. }
  287. channel->incoming(message);
  288. }
  289. void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
  290. shared_ptr<DataChannel> channel;
  291. if (auto it = mDataChannels.find(stream); it != mDataChannels.end()) {
  292. channel = it->second.lock();
  293. if (!channel || channel->isClosed()) {
  294. mDataChannels.erase(it);
  295. channel = nullptr;
  296. }
  297. }
  298. if (channel)
  299. channel->triggerBufferedAmount(amount);
  300. }
  301. void PeerConnection::iterateDataChannels(
  302. std::function<void(shared_ptr<DataChannel> channel)> func) {
  303. auto it = mDataChannels.begin();
  304. while (it != mDataChannels.end()) {
  305. auto channel = it->second.lock();
  306. if (!channel || channel->isClosed()) {
  307. it = mDataChannels.erase(it);
  308. continue;
  309. }
  310. func(channel);
  311. ++it;
  312. }
  313. }
  314. void PeerConnection::openDataChannels() {
  315. if (auto transport = std::atomic_load(&mSctpTransport))
  316. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
  317. }
  318. void PeerConnection::closeDataChannels() {
  319. iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
  320. }
  321. void PeerConnection::processLocalDescription(Description description) {
  322. std::optional<uint16_t> remoteSctpPort;
  323. if (auto remote = remoteDescription())
  324. remoteSctpPort = remote->sctpPort();
  325. std::lock_guard lock(mLocalDescriptionMutex);
  326. mLocalDescription.emplace(std::move(description));
  327. mLocalDescription->setFingerprint(mCertificate->fingerprint());
  328. mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
  329. mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
  330. mLocalDescriptionCallback(*mLocalDescription);
  331. }
  332. void PeerConnection::processLocalCandidate(Candidate candidate) {
  333. std::lock_guard lock(mLocalDescriptionMutex);
  334. if (!mLocalDescription)
  335. throw std::logic_error("Got a local candidate without local description");
  336. mLocalDescription->addCandidate(candidate);
  337. mLocalCandidateCallback(candidate);
  338. }
  339. void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
  340. auto dataChannel = weakDataChannel.lock();
  341. if (!dataChannel)
  342. return;
  343. mDataChannelCallback(dataChannel);
  344. }
  345. void PeerConnection::changeState(State state) {
  346. if (mState.exchange(state) != state)
  347. mStateChangeCallback(state);
  348. }
  349. void PeerConnection::changeGatheringState(GatheringState state) {
  350. if (mGatheringState.exchange(state) != state)
  351. mGatheringStateChangeCallback(state);
  352. }
  353. } // namespace rtc
  354. std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &state) {
  355. using State = rtc::PeerConnection::State;
  356. std::string str;
  357. switch (state) {
  358. case State::New:
  359. str = "new";
  360. break;
  361. case State::Connecting:
  362. str = "connecting";
  363. break;
  364. case State::Connected:
  365. str = "connected";
  366. break;
  367. case State::Disconnected:
  368. str = "disconnected";
  369. break;
  370. case State::Failed:
  371. str = "failed";
  372. break;
  373. default:
  374. str = "unknown";
  375. break;
  376. }
  377. return out << str;
  378. }
  379. std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::GatheringState &state) {
  380. using GatheringState = rtc::PeerConnection::GatheringState;
  381. std::string str;
  382. switch (state) {
  383. case GatheringState::New:
  384. str = "new";
  385. break;
  386. case GatheringState::InProgress:
  387. str = "in_progress";
  388. break;
  389. case GatheringState::Complete:
  390. str = "complete";
  391. break;
  392. default:
  393. str = "unknown";
  394. break;
  395. }
  396. return out << str;
  397. }