Browse Source

Merge pull request #1237 from paullouisageneau/fix-renegotiationneeded

Properly implement the renegotiation needed mechanism
Paul-Louis Ageneau 11 months ago
parent
commit
1691c95a57

+ 12 - 0
DOC.md

@@ -371,6 +371,18 @@ Return value: the maximun length of strings copied in buffers (including the ter
 
 If `local`, `remote`, or both, are `NULL`, the corresponding candidate is not copied, but the maximum length is still returned.
 
+#### rtcIsNegotiationNeeded
+```
+bool rtcIsNegotiationNeeded(int pc);
+```
+
+Return true if negotiation needs to be started or restarted, for instance to signal new tracks. If so, the user may call `rtcSetLocalDescription()` to start it.
+
+Arguments:
+- `pc`: the Peer Connection identifier
+
+Return value: true if negotiation is needed
+
 #### rtcGetMaxDataChannelStream
 ```
 int rtcGetMaxDataChannelStream(int pc);

+ 3 - 3
include/rtc/description.hpp

@@ -281,9 +281,9 @@ public:
 	int addAudio(string mid = "audio", Direction dir = Direction::SendOnly);
 	void clearMedia();
 
-	variant<Media *, Application *> media(unsigned int index);
-	variant<const Media *, const Application *> media(unsigned int index) const;
-	unsigned int mediaCount() const;
+	variant<Media *, Application *> media(int index);
+	variant<const Media *, const Application *> media(int index) const;
+	int mediaCount() const;
 
 	const Application *application() const;
 	Application *application();

+ 1 - 0
include/rtc/peerconnection.hpp

@@ -86,6 +86,7 @@ public:
 	IceState iceState() const;
 	GatheringState gatheringState() const;
 	SignalingState signalingState() const;
+	bool negotiationNeeded() const;
 	bool hasMedia() const;
 	optional<Description> localDescription() const;
 	optional<Description> remoteDescription() const;

+ 2 - 0
include/rtc/rtc.h

@@ -227,6 +227,8 @@ RTC_C_EXPORT int rtcGetRemoteAddress(int pc, char *buffer, int size);
 RTC_C_EXPORT int rtcGetSelectedCandidatePair(int pc, char *local, int localSize, char *remote,
                                              int remoteSize);
 
+RTC_C_EXPORT bool rtcIsNegotiationNeeded(int pc);
+
 RTC_C_EXPORT int rtcGetMaxDataChannelStream(int pc);
 RTC_C_EXPORT int rtcGetRemoteMaxMessageSize(int pc);
 

+ 12 - 0
pages/content/pages/reference.md

@@ -374,6 +374,18 @@ Return value: the maximun length of strings copied in buffers (including the ter
 
 If `local`, `remote`, or both, are `NULL`, the corresponding candidate is not copied, but the maximum length is still returned.
 
+#### rtcIsNegotiationNeeded
+```
+bool rtcIsNegotiationNeeded(int pc);
+```
+
+Return true if negotiation needs to be started or restarted, for instance to signal new tracks. If so, the user may call `rtcSetLocalDescription()` to start it.
+
+Arguments:
+- `pc`: the Peer Connection identifier
+
+Return value: true if negotiation is needed
+
 #### rtcGetMaxDataChannelStream
 ```
 int rtcGetMaxDataChannelStream(int pc);

+ 7 - 2
src/capi.cpp

@@ -674,6 +674,11 @@ int rtcGetSelectedCandidatePair(int pc, char *local, int localSize, char *remote
 	});
 }
 
+bool rtcIsNegotiationNeeded(int pc) {
+	return wrap([&] { return getPeerConnection(pc)->negotiationNeeded() ? 0 : 1; }) == 0 ? true
+	                                                                                     : false;
+}
+
 int rtcGetMaxDataChannelStream(int pc) {
 	return wrap([&] {
 		auto peerConnection = getPeerConnection(pc);
@@ -1440,7 +1445,7 @@ int rtcGetSsrcsForType(const char *mediaType, const char *sdp, uint32_t *buffer,
 		auto oldSDP = string(sdp);
 		auto description = Description(oldSDP, "unspec");
 		auto mediaCount = description.mediaCount();
-		for (unsigned int i = 0; i < mediaCount; i++) {
+		for (int i = 0; i < mediaCount; i++) {
 			if (std::holds_alternative<Description::Media *>(description.media(i))) {
 				auto media = std::get<Description::Media *>(description.media(i));
 				auto currentMediaType = lowercased(media->type());
@@ -1461,7 +1466,7 @@ int rtcSetSsrcForType(const char *mediaType, const char *sdp, char *buffer, cons
 		auto prevSDP = string(sdp);
 		auto description = Description(prevSDP, "unspec");
 		auto mediaCount = description.mediaCount();
-		for (unsigned int i = 0; i < mediaCount; i++) {
+		for (int i = 0; i < mediaCount; i++) {
 			if (std::holds_alternative<Description::Media *>(description.media(i))) {
 				auto media = std::get<Description::Media *>(description.media(i));
 				auto currentMediaType = lowercased(media->type());

+ 5 - 6
src/description.cpp

@@ -493,8 +493,8 @@ void Description::clearMedia() {
 	mApplication.reset();
 }
 
-variant<Description::Media *, Description::Application *> Description::media(unsigned int index) {
-	if (index >= mEntries.size())
+variant<Description::Media *, Description::Application *> Description::media(int index) {
+	if (index < 0 || index >= int(mEntries.size()))
 		throw std::out_of_range("Media index out of range");
 
 	const auto &entry = mEntries[index];
@@ -514,9 +514,8 @@ variant<Description::Media *, Description::Application *> Description::media(uns
 	}
 }
 
-variant<const Description::Media *, const Description::Application *>
-Description::media(unsigned int index) const {
-	if (index >= mEntries.size())
+variant<const Description::Media *, const Description::Application *> Description::media(int index) const {
+	if (index < 0 || index >= int(mEntries.size()))
 		throw std::out_of_range("Media index out of range");
 
 	const auto &entry = mEntries[index];
@@ -536,7 +535,7 @@ Description::media(unsigned int index) const {
 	}
 }
 
-unsigned int Description::mediaCount() const { return unsigned(mEntries.size()); }
+int Description::mediaCount() const { return int(mEntries.size()); }
 
 Description::Entry::Entry(const string &mline, string mid, Direction dir)
     : mMid(std::move(mid)), mDirection(dir) {

+ 97 - 25
src/impl/peerconnection.cpp

@@ -85,7 +85,6 @@ PeerConnection::~PeerConnection() {
 }
 
 void PeerConnection::close() {
-	negotiationNeeded = false;
 	if (!closing.exchange(true)) {
 		PLOG_VERBOSE << "Closing PeerConnection";
 		if (auto transport = std::atomic_load(&mSctpTransport))
@@ -829,27 +828,58 @@ void PeerConnection::iterateTracks(std::function<void(shared_ptr<Track> track)>
 	}
 }
 
+void PeerConnection::iterateRemoteTracks(std::function<void(shared_ptr<Track> track)> func) {
+	auto remote = remoteDescription();
+	if(!remote)
+		return;
+
+	std::vector<shared_ptr<Track>> locked;
+	{
+		std::shared_lock lock(mTracksMutex); // read-only
+		locked.reserve(remote->mediaCount());
+		for(int i = 0; i < remote->mediaCount(); ++i) {
+			if (std::holds_alternative<Description::Media *>(remote->media(i))) {
+				auto remoteMedia = std::get<Description::Media *>(remote->media(i));
+				if (!remoteMedia->isRemoved())
+					if (auto it = mTracks.find(remoteMedia->mid()); it != mTracks.end())
+						if (auto track = it->second.lock())
+							locked.push_back(std::move(track));
+			}
+		}
+	}
+
+	for (auto &track : locked) {
+		try {
+			func(std::move(track));
+		} catch (const std::exception &e) {
+			PLOG_WARNING << e.what();
+		}
+	}
+}
+
+
 void PeerConnection::openTracks() {
 #if RTC_ENABLE_MEDIA
-	if (auto transport = std::atomic_load(&mDtlsTransport)) {
-		auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
-
-		iterateTracks([&](const shared_ptr<Track> &track) {
-			if (!track->isOpen()) {
-				if (srtpTransport) {
-					track->open(srtpTransport);
-				} else {
-					// A track was added during a latter renegotiation, whereas SRTP transport was
-					// not initialized. This is an optimization to use the library with data
-					// channels only. Set forceMediaTransport to true to initialize the transport
-					// before dynamically adding tracks.
-					auto errorMsg = "The connection has no media transport";
-					PLOG_ERROR << errorMsg;
-					track->triggerError(errorMsg);
-				}
+	auto transport = std::atomic_load(&mDtlsTransport);
+	if (!transport)
+		return;
+
+	auto srtpTransport = std::dynamic_pointer_cast<DtlsSrtpTransport>(transport);
+	iterateRemoteTracks([&](shared_ptr<Track> track) {
+		if(!track->isOpen()) {
+			if (srtpTransport) {
+				track->open(srtpTransport);
+			} else {
+				// A track was added during a latter renegotiation, whereas SRTP transport was
+				// not initialized. This is an optimization to use the library with data
+				// channels only. Set forceMediaTransport to true to initialize the transport
+				// before dynamically adding tracks.
+				auto errorMsg = "The connection has no media transport";
+				PLOG_ERROR << errorMsg;
+				track->triggerError(errorMsg);
 			}
-		});
-	}
+		}
+	});
 #endif
 }
 
@@ -872,7 +902,7 @@ void PeerConnection::validateRemoteDescription(const Description &description) {
 		throw std::invalid_argument("Remote description has no media line");
 
 	int activeMediaCount = 0;
-	for (unsigned int i = 0; i < description.mediaCount(); ++i)
+	for (int i = 0; i < description.mediaCount(); ++i)
 		std::visit(rtc::overloaded{[&](const Description::Application *application) {
 			                           if (!application->isRemoved())
 				                           ++activeMediaCount;
@@ -900,7 +930,7 @@ void PeerConnection::processLocalDescription(Description description) {
 
 	if (auto remote = remoteDescription()) {
 		// Reciprocate remote description
-		for (unsigned int i = 0; i < remote->mediaCount(); ++i)
+		for (int i = 0; i < remote->mediaCount(); ++i)
 			std::visit( // reciprocate each media
 			    rtc::overloaded{
 			        [&](Description::Application *remoteApp) {
@@ -1027,8 +1057,7 @@ void PeerConnection::processLocalDescription(Description description) {
 			}
 		}
 
-		// There might be no media at this point if the user created a Track, deleted it,
-		// then called setLocalDescription().
+		// There might be no media at this point, for instance if the user deleted tracks
 		if (description.mediaCount() == 0)
 			throw std::runtime_error("No DataChannel or Track to negotiate");
 	}
@@ -1102,8 +1131,8 @@ void PeerConnection::processRemoteDescription(Description description) {
 		mRemoteDescription->addCandidates(std::move(existingCandidates));
 	}
 
+	auto dtlsTransport = std::atomic_load(&mDtlsTransport);
 	if (description.hasApplication()) {
-		auto dtlsTransport = std::atomic_load(&mDtlsTransport);
 		auto sctpTransport = std::atomic_load(&mSctpTransport);
 		if (!sctpTransport && dtlsTransport &&
 		    dtlsTransport->state() == Transport::State::Connected)
@@ -1111,6 +1140,10 @@ void PeerConnection::processRemoteDescription(Description description) {
 	} else {
 		mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
 	}
+
+	if (dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
+		mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
+
 }
 
 void PeerConnection::processRemoteCandidate(Candidate candidate) {
@@ -1156,6 +1189,45 @@ string PeerConnection::localBundleMid() const {
 	return mLocalDescription ? mLocalDescription->bundleMid() : "0";
 }
 
+bool PeerConnection::negotiationNeeded() const {
+	auto description = localDescription();
+
+	{
+		std::shared_lock lock(mDataChannelsMutex);
+		if (!mDataChannels.empty() || !mUnassignedDataChannels.empty())
+			if(!description || !description->hasApplication()) {
+				PLOG_DEBUG << "Negotiation needed for data channels";
+				return true;
+			}
+	}
+
+	{
+		std::shared_lock lock(mTracksMutex);
+		for(const auto &[mid, weakTrack] : mTracks)
+			if (auto track = weakTrack.lock())
+				if (!description || !description->hasMid(track->mid())) {
+					PLOG_DEBUG << "Negotiation needed to add track, mid=" << track->mid();
+					return true;
+				}
+
+		if(description) {
+			for(int i = 0; i < description->mediaCount(); ++i) {
+				if (std::holds_alternative<Description::Media *>(description->media(i))) {
+					auto media = std::get<Description::Media *>(description->media(i));
+					if (!media->isRemoved())
+						if (auto it = mTracks.find(media->mid()); it != mTracks.end())
+							if (auto track = it->second.lock(); !track || track->isClosed()) {
+								PLOG_DEBUG << "Negotiation needed to remove track, mid=" << track->mid();
+								return true;
+							}
+				}
+			}
+		}
+	}
+
+	return false;
+}
+
 void PeerConnection::setMediaHandler(shared_ptr<MediaHandler> handler) {
 	std::unique_lock lock(mMediaHandlerMutex);
 	mMediaHandler = handler;
@@ -1321,7 +1393,7 @@ void PeerConnection::updateTrackSsrcCache(const Description &description) {
 	std::unique_lock lock(mTracksMutex); // for safely writing to mTracksBySsrc
 
 	// Setup SSRC -> Track mapping
-	for (unsigned int i = 0; i < description.mediaCount(); ++i)
+	for (int i = 0; i < description.mediaCount(); ++i)
 		std::visit( // ssrc -> track mapping
 		    rtc::overloaded{
 		        [&](Description::Application const *) { return; },

+ 5 - 3
src/impl/peerconnection.hpp

@@ -70,6 +70,7 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
 
 	shared_ptr<Track> emplaceTrack(Description::Media description);
 	void iterateTracks(std::function<void(shared_ptr<Track> track)> func);
+	void iterateRemoteTracks(std::function<void(shared_ptr<Track> track)> func);
 	void openTracks();
 	void closeTracks();
 
@@ -80,6 +81,8 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
 	void processRemoteCandidate(Candidate candidate);
 	string localBundleMid() const;
 
+	bool negotiationNeeded() const;
+
 	void setMediaHandler(shared_ptr<MediaHandler> handler);
 	shared_ptr<MediaHandler> getMediaHandler();
 
@@ -115,7 +118,6 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
 	std::atomic<IceState> iceState = IceState::New;
 	std::atomic<GatheringState> gatheringState = GatheringState::New;
 	std::atomic<SignalingState> signalingState = SignalingState::Stable;
-	std::atomic<bool> negotiationNeeded = false;
 	std::atomic<bool> closing = false;
 	std::mutex signalingMutex;
 
@@ -154,12 +156,12 @@ private:
 
 	std::unordered_map<uint16_t, weak_ptr<DataChannel>> mDataChannels; // by stream ID
 	std::vector<weak_ptr<DataChannel>> mUnassignedDataChannels;
-	std::shared_mutex mDataChannelsMutex;
+	mutable std::shared_mutex mDataChannelsMutex;
 
 	std::unordered_map<string, weak_ptr<Track>> mTracks;         // by mid
 	std::unordered_map<uint32_t, weak_ptr<Track>> mTracksBySsrc; // by SSRC
 	std::vector<weak_ptr<Track>> mTrackLines;                    // by SDP order
-	std::shared_mutex mTracksMutex;
+	mutable std::shared_mutex mTracksMutex;
 
 	Queue<shared_ptr<DataChannel>> mPendingDataChannels;
 	Queue<shared_ptr<Track>> mPendingTracks;

+ 33 - 23
src/peerconnection.cpp

@@ -61,6 +61,10 @@ PeerConnection::SignalingState PeerConnection::signalingState() const {
 	return impl()->signalingState;
 }
 
+bool PeerConnection::negotiationNeeded() const {
+	return impl()->negotiationNeeded();
+}
+
 optional<Description> PeerConnection::localDescription() const {
 	return impl()->localDescription();
 }
@@ -98,12 +102,6 @@ void PeerConnection::setLocalDescription(Description::Type type, LocalDescriptio
 			type = Description::Type::Offer;
 	}
 
-	// Only a local offer resets the negotiation needed flag
-	if (type == Description::Type::Offer && !impl()->negotiationNeeded.exchange(false)) {
-		PLOG_DEBUG << "No negotiation needed";
-		return;
-	}
-
 	// Get the new signaling state
 	SignalingState newSignalingState;
 	switch (signalingState) {
@@ -151,6 +149,12 @@ void PeerConnection::setLocalDescription(Description::Type type, LocalDescriptio
 	impl()->changeSignalingState(newSignalingState);
 	signalingLock.unlock();
 
+	if (!impl()->config.disableAutoNegotiation && newSignalingState == SignalingState::Stable) {
+		// We might need to make a new offer
+		if (impl()->negotiationNeeded())
+			setLocalDescription(Description::Type::Offer);
+	}
+
 	if (impl()->gatheringState == GatheringState::New && !impl()->config.disableAutoGathering) {
 		iceTransport->gatherLocalCandidates(impl()->localBundleMid());
 	}
@@ -239,7 +243,6 @@ void PeerConnection::setRemoteDescription(Description description) {
 
 	// Candidates will be added at the end, extract them for now
 	auto remoteCandidates = description.extractCandidates();
-	auto type = description.type();
 
 	auto iceTransport = impl()->initIceTransport();
 	if (!iceTransport)
@@ -251,14 +254,26 @@ void PeerConnection::setRemoteDescription(Description description) {
 	impl()->changeSignalingState(newSignalingState);
 	signalingLock.unlock();
 
-	if (type == Description::Type::Offer) {
-		// This is an offer, we need to answer
-		if (!impl()->config.disableAutoNegotiation)
-			setLocalDescription(Description::Type::Answer);
-	}
-
 	for (const auto &candidate : remoteCandidates)
 		addRemoteCandidate(candidate);
+
+	if (!impl()->config.disableAutoNegotiation) {
+		switch (newSignalingState) {
+		case SignalingState::Stable:
+			// We might need to make a new offer
+			if (impl()->negotiationNeeded())
+				setLocalDescription(Description::Type::Offer);
+			break;
+
+		case SignalingState::HaveRemoteOffer:
+			// We need to answer
+			setLocalDescription(Description::Type::Answer);
+			break;
+
+		default:
+			break;
+		}
+	}
 }
 
 void PeerConnection::addRemoteCandidate(Candidate candidate) {
@@ -289,13 +304,11 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(string label, DataChan
 	auto channelImpl = impl()->emplaceDataChannel(std::move(label), std::move(init));
 	auto channel = std::make_shared<DataChannel>(channelImpl);
 
-	// Renegotiation is needed iff the current local description does not have application
-	auto local = impl()->localDescription();
-	if (!local || !local->hasApplication())
-		impl()->negotiationNeeded = true;
-
-	if (!impl()->config.disableAutoNegotiation)
-		setLocalDescription();
+	if (!impl()->config.disableAutoNegotiation && impl()->signalingState.load() == SignalingState::Stable) {
+		// We might need to make a new offer
+		if (impl()->negotiationNeeded())
+			setLocalDescription(Description::Type::Offer);
+	}
 
 	return channel;
 }
@@ -310,9 +323,6 @@ std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description)
 	auto trackImpl = impl()->emplaceTrack(std::move(description));
 	auto track = std::make_shared<Track>(trackImpl);
 
-	// Renegotiation is needed for the new or updated track
-	impl()->negotiationNeeded = true;
-
 	return track;
 }