Browse Source

ThreadPool: optional limit for jobs queue (#1741)

For very busy servers, the internal jobs queue where accepted
sockets are enqueued can grow without limit.
This is a problem for two reasons:
 - queueing too much work causes the server to respond with huge latency,
   resulting in repetead timeouts on the clients; it is definitely
   better to reject the connection early, so that the client
   receives the backpressure signal as soon as the queue is
   becoming too large
 - the jobs list can eventually cause an out of memory condition
vmaffione 2 years ago
parent
commit
374d058de7
3 changed files with 118 additions and 10 deletions
  1. 15 2
      README.md
  2. 14 4
      httplib.h
  3. 89 4
      test/test.cc

+ 15 - 2
README.md

@@ -433,6 +433,17 @@ If you want to set the thread count at runtime, there is no convenient way... Bu
 svr.new_task_queue = [] { return new ThreadPool(12); };
 ```
 
+You can also provide an optional parameter to limit the maximum number
+of pending requests, i.e. requests `accept()`ed by the listener but
+still waiting to be serviced by worker threads.
+
+```cpp
+svr.new_task_queue = [] { return new ThreadPool(/*num_threads=*/12, /*max_queued_requests=*/18); };
+```
+
+Default limit is 0 (unlimited). Once the limit is reached, the listener
+will shutdown the client connection.
+
 ### Override the default thread pool with yours
 
 You can supply your own thread pool implementation according to your need.
@@ -444,8 +455,10 @@ public:
     pool_.start_with_thread_count(n);
   }
 
-  virtual void enqueue(std::function<void()> fn) override {
-    pool_.enqueue(fn);
+  virtual bool enqueue(std::function<void()> fn) override {
+    /* Return true if the task was actually enqueued, or false
+     * if the caller must drop the corresponding connection. */
+    return pool_.enqueue(fn);
   }
 
   virtual void shutdown() override {

+ 14 - 4
httplib.h

@@ -653,7 +653,7 @@ public:
   TaskQueue() = default;
   virtual ~TaskQueue() = default;
 
-  virtual void enqueue(std::function<void()> fn) = 0;
+  virtual bool enqueue(std::function<void()> fn) = 0;
   virtual void shutdown() = 0;
 
   virtual void on_idle() {}
@@ -661,7 +661,8 @@ public:
 
 class ThreadPool : public TaskQueue {
 public:
-  explicit ThreadPool(size_t n) : shutdown_(false) {
+  explicit ThreadPool(size_t n, size_t mqr = 0)
+      : shutdown_(false), max_queued_requests_(mqr) {
     while (n) {
       threads_.emplace_back(worker(*this));
       n--;
@@ -671,13 +672,17 @@ public:
   ThreadPool(const ThreadPool &) = delete;
   ~ThreadPool() override = default;
 
-  void enqueue(std::function<void()> fn) override {
+  bool enqueue(std::function<void()> fn) override {
     {
       std::unique_lock<std::mutex> lock(mutex_);
+      if (max_queued_requests_ > 0 && jobs_.size() >= max_queued_requests_) {
+        return false;
+      }
       jobs_.push_back(std::move(fn));
     }
 
     cond_.notify_one();
+    return true;
   }
 
   void shutdown() override {
@@ -727,6 +732,7 @@ private:
   std::list<std::function<void()>> jobs_;
 
   bool shutdown_;
+  size_t max_queued_requests_ = 0;
 
   std::condition_variable cond_;
   std::mutex mutex_;
@@ -6319,7 +6325,11 @@ inline bool Server::listen_internal() {
 #endif
       }
 
-      task_queue->enqueue([this, sock]() { process_and_close_socket(sock); });
+      if (!task_queue->enqueue(
+              [this, sock]() { process_and_close_socket(sock); })) {
+        detail::shutdown_socket(sock);
+        detail::close_socket(sock);
+      }
     }
 
     task_queue->shutdown();

+ 89 - 4
test/test.cc

@@ -6511,18 +6511,103 @@ TEST(SocketStream, is_writable_INET) {
 #endif // #ifndef _WIN32
 
 TEST(TaskQueueTest, IncreaseAtomicInteger) {
-  static constexpr unsigned int number_of_task{1000000};
+  static constexpr unsigned int number_of_tasks{1000000};
   std::atomic_uint count{0};
   std::unique_ptr<TaskQueue> task_queue{
       new ThreadPool{CPPHTTPLIB_THREAD_POOL_COUNT}};
 
-  for (unsigned int i = 0; i < number_of_task; ++i) {
-    task_queue->enqueue(
+  for (unsigned int i = 0; i < number_of_tasks; ++i) {
+    auto queued = task_queue->enqueue(
         [&count] { count.fetch_add(1, std::memory_order_relaxed); });
+    EXPECT_TRUE(queued);
+  }
+
+  EXPECT_NO_THROW(task_queue->shutdown());
+  EXPECT_EQ(number_of_tasks, count.load());
+}
+
+TEST(TaskQueueTest, IncreaseAtomicIntegerWithQueueLimit) {
+  static constexpr unsigned int number_of_tasks{1000000};
+  static constexpr unsigned int qlimit{2};
+  unsigned int queued_count{0};
+  std::atomic_uint count{0};
+  std::unique_ptr<TaskQueue> task_queue{
+      new ThreadPool{/*num_threads=*/1, qlimit}};
+
+  for (unsigned int i = 0; i < number_of_tasks; ++i) {
+    if (task_queue->enqueue(
+            [&count] { count.fetch_add(1, std::memory_order_relaxed); })) {
+      queued_count++;
+    }
+  }
+
+  EXPECT_NO_THROW(task_queue->shutdown());
+  EXPECT_EQ(queued_count, count.load());
+  EXPECT_TRUE(queued_count <= number_of_tasks);
+  EXPECT_TRUE(queued_count >= qlimit);
+}
+
+TEST(TaskQueueTest, MaxQueuedRequests) {
+  static constexpr unsigned int qlimit{3};
+  std::unique_ptr<TaskQueue> task_queue{new ThreadPool{1, qlimit}};
+  std::condition_variable sem_cv;
+  std::mutex sem_mtx;
+  int credits = 0;
+  bool queued;
+
+  /* Fill up the queue with tasks that will block until we give them credits to
+   * complete. */
+  for (unsigned int n = 0; n <= qlimit;) {
+    queued = task_queue->enqueue([&sem_mtx, &sem_cv, &credits] {
+      std::unique_lock<std::mutex> lock(sem_mtx);
+      while (credits <= 0) {
+        sem_cv.wait(lock);
+      }
+      /* Consume the credit and signal the test code if they are all gone. */
+      if (--credits == 0) { sem_cv.notify_one(); }
+    });
+
+    if (n < qlimit) {
+      /* The first qlimit enqueues must succeed. */
+      EXPECT_TRUE(queued);
+    } else {
+      /* The last one will succeed only when the worker thread
+       * starts and dequeues the first blocking task. Although
+       * not necessary for the correctness of this test, we sleep for
+       * a short while to avoid busy waiting. */
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+    }
+    if (queued) { n++; }
+  }
+
+  /* Further enqueues must fail since the queue is full. */
+  for (auto i = 0; i < 4; i++) {
+    queued = task_queue->enqueue([] {});
+    EXPECT_FALSE(queued);
+  }
+
+  /* Give the credits to allow the previous tasks to complete. */
+  {
+    std::unique_lock<std::mutex> lock(sem_mtx);
+    credits += qlimit + 1;
+  }
+  sem_cv.notify_all();
+
+  /* Wait for all the credits to be consumed. */
+  {
+    std::unique_lock<std::mutex> lock(sem_mtx);
+    while (credits > 0) {
+      sem_cv.wait(lock);
+    }
+  }
+
+  /* Check that we are able again to enqueue at least qlimit tasks. */
+  for (unsigned int i = 0; i < qlimit; i++) {
+    queued = task_queue->enqueue([] {});
+    EXPECT_TRUE(queued);
   }
 
   EXPECT_NO_THROW(task_queue->shutdown());
-  EXPECT_EQ(number_of_task, count.load());
 }
 
 TEST(RedirectTest, RedirectToUrlWithQueryParameters) {