Browse Source

Added TaskQueue interface

yhirose 6 years ago
parent
commit
47312e6df9
1 changed files with 40 additions and 24 deletions
  1. 40 24
      httplib.h

+ 40 - 24
httplib.h

@@ -265,6 +265,42 @@ private:
   std::string buffer;
   std::string buffer;
 };
 };
 
 
+class ThreadsTaskQueue {
+public:
+  ThreadsTaskQueue() : running_threads_(0) {}
+  ~ThreadsTaskQueue() {}
+
+  void enque(std::function<void(void)> fn) {
+    std::thread([=]() {
+      {
+        std::lock_guard<std::mutex> guard(running_threads_mutex_);
+        running_threads_++;
+      }
+
+      fn();
+
+      {
+        std::lock_guard<std::mutex> guard(running_threads_mutex_);
+        running_threads_--;
+      }
+    }).detach();
+  }
+
+  void shutdown() {
+    for (;;) {
+      std::this_thread::sleep_for(std::chrono::milliseconds(10));
+      std::lock_guard<std::mutex> guard(running_threads_mutex_);
+      if (!running_threads_) { break; }
+    }
+  }
+
+private:
+  std::mutex running_threads_mutex_;
+  int running_threads_;
+};
+
+typedef ThreadsTaskQueue TaskQueue;
+
 class Server {
 class Server {
 public:
 public:
   typedef std::function<void(const Request &, Response &)> Handler;
   typedef std::function<void(const Request &, Response &)> Handler;
@@ -338,9 +374,7 @@ private:
   Handler error_handler_;
   Handler error_handler_;
   Logger logger_;
   Logger logger_;
 
 
-  // TODO: Use thread pool...
-  std::mutex running_threads_mutex_;
-  int running_threads_;
+  TaskQueue task_queue_;
 };
 };
 
 
 class Client {
 class Client {
@@ -1890,7 +1924,7 @@ inline const std::string &BufferStream::get_buffer() const { return buffer; }
 inline Server::Server()
 inline Server::Server()
     : keep_alive_max_count_(CPPHTTPLIB_KEEPALIVE_MAX_COUNT),
     : keep_alive_max_count_(CPPHTTPLIB_KEEPALIVE_MAX_COUNT),
       payload_max_length_(CPPHTTPLIB_PAYLOAD_MAX_LENGTH), is_running_(false),
       payload_max_length_(CPPHTTPLIB_PAYLOAD_MAX_LENGTH), is_running_(false),
-      svr_sock_(INVALID_SOCKET), running_threads_(0) {
+      svr_sock_(INVALID_SOCKET) {
 #ifndef _WIN32
 #ifndef _WIN32
   signal(SIGPIPE, SIG_IGN);
   signal(SIGPIPE, SIG_IGN);
 #endif
 #endif
@@ -2219,28 +2253,10 @@ inline bool Server::listen_internal() {
       break;
       break;
     }
     }
 
 
-    // TODO: Use thread pool...
-    std::thread([=]() {
-      {
-        std::lock_guard<std::mutex> guard(running_threads_mutex_);
-        running_threads_++;
-      }
-
-      read_and_close_socket(sock);
-
-      {
-        std::lock_guard<std::mutex> guard(running_threads_mutex_);
-        running_threads_--;
-      }
-    }).detach();
+    task_queue_.enque([=]() { read_and_close_socket(sock); });
   }
   }
 
 
-  // TODO: Use thread pool...
-  for (;;) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(10));
-    std::lock_guard<std::mutex> guard(running_threads_mutex_);
-    if (!running_threads_) { break; }
-  }
+  task_queue_.shutdown();
 
 
   is_running_ = false;
   is_running_ = false;