|
@@ -44,7 +44,8 @@ PeerConnection::PeerConnection() : PeerConnection(Configuration()) {}
|
|
|
|
|
|
PeerConnection::PeerConnection(const Configuration &config)
|
|
|
: mConfig(config), mCertificate(make_certificate()), mProcessor(std::make_unique<Processor>()),
|
|
|
- mState(State::New), mGatheringState(GatheringState::New) {
|
|
|
+ mState(State::New), mGatheringState(GatheringState::New),
|
|
|
+ mSignalingState(SignalingState::Stable), mNegotiationNeeded(false) {
|
|
|
PLOG_VERBOSE << "Creating PeerConnection";
|
|
|
|
|
|
if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
|
|
@@ -60,6 +61,8 @@ PeerConnection::~PeerConnection() {
|
|
|
void PeerConnection::close() {
|
|
|
PLOG_VERBOSE << "Closing PeerConnection";
|
|
|
|
|
|
+ mNegotiationNeeded = false;
|
|
|
+
|
|
|
// Close data channels asynchronously
|
|
|
mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
|
|
|
|
|
@@ -72,6 +75,8 @@ PeerConnection::State PeerConnection::state() const { return mState; }
|
|
|
|
|
|
PeerConnection::GatheringState PeerConnection::gatheringState() const { return mGatheringState; }
|
|
|
|
|
|
+PeerConnection::SignalingState PeerConnection::signalingState() const { return mSignalingState; }
|
|
|
+
|
|
|
std::optional<Description> PeerConnection::localDescription() const {
|
|
|
std::lock_guard lock(mLocalDescriptionMutex);
|
|
|
return mLocalDescription;
|
|
@@ -97,88 +102,178 @@ bool PeerConnection::hasMedia() const {
|
|
|
return local && local->hasAudioOrVideo();
|
|
|
}
|
|
|
|
|
|
-void PeerConnection::setLocalDescription() {
|
|
|
- PLOG_VERBOSE << "Setting local description";
|
|
|
+void PeerConnection::setLocalDescription(Description::Type type) {
|
|
|
+ PLOG_VERBOSE << "Setting local description, type=" << Description::typeToString(type);
|
|
|
+
|
|
|
+ SignalingState signalingState = mSignalingState.load();
|
|
|
+ if (type == Description::Type::Rollback) {
|
|
|
+ if (signalingState == SignalingState::HaveLocalOffer ||
|
|
|
+ signalingState == SignalingState::HaveLocalPranswer) {
|
|
|
+ PLOG_DEBUG << "Rolling back pending local description";
|
|
|
+
|
|
|
+ std::unique_lock lock(mLocalDescriptionMutex);
|
|
|
+ if (mCurrentLocalDescription) {
|
|
|
+ std::vector<Candidate> existingCandidates;
|
|
|
+ if (mLocalDescription)
|
|
|
+ existingCandidates = mLocalDescription->extractCandidates();
|
|
|
|
|
|
- if (std::atomic_load(&mIceTransport)) {
|
|
|
- PLOG_DEBUG << "Local description is already set, ignoring";
|
|
|
+ mLocalDescription.emplace(std::move(*mCurrentLocalDescription));
|
|
|
+ mLocalDescription->addCandidates(std::move(existingCandidates));
|
|
|
+ mCurrentLocalDescription.reset();
|
|
|
+ }
|
|
|
+ lock.unlock();
|
|
|
+
|
|
|
+ changeSignalingState(SignalingState::Stable);
|
|
|
+ }
|
|
|
return;
|
|
|
}
|
|
|
|
|
|
- // RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
|
|
|
- // setup:actpass.
|
|
|
- // See https://tools.ietf.org/html/rfc5763#section-5
|
|
|
- auto iceTransport = initIceTransport(Description::Role::ActPass);
|
|
|
- Description localDescription = iceTransport->getLocalDescription(Description::Type::Offer);
|
|
|
- processLocalDescription(localDescription);
|
|
|
- iceTransport->gatherLocalCandidates();
|
|
|
-}
|
|
|
+ // Guess the description type if unspecified
|
|
|
+ if (type == Description::Type::Unspec) {
|
|
|
+ if (mSignalingState == SignalingState::HaveRemoteOffer)
|
|
|
+ type = Description::Type::Answer;
|
|
|
+ else
|
|
|
+ type = Description::Type::Offer;
|
|
|
+ }
|
|
|
|
|
|
-void PeerConnection::setRemoteDescription(Description description) {
|
|
|
- PLOG_VERBOSE << "Setting remote description: " << string(description);
|
|
|
+ // Only a local offer resets the negotiation needed flag
|
|
|
+ if (type == Description::Type::Offer && !mNegotiationNeeded.exchange(false)) {
|
|
|
+ PLOG_DEBUG << "No negotiation needed";
|
|
|
+ return;
|
|
|
+ }
|
|
|
|
|
|
- if (hasRemoteDescription())
|
|
|
- throw std::logic_error("Remote description is already set");
|
|
|
+ // Get the new signaling state
|
|
|
+ SignalingState newSignalingState;
|
|
|
+ switch (signalingState) {
|
|
|
+ case SignalingState::Stable:
|
|
|
+ if (type != Description::Type::Offer) {
|
|
|
+ std::ostringstream oss;
|
|
|
+ oss << "Unexpected local desciption type " << type << " in signaling state "
|
|
|
+ << signalingState;
|
|
|
+ throw std::logic_error(oss.str());
|
|
|
+ }
|
|
|
+ newSignalingState = SignalingState::HaveLocalOffer;
|
|
|
+ break;
|
|
|
|
|
|
- if (description.mediaCount() == 0)
|
|
|
- throw std::invalid_argument("Remote description has no media line");
|
|
|
+ case SignalingState::HaveRemoteOffer:
|
|
|
+ case SignalingState::HaveLocalPranswer:
|
|
|
+ if (type != Description::Type::Answer && type != Description::Type::Pranswer) {
|
|
|
+ std::ostringstream oss;
|
|
|
+ oss << "Unexpected local description type " << type
|
|
|
+ << " description in signaling state " << signalingState;
|
|
|
+ throw std::logic_error(oss.str());
|
|
|
+ }
|
|
|
+ newSignalingState = SignalingState::Stable;
|
|
|
+ break;
|
|
|
|
|
|
- int activeMediaCount = 0;
|
|
|
- for (int i = 0; i < description.mediaCount(); ++i)
|
|
|
- std::visit( // reciprocate each media
|
|
|
- rtc::overloaded{[&](Description::Application *) { ++activeMediaCount; },
|
|
|
- [&](Description::Media *media) {
|
|
|
- if (media->direction() != Description::Direction::Inactive)
|
|
|
- ++activeMediaCount;
|
|
|
- }},
|
|
|
- description.media(i));
|
|
|
+ default: {
|
|
|
+ std::ostringstream oss;
|
|
|
+ oss << "Unexpected local description in signaling state " << signalingState << ", ignoring";
|
|
|
+ LOG_WARNING << oss.str();
|
|
|
+ return;
|
|
|
+ }
|
|
|
+ }
|
|
|
|
|
|
- if (activeMediaCount == 0)
|
|
|
- throw std::invalid_argument("Remote description has no active media");
|
|
|
+ auto iceTransport = std::atomic_load(&mIceTransport);
|
|
|
+ if (!iceTransport) {
|
|
|
+ // RFC 5763: The endpoint that is the offerer MUST use the setup attribute value of
|
|
|
+ // setup:actpass.
|
|
|
+ // See https://tools.ietf.org/html/rfc5763#section-5
|
|
|
+ iceTransport = initIceTransport(Description::Role::ActPass);
|
|
|
+ }
|
|
|
|
|
|
- if (!description.fingerprint())
|
|
|
- throw std::invalid_argument("Remote description has no fingerprint");
|
|
|
+ Description localDescription = iceTransport->getLocalDescription(type);
|
|
|
+ processLocalDescription(std::move(localDescription));
|
|
|
|
|
|
- description.hintType(hasLocalDescription() ? Description::Type::Answer
|
|
|
- : Description::Type::Offer);
|
|
|
+ changeSignalingState(newSignalingState);
|
|
|
|
|
|
- if (description.type() == Description::Type::Offer) {
|
|
|
- if (hasLocalDescription()) {
|
|
|
- PLOG_ERROR << "Got a remote offer description while an answer was expected";
|
|
|
- throw std::logic_error("Got an unexpected remote offer description");
|
|
|
+ if (mGatheringState == GatheringState::New)
|
|
|
+ iceTransport->gatherLocalCandidates();
|
|
|
+}
|
|
|
+
|
|
|
+void PeerConnection::setRemoteDescription(Description description) {
|
|
|
+ PLOG_VERBOSE << "Setting remote description: " << string(description);
|
|
|
+
|
|
|
+ if (description.type() == Description::Type::Rollback) {
|
|
|
+ // This is mostly useless because we accept any offer
|
|
|
+ PLOG_VERBOSE << "Rolling back pending remote description";
|
|
|
+ changeSignalingState(SignalingState::Stable);
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ validateRemoteDescription(description);
|
|
|
+
|
|
|
+ // Get the new signaling state
|
|
|
+ SignalingState signalingState = mSignalingState.load();
|
|
|
+ SignalingState newSignalingState;
|
|
|
+ switch (signalingState) {
|
|
|
+ case SignalingState::Stable:
|
|
|
+ description.hintType(Description::Type::Offer);
|
|
|
+ if (description.type() != Description::Type::Offer) {
|
|
|
+ std::ostringstream oss;
|
|
|
+ oss << "Unexpected remote " << description.type() << " description in signaling state "
|
|
|
+ << signalingState;
|
|
|
+ throw std::logic_error(oss.str());
|
|
|
}
|
|
|
- } else { // Answer
|
|
|
- if (auto local = localDescription()) {
|
|
|
- if (description.iceUfrag() == local->iceUfrag() &&
|
|
|
- description.icePwd() == local->icePwd())
|
|
|
- throw std::logic_error("Got the local description as remote description");
|
|
|
- } else {
|
|
|
- PLOG_ERROR << "Got a remote answer description while an offer was expected";
|
|
|
- throw std::logic_error("Got an unexpected remote answer description");
|
|
|
+ newSignalingState = SignalingState::HaveRemoteOffer;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case SignalingState::HaveLocalOffer:
|
|
|
+ description.hintType(Description::Type::Answer);
|
|
|
+ if (description.type() == Description::Type::Offer) {
|
|
|
+ // The ICE agent will automatically initiate a rollback when a peer that had previously
|
|
|
+ // created an offer receives an offer from the remote peer
|
|
|
+ setLocalDescription(Description::Type::Rollback);
|
|
|
+ newSignalingState = SignalingState::HaveRemoteOffer;
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ if (description.type() != Description::Type::Answer &&
|
|
|
+ description.type() != Description::Type::Pranswer) {
|
|
|
+ std::ostringstream oss;
|
|
|
+ oss << "Unexpected remote " << description.type() << " description in signaling state "
|
|
|
+ << signalingState;
|
|
|
+ throw std::logic_error(oss.str());
|
|
|
}
|
|
|
+ newSignalingState = SignalingState::Stable;
|
|
|
+ break;
|
|
|
+
|
|
|
+ case SignalingState::HaveRemotePranswer:
|
|
|
+ description.hintType(Description::Type::Answer);
|
|
|
+ if (description.type() != Description::Type::Answer &&
|
|
|
+ description.type() != Description::Type::Pranswer) {
|
|
|
+ std::ostringstream oss;
|
|
|
+ oss << "Unexpected remote " << description.type() << " description in signaling state "
|
|
|
+ << signalingState;
|
|
|
+ throw std::logic_error(oss.str());
|
|
|
+ }
|
|
|
+ newSignalingState = SignalingState::Stable;
|
|
|
+ break;
|
|
|
+
|
|
|
+ default: {
|
|
|
+ std::ostringstream oss;
|
|
|
+ oss << "Unexpected remote description in signaling state " << signalingState;
|
|
|
+ throw std::logic_error(oss.str());
|
|
|
+ }
|
|
|
}
|
|
|
|
|
|
// Candidates will be added at the end, extract them for now
|
|
|
auto remoteCandidates = description.extractCandidates();
|
|
|
+ auto type = description.type();
|
|
|
|
|
|
auto iceTransport = std::atomic_load(&mIceTransport);
|
|
|
if (!iceTransport)
|
|
|
iceTransport = initIceTransport(Description::Role::ActPass);
|
|
|
+
|
|
|
iceTransport->setRemoteDescription(description);
|
|
|
+ processRemoteDescription(std::move(description));
|
|
|
|
|
|
- {
|
|
|
- // Set as remote description
|
|
|
- std::lock_guard lock(mRemoteDescriptionMutex);
|
|
|
- mRemoteDescription.emplace(std::move(description));
|
|
|
- }
|
|
|
+ changeSignalingState(newSignalingState);
|
|
|
|
|
|
- if (description.type() == Description::Type::Offer) {
|
|
|
- // This is an offer and we are the answerer.
|
|
|
- Description localDescription = iceTransport->getLocalDescription(Description::Type::Answer);
|
|
|
- processLocalDescription(localDescription);
|
|
|
- iceTransport->gatherLocalCandidates();
|
|
|
+ if (type == Description::Type::Offer) {
|
|
|
+ // This is an offer, we need to answer
|
|
|
+ setLocalDescription(Description::Type::Answer);
|
|
|
} else {
|
|
|
- // This is an answer and we are the offerer.
|
|
|
+ // This is an answer
|
|
|
auto sctpTransport = std::atomic_load(&mSctpTransport);
|
|
|
if (!sctpTransport && iceTransport->role() == Description::Role::Active) {
|
|
|
// Since we assumed passive role during DataChannel creation, we need to shift the
|
|
@@ -203,27 +298,7 @@ void PeerConnection::setRemoteDescription(Description description) {
|
|
|
|
|
|
void PeerConnection::addRemoteCandidate(Candidate candidate) {
|
|
|
PLOG_VERBOSE << "Adding remote candidate: " << string(candidate);
|
|
|
-
|
|
|
- auto iceTransport = std::atomic_load(&mIceTransport);
|
|
|
- if (!mRemoteDescription || !iceTransport)
|
|
|
- throw std::logic_error("Remote candidate set without remote description");
|
|
|
-
|
|
|
- if (candidate.resolve(Candidate::ResolveMode::Simple)) {
|
|
|
- iceTransport->addRemoteCandidate(candidate);
|
|
|
- } else {
|
|
|
- // OK, we might need a lookup, do it asynchronously
|
|
|
- // We don't use the thread pool because we have no control on the timeout
|
|
|
- weak_ptr<IceTransport> weakIceTransport{iceTransport};
|
|
|
- std::thread t([weakIceTransport, candidate]() mutable {
|
|
|
- if (candidate.resolve(Candidate::ResolveMode::Lookup))
|
|
|
- if (auto iceTransport = weakIceTransport.lock())
|
|
|
- iceTransport->addRemoteCandidate(candidate);
|
|
|
- });
|
|
|
- t.detach();
|
|
|
- }
|
|
|
-
|
|
|
- std::lock_guard lock(mRemoteDescriptionMutex);
|
|
|
- mRemoteDescription->addCandidate(candidate);
|
|
|
+ processRemoteCandidate(std::move(candidate));
|
|
|
}
|
|
|
|
|
|
std::optional<string> PeerConnection::localAddress() const {
|
|
@@ -238,11 +313,6 @@ std::optional<string> PeerConnection::remoteAddress() const {
|
|
|
|
|
|
shared_ptr<DataChannel> PeerConnection::addDataChannel(string label, string protocol,
|
|
|
Reliability reliability) {
|
|
|
- if (auto local = localDescription(); local && !local->hasApplication()) {
|
|
|
- PLOG_ERROR << "The PeerConnection was negociated without DataChannel support.";
|
|
|
- throw std::runtime_error("No DataChannel support on the PeerConnection");
|
|
|
- }
|
|
|
-
|
|
|
// RFC 5763: The answerer MUST use either a setup attribute value of setup:active or
|
|
|
// setup:passive. [...] Thus, setup:active is RECOMMENDED.
|
|
|
// See https://tools.ietf.org/html/rfc5763#section-5
|
|
@@ -257,6 +327,11 @@ shared_ptr<DataChannel> PeerConnection::addDataChannel(string label, string prot
|
|
|
if (transport->state() == SctpTransport::State::Connected)
|
|
|
channel->open(transport);
|
|
|
|
|
|
+ // Renegotiation is needed iff the current local description does not have application
|
|
|
+ std::lock_guard lock(mLocalDescriptionMutex);
|
|
|
+ if (!mLocalDescription || !mLocalDescription->hasApplication())
|
|
|
+ mNegotiationNeeded = true;
|
|
|
+
|
|
|
return channel;
|
|
|
}
|
|
|
|
|
@@ -288,21 +363,30 @@ void PeerConnection::onGatheringStateChange(std::function<void(GatheringState st
|
|
|
mGatheringStateChangeCallback = callback;
|
|
|
}
|
|
|
|
|
|
-std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
|
|
|
- if (hasLocalDescription())
|
|
|
- throw std::logic_error("Tracks must be created before local description");
|
|
|
-
|
|
|
- if (auto it = mTracks.find(description.mid()); it != mTracks.end())
|
|
|
- if (auto track = it->second.lock())
|
|
|
- return track;
|
|
|
+void PeerConnection::onSignalingStateChange(std::function<void(SignalingState state)> callback) {
|
|
|
+ mSignalingStateChangeCallback = callback;
|
|
|
+}
|
|
|
|
|
|
+std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
|
|
|
#if !RTC_ENABLE_MEDIA
|
|
|
if (mTracks.empty()) {
|
|
|
PLOG_WARNING << "Tracks will be inative (not compiled with SRTP support)";
|
|
|
}
|
|
|
#endif
|
|
|
- auto track = std::make_shared<Track>(std::move(description));
|
|
|
- mTracks.emplace(std::make_pair(track->mid(), track));
|
|
|
+
|
|
|
+ std::shared_ptr<Track> track;
|
|
|
+ if (auto it = mTracks.find(description.mid()); it != mTracks.end())
|
|
|
+ if (track = it->second.lock(); track)
|
|
|
+ track->setDescription(std::move(description));
|
|
|
+
|
|
|
+ if (!track) {
|
|
|
+ track = std::make_shared<Track>(std::move(description));
|
|
|
+ mTracks.emplace(std::make_pair(track->mid(), track));
|
|
|
+ }
|
|
|
+
|
|
|
+ // Renegotiation is needed for the new or updated track
|
|
|
+ mNegotiationNeeded = true;
|
|
|
+
|
|
|
return track;
|
|
|
}
|
|
|
|
|
@@ -311,6 +395,7 @@ void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callbac
|
|
|
}
|
|
|
|
|
|
shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role) {
|
|
|
+ PLOG_VERBOSE << "Starting ICE transport";
|
|
|
try {
|
|
|
if (auto transport = std::atomic_load(&mIceTransport))
|
|
|
return transport;
|
|
@@ -373,6 +458,7 @@ shared_ptr<IceTransport> PeerConnection::initIceTransport(Description::Role role
|
|
|
}
|
|
|
|
|
|
shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
|
|
+ PLOG_VERBOSE << "Starting DTLS transport";
|
|
|
try {
|
|
|
if (auto transport = std::atomic_load(&mDtlsTransport))
|
|
|
return transport;
|
|
@@ -388,12 +474,12 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
|
|
|
|
|
switch (state) {
|
|
|
case DtlsTransport::State::Connected:
|
|
|
- if (auto local = localDescription(); local && local->hasApplication())
|
|
|
+ if (auto remote = remoteDescription(); remote && remote->hasApplication())
|
|
|
initSctpTransport();
|
|
|
else
|
|
|
changeState(State::Connected);
|
|
|
|
|
|
- openTracks();
|
|
|
+ mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this));
|
|
|
break;
|
|
|
case DtlsTransport::State::Failed:
|
|
|
changeState(State::Failed);
|
|
@@ -443,42 +529,43 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
|
|
|
}
|
|
|
|
|
|
shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
|
|
|
+ PLOG_VERBOSE << "Starting SCTP transport";
|
|
|
try {
|
|
|
if (auto transport = std::atomic_load(&mSctpTransport))
|
|
|
return transport;
|
|
|
|
|
|
auto remote = remoteDescription();
|
|
|
if (!remote || !remote->application())
|
|
|
- throw std::logic_error("Initializing SCTP transport without application description");
|
|
|
+ throw std::logic_error("Starting SCTP transport without application description");
|
|
|
|
|
|
uint16_t sctpPort = remote->application()->sctpPort().value_or(DEFAULT_SCTP_PORT);
|
|
|
auto lower = std::atomic_load(&mDtlsTransport);
|
|
|
auto transport = std::make_shared<SctpTransport>(
|
|
|
- lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
|
|
|
- weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
|
|
|
- [this, weak_this = weak_from_this()](SctpTransport::State state) {
|
|
|
- auto shared_this = weak_this.lock();
|
|
|
- if (!shared_this)
|
|
|
- return;
|
|
|
- switch (state) {
|
|
|
- case SctpTransport::State::Connected:
|
|
|
- changeState(State::Connected);
|
|
|
- mProcessor->enqueue(std::bind(&PeerConnection::openDataChannels, this));
|
|
|
- break;
|
|
|
- case SctpTransport::State::Failed:
|
|
|
- LOG_WARNING << "SCTP transport failed";
|
|
|
- changeState(State::Failed);
|
|
|
- mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
|
|
|
- break;
|
|
|
- case SctpTransport::State::Disconnected:
|
|
|
- changeState(State::Disconnected);
|
|
|
- mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
|
|
|
- break;
|
|
|
- default:
|
|
|
- // Ignore
|
|
|
- break;
|
|
|
- }
|
|
|
- });
|
|
|
+ lower, sctpPort, weak_bind(&PeerConnection::forwardMessage, this, _1),
|
|
|
+ weak_bind(&PeerConnection::forwardBufferedAmount, this, _1, _2),
|
|
|
+ [this, weak_this = weak_from_this()](SctpTransport::State state) {
|
|
|
+ auto shared_this = weak_this.lock();
|
|
|
+ if (!shared_this)
|
|
|
+ return;
|
|
|
+ switch (state) {
|
|
|
+ case SctpTransport::State::Connected:
|
|
|
+ changeState(State::Connected);
|
|
|
+ mProcessor->enqueue(std::bind(&PeerConnection::openDataChannels, this));
|
|
|
+ break;
|
|
|
+ case SctpTransport::State::Failed:
|
|
|
+ LOG_WARNING << "SCTP transport failed";
|
|
|
+ changeState(State::Failed);
|
|
|
+ mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
|
|
|
+ break;
|
|
|
+ case SctpTransport::State::Disconnected:
|
|
|
+ changeState(State::Disconnected);
|
|
|
+ mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ // Ignore
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ });
|
|
|
|
|
|
std::atomic_store(&mSctpTransport, transport);
|
|
|
if (mState == State::Closed) {
|
|
@@ -499,7 +586,8 @@ void PeerConnection::closeTransports() {
|
|
|
PLOG_VERBOSE << "Closing transports";
|
|
|
|
|
|
// Change state to sink state Closed
|
|
|
- changeState(State::Closed);
|
|
|
+ if (!changeState(State::Closed))
|
|
|
+ return; // already closed
|
|
|
|
|
|
// Reset callbacks now that state is changed
|
|
|
resetCallbacks();
|
|
@@ -723,40 +811,105 @@ void PeerConnection::openTracks() {
|
|
|
std::shared_lock lock(mTracksMutex); // read-only
|
|
|
for (auto it = mTracks.begin(); it != mTracks.end(); ++it)
|
|
|
if (auto track = it->second.lock())
|
|
|
- track->open(srtpTransport);
|
|
|
+ if (!track->isOpen())
|
|
|
+ track->open(srtpTransport);
|
|
|
}
|
|
|
#endif
|
|
|
}
|
|
|
|
|
|
+void PeerConnection::validateRemoteDescription(const Description &description) {
|
|
|
+ if (!description.iceUfrag())
|
|
|
+ throw std::invalid_argument("Remote description has no ICE user fragment");
|
|
|
+
|
|
|
+ if (!description.icePwd())
|
|
|
+ throw std::invalid_argument("Remote description has no ICE password");
|
|
|
+
|
|
|
+ if (!description.fingerprint())
|
|
|
+ throw std::invalid_argument("Remote description has no fingerprint");
|
|
|
+
|
|
|
+ if (description.mediaCount() == 0)
|
|
|
+ throw std::invalid_argument("Remote description has no media line");
|
|
|
|
|
|
-void PeerConnection::processLocalDescription(Description description) {
|
|
|
int activeMediaCount = 0;
|
|
|
+ for (int i = 0; i < description.mediaCount(); ++i)
|
|
|
+ std::visit(rtc::overloaded{[&](const Description::Application *) { ++activeMediaCount; },
|
|
|
+ [&](const Description::Media *media) {
|
|
|
+ if (media->direction() != Description::Direction::Inactive)
|
|
|
+ ++activeMediaCount;
|
|
|
+ }},
|
|
|
+ description.media(i));
|
|
|
|
|
|
- if (hasLocalDescription())
|
|
|
- throw std::logic_error("Local description is already set");
|
|
|
+ if (activeMediaCount == 0)
|
|
|
+ throw std::invalid_argument("Remote description has no active media");
|
|
|
|
|
|
+ if (auto local = localDescription(); local && local->iceUfrag() && local->icePwd())
|
|
|
+ if (*description.iceUfrag() == *local->iceUfrag() &&
|
|
|
+ *description.icePwd() == *local->icePwd())
|
|
|
+ throw std::logic_error("Got the local description as remote description");
|
|
|
+
|
|
|
+ PLOG_VERBOSE << "Remote description looks valid";
|
|
|
+}
|
|
|
+
|
|
|
+void PeerConnection::processLocalDescription(Description description) {
|
|
|
if (auto remote = remoteDescription()) {
|
|
|
// Reciprocate remote description
|
|
|
for (int i = 0; i < remote->mediaCount(); ++i)
|
|
|
std::visit( // reciprocate each media
|
|
|
rtc::overloaded{
|
|
|
- [&](Description::Application *app) {
|
|
|
- auto reciprocated = app->reciprocate();
|
|
|
+ [&](Description::Application *remoteApp) {
|
|
|
+ std::shared_lock lock(mDataChannelsMutex);
|
|
|
+ if (!mDataChannels.empty()) {
|
|
|
+ // Prefer local description
|
|
|
+ Description::Application app(remoteApp->mid());
|
|
|
+ app.setSctpPort(DEFAULT_SCTP_PORT);
|
|
|
+ app.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
|
|
|
+
|
|
|
+ PLOG_DEBUG << "Adding application to local description, mid=\""
|
|
|
+ << app.mid() << "\"";
|
|
|
+
|
|
|
+ description.addMedia(std::move(app));
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ auto reciprocated = remoteApp->reciprocate();
|
|
|
reciprocated.hintSctpPort(DEFAULT_SCTP_PORT);
|
|
|
reciprocated.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
|
|
|
- ++activeMediaCount;
|
|
|
|
|
|
PLOG_DEBUG << "Reciprocating application in local description, mid=\""
|
|
|
<< reciprocated.mid() << "\"";
|
|
|
|
|
|
description.addMedia(std::move(reciprocated));
|
|
|
},
|
|
|
- [&](Description::Media *media) {
|
|
|
- auto reciprocated = media->reciprocate();
|
|
|
-#if RTC_ENABLE_MEDIA
|
|
|
- if (reciprocated.direction() != Description::Direction::Inactive)
|
|
|
- ++activeMediaCount;
|
|
|
-#else
|
|
|
+ [&](Description::Media *remoteMedia) {
|
|
|
+ std::shared_lock lock(mTracksMutex);
|
|
|
+ if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end()) {
|
|
|
+ // Prefer local description
|
|
|
+ if (auto track = it->second.lock()) {
|
|
|
+ auto media = track->description();
|
|
|
+#if !RTC_ENABLE_MEDIA
|
|
|
+ // No media support, mark as inactive
|
|
|
+ media.setDirection(Description::Direction::Inactive);
|
|
|
+#endif
|
|
|
+ PLOG_DEBUG
|
|
|
+ << "Adding media to local description, mid=\"" << media.mid()
|
|
|
+ << "\", active=" << std::boolalpha
|
|
|
+ << (media.direction() != Description::Direction::Inactive);
|
|
|
+
|
|
|
+ description.addMedia(std::move(media));
|
|
|
+ } else {
|
|
|
+ auto reciprocated = remoteMedia->reciprocate();
|
|
|
+ reciprocated.setDirection(Description::Direction::Inactive);
|
|
|
+
|
|
|
+ PLOG_DEBUG << "Adding inactive media to local description, mid=\""
|
|
|
+ << reciprocated.mid() << "\"";
|
|
|
+
|
|
|
+ description.addMedia(std::move(reciprocated));
|
|
|
+ }
|
|
|
+ return;
|
|
|
+ }
|
|
|
+
|
|
|
+ auto reciprocated = remoteMedia->reciprocate();
|
|
|
+#if !RTC_ENABLE_MEDIA
|
|
|
// No media support, mark as inactive
|
|
|
reciprocated.setDirection(Description::Direction::Inactive);
|
|
|
#endif
|
|
@@ -771,15 +924,17 @@ void PeerConnection::processLocalDescription(Description description) {
|
|
|
},
|
|
|
},
|
|
|
remote->media(i));
|
|
|
- } else {
|
|
|
+ }
|
|
|
+
|
|
|
+ if (description.type() == Description::Type::Offer) {
|
|
|
+ // This is an offer, add locally created data channels and tracks
|
|
|
// Add application for data channels
|
|
|
- {
|
|
|
+ if (!description.hasApplication()) {
|
|
|
std::shared_lock lock(mDataChannelsMutex);
|
|
|
if (!mDataChannels.empty()) {
|
|
|
Description::Application app("data");
|
|
|
app.setSctpPort(DEFAULT_SCTP_PORT);
|
|
|
app.setMaxMessageSize(LOCAL_MAX_MESSAGE_SIZE);
|
|
|
- ++activeMediaCount;
|
|
|
|
|
|
PLOG_DEBUG << "Adding application to local description, mid=\"" << app.mid()
|
|
|
<< "\"";
|
|
@@ -789,45 +944,52 @@ void PeerConnection::processLocalDescription(Description description) {
|
|
|
}
|
|
|
|
|
|
// Add media for local tracks
|
|
|
- {
|
|
|
- std::shared_lock lock(mTracksMutex);
|
|
|
- for (auto it = mTracks.begin(); it != mTracks.end(); ++it) {
|
|
|
- if (auto track = it->second.lock()) {
|
|
|
- auto media = track->description();
|
|
|
-#if RTC_ENABLE_MEDIA
|
|
|
- if (media.direction() != Description::Direction::Inactive)
|
|
|
- ++activeMediaCount;
|
|
|
-#else
|
|
|
- // No media support, mark as inactive
|
|
|
- media.setDirection(Description::Direction::Inactive);
|
|
|
+ std::shared_lock lock(mTracksMutex);
|
|
|
+ for (auto it = mTracks.begin(); it != mTracks.end(); ++it) {
|
|
|
+ if (description.hasMid(it->first))
|
|
|
+ continue;
|
|
|
+
|
|
|
+ if (auto track = it->second.lock()) {
|
|
|
+ auto media = track->description();
|
|
|
+#if !RTC_ENABLE_MEDIA
|
|
|
+ // No media support, mark as inactive
|
|
|
+ media.setDirection(Description::Direction::Inactive);
|
|
|
#endif
|
|
|
- PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
|
|
|
- << "\", active=" << std::boolalpha
|
|
|
- << (media.direction() != Description::Direction::Inactive);
|
|
|
+ PLOG_DEBUG << "Adding media to local description, mid=\"" << media.mid()
|
|
|
+ << "\", active=" << std::boolalpha
|
|
|
+ << (media.direction() != Description::Direction::Inactive);
|
|
|
|
|
|
- description.addMedia(std::move(media));
|
|
|
- }
|
|
|
+ description.addMedia(std::move(media));
|
|
|
}
|
|
|
}
|
|
|
}
|
|
|
|
|
|
- // There must be at least one active media to negociate
|
|
|
- if (activeMediaCount == 0)
|
|
|
- throw std::runtime_error("Nothing to negociate");
|
|
|
-
|
|
|
// Set local fingerprint (wait for certificate if necessary)
|
|
|
description.setFingerprint(mCertificate.get()->fingerprint());
|
|
|
|
|
|
{
|
|
|
// Set as local description
|
|
|
std::lock_guard lock(mLocalDescriptionMutex);
|
|
|
+
|
|
|
+ std::vector<Candidate> existingCandidates;
|
|
|
+ if (mLocalDescription) {
|
|
|
+ existingCandidates = mLocalDescription->extractCandidates();
|
|
|
+ mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
|
|
|
+ }
|
|
|
+
|
|
|
mLocalDescription.emplace(std::move(description));
|
|
|
+ mLocalDescription->addCandidates(std::move(existingCandidates));
|
|
|
}
|
|
|
|
|
|
mProcessor->enqueue([this, description = *mLocalDescription]() {
|
|
|
PLOG_VERBOSE << "Issuing local description: " << description;
|
|
|
mLocalDescriptionCallback(std::move(description));
|
|
|
});
|
|
|
+
|
|
|
+ // Reciprocated tracks might need to be open
|
|
|
+ if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
|
|
|
+ dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
|
|
|
+ mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this));
|
|
|
}
|
|
|
|
|
|
void PeerConnection::processLocalCandidate(Candidate candidate) {
|
|
@@ -844,6 +1006,56 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
|
|
|
});
|
|
|
}
|
|
|
|
|
|
+void PeerConnection::processRemoteDescription(Description description) {
|
|
|
+ {
|
|
|
+ // Set as remote description
|
|
|
+ std::lock_guard lock(mRemoteDescriptionMutex);
|
|
|
+
|
|
|
+ std::vector<Candidate> existingCandidates;
|
|
|
+ if (mRemoteDescription)
|
|
|
+ existingCandidates = mRemoteDescription->extractCandidates();
|
|
|
+
|
|
|
+ mRemoteDescription.emplace(std::move(description));
|
|
|
+ mRemoteDescription->addCandidates(std::move(existingCandidates));
|
|
|
+ }
|
|
|
+
|
|
|
+ if (description.hasApplication()) {
|
|
|
+ auto dtlsTransport = std::atomic_load(&mDtlsTransport);
|
|
|
+ auto sctpTransport = std::atomic_load(&mSctpTransport);
|
|
|
+ if (!sctpTransport && dtlsTransport &&
|
|
|
+ dtlsTransport->state() == Transport::State::Connected)
|
|
|
+ initSctpTransport();
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
+void PeerConnection::processRemoteCandidate(Candidate candidate) {
|
|
|
+ auto iceTransport = std::atomic_load(&mIceTransport);
|
|
|
+ if (!iceTransport)
|
|
|
+ throw std::logic_error("Remote candidate set without remote description");
|
|
|
+
|
|
|
+ if (candidate.resolve(Candidate::ResolveMode::Simple)) {
|
|
|
+ iceTransport->addRemoteCandidate(candidate);
|
|
|
+ } else {
|
|
|
+ // OK, we might need a lookup, do it asynchronously
|
|
|
+ // We don't use the thread pool because we have no control on the timeout
|
|
|
+ weak_ptr<IceTransport> weakIceTransport{iceTransport};
|
|
|
+ std::thread t([weakIceTransport, candidate]() mutable {
|
|
|
+ if (candidate.resolve(Candidate::ResolveMode::Lookup))
|
|
|
+ if (auto iceTransport = weakIceTransport.lock())
|
|
|
+ iceTransport->addRemoteCandidate(candidate);
|
|
|
+ });
|
|
|
+ t.detach();
|
|
|
+ }
|
|
|
+
|
|
|
+ {
|
|
|
+ std::lock_guard lock(mRemoteDescriptionMutex);
|
|
|
+ if (!mRemoteDescription)
|
|
|
+ throw std::logic_error("Got a remote candidate without remote description");
|
|
|
+
|
|
|
+ mRemoteDescription->addCandidate(candidate);
|
|
|
+ }
|
|
|
+}
|
|
|
+
|
|
|
void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
|
|
|
auto dataChannel = weakDataChannel.lock();
|
|
|
if (!dataChannel)
|
|
@@ -861,10 +1073,10 @@ bool PeerConnection::changeState(State state) {
|
|
|
State current;
|
|
|
do {
|
|
|
current = mState.load();
|
|
|
- if (current == state)
|
|
|
- return true;
|
|
|
if (current == State::Closed)
|
|
|
return false;
|
|
|
+ if (current == state)
|
|
|
+ return false;
|
|
|
|
|
|
} while (!mState.compare_exchange_weak(current, state));
|
|
|
|
|
@@ -882,12 +1094,24 @@ bool PeerConnection::changeState(State state) {
|
|
|
}
|
|
|
|
|
|
bool PeerConnection::changeGatheringState(GatheringState state) {
|
|
|
- if (mGatheringState.exchange(state) != state) {
|
|
|
- std::ostringstream s;
|
|
|
- s << state;
|
|
|
- PLOG_INFO << "Changed gathering state to " << s.str();
|
|
|
- mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
|
|
|
- }
|
|
|
+ if (mGatheringState.exchange(state) == state)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ std::ostringstream s;
|
|
|
+ s << state;
|
|
|
+ PLOG_INFO << "Changed gathering state to " << s.str();
|
|
|
+ mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
|
|
|
+ return true;
|
|
|
+}
|
|
|
+
|
|
|
+bool PeerConnection::changeSignalingState(SignalingState state) {
|
|
|
+ if (mSignalingState.exchange(state) == state)
|
|
|
+ return false;
|
|
|
+
|
|
|
+ std::ostringstream s;
|
|
|
+ s << state;
|
|
|
+ PLOG_INFO << "Changed signaling state to " << s.str();
|
|
|
+ mProcessor->enqueue([this, state] { mSignalingStateChangeCallback(state); });
|
|
|
return true;
|
|
|
}
|
|
|
|
|
@@ -930,15 +1154,14 @@ std::optional<std::chrono::milliseconds> PeerConnection::rtt() {
|
|
|
auto sctpTransport = std::atomic_load(&mSctpTransport);
|
|
|
if (sctpTransport)
|
|
|
return sctpTransport->rtt();
|
|
|
- PLOG_WARNING << "Could not load sctpTransport";
|
|
|
return std::nullopt;
|
|
|
}
|
|
|
|
|
|
} // namespace rtc
|
|
|
|
|
|
-std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &state) {
|
|
|
+std::ostream &operator<<(std::ostream &out, rtc::PeerConnection::State state) {
|
|
|
using State = rtc::PeerConnection::State;
|
|
|
- std::string str;
|
|
|
+ const char *str;
|
|
|
switch (state) {
|
|
|
case State::New:
|
|
|
str = "new";
|
|
@@ -965,15 +1188,15 @@ std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::State &st
|
|
|
return out << str;
|
|
|
}
|
|
|
|
|
|
-std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::GatheringState &state) {
|
|
|
+std::ostream &operator<<(std::ostream &out, rtc::PeerConnection::GatheringState state) {
|
|
|
using GatheringState = rtc::PeerConnection::GatheringState;
|
|
|
- std::string str;
|
|
|
+ const char *str;
|
|
|
switch (state) {
|
|
|
case GatheringState::New:
|
|
|
str = "new";
|
|
|
break;
|
|
|
case GatheringState::InProgress:
|
|
|
- str = "in_progress";
|
|
|
+ str = "in-progress";
|
|
|
break;
|
|
|
case GatheringState::Complete:
|
|
|
str = "complete";
|
|
@@ -984,3 +1207,29 @@ std::ostream &operator<<(std::ostream &out, const rtc::PeerConnection::Gathering
|
|
|
}
|
|
|
return out << str;
|
|
|
}
|
|
|
+
|
|
|
+std::ostream &operator<<(std::ostream &out, rtc::PeerConnection::SignalingState state) {
|
|
|
+ using SignalingState = rtc::PeerConnection::SignalingState;
|
|
|
+ const char *str;
|
|
|
+ switch (state) {
|
|
|
+ case SignalingState::Stable:
|
|
|
+ str = "stable";
|
|
|
+ break;
|
|
|
+ case SignalingState::HaveLocalOffer:
|
|
|
+ str = "have-local-offer";
|
|
|
+ break;
|
|
|
+ case SignalingState::HaveRemoteOffer:
|
|
|
+ str = "have-remote-offer";
|
|
|
+ break;
|
|
|
+ case SignalingState::HaveLocalPranswer:
|
|
|
+ str = "have-local-pranswer";
|
|
|
+ break;
|
|
|
+ case SignalingState::HaveRemotePranswer:
|
|
|
+ str = "have-remote-pranswer";
|
|
|
+ break;
|
|
|
+ default:
|
|
|
+ str = "unknown";
|
|
|
+ break;
|
|
|
+ }
|
|
|
+ return out << str;
|
|
|
+}
|