Browse Source

Added global processor to tear down connections sequentially

Paul-Louis Ageneau 3 years ago
parent
commit
c1027f4a87

+ 10 - 9
src/impl/peerconnection.cpp

@@ -25,9 +25,9 @@
 #include "internals.hpp"
 #include "logcounter.hpp"
 #include "peerconnection.hpp"
+#include "processor.hpp"
 #include "rtp.hpp"
 #include "sctptransport.hpp"
-#include "threadpool.hpp"
 
 #if RTC_ENABLE_MEDIA
 #include "dtlssrtptransport.hpp"
@@ -371,14 +371,15 @@ void PeerConnection::closeTransports() {
 
 	// Initiate transport stop on the processor after closing the data channels
 	mProcessor.enqueue([self = shared_from_this(), transports = std::move(transports)]() {
-		ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
-			for (const auto &t : transports)
-				if (t)
-					t->stop();
-
-			for (auto &t : transports)
-				t.reset();
-		});
+		TearDownProcessor::Instance().enqueue(
+		    [transports = std::move(transports), token = Init::Instance().token()]() mutable {
+			    for (const auto &t : transports)
+				    if (t)
+					    t->stop();
+
+			    for (auto &t : transports)
+				    t.reset();
+		    });
 	});
 }
 

+ 1 - 0
src/impl/peerconnection.hpp

@@ -23,6 +23,7 @@
 #include "datachannel.hpp"
 #include "dtlstransport.hpp"
 #include "icetransport.hpp"
+#include "init.hpp"
 #include "processor.hpp"
 #include "sctptransport.hpp"
 #include "track.hpp"

+ 9 - 0
src/impl/processor.cpp

@@ -40,4 +40,13 @@ void Processor::schedule() {
 	}
 }
 
+TearDownProcessor &TearDownProcessor::Instance() {
+	static TearDownProcessor *instance = new TearDownProcessor;
+	return *instance;
+}
+
+TearDownProcessor::TearDownProcessor() {}
+
+TearDownProcessor::~TearDownProcessor() {}
+
 } // namespace rtc::impl

+ 12 - 7
src/impl/processor.hpp

@@ -20,7 +20,6 @@
 #define RTC_IMPL_PROCESSOR_H
 
 #include "common.hpp"
-#include "init.hpp"
 #include "queue.hpp"
 #include "threadpool.hpp"
 
@@ -33,10 +32,10 @@
 namespace rtc::impl {
 
 // Processed tasks in order by delegating them to the thread pool
-class Processor final {
+class Processor {
 public:
 	Processor(size_t limit = 0);
-	~Processor();
+	virtual ~Processor();
 
 	Processor(const Processor &) = delete;
 	Processor &operator=(const Processor &) = delete;
@@ -47,12 +46,9 @@ public:
 
 	template <class F, class... Args> void enqueue(F &&f, Args &&...args);
 
-protected:
+private:
 	void schedule();
 
-	// Keep an init token
-	const init_token mInitToken = Init::Instance().token();
-
 	Queue<std::function<void()>> mTasks;
 	bool mPending = false; // true iff a task is pending in the thread pool
 
@@ -60,6 +56,15 @@ protected:
 	std::condition_variable mCondition;
 };
 
+class TearDownProcessor final : public Processor {
+public:
+	static TearDownProcessor &Instance();
+
+private:
+	TearDownProcessor();
+	~TearDownProcessor();
+};
+
 template <class F, class... Args> void Processor::enqueue(F &&f, Args &&...args) {
 	std::unique_lock lock(mMutex);
 	auto bound = std::bind(std::forward<F>(f), std::forward<Args>(args)...);

+ 9 - 8
src/impl/websocket.cpp

@@ -21,7 +21,7 @@
 #include "websocket.hpp"
 #include "common.hpp"
 #include "internals.hpp"
-#include "threadpool.hpp"
+#include "processor.hpp"
 #include "utils.hpp"
 
 #include "tcptransport.hpp"
@@ -408,14 +408,15 @@ void WebSocket::closeTransports() {
 		if (t)
 			t->onStateChange(nullptr);
 
-	ThreadPool::Instance().enqueue([transports = std::move(transports)]() mutable {
-		for (const auto &t : transports)
-			if (t)
-				t->stop();
+	TearDownProcessor::Instance().enqueue(
+	    [transports = std::move(transports), token = Init::Instance().token()]() mutable {
+		    for (const auto &t : transports)
+			    if (t)
+				    t->stop();
 
-		for (auto &t : transports)
-			t.reset();
-	});
+		    for (auto &t : transports)
+			    t.reset();
+	    });
 
 	triggerClosed();
 }