Browse Source

Introduced SCTP transport processor

Paul-Louis Ageneau 4 years ago
parent
commit
1181fdc599
5 changed files with 28 additions and 12 deletions
  1. 5 5
      src/processor.cpp
  2. 4 3
      src/processor.hpp
  3. 11 1
      src/sctptransport.cpp
  4. 6 1
      src/sctptransport.hpp
  5. 2 2
      src/transport.hpp

+ 5 - 5
src/processor.cpp

@@ -20,6 +20,8 @@
 
 
 namespace rtc {
 namespace rtc {
 
 
+Processor::Processor(size_t limit) : mTasks(limit) {}
+
 Processor::~Processor() { join(); }
 Processor::~Processor() { join(); }
 
 
 void Processor::join() {
 void Processor::join() {
@@ -29,15 +31,13 @@ void Processor::join() {
 
 
 void Processor::schedule() {
 void Processor::schedule() {
 	std::unique_lock lock(mMutex);
 	std::unique_lock lock(mMutex);
-	if (mTasks.empty()) {
+	if (auto next = mTasks.tryPop()) {
+		ThreadPool::Instance().enqueue(std::move(*next));
+	} else {
 		// No more tasks
 		// No more tasks
 		mPending = false;
 		mPending = false;
 		mCondition.notify_all();
 		mCondition.notify_all();
-		return;
 	}
 	}
-
-	ThreadPool::Instance().enqueue(std::move(mTasks.front()));
-	mTasks.pop();
 }
 }
 
 
 } // namespace rtc
 } // namespace rtc

+ 4 - 3
src/processor.hpp

@@ -22,6 +22,7 @@
 #include "include.hpp"
 #include "include.hpp"
 #include "init.hpp"
 #include "init.hpp"
 #include "threadpool.hpp"
 #include "threadpool.hpp"
+#include "queue.hpp"
 
 
 #include <condition_variable>
 #include <condition_variable>
 #include <future>
 #include <future>
@@ -34,7 +35,7 @@ namespace rtc {
 // Processed tasks in order by delegating them to the thread pool
 // Processed tasks in order by delegating them to the thread pool
 class Processor final {
 class Processor final {
 public:
 public:
-	Processor() = default;
+	Processor(size_t limit = 0);
 	~Processor();
 	~Processor();
 
 
 	Processor(const Processor &) = delete;
 	Processor(const Processor &) = delete;
@@ -52,7 +53,7 @@ protected:
 	// Keep an init token
 	// Keep an init token
 	const init_token mInitToken = Init::Token();
 	const init_token mInitToken = Init::Token();
 
 
-	std::queue<std::function<void()>> mTasks;
+	Queue<std::function<void()>> mTasks;
 	bool mPending = false; // true iff a task is pending in the thread pool
 	bool mPending = false; // true iff a task is pending in the thread pool
 
 
 	mutable std::mutex mMutex;
 	mutable std::mutex mMutex;
@@ -71,7 +72,7 @@ template <class F, class... Args> void Processor::enqueue(F &&f, Args &&... args
 		ThreadPool::Instance().enqueue(std::move(task));
 		ThreadPool::Instance().enqueue(std::move(task));
 		mPending = true;
 		mPending = true;
 	} else {
 	} else {
-		mTasks.emplace(std::move(task));
+		mTasks.push(std::move(task));
 	}
 	}
 }
 }
 
 

+ 11 - 1
src/sctptransport.cpp

@@ -88,7 +88,7 @@ void SctpTransport::Cleanup() {
 SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
 SctpTransport::SctpTransport(std::shared_ptr<Transport> lower, uint16_t port,
                              message_callback recvCallback, amount_callback bufferedAmountCallback,
                              message_callback recvCallback, amount_callback bufferedAmountCallback,
                              state_callback stateChangeCallback)
                              state_callback stateChangeCallback)
-    : Transport(lower, std::move(stateChangeCallback)), mPort(port),
+    : Transport(lower, std::move(stateChangeCallback)), mPort(port), mProcessor(RECV_QUEUE_LIMIT),
       mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
       mSendQueue(0, message_size_func), mBufferedAmountCallback(std::move(bufferedAmountCallback)) {
 	onRecv(recvCallback);
 	onRecv(recvCallback);
 
 
@@ -230,6 +230,16 @@ void SctpTransport::close() {
 	}
 	}
 }
 }
 
 
+void SctpTransport::recv(message_ptr message) {
+	// Delegate to processor to release SCTP thread
+	mProcessor.enqueue([this, message = std::move(message)]() { Transport::recv(message); });
+}
+
+void SctpTransport::changeState(State state) {
+	// Delegate to processor to release SCTP thread
+	mProcessor.enqueue([this, state]() { Transport::changeState(state); });
+}
+
 void SctpTransport::connect() {
 void SctpTransport::connect() {
 	if (!mSock)
 	if (!mSock)
 		throw std::logic_error("Attempted SCTP connect with closed socket");
 		throw std::logic_error("Attempted SCTP connect with closed socket");

+ 6 - 1
src/sctptransport.hpp

@@ -21,6 +21,7 @@
 
 
 #include "include.hpp"
 #include "include.hpp"
 #include "peerconnection.hpp"
 #include "peerconnection.hpp"
+#include "processor.hpp"
 #include "queue.hpp"
 #include "queue.hpp"
 #include "transport.hpp"
 #include "transport.hpp"
 
 
@@ -35,7 +36,7 @@
 
 
 namespace rtc {
 namespace rtc {
 
 
-class SctpTransport : public Transport {
+class SctpTransport final : public Transport {
 public:
 public:
 	static void Init();
 	static void Init();
 	static void Cleanup();
 	static void Cleanup();
@@ -71,6 +72,9 @@ private:
 		PPID_BINARY_EMPTY = 57
 		PPID_BINARY_EMPTY = 57
 	};
 	};
 
 
+	void recv(message_ptr message) override;
+	void changeState(State state) override;
+
 	void connect();
 	void connect();
 	void shutdown();
 	void shutdown();
 	void close();
 	void close();
@@ -93,6 +97,7 @@ private:
 	const uint16_t mPort;
 	const uint16_t mPort;
 	struct socket *mSock;
 	struct socket *mSock;
 
 
+	Processor mProcessor;
 	std::mutex mSendMutex;
 	std::mutex mSendMutex;
 	Queue<message_ptr> mSendQueue;
 	Queue<message_ptr> mSendQueue;
 	std::map<uint16_t, size_t> mBufferedAmount;
 	std::map<uint16_t, size_t> mBufferedAmount;

+ 2 - 2
src/transport.hpp

@@ -67,14 +67,14 @@ public:
 	virtual bool send(message_ptr message) { return outgoing(message); }
 	virtual bool send(message_ptr message) { return outgoing(message); }
 
 
 protected:
 protected:
-	void recv(message_ptr message) {
+	virtual void recv(message_ptr message) {
 		try {
 		try {
 			mRecvCallback(message);
 			mRecvCallback(message);
 		} catch (const std::exception &e) {
 		} catch (const std::exception &e) {
 			PLOG_WARNING << e.what();
 			PLOG_WARNING << e.what();
 		}
 		}
 	}
 	}
-	void changeState(State state) {
+	virtual void changeState(State state) {
 		try {
 		try {
 			if (mState.exchange(state) != state)
 			if (mState.exchange(state) != state)
 				mStateChangeCallback(state);
 				mStateChangeCallback(state);