Browse Source

Enforce proper callback order between open and message

Paul-Louis Ageneau 4 years ago
parent
commit
d86de619dc
4 changed files with 35 additions and 16 deletions
  1. 1 4
      src/channel.cpp
  2. 25 8
      src/impl/channel.cpp
  3. 6 1
      src/impl/channel.hpp
  4. 3 3
      src/impl/peerconnection.cpp

+ 1 - 4
src/channel.cpp

@@ -43,10 +43,7 @@ void Channel::onError(std::function<void(string error)> callback) {
 
 
 void Channel::onMessage(std::function<void(message_variant data)> callback) {
 void Channel::onMessage(std::function<void(message_variant data)> callback) {
 	impl()->messageCallback = callback;
 	impl()->messageCallback = callback;
-
-	// Pass pending messages
-	while (auto message = receive())
-		impl()->messageCallback(*message);
+	impl()->flushPendingMessages();
 }
 }
 
 
 void Channel::onMessage(std::function<void(binary data)> binaryCallback,
 void Channel::onMessage(std::function<void(binary data)> binaryCallback,

+ 25 - 8
src/impl/channel.cpp

@@ -20,22 +20,21 @@
 
 
 namespace rtc::impl {
 namespace rtc::impl {
 
 
-void Channel::triggerOpen() { openCallback(); }
+void Channel::triggerOpen() {
+	mOpenTriggered = true;
+	openCallback();
+	flushPendingMessages();
+}
 
 
 void Channel::triggerClosed() { closedCallback(); }
 void Channel::triggerClosed() { closedCallback(); }
 
 
-void Channel::triggerError(string error) { errorCallback(error); }
+void Channel::triggerError(string error) { errorCallback(std::move(error)); }
 
 
 void Channel::triggerAvailable(size_t count) {
 void Channel::triggerAvailable(size_t count) {
 	if (count == 1)
 	if (count == 1)
 		availableCallback();
 		availableCallback();
 
 
-	while (messageCallback && count--) {
-		auto message = receive();
-		if (!message)
-			break;
-		messageCallback(*message);
-	}
+	flushPendingMessages();
 }
 }
 
 
 void Channel::triggerBufferedAmount(size_t amount) {
 void Channel::triggerBufferedAmount(size_t amount) {
@@ -45,6 +44,24 @@ void Channel::triggerBufferedAmount(size_t amount) {
 		bufferedAmountLowCallback();
 		bufferedAmountLowCallback();
 }
 }
 
 
+void Channel::flushPendingMessages() {
+	if (!mOpenTriggered)
+		return;
+
+	while (messageCallback) {
+		auto next = receive();
+		if (!next)
+			break;
+
+		messageCallback(*next);
+	}
+}
+
+void Channel::resetOpenCallback() {
+	mOpenTriggered = false;
+	openCallback = nullptr;
+}
+
 void Channel::resetCallbacks() {
 void Channel::resetCallbacks() {
 	openCallback = nullptr;
 	openCallback = nullptr;
 	closedCallback = nullptr;
 	closedCallback = nullptr;

+ 6 - 1
src/impl/channel.hpp

@@ -38,7 +38,9 @@ struct Channel {
 	virtual void triggerAvailable(size_t count);
 	virtual void triggerAvailable(size_t count);
 	virtual void triggerBufferedAmount(size_t amount);
 	virtual void triggerBufferedAmount(size_t amount);
 
 
-	virtual void resetCallbacks();
+	void flushPendingMessages();
+	void resetOpenCallback();
+	void resetCallbacks();
 
 
 	synchronized_callback<> openCallback;
 	synchronized_callback<> openCallback;
 	synchronized_callback<> closedCallback;
 	synchronized_callback<> closedCallback;
@@ -49,6 +51,9 @@ struct Channel {
 
 
 	std::atomic<size_t> bufferedAmount = 0;
 	std::atomic<size_t> bufferedAmount = 0;
 	std::atomic<size_t> bufferedAmountLowThreshold = 0;
 	std::atomic<size_t> bufferedAmountLowThreshold = 0;
+
+private:
+	std::atomic<bool> mOpenTriggered = false;
 };
 };
 
 
 } // namespace rtc::impl
 } // namespace rtc::impl

+ 3 - 3
src/impl/peerconnection.cpp

@@ -974,7 +974,7 @@ string PeerConnection::localBundleMid() const {
 void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 	auto dataChannel = weakDataChannel.lock();
 	auto dataChannel = weakDataChannel.lock();
 	if (dataChannel) {
 	if (dataChannel) {
-		dataChannel->openCallback = nullptr; // might be set internally
+		dataChannel->resetOpenCallback(); // might be set internally
 		mPendingDataChannels.push(std::move(dataChannel));
 		mPendingDataChannels.push(std::move(dataChannel));
 	}
 	}
 	triggerPendingDataChannels();
 	triggerPendingDataChannels();
@@ -983,7 +983,7 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
 void PeerConnection::triggerTrack(weak_ptr<Track> weakTrack) {
 	auto track = weakTrack.lock();
 	auto track = weakTrack.lock();
 	if (track) {
 	if (track) {
-		track->openCallback = nullptr; // might be set internally
+		track->resetOpenCallback(); // might be set internally
 		mPendingTracks.push(std::move(track));
 		mPendingTracks.push(std::move(track));
 	}
 	}
 	triggerPendingTracks();
 	triggerPendingTracks();
@@ -1008,7 +1008,7 @@ void PeerConnection::triggerPendingTracks() {
 			break;
 			break;
 
 
 		auto impl = std::move(*next);
 		auto impl = std::move(*next);
-		trackCallback(std::make_shared<rtc::Track>(std::move(impl)));
+		trackCallback(std::make_shared<rtc::Track>(impl));
 		impl->triggerOpen();
 		impl->triggerOpen();
 	}
 	}
 }
 }