Browse Source

Refactor the this + weak_this pattern in other places

Paul-Louis Ageneau 3 weeks ago
parent
commit
a0912c0d88
4 changed files with 38 additions and 43 deletions
  1. 1 0
      include/rtc/pacinghandler.hpp
  2. 5 5
      src/impl/track.cpp
  3. 6 7
      src/impl/wstransport.cpp
  4. 26 31
      src/pacinghandler.cpp

+ 1 - 0
include/rtc/pacinghandler.hpp

@@ -40,6 +40,7 @@ private:
 	std::queue<message_ptr> mRtpBuffer;
 
 	void schedule(const message_callback &send);
+	void run(const message_callback &send);
 };
 
 } // namespace rtc

+ 5 - 5
src/impl/track.cpp

@@ -73,7 +73,7 @@ void Track::close() {
 		triggerClosed();
 		setMediaHandler(nullptr);
 		resetCallbacks();
-	}		
+	}
 }
 
 message_variant Track::trackMessageToVariant(message_ptr message) {
@@ -144,9 +144,9 @@ void Track::incoming(message_ptr message) {
 	message_vector messages{std::move(message)};
 	if (auto handler = getMediaHandler()) {
 		try {
-			handler->incomingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
+			handler->incomingChain(messages, [weak_this = weak_from_this()](message_ptr m) {
 				if (auto locked = weak_this.lock()) {
-					transportSend(m);
+					locked->transportSend(m);
 				}
 			});
 		} catch (const std::exception &e) {
@@ -186,9 +186,9 @@ bool Track::outgoing(message_ptr message) {
 
 	if (handler) {
 		message_vector messages{std::move(message)};
-		handler->outgoingChain(messages, [this, weak_this = weak_from_this()](message_ptr m) {
+		handler->outgoingChain(messages, [weak_this = weak_from_this()](message_ptr m) {
 			if (auto locked = weak_this.lock()) {
-				transportSend(m);
+				locked->transportSend(m);
 			}
 		});
 

+ 6 - 7
src/impl/wstransport.cpp

@@ -102,13 +102,12 @@ void WsTransport::close() {
 		return;
 	}
 
-	ThreadPool::Instance().schedule(std::chrono::seconds(10),
-	                                [this, weak_this = weak_from_this()]() {
-		                                if (auto shared_this = weak_this.lock()) {
-			                                PLOG_DEBUG << "WebSocket close timeout";
-			                                changeState(State::Disconnected);
-		                                }
-	                                });
+	ThreadPool::Instance().schedule(std::chrono::seconds(10), [weak_this = weak_from_this()]() {
+		if (auto locked = weak_this.lock()) {
+			PLOG_DEBUG << "WebSocket close timeout";
+			locked->changeState(State::Disconnected);
+		}
+	});
 }
 
 void WsTransport::incoming(message_ptr message) {

+ 26 - 31
src/pacinghandler.cpp

@@ -18,41 +18,36 @@
 namespace rtc {
 
 PacingHandler::PacingHandler(double bitsPerSecond, std::chrono::milliseconds sendInterval)
-    : mBytesPerSecond(bitsPerSecond / 8), mBudget(0.), mSendInterval(sendInterval){};
+    : mBytesPerSecond(bitsPerSecond / 8), mBudget(0.), mSendInterval(sendInterval) {};
 
 void PacingHandler::schedule(const message_callback &send) {
-	if (mHaveScheduled.exchange(true)) {
-		return;
+	if (!mHaveScheduled.exchange(true))
+		impl::ThreadPool::Instance().schedule(mSendInterval,
+		                                      weak_bind(&PacingHandler::run, this, send));
+}
+
+void PacingHandler::run(const message_callback &send) {
+	const std::lock_guard<std::mutex> lock(mMutex);
+	mHaveScheduled.store(false);
+
+	// Update the budget and cap it
+	auto now = std::chrono::high_resolution_clock::now();
+	auto newBudget = std::chrono::duration<double>(now - mLastRun).count() * mBytesPerSecond;
+	auto maxBudget = std::chrono::duration<double>(mSendInterval).count() * mBytesPerSecond;
+	mBudget = std::min(mBudget + newBudget, maxBudget);
+	mLastRun = std::chrono::high_resolution_clock::now();
+
+	// Send packets while there is budget, allow a single partial packet over budget
+	while (!mRtpBuffer.empty() && mBudget > 0) {
+		auto size = int(mRtpBuffer.front()->size());
+		send(std::move(mRtpBuffer.front()));
+		mRtpBuffer.pop();
+		mBudget -= size;
 	}
 
-	impl::ThreadPool::Instance().schedule(mSendInterval, [this, weak_this = weak_from_this(),
-	                                                      send]() {
-		if (auto locked = weak_this.lock()) {
-			const std::lock_guard<std::mutex> lock(mMutex);
-			mHaveScheduled.store(false);
-
-			// Update the budget and cap it
-			auto newBudget =
-			    std::chrono::duration<double>(std::chrono::high_resolution_clock::now() - mLastRun)
-			        .count() *
-			    mBytesPerSecond;
-			auto maxBudget = std::chrono::duration<double>(mSendInterval).count() * mBytesPerSecond;
-			mBudget = std::min(mBudget + newBudget, maxBudget);
-			mLastRun = std::chrono::high_resolution_clock::now();
-
-			// Send packets while there is budget, allow a single partial packet over budget
-			while (!mRtpBuffer.empty() && mBudget > 0) {
-				auto size = int(mRtpBuffer.front()->size());
-				send(std::move(mRtpBuffer.front()));
-				mRtpBuffer.pop();
-				mBudget -= size;
-			}
-
-			if (!mRtpBuffer.empty()) {
-				schedule(send);
-			}
-		}
-	});
+	if (!mRtpBuffer.empty()) {
+		schedule(send);
+	}
 }
 
 void PacingHandler::outgoing(message_vector &messages, const message_callback &send) {