threadpool.h 5.8 KB


  1. #ifndef GUL_THREAD_POOL_H
  2. #define GUL_THREAD_POOL_H
  3. #include <vector>
  4. #include <queue>
  5. #include <memory>
  6. #include <thread>
  7. #include <mutex>
  8. #include <condition_variable>
  9. #include <future>
  10. #include <functional>
  11. #include <stdexcept>
  12. #ifndef GUL_NAMESPACE
  13. #define GUL_NAMESPACE gul
  14. #endif
  15. namespace GUL_NAMESPACE
  16. {
  17. class thread_pool
  18. {
  19. public:
  20. thread_pool(size_t num_threads);
  21. thread_pool();
  22. template<class F, class... Args>
  23. std::future<typename std::result_of<F(Args...)>::type> push( F && f, Args &&... args);
  24. /**
  25. * @brief create_workers
  26. * @param num
  27. * Creates a new worker to work the threadpool
  28. */
  29. void create_workers(std::size_t num);
  30. /**
  31. * @brief remove_thread
  32. * Remove a worker from the thread pool
  33. */
  34. void remove_worker();
  35. /**
  36. * @brief clear_tasks
  37. *
  38. * Clears all the current tasks in the queue
  39. */
  40. void clear_tasks();
  41. /**
  42. * @brief num_tasks
  43. * @return
  44. *
  45. * Returns the number of tasks still in the queue
  46. */
  47. std::size_t num_tasks() const { return m_tasks.size(); }
  48. /**
  49. * @brief num_workers
  50. * @return
  51. *
  52. * Returns the number of workers in this thread pool
  53. */
  54. std::size_t num_workers() const { return m_worker_count; }
  55. /**
  56. * @brief executeTask
  57. *
  58. * Executes a single task on the queue
  59. */
  60. void executeTask();
  61. ~thread_pool();
  62. protected:
  63. /**
  64. * @brief add_thread
  65. * Add a new worker to the thread pool
  66. */
  67. void add_thread();
  68. // need to keep track of threads so we can join them
  69. std::vector< std::thread > workers;
  70. // the task queue
  71. std::queue< std::function<void()> > m_tasks;
  72. // synchronization
  73. std::mutex m_mutex;
  74. std::condition_variable m_cv;
  75. uint32_t m_worker_count=0; // number of currently active workers
  76. uint32_t m_thread_count=0;
  77. // bool stop;
  78. };
  79. inline void thread_pool::remove_worker()
  80. {
  81. --m_thread_count;
  82. }
  83. inline void thread_pool::create_workers(std::size_t num)
  84. {
  85. for(size_t i=0;i<num;++i)
  86. {
  87. add_thread();
  88. }
  89. if( m_tasks.size())
  90. m_cv.notify_all();
  91. }
  92. inline void thread_pool::executeTask()
  93. {
  94. std::function<void()> task;
  95. {
  96. std::unique_lock<std::mutex> lock(this->m_mutex);
  97. if( this->m_tasks.empty() )
  98. return;
  99. task = std::move(this->m_tasks.front());
  100. this->m_tasks.pop();
  101. }
  102. task();
  103. }
  104. inline void thread_pool::add_thread()
  105. {
  106. ++m_thread_count;
  107. ++m_worker_count;
  108. workers.emplace_back(
  109. [this]
  110. {
  111. for(;;)
  112. {
  113. std::function<void()> task;
  114. {
  115. std::unique_lock<std::mutex> lock(this->m_mutex);
  116. // Wait for someone to trigger the condition variable.
  117. // But do not wait if the task list is empty
  118. //this->m_cv.wait(lock, [this]{ return this->stop || !this->m_tasks.empty(); });
  119. this->m_cv.wait(lock, [this]{ return (m_thread_count < m_worker_count) || !this->m_tasks.empty(); });
  120. // ========== Start Safe Zone =========================
  121. //if( (this->stop && this->m_tasks.empty()) || (m_thread_count < m_worker_count) )
  122. if( m_thread_count < m_worker_count )
  123. {
  124. // We do not need this thread anymore, so we can exit.
  125. --m_worker_count;
  126. // std::cout << std::this_thread::get_id() << " shutting down" << std::endl;
  127. return;
  128. }
  129. task = std::move(this->m_tasks.front());
  130. this->m_tasks.pop();
  131. //std::cout << std::this_thread::get_id() << " Starting Task! " << m_tasks.size() << " tasks left" << std::endl;
  132. // ========== End Safe Zone =========================
  133. }
  134. task();
  135. }
  136. }
  137. );
  138. }
  139. // The constructor just launches some amount of workers
  140. inline thread_pool::thread_pool(size_t threads)
  141. : m_worker_count(0), m_thread_count(0)
  142. // : stop(false)
  143. {
  144. for(size_t i = 0;i<threads;++i)
  145. {
  146. add_thread();
  147. }
  148. }
  149. inline thread_pool::thread_pool() : m_worker_count(0), m_thread_count(0)
  150. {
  151. }
  152. inline void thread_pool::clear_tasks()
  153. {
  154. std::unique_lock<std::mutex> lock(m_mutex);
  155. while(m_tasks.size()) m_tasks.pop();
  156. }
  157. // add new work item to the pool
  158. template<class F, class... Args>
  159. #define RETURN_TYPE typename std::result_of<F(Args...)>::type
  160. std::future< RETURN_TYPE > thread_pool::push(F&& f, Args&&... args)
  161. {
  162. #undef RETURN_TYPE
  163. using return_type = typename std::result_of<F(Args...)>::type;
  164. auto task = std::make_shared< std::packaged_task<return_type()> >
  165. (
  166. std::bind(std::forward<F>(f), std::forward<Args>(args)...)
  167. );
  168. std::future<return_type> res = task->get_future();
  169. {
  170. std::unique_lock<std::mutex> lock(m_mutex);
  171. // don't allow enqueueing after stopping the pool
  172. //if(stop)
  173. // throw std::runtime_error("enqueue on stopped ThreadPool");
  174. m_tasks.emplace([task](){ (*task)(); });
  175. }
  176. m_cv.notify_one();
  177. return res;
  178. }
  179. // the destructor joins all threads
  180. inline thread_pool::~thread_pool()
  181. {
  182. {
  183. std::unique_lock<std::mutex> lock(m_mutex);
  184. m_thread_count = 0;
  185. }
  186. m_cv.notify_all();
  187. for(std::thread &worker: workers)
  188. {
  189. if( worker.joinable())
  190. worker.join();
  191. }
  192. }
  193. }
  194. #endif