Browse Source

Refactored processor enqueueing

Paul-Louis Ageneau 4 years ago
parent
commit
91a854aa5b
2 changed files with 26 additions and 27 deletions
  1. 10 6
      include/rtc/include.hpp
  2. 16 21
      src/peerconnection.cpp

+ 10 - 6
include/rtc/include.hpp

@@ -102,12 +102,12 @@ private:
 	std::function<void()> function;
 };
 
-template <typename... P> class synchronized_callback {
+template <typename... Args> class synchronized_callback {
 public:
 	synchronized_callback() = default;
 	synchronized_callback(synchronized_callback &&cb) { *this = std::move(cb); }
 	synchronized_callback(const synchronized_callback &cb) { *this = cb; }
-	synchronized_callback(std::function<void(P...)> func) { *this = std::move(func); }
+	synchronized_callback(std::function<void(Args...)> func) { *this = std::move(func); }
 	~synchronized_callback() { *this = nullptr; }
 
 	synchronized_callback &operator=(synchronized_callback &&cb) {
@@ -123,16 +123,16 @@ public:
 		return *this;
 	}
 
-	synchronized_callback &operator=(std::function<void(P...)> func) {
+	synchronized_callback &operator=(std::function<void(Args...)> func) {
 		std::lock_guard lock(mutex);
 		callback = std::move(func);
 		return *this;
 	}
 
-	void operator()(P... args) const {
+	void operator()(Args... args) const {
 		std::lock_guard lock(mutex);
 		if (callback)
-			callback(args...);
+			callback(std::move(args)...);
 	}
 
 	operator bool() const {
@@ -140,8 +140,12 @@ public:
 		return callback ? true : false;
 	}
 
+	std::function<void(Args...)> wrap() const {
+		return [this](Args... args) { (*this)(std::move(args)...); };
+	}
+
 private:
-	std::function<void(P...)> callback;
+	std::function<void(Args...)> callback;
 	mutable std::recursive_mutex mutex;
 };
 } // namespace rtc

+ 16 - 21
src/peerconnection.cpp

@@ -77,7 +77,7 @@ void PeerConnection::close() {
 	mNegotiationNeeded = false;
 
 	// Close data channels asynchronously
-	mProcessor->enqueue(std::bind(&PeerConnection::closeDataChannels, this));
+	mProcessor->enqueue(&PeerConnection::closeDataChannels, this);
 
 	closeTransports();
 }
@@ -490,7 +490,7 @@ shared_ptr<DtlsTransport> PeerConnection::initDtlsTransport() {
 				else
 					changeState(State::Connected);
 
-				mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this));
+				mProcessor->enqueue(&PeerConnection::openTracks, this);
 				break;
 			case DtlsTransport::State::Failed:
 				changeState(State::Failed);
@@ -561,16 +561,16 @@ shared_ptr<SctpTransport> PeerConnection::initSctpTransport() {
 			    switch (state) {
 			    case SctpTransport::State::Connected:
 				    changeState(State::Connected);
-				    mProcessor->enqueue(std::bind(&PeerConnection::openDataChannels, this));
+				    mProcessor->enqueue(&PeerConnection::openDataChannels, this);
 				    break;
 			    case SctpTransport::State::Failed:
 				    LOG_WARNING << "SCTP transport failed";
 				    changeState(State::Failed);
-				    mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
+				    mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
 				    break;
 			    case SctpTransport::State::Disconnected:
 				    changeState(State::Disconnected);
-				    mProcessor->enqueue(std::bind(&PeerConnection::remoteCloseDataChannels, this));
+				    mProcessor->enqueue(&PeerConnection::remoteCloseDataChannels, this);
 				    break;
 			    default:
 				    // Ignore
@@ -1069,19 +1069,17 @@ void PeerConnection::processLocalDescription(Description description) {
 			mCurrentLocalDescription.emplace(std::move(*mLocalDescription));
 		}
 
-		mLocalDescription.emplace(std::move(description));
+		mLocalDescription.emplace(description);
 		mLocalDescription->addCandidates(std::move(existingCandidates));
 	}
 
-	mProcessor->enqueue([this, description = *mLocalDescription]() {
-		PLOG_VERBOSE << "Issuing local description: " << description;
-		mLocalDescriptionCallback(std::move(description));
-	});
+	PLOG_VERBOSE << "Issuing local description: " << description;
+	mProcessor->enqueue(mLocalDescriptionCallback.wrap(), std::move(description));
 
 	// Reciprocated tracks might need to be open
 	if (auto dtlsTransport = std::atomic_load(&mDtlsTransport);
 	    dtlsTransport && dtlsTransport->state() == Transport::State::Connected)
-		mProcessor->enqueue(std::bind(&PeerConnection::openTracks, this));
+		mProcessor->enqueue(&PeerConnection::openTracks, this);
 }
 
 void PeerConnection::processLocalCandidate(Candidate candidate) {
@@ -1092,10 +1090,8 @@ void PeerConnection::processLocalCandidate(Candidate candidate) {
 	candidate.resolve(Candidate::ResolveMode::Simple); // for proper SDP generation later
 	mLocalDescription->addCandidate(candidate);
 
-	mProcessor->enqueue([this, candidate = std::move(candidate)]() {
-		PLOG_VERBOSE << "Issuing local candidate: " << candidate;
-		mLocalCandidateCallback(std::move(candidate));
-	});
+	PLOG_VERBOSE << "Issuing local candidate: " << candidate;
+	mProcessor->enqueue(mLocalCandidateCallback.wrap(), std::move(candidate));
 }
 
 void PeerConnection::processRemoteDescription(Description description) {
@@ -1150,12 +1146,11 @@ void PeerConnection::triggerDataChannel(weak_ptr<DataChannel> weakDataChannel) {
 	if (!dataChannel)
 		return;
 
-	mProcessor->enqueue(
-	    [this, dataChannel = std::move(dataChannel)]() { mDataChannelCallback(dataChannel); });
+	mProcessor->enqueue(mDataChannelCallback.wrap(), std::move(dataChannel));
 }
 
 void PeerConnection::triggerTrack(std::shared_ptr<Track> track) {
-	mProcessor->enqueue([this, track = std::move(track)]() { mTrackCallback(track); });
+	mProcessor->enqueue(mTrackCallback.wrap(), std::move(track));
 }
 
 bool PeerConnection::changeState(State state) {
@@ -1177,7 +1172,7 @@ bool PeerConnection::changeState(State state) {
 		// This is the last state change, so we may steal the callback
 		mProcessor->enqueue([cb = std::move(mStateChangeCallback)]() { cb(State::Closed); });
 	else
-		mProcessor->enqueue([this, state]() { mStateChangeCallback(state); });
+		mProcessor->enqueue(mStateChangeCallback.wrap(), state);
 
 	return true;
 }
@@ -1189,7 +1184,7 @@ bool PeerConnection::changeGatheringState(GatheringState state) {
 	std::ostringstream s;
 	s << state;
 	PLOG_INFO << "Changed gathering state to " << s.str();
-	mProcessor->enqueue([this, state] { mGatheringStateChangeCallback(state); });
+	mProcessor->enqueue(mGatheringStateChangeCallback.wrap(), state);
 	return true;
 }
 
@@ -1200,7 +1195,7 @@ bool PeerConnection::changeSignalingState(SignalingState state) {
 	std::ostringstream s;
 	s << state;
 	PLOG_INFO << "Changed signaling state to " << s.str();
-	mProcessor->enqueue([this, state] { mSignalingStateChangeCallback(state); });
+	mProcessor->enqueue(mSignalingStateChangeCallback.wrap(), state);
 	return true;
 }