Browse Source

Merge pull request #1179 from paullouisageneau/fix-recv-queue-limit

Reduce per-channel receive queue limit to 1024
Paul-Louis Ageneau 1 year ago
parent
commit
778c024e50
2 changed files with 6 additions and 8 deletions
  1. 1 1
      src/impl/internals.hpp
  2. 5 7
      src/impl/queue.hpp

+ 1 - 1
src/impl/internals.hpp

@@ -43,7 +43,7 @@ const size_t DEFAULT_REMOTE_MAX_MESSAGE_SIZE = 65536;     // Remote max message
 
 const size_t DEFAULT_WS_MAX_MESSAGE_SIZE = 256 * 1024;   // Default max message size for WebSockets
 
-const size_t RECV_QUEUE_LIMIT = 1024 * 1024; // Max per-channel queue size
+const size_t RECV_QUEUE_LIMIT = 1024; // Max per-channel queue size (messages)
 
 const int MIN_THREADPOOL_SIZE = 4; // Minimum number of threads in the global thread pool (>= 2)
 

+ 5 - 7
src/impl/queue.hpp

@@ -23,7 +23,8 @@ template <typename T> class Queue {
 public:
 	using amount_function = std::function<size_t(const T &element)>;
 
-	Queue(size_t limit = 0, amount_function func = nullptr);
+	Queue(size_t limit = 0, // elements (0 means no limit)
+	      amount_function func = nullptr);
 	~Queue();
 
 	void stop();
@@ -50,10 +51,7 @@ private:
 
 template <typename T>
 Queue<T>::Queue(size_t limit, amount_function func) : mLimit(limit), mAmount(0) {
-	mAmountFunction = func ? func : [](const T &element) -> size_t {
-		static_cast<void>(element);
-		return 1;
-	};
+	mAmountFunction = func ? func : []([[maybe_unused]] const T &element) -> size_t { return 1; };
 }
 
 template <typename T> Queue<T>::~Queue() { stop(); }
@@ -76,7 +74,7 @@ template <typename T> bool Queue<T>::empty() const {
 
 template <typename T> bool Queue<T>::full() const {
 	std::lock_guard lock(mMutex);
-	return mQueue.size() >= mLimit;
+	return mLimit > 0 && mQueue.size() >= mLimit;
 }
 
 template <typename T> size_t Queue<T>::size() const {
@@ -91,7 +89,7 @@ template <typename T> size_t Queue<T>::amount() const {
 
 template <typename T> void Queue<T>::push(T element) {
 	std::unique_lock lock(mMutex);
-	mPushCondition.wait(lock, [this]() { return !mLimit || mQueue.size() < mLimit || mStopping; });
+	mPushCondition.wait(lock, [this]() { return mLimit == 0 || mQueue.size() < mLimit || mStopping; });
 	if (mStopping)
 		return;