Browse Source

Merge pull request #132 from paullouisageneau/prevent-user-deadlock

Close data channels  asynchronously for safety
Paul-Louis Ageneau 5 years ago
parent
commit
7090f2344b
3 changed files with 29 additions and 23 deletions
  1. 27 21
      src/peerconnection.cpp
  2. 1 1
      src/sctptransport.cpp
  3. 1 1
      src/websocket.cpp

+ 27 - 21
src/peerconnection.cpp

@@ -51,13 +51,17 @@ PeerConnection::PeerConnection(const Configuration &config)
 }
 
 PeerConnection::~PeerConnection() {
-	close();
 	PLOG_VERBOSE << "Destroying PeerConnection";
+	close();
+	mProcessor->join();
 }
 
 void PeerConnection::close() {
 	PLOG_VERBOSE << "Closing PeerConnection";
-	closeDataChannels();
+
+	// Close data channels asynchronously
+	mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
+
 	closeTransports();
 }
 
@@ -439,27 +443,29 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
 void PeerConnection::closeTransports() {
 	PLOG_VERBOSE << "Closing transports";
 
-	// Change state to sink state Closed to block init methods
+	// Change state to sink state Closed
 	changeState(State::Closed);
 
-	// Reset callbacks now that state is changed
-	resetCallbacks();
-
-	// Pass the references to a thread, allowing to terminate a transport from its own thread
-	auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
-	auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
-	auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
-	ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
-		if (sctp)
-			sctp->stop();
-		if (dtls)
-			dtls->stop();
-		if (ice)
-			ice->stop();
-
-		sctp.reset();
-		dtls.reset();
-		ice.reset();
+	// Reset callbacks after calls are processed (in particular state change to Closed)
+	mProcessor->enqueue(std::bind(&PeerConnection::resetCallbacks, this));
+
+	mProcessor->enqueue([this]() {
+		// Pass the pointers to a thread
+		auto sctp = std::atomic_exchange(&mSctpTransport, decltype(mSctpTransport)(nullptr));
+		auto dtls = std::atomic_exchange(&mDtlsTransport, decltype(mDtlsTransport)(nullptr));
+		auto ice = std::atomic_exchange(&mIceTransport, decltype(mIceTransport)(nullptr));
+		ThreadPool::Instance().enqueue([sctp, dtls, ice]() mutable {
+			if (sctp)
+				sctp->stop();
+			if (dtls)
+				dtls->stop();
+			if (ice)
+				ice->stop();
+
+			sctp.reset();
+			dtls.reset();
+			ice.reset();
+		});
 	});
 }
 

+ 1 - 1
src/sctptransport.cpp

@@ -451,7 +451,7 @@ void SctpTransport::sendReset(uint16_t streamId) {
 		mWrittenCondition.wait_for(lock, 1000ms,
 		                           [&]() { return mWritten || state() != State::Connected; });
 	} else if (errno == EINVAL) {
-		PLOG_VERBOSE << "SCTP stream " << streamId << " already reset";
+		PLOG_DEBUG << "SCTP stream " << streamId << " already reset";
 	} else {
 		PLOG_WARNING << "SCTP reset stream " << streamId << " failed, errno=" << errno;
 	}

+ 1 - 1
src/websocket.cpp

@@ -317,7 +317,7 @@ void WebSocket::closeTransports() {
 	// Reset callbacks now that state is changed
 	resetCallbacks();
 
-	// Pass the references to a thread, allowing to terminate a transport from its own thread
+	// Pass the pointers to a thread, allowing to terminate a transport from its own thread
 	auto ws = std::atomic_exchange(&mWsTransport, decltype(mWsTransport)(nullptr));
 	auto tls = std::atomic_exchange(&mTlsTransport, decltype(mTlsTransport)(nullptr));
 	auto tcp = std::atomic_exchange(&mTcpTransport, decltype(mTcpTransport)(nullptr));