123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556 |
- /**
- * Copyright (c) 2019 Paul-Louis Ageneau
- *
- * This library is free software; you can redistribute it and/or
- * modify it under the terms of the GNU Lesser General Public
- * License as published by the Free Software Foundation; either
- * version 2.1 of the License, or (at your option) any later version.
- *
- * This library is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
- * Lesser General Public License for more details.
- *
- * You should have received a copy of the GNU Lesser General Public
- * License along with this library; if not, write to the Free Software
- * Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 02110-1301 USA
- */
- #include "peerconnection.hpp"
- #include "certificate.hpp"
- #include "dtlstransport.hpp"
- #include "icetransport.hpp"
- #include "include.hpp"
- #include "sctptransport.hpp"
- #include <iostream>
- #ifdef _WIN32
- #include <winsock2.h>
- #endif
- namespace rtc {
- using namespace std::placeholders;
- using std::shared_ptr;
- using std::weak_ptr;
- PeerConnection::PeerConnection() : PeerConnection(Configuration()) {
- #ifdef _WIN32
- WSADATA wsaData;
- if (WSAStartup(MAKEWORD(2, 2), &wsaData))
- throw std::runtime_error("WSAStartup failed, error=" + std::to_string(WSAGetLastError()));
- #endif
- }
- PeerConnection::PeerConnection(const Configuration &config)
- : mConfig(config), mCertificate(make_certificate("libdatachannel")), mState(State::New) {}
- PeerConnection::~PeerConnection() {
- changeState(State::Destroying);
- close();
- mSctpTransport.reset();
- mDtlsTransport.reset();
- mIceTransport.reset();
- #ifdef _WIN32
- WSACleanup();
- #endif
- }
- void PeerConnection::close() {
- // Close DataChannels
- closeDataChannels();
- // Close Transports
- for (int i = 0; i < 2; ++i) { // Make sure a transport wasn't spawn behind our back
- if (auto transport = std::atomic_load(&mSctpTransport))
- transport->stop();
- if (auto transport = std::atomic_load(&mDtlsTransport))
- transport->stop();
- if (auto transport = std::atomic_load(&mIceTransport))
- transport->stop();
- }
- changeState(State::Closed);
- }
- const Configuration *PeerConnection::config() const { return &mConfig; }
- PeerConnection::State PeerConnection::state() const { return mState; }
- PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
- std::optional<Description> PeerConnection::localDescription() const {
- std::lock_guard lock(mLocalDescriptionMutex);
- return mLocalDescription;
- }
- std::optional<Description> PeerConnection::remoteDescription() const {
- std::lock_guard lock(mRemoteDescriptionMutex);
- return mRemoteDescription;
- }
- void PeerConnection::setRemoteDescription(Description description) {
- description.hintType(localDescription() ? Description::Type::Answer : Description::Type::Offer);
- auto remoteCandidates = description.extractCandidates();
- std::lock_guard lock(mRemoteDescriptionMutex);
- mRemoteDescription.emplace(std::move(description));
- auto iceTransport = std::atomic_load(&mIceTransport);
- if (!iceTransport)
- iceTransport = initIceTransport(Description::Role::ActPass);
- iceTransport->setRemoteDescription(*mRemoteDescription);
- if (mRemoteDescription->type() == Description::Type::Offer) {
- // This is an offer and we are the answerer.
- processLocalDescription(iceTransport->getLocalDescription(Description::Type::Answer));
- iceTransport->gatherLocalCandidates();
- } else {
- // This is an answer and we are the offerer.
- auto sctpTransport = std::atomic_load(&mSctpTransport);
- if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
- // Since we assumed passive role during DataChannel creation, we need to shift the
- // stream numbers by one to shift them from odd to even.
- std::unique_lock lock(mDataChannelsMutex);
- decltype(mDataChannels) newDataChannels;
- auto it = mDataChannels.begin();
- while (it != mDataChannels.end()) {
- auto channel = it->second.lock();
- if (channel->stream() % 2 == 1)
- channel->mStream -= 1;
- newDataChannels.emplace(channel->stream(), channel);
- ++it;
- }
- std::swap(mDataChannels, newDataChannels);
- }
- }
- for (const auto &candidate : remoteCandidates)
- addRemoteCandidate(candidate);
- }
- void PeerConnection::addRemoteCandidate(Candidate candidate) {
- std::lock_guard lock(mRemoteDescriptionMutex);
- auto iceTransport = std::atomic_load(&mIceTransport);
- if (!mRemoteDescription || !iceTransport)
- throw std::logic_error("Remote candidate set without remote description");
- mRemoteDescription->addCandidate(candidate);
- if (candidate.resolve(Candidate::ResolveMode::Simple)) {
- iceTransport->addRemoteCandidate(candidate);
- } else {
- // OK, we might need a lookup, do it asynchronously
- weak_ptr<IceTransport> weakIceTransport{iceTransport};
- std::thread t([weakIceTransport, candidate]() mutable {
- if (candidate.resolve(Candidate::ResolveMode::Lookup))
- if (auto iceTransport = weakIceTransport.lock())
- iceTransport->addRemoteCandidate(candidate);
- });
- t.detach();
- }
- }
- std::optional<string> PeerConnection::localAddress() const {
- auto iceTransport = std::atomic_load(&mIceTransport);
- return iceTransport ? iceTransport->getLocalAddress() : nullopt;
- }
- std::optional<string> PeerConnection::remoteAddress() const {
- auto iceTransport = std::atomic_load(&mIceTransport);
- return iceTransport ? iceTransport->getRemoteAddress() : nullopt;
- }
- shared_ptr<DataChannel> PeerConnection::createDataChannel(const string &label,
- const string &protocol,
- const Reliability &reliability) {
- // RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
- // setup:passive. [...] Thus, setup:active is RECOMMENDED.
- // See https://tools.ietf.org/html/rfc5763#section-5
- // Therefore, we assume passive role when we are the offerer.
- auto iceTransport = std::atomic_load(&mIceTransport);
- auto role = iceTransport ? iceTransport->role() : Description::Role::Passive;
- auto channel = emplaceDataChannel(role, label, protocol, reliability);
- if (!iceTransport) {
- // RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
- // setup:actpass.
- // See https://tools.ietf.org/html/rfc5763#section-5
- iceTransport = initIceTransport(Description::Role::ActPass);
- processLocalDescription(iceTransport->getLocalDescription(Description::Type::Offer));
- iceTransport->gatherLocalCandidates();
- } else {
- if (auto transport = std::atomic_load(&mSctpTransport))
- if (transport->state() == SctpTransport::State::Connected)
- channel->open(transport);
- }
- return channel;
- }
- void PeerConnection::onDataChannel(
- std::function<void(shared_ptr<DataChannel> dataChannel)> callback) {
- mDataChannelCallback = callback;
- }
- void PeerConnection::onLocalDescription(
- std::function<void(const Description &description)> callback) {
- mLocalDescriptionCallback = callback;
- }
- void PeerConnection::onLocalCandidate(std::function<void(const Candidate &candidate)> callback) {
- mLocalCandidateCallback = callback;
- }
- void PeerConnection::onStateChange(std::function<void(State state)> callback) {
- mStateChangeCallback = callback;
- }
- void PeerConnection::onGatheringStateChange(std::function<void(GatheringState state)> callback) {
- mGatheringStateChangeCallback = callback;
- }
- shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
- try {
- std::lock_guard lock(mInitMutex);
- if (auto transport = std::atomic_load(&mIceTransport))
- return transport;
- auto transport = std::make_shared<IceTransport>(
- mConfig, role, std::bind(&PeerConnection::processLocalCandidate, this, _1),
- [this](IceTransport::State state) {
- switch (state) {
- case IceTransport::State::Connecting:
- changeState(State::Connecting);
- break;
- case IceTransport::State::Failed:
- changeState(State::Failed);
- break;
- case IceTransport::State::Connected:
- initDtlsTransport();
- break;
- case IceTransport::State::Disconnected:
- changeState(State::Disconnected);
- break;
- default:
- // Ignore
- break;
- }
- },
- [this](IceTransport::GatheringState state) {
- switch (state) {
- case IceTransport::GatheringState::InProgress:
- changeGatheringState(GatheringState::InProgress);
- break;
- case IceTransport::GatheringState::Complete:
- endLocalCandidates();
- changeGatheringState(GatheringState::Complete);
- break;
- default:
- // Ignore
- break;
- }
- });
- std::atomic_store(&mIceTransport, transport);
- return transport;
- } catch (const std::exception &e) {
- PLOG_ERROR << e.what();
- changeState(State::Failed);
- throw std::runtime_error("ICE transport initialization failed");
- }
- }
- shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
- try {
- std::lock_guard lock(mInitMutex);
- if (auto transport = std::atomic_load(&mDtlsTransport))
- return transport;
- auto lower = std::atomic_load(&mIceTransport);
- auto transport = std::make_shared<DtlsTransport>(
- lower, mCertificate, std::bind(&PeerConnection::checkFingerprint, this, _1),
- [this](DtlsTransport::State state) {
- switch (state) {
- case DtlsTransport::State::Connected:
- initSctpTransport();
- break;
- case DtlsTransport::State::Failed:
- changeState(State::Failed);
- break;
- case DtlsTransport::State::Disconnected:
- changeState(State::Disconnected);
- break;
- default:
- // Ignore
- break;
- }
- });
- std::atomic_store(&mDtlsTransport, transport);
- return transport;
- } catch (const std::exception &e) {
- PLOG_ERROR << e.what();
- changeState(State::Failed);
- throw std::runtime_error("DTLS transport initialization failed");
- }
- }
- shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
- try {
- std::lock_guard lock(mInitMutex);
- if (auto transport = std::atomic_load(&mSctpTransport))
- return transport;
- uint16_t sctpPort = remoteDescription()->sctpPort().value_or(DEFAULT_SCTP_PORT);
- auto lower = std::atomic_load(&mDtlsTransport);
- auto transport = std::make_shared<SctpTransport>(
- lower, sctpPort, std::bind(&PeerConnection::forwardMessage, this, _1),
- std::bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
- [this](SctpTransport::State state) {
- switch (state) {
- case SctpTransport::State::Connected:
- changeState(State::Connected);
- openDataChannels();
- break;
- case SctpTransport::State::Failed:
- remoteCloseDataChannels();
- changeState(State::Failed);
- break;
- case SctpTransport::State::Disconnected:
- remoteCloseDataChannels();
- changeState(State::Disconnected);
- break;
- default:
- // Ignore
- break;
- }
- });
- std::atomic_store(&mSctpTransport, transport);
- return transport;
- } catch (const std::exception &e) {
- PLOG_ERROR << e.what();
- changeState(State::Failed);
- throw std::runtime_error("SCTP transport initialization failed");
- }
- }
- void PeerConnection::endLocalCandidates() {
- std::lock_guard lock(mLocalDescriptionMutex);
- if (mLocalDescription)
- mLocalDescription->endCandidates();
- }
- bool PeerConnection::checkFingerprint(const std::string &fingerprint) const {
- std::lock_guard lock(mRemoteDescriptionMutex);
- if (auto expectedFingerprint =
- mRemoteDescription ? mRemoteDescription->fingerprint() : nullopt) {
- return *expectedFingerprint == fingerprint;
- }
- return false;
- }
- void PeerConnection::forwardMessage(message_ptr message) {
- if (!message) {
- remoteCloseDataChannels();
- return;
- }
- auto channel = findDataChannel(message->stream);
- auto iceTransport = std::atomic_load(&mIceTransport);
- auto sctpTransport = std::atomic_load(&mSctpTransport);
- if (!iceTransport || !sctpTransport)
- return;
- if (!channel) {
- const byte dataChannelOpenMessage{0x03};
- unsigned int remoteParity = (iceTransport->role() == Description::Role::Active) ? 1 : 0;
- if (message->type == Message::Control && *message->data() == dataChannelOpenMessage &&
- message->stream % 2 == remoteParity) {
- channel =
- std::make_shared<DataChannel>(shared_from_this(), sctpTransport, message->stream);
- channel->onOpen(std::bind(&PeerConnection::triggerDataChannel, this,
- weak_ptr<DataChannel>{channel}));
- mDataChannels.insert(std::make_pair(message->stream, channel));
- } else {
- // Invalid, close the DataChannel by resetting the stream
- sctpTransport->reset(message->stream);
- return;
- }
- }
- channel->incoming(message);
- }
- void PeerConnection::forwardBufferedAmount(uint16_t stream, size_t amount) {
- if (auto channel = findDataChannel(stream))
- channel->triggerBufferedAmount(amount);
- }
- shared_ptr<DataChannel> PeerConnection::emplaceDataChannel(Description::Role role,
- const string &label,
- const string &protocol,
- const Reliability &reliability) {
- // The active side must use streams with even identifiers, whereas the passive side must use
- // streams with odd identifiers.
- // See https://tools.ietf.org/html/draft-ietf-rtcweb-data-protocol-09#section-6
- std::unique_lock lock(mDataChannelsMutex);
- unsigned int stream = (role == Description::Role::Active) ? 0 : 1;
- while (mDataChannels.find(stream) != mDataChannels.end()) {
- stream += 2;
- if (stream >= 65535)
- throw std::runtime_error("Too many DataChannels");
- }
- auto channel =
- std::make_shared<DataChannel>(shared_from_this(), stream, label, protocol, reliability);
- mDataChannels.emplace(std::make_pair(stream, channel));
- return channel;
- }
- shared_ptr<DataChannel> PeerConnection::findDataChannel(uint16_t stream) {
- std::shared_lock lock(mDataChannelsMutex);
- shared_ptr<DataChannel> channel;
- if (auto it = mDataChannels.find(stream); it != mDataChannels.end()) {
- channel = it->second.lock();
- if (!channel || channel->isClosed()) {
- mDataChannels.erase(it);
- channel.reset();
- }
- }
- return channel;
- }
- void PeerConnection::iterateDataChannels(
- std::function<void(shared_ptr<DataChannel> channel)> func) {
- std::shared_lock lock(mDataChannelsMutex);
- auto it = mDataChannels.begin();
- while (it != mDataChannels.end()) {
- auto channel = it->second.lock();
- if (!channel || channel->isClosed()) {
- it = mDataChannels.erase(it);
- continue;
- }
- func(channel);
- ++it;
- }
- }
- void PeerConnection::openDataChannels() {
- if (auto transport = std::atomic_load(&mSctpTransport))
- iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->open(transport); });
- }
- void PeerConnection::closeDataChannels() {
- iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->close(); });
- }
- void PeerConnection::remoteCloseDataChannels() {
- iterateDataChannels([&](shared_ptr<DataChannel> channel) { channel->remoteClose(); });
- }
- void PeerConnection::processLocalDescription(Description description) {
- std::optional<uint16_t> remoteSctpPort;
- if (auto remote = remoteDescription())
- remoteSctpPort = remote->sctpPort();
- std::lock_guard lock(mLocalDescriptionMutex);
- mLocalDescription.emplace(std::move(description));
- mLocalDescription->setFingerprint(mCertificate->fingerprint());
- mLocalDescription->setSctpPort(remoteSctpPort.value_or(DEFAULT_SCTP_PORT));
- mLocalDescription->setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
- mLocalDescriptionCallback(*mLocalDescription);
- }
- void PeerConnection::processLocalCandidate(Candidate candidate) {
- std::lock_guard lock(mLocalDescriptionMutex);
- if (!mLocalDescription)
- throw std::logic_error("Got a local candidate without local description");
- mLocalDescription->addCandidate(candidate);
- mLocalCandidateCallback(candidate);
- }
- void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
- auto dataChannel = weakDataChannel.lock();
- if (!dataChannel)
- return;
- mDataChannelCallback(dataChannel);
- }
- void PeerConnection::changeState(State state) {
- State current;
- do {
- current = mState.load();
- if (current == state || current == State::Destroying)
- return;
- } while (!mState.compare_exchange_weak(current, state));
- if (state != State::Destroying)
- mStateChangeCallback(state);
- }
- void PeerConnection::changeGatheringState(GatheringState state) {
- if (mGatheringState.exchange(state) != state)
- mGatheringStateChangeCallback(state);
- }
- } // namespace rtc
- std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &state) {
- using State = rtc::PeerConnection::State;
- std::string str;
- switch (state) {
- case State::New:
- str = "new";
- break;
- case State::Connecting:
- str = "connecting";
- break;
- case State::Connected:
- str = "connected";
- break;
- case State::Disconnected:
- str = "disconnected";
- break;
- case State::Failed:
- str = "failed";
- break;
- case State::Closed:
- str = "closed";
- break;
- case State::Destroying:
- str = "destroying";
- break;
- default:
- str = "unknown";
- break;
- }
- return out << str;
- }
- std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::GatheringState &state) {
- using GatheringState = rtc::PeerConnection::GatheringState;
- std::string str;
- switch (state) {
- case GatheringState::New:
- str = "new";
- break;
- case GatheringState::InProgress:
- str = "in_progress";
- break;
- case GatheringState::Complete:
- str = "complete";
- break;
- default:
- str = "unknown";
- break;
- }
- return out << str;
- }
|