Browse Source

Safer callback reset strategy for PeerConnection

Paul-Louis Ageneau 5 years ago
parent
commit
e76d933de2
2 changed files with 17 additions and 3 deletions
  1. 8 0
      include/rtc/include.hpp
  2. 9 3
      src/peerconnection.cpp

+ 8 - 0
include/rtc/include.hpp

@@ -84,9 +84,17 @@ template <typename F, typename T, typename... Args> auto weak_bind(F &&f, T *t,
 template <typename... P> class synchronized_callback {
 public:
 	synchronized_callback() = default;
+	synchronized_callback(synchronized_callback &&cb) { *this = std::move(cb); }
 	synchronized_callback(std::function<void(P...)> func) { *this = std::move(func); }
 	~synchronized_callback() { *this = nullptr; }
 
+	synchronized_callback &operator=(synchronized_callback &&cb) {
+		std::scoped_lock lock(mutex, cb.mutex);
+		callback = std::move(cb.callback);
+		cb.callback = nullptr;
+		return *this;
+	}
+
 	synchronized_callback &operator=(std::function<void(P...)> func) {
 		std::lock_guard lock(mutex);
 		callback = std::move(func);

+ 9 - 3
src/peerconnection.cpp

@@ -446,9 +446,10 @@ void PeerConnection::closeTransports() {
 	// Change state to sink state Closed
 	changeState(State::Closed);
 
-	// Reset callbacks after calls are processed (in particular state change to Closed)
-	mProcessor->enqueue(std::bind(&PeerConnection::resetCallbacks, this));
+	// Reset callbacks now that state is changed
+	resetCallbacks();
 
+	// Initiate transport stop on the processor after closing the data channels
 	mProcessor->enqueue([this]() {
 		// Pass the pointers to a thread
 		auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
@@ -654,7 +655,12 @@ bool PeerConnection::changeState(State state) {
 
 	} while (!mState.compare_exchange_weak(current, state));
 
-	mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
+	if (state == State::Closed)
+		// This is the last state change, so we may steal the callback
+		mProcessor->enqueue([this, cb = std::move(mStateChangeCallback)]() { cb(State::Closed); });
+	else
+		mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
+
 	return true;
 }