Browse Source

Merge pull request #600 from paullouisageneau/fix-processor-shared-ptr

Fix deadlock on deletion from state callback in C API
Paul-Louis Ageneau 3 năm trước cách đây
mục cha
commit
731f68a23d

+ 0 - 4
include/rtc/utils.hpp

@@ -96,10 +96,6 @@ public:
 		return callback ? true : false;
 	}
 
-	std::function<void(Args...)> wrap() const {
-		return [this](Args... args) { (*this)(std::move(args)...); };
-	}
-
 protected:
 	virtual void set(std::function<void(Args...)> func) { callback = std::move(func); }
 	virtual bool call(Args... args) const {

+ 30 - 24
src/impl/peerconnection.cpp

@@ -25,7 +25,6 @@
 #include "internals.hpp"
 #include "logcounter.hpp"
 #include "peerconnection.hpp"
-#include "processor.hpp"
 #include "rtp.hpp"
 #include "sctptransport.hpp"
 #include "threadpool.hpp"
@@ -54,8 +53,7 @@ static LogCounter
                                 "Number of unknown RTCP packet types over past second");
 
 PeerConnection::PeerConnection(Configuration config_)
-    : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)),
-      mProcessor(std::make_unique<Processor>()) {
+    : config(std::move(config_)), mCertificate(make_certificate(config.certificateType)) {
 	PLOG_VERBOSE << "Creating PeerConnection";
 
 	if (config.portRangeEnd && config.portRangeBegin > config.portRangeEnd)
@@ -75,7 +73,7 @@ PeerConnection::PeerConnection(Configuration config_)
 
 PeerConnection::~PeerConnection() {
 	PLOG_VERBOSE << "Destroying PeerConnection";
-	mProcessor->join();
+	mProcessor.join();
 }
 
 void PeerConnection::close() {
@@ -84,8 +82,8 @@ void PeerConnection::close() {
 	negotiationNeeded = false;
 
 	// Close data channels and tracks asynchronously
-	mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
-	mProcessor->enqueue(&PeerConnection::closeTracks, this);
+	mProcessor.enqueue(&PeerConnection::closeDataChannels, shared_from_this());
+	mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
 
 	closeTransports();
 }
@@ -222,15 +220,15 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
 				    else
 					    changeState(State::Connected);
 
-				    mProcessor->enqueue(&PeerConnection::openTracks, this);
+				    mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
 				    break;
 			    case DtlsTransport::State::Failed:
 				    changeState(State::Failed);
-				    mProcessor->enqueue(&PeerConnection::closeTracks, this);
+				    mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
 				    break;
 			    case DtlsTransport::State::Disconnected:
 				    changeState(State::Disconnected);
-				    mProcessor->enqueue(&PeerConnection::closeTracks, this);
+				    mProcessor.enqueue(&PeerConnection::closeTracks, shared_from_this());
 				    break;
 			    default:
 				    // Ignore
@@ -304,16 +302,18 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
 			    switch (transportState) {
 			    case SctpTransport::State::Connected:
 				    changeState(State::Connected);
-				    mProcessor->enqueue(&PeerConnection::openDataChannels, this);
+				    mProcessor.enqueue(&PeerConnection::openDataChannels, shared_from_this());
 				    break;
 			    case SctpTransport::State::Failed:
 				    LOG_WARNING << "SCTP transport failed";
 				    changeState(State::Failed);
-				    mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
+				    mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels,
+				                       shared_from_this());
 				    break;
 			    case SctpTransport::State::Disconnected:
 				    changeState(State::Disconnected);
-				    mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
+				    mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels,
+				                       shared_from_this());
 				    break;
 			    default:
 				    // Ignore
@@ -370,7 +370,7 @@ void PeerConnection::closeTransports() {
 			t->onStateChange(nullptr);
 
 	// Initiate transport stop on the processor after closing the data channels
-	mProcessor->enqueue([transports = std::move(transports)]() {
+	mProcessor.enqueue([self = shared_from_this(), transports = std::move(transports)]() {
 		ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
 			for (const auto &t : transports)
 				if (t)
@@ -956,12 +956,13 @@ void PeerConnection::processLocalDescription(Description description) {
 		mLocalDescription->addCandidates(std::move(existingCandidates));
 	}
 
-	mProcessor->enqueue(localDescriptionCallback.wrap(), std::move(description));
+	mProcessor.enqueue(&PeerConnection::trigger<Description>, shared_from_this(),
+	                   localDescriptionCallback, std::move(description));
 
 	// Reciprocated tracks might need to be open
 	if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
 	    dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
-		mProcessor->enqueue(&PeerConnection::openTracks, this);
+		mProcessor.enqueue(&PeerConnection::openTracks, shared_from_this());
 }
 
 void PeerConnection::processLocalCandidate(Candidate candidate) {
@@ -980,7 +981,8 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
 	candidate.resolve(Candidate::ResolveMode::Simple);
 	mLocalDescription->addCandidate(candidate);
 
-	mProcessor->enqueue(localCandidateCallback.wrap(), std::move(candidate));
+	mProcessor.enqueue(&PeerConnection::trigger<Candidate>, shared_from_this(),
+	                   localCandidateCallback, std::move(candidate));
 }
 
 void PeerConnection::processRemoteDescription(Description description) {
@@ -1013,7 +1015,7 @@ void PeerConnection::processRemoteDescription(Description description) {
 		    dtlsTransport->state() == Transport::State::Connected)
 			initSctpTransport();
 	} else {
-		mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
+		mProcessor.enqueue(&PeerConnection::remoteCloseDataChannels, shared_from_this());
 	}
 }
 
@@ -1102,11 +1104,11 @@ void PeerConnection::triggerPendingTracks() {
 }
 
 void PeerConnection::flushPendingDataChannels() {
-	mProcessor->enqueue(&PeerConnection::triggerPendingDataChannels, this);
+	mProcessor.enqueue(&PeerConnection::triggerPendingDataChannels, shared_from_this());
 }
 
 void PeerConnection::flushPendingTracks() {
-	mProcessor->enqueue(&PeerConnection::triggerPendingTracks, this);
+	mProcessor.enqueue(&PeerConnection::triggerPendingTracks, shared_from_this());
 }
 
 bool PeerConnection::changeState(State newState) {
@@ -1125,10 +1127,10 @@ bool PeerConnection::changeState(State newState) {
 	PLOG_INFO << "Changed state to " << s.str();
 
 	if (newState == State::Closed)
-		// This is the last state change, so we may steal the callback
-		mProcessor->enqueue([cb = std::move(stateChangeCallback)]() { cb(State::Closed); });
+		stateChangeCallback(State::Closed); // synchronous
 	else
-		mProcessor->enqueue(stateChangeCallback.wrap(), newState);
+		mProcessor.enqueue(&PeerConnection::trigger<State>, shared_from_this(), stateChangeCallback,
+		                   newState);
 
 	return true;
 }
@@ -1140,7 +1142,9 @@ bool PeerConnection::changeGatheringState(GatheringState newState) {
 	std::ostringstream s;
 	s << newState;
 	PLOG_INFO << "Changed gathering state to " << s.str();
-	mProcessor->enqueue(gatheringStateChangeCallback.wrap(), newState);
+	mProcessor.enqueue(&PeerConnection::trigger<GatheringState>, shared_from_this(),
+	                   gatheringStateChangeCallback, newState);
+
 	return true;
 }
 
@@ -1151,7 +1155,9 @@ bool PeerConnection::changeSignalingState(SignalingState newState) {
 	std::ostringstream s;
 	s << newState;
 	PLOG_INFO << "Changed signaling state to " << s.str();
-	mProcessor->enqueue(signalingStateChangeCallback.wrap(), newState);
+	mProcessor.enqueue(&PeerConnection::trigger<SignalingState>, shared_from_this(),
+	                   signalingStateChangeCallback, newState);
+
 	return true;
 }
 

+ 6 - 2
src/impl/peerconnection.hpp

@@ -23,6 +23,7 @@
 #include "datachannel.hpp"
 #include "dtlstransport.hpp"
 #include "icetransport.hpp"
+#include "processor.hpp"
 #include "sctptransport.hpp"
 #include "track.hpp"
 
@@ -101,7 +102,10 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
 
 	void resetCallbacks();
 
-	void outgoingMedia(message_ptr message);
+	// Helper method for asynchronous callback invocation
+	template <typename... Args> void trigger(synchronized_callback<Args...> &cb, Args... args) {
+		cb(std::move(args...));
+	}
 
 	const Configuration config;
 	std::atomic<State> state = State::New;
@@ -121,8 +125,8 @@ struct PeerConnection : std::enable_shared_from_this<PeerConnection> {
 private:
 	const init_token mInitToken = Init::Instance().token();
 	const future_certificate_ptr mCertificate;
-	const unique_ptr<Processor> mProcessor;
 
+	Processor mProcessor;
 	optional<Description> mLocalDescription, mRemoteDescription;
 	optional<Description> mCurrentLocalDescription;
 	mutable std::mutex mLocalDescriptionMutex, mRemoteDescriptionMutex;

+ 3 - 3
src/impl/sctptransport.cpp

@@ -447,7 +447,7 @@ void SctpTransport::closeStream(unsigned int stream) {
 	mSendQueue.push(make_message(0, Message::Reset, to_uint16(stream)));
 
 	// This method must not call the buffered callback synchronously
-	mProcessor.enqueue(&SctpTransport::flush, this);
+	mProcessor.enqueue(&SctpTransport::flush, shared_from_this());
 }
 
 void SctpTransport::incoming(message_ptr message) {
@@ -702,12 +702,12 @@ void SctpTransport::handleUpcall() {
 
 	if (events & SCTP_EVENT_READ && mPendingRecvCount == 0) {
 		++mPendingRecvCount;
-		mProcessor.enqueue(&SctpTransport::doRecv, this);
+		mProcessor.enqueue(&SctpTransport::doRecv, shared_from_this());
 	}
 
 	if (events & SCTP_EVENT_WRITE && mPendingFlushCount == 0) {
 		++mPendingFlushCount;
-		mProcessor.enqueue(&SctpTransport::doFlush, this);
+		mProcessor.enqueue(&SctpTransport::doFlush, shared_from_this());
 	}
 }
 

+ 1 - 1
src/impl/sctptransport.hpp

@@ -35,7 +35,7 @@
 
 namespace rtc::impl {
 
-class SctpTransport final : public Transport {
+class SctpTransport final : public Transport, public std::enable_shared_from_this<SctpTransport> {
 public:
 	static void Init();
 	static void SetSettings(const SctpSettings &s);

+ 0 - 1
src/impl/websocket.cpp

@@ -48,7 +48,6 @@ WebSocket::WebSocket(optional<Configuration> optConfig, certificate_ptr certific
 
 WebSocket::~WebSocket() {
 	PLOG_VERBOSE << "Destroying WebSocket";
-	remoteClose();
 }
 
 void WebSocket::open(const string &url) {