Browse Source

Added pending queue for incoming DataChannels and Tracks

Paul-Louis Ageneau 4 years ago
parent
commit
b2d9332fcd
4 changed files with 38 additions and 12 deletions
  1. 6 3
      include/rtc/utils.hpp
  2. 24 7
      src/impl/peerconnection.cpp
  3. 6 2
      src/impl/peerconnection.hpp
  4. 2 0
      src/peerconnection.cpp

+ 6 - 3
include/rtc/utils.hpp

@@ -85,10 +85,13 @@ public:
 		return *this;
 	}
 
-	void operator()(Args... args) const {
+	bool operator()(Args... args) const {
 		std::lock_guard lock(mutex);
-		if (callback)
-			callback(std::move(args)...);
+		if (!callback)
+			return false;
+
+		callback(std::move(args)...);
+		return true;
 	}
 
 	operator bool() const {

+ 24 - 7
src/impl/peerconnection.cpp

@@ -730,7 +730,8 @@ void PeerConnection::validateRemoteDescription(const Description &description) {
 
 void PeerConnection::processLocalDescription(Description description) {
 	const size_t localSctpPort = DEFAULT_SCTP_PORT;
-	const size_t localMaxMessageSize = config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
+	const size_t localMaxMessageSize =
+	    config.maxMessageSize.value_or(DEFAULT_LOCAL_MAX_MESSAGE_SIZE);
 
 	// Clean up the application entry the ICE transport might have added already (libnice)
 	description.clearMedia();
@@ -959,15 +960,31 @@ string PeerConnection::localBundleMid() const {
 
 void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 	auto dataChannel = weakDataChannel.lock();
-	if (!dataChannel)
-		return;
+	if (dataChannel)
+		mPendingDataChannels.push(std::move(dataChannel));
+
+	while (dataChannelCallback) {
+		auto next = mPendingDataChannels.tryPop();
+		if (!next)
+			break;
 
-	mProcessor->enqueue(dataChannelCallback.wrap(),
-	                    std::make_shared<rtc::DataChannel>(std::move(dataChannel)));
+		mProcessor->enqueue(dataChannelCallback.wrap(),
+		                    std::make_shared<rtc::DataChannel>(std::move(*next)));
+	}
 }
 
-void PeerConnection::triggerTrack(shared_ptr<Track> track) {
-	mProcessor->enqueue(trackCallback.wrap(), std::make_shared<rtc::Track>(std::move(track)));
+void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
+	auto track = weakTrack.lock();
+	if (track)
+		mPendingTracks.push(std::move(track));
+
+	while (trackCallback) {
+		auto next = mPendingTracks.tryPop();
+		if (!next)
+			break;
+
+		mProcessor->enqueue(trackCallback.wrap(), std::make_shared<rtc::Track>(std::move(*next)));
+	}
 }
 
 bool PeerConnection::changeState(State newState) {

+ 6 - 2
src/impl/peerconnection.hpp

@@ -85,8 +85,9 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
 	void processRemoteCandidate(Candidate candidate);
 	string localBundleMid() const;
 
-	void triggerDataChannel(weak_ptr<DataChannel> weakDataChannel);
-	void triggerTrack(shared_ptr<Track> track);
+	void triggerDataChannel(weak_ptr<DataChannel> weakDataChannel = {});
+	void triggerTrack(weak_ptr<Track> weakTrack = {});
+
 	bool changeState(State newState);
 	bool changeGatheringState(GatheringState newState);
 	bool changeSignalingState(SignalingState newState);
@@ -127,6 +128,9 @@ private:
 	std::vector<weak_ptr<Track>> mTrackLines;                          // by SDP order
 	std::shared_mutex mDataChannelsMutex, mTracksMutex;
 
+	Queue<shared_ptr<DataChannel>> mPendingDataChannels;
+	Queue<shared_ptr<Track>> mPendingTracks;
+
 	std::unordered_map<uint32_t, string> mMidFromSsrc; // cache
 };
 

+ 2 - 0
src/peerconnection.cpp

@@ -278,6 +278,7 @@ shared_ptr<DataChannel> PeerConnection::createDataChannel(string label, DataChan
 void PeerConnection::onDataChannel(
     std::function<void(shared_ptr<DataChannel> dataChannel)> callback) {
 	impl()->dataChannelCallback = callback;
+	impl()->triggerDataChannel(); // trigger pending DataChannels
 }
 
 std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description) {
@@ -292,6 +293,7 @@ std::shared_ptr<Track> PeerConnection::addTrack(Description::Media description)
 
 void PeerConnection::onTrack(std::function<void(std::shared_ptr<Track>)> callback) {
 	impl()->trackCallback = callback;
+	impl()->triggerTrack(); // trigger pending tracks
 }
 
 void PeerConnection::onLocalDescription(std::function<void(Description description)> callback) {