ThreadPool.cpp 9.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528
  1. //
  2. // ThreadPool.cpp
  3. //
  4. // $Id: //poco/1.4/Foundation/src/ThreadPool.cpp#2 $
  5. //
  6. // Library: Foundation
  7. // Package: Threading
  8. // Module: ThreadPool
  9. //
  10. // Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
  11. // and Contributors.
  12. //
  13. // SPDX-License-Identifier: BSL-1.0
  14. //
  15. #include "Poco/ThreadPool.h"
  16. #include "Poco/Runnable.h"
  17. #include "Poco/Thread.h"
  18. #include "Poco/Event.h"
  19. #include "Poco/ThreadLocal.h"
  20. #include "Poco/ErrorHandler.h"
  21. #include <sstream>
  22. #include <ctime>
  23. #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
  24. #include "wce_time.h"
  25. #endif
  26. namespace Poco {
  27. class PooledThread: public Runnable
  28. {
  29. public:
  30. PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
  31. ~PooledThread();
  32. void start();
  33. void start(Thread::Priority priority, Runnable& target);
  34. void start(Thread::Priority priority, Runnable& target, const std::string& name);
  35. bool idle();
  36. int idleTime();
  37. void join();
  38. void activate();
  39. void release();
  40. void run();
  41. private:
  42. volatile bool _idle;
  43. volatile std::time_t _idleTime;
  44. Runnable* _pTarget;
  45. std::string _name;
  46. Thread _thread;
  47. Event _targetReady;
  48. Event _targetCompleted;
  49. Event _started;
  50. FastMutex _mutex;
  51. };
  52. PooledThread::PooledThread(const std::string& name, int stackSize):
  53. _idle(true),
  54. _idleTime(0),
  55. _pTarget(0),
  56. _name(name),
  57. _thread(name),
  58. _targetCompleted(false)
  59. {
  60. poco_assert_dbg (stackSize >= 0);
  61. _thread.setStackSize(stackSize);
  62. #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
  63. _idleTime = wceex_time(NULL);
  64. #else
  65. _idleTime = std::time(NULL);
  66. #endif
  67. }
  68. PooledThread::~PooledThread()
  69. {
  70. }
  71. void PooledThread::start()
  72. {
  73. _thread.start(*this);
  74. _started.wait();
  75. }
  76. void PooledThread::start(Thread::Priority priority, Runnable& target)
  77. {
  78. FastMutex::ScopedLock lock(_mutex);
  79. poco_assert (_pTarget == 0);
  80. _pTarget = &target;
  81. _thread.setPriority(priority);
  82. _targetReady.set();
  83. }
  84. void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
  85. {
  86. FastMutex::ScopedLock lock(_mutex);
  87. std::string fullName(name);
  88. if (name.empty())
  89. {
  90. fullName = _name;
  91. }
  92. else
  93. {
  94. fullName.append(" (");
  95. fullName.append(_name);
  96. fullName.append(")");
  97. }
  98. _thread.setName(fullName);
  99. _thread.setPriority(priority);
  100. poco_assert (_pTarget == 0);
  101. _pTarget = &target;
  102. _targetReady.set();
  103. }
  104. inline bool PooledThread::idle()
  105. {
  106. FastMutex::ScopedLock lock(_mutex);
  107. return _idle;
  108. }
  109. int PooledThread::idleTime()
  110. {
  111. FastMutex::ScopedLock lock(_mutex);
  112. #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
  113. return (int) (wceex_time(NULL) - _idleTime);
  114. #else
  115. return (int) (time(NULL) - _idleTime);
  116. #endif
  117. }
  118. void PooledThread::join()
  119. {
  120. _mutex.lock();
  121. Runnable* pTarget = _pTarget;
  122. _mutex.unlock();
  123. if (pTarget)
  124. _targetCompleted.wait();
  125. }
  126. void PooledThread::activate()
  127. {
  128. FastMutex::ScopedLock lock(_mutex);
  129. poco_assert (_idle);
  130. _idle = false;
  131. _targetCompleted.reset();
  132. }
  133. void PooledThread::release()
  134. {
  135. const long JOIN_TIMEOUT = 10000;
  136. _mutex.lock();
  137. _pTarget = 0;
  138. _mutex.unlock();
  139. // In case of a statically allocated thread pool (such
  140. // as the default thread pool), Windows may have already
  141. // terminated the thread before we got here.
  142. if (_thread.isRunning())
  143. _targetReady.set();
  144. if (_thread.tryJoin(JOIN_TIMEOUT))
  145. {
  146. delete this;
  147. }
  148. }
  149. void PooledThread::run()
  150. {
  151. _started.set();
  152. for (;;)
  153. {
  154. _targetReady.wait();
  155. _mutex.lock();
  156. if (_pTarget) // a NULL target means kill yourself
  157. {
  158. _mutex.unlock();
  159. try
  160. {
  161. _pTarget->run();
  162. }
  163. catch (Exception& exc)
  164. {
  165. ErrorHandler::handle(exc);
  166. }
  167. catch (std::exception& exc)
  168. {
  169. ErrorHandler::handle(exc);
  170. }
  171. catch (...)
  172. {
  173. ErrorHandler::handle();
  174. }
  175. FastMutex::ScopedLock lock(_mutex);
  176. _pTarget = 0;
  177. #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
  178. _idleTime = wceex_time(NULL);
  179. #else
  180. _idleTime = time(NULL);
  181. #endif
  182. _idle = true;
  183. _targetCompleted.set();
  184. ThreadLocalStorage::clear();
  185. _thread.setName(_name);
  186. _thread.setPriority(Thread::PRIO_NORMAL);
  187. }
  188. else
  189. {
  190. _mutex.unlock();
  191. break;
  192. }
  193. }
  194. }
  195. ThreadPool::ThreadPool(int minCapacity,
  196. int maxCapacity,
  197. int idleTime,
  198. int stackSize):
  199. _minCapacity(minCapacity),
  200. _maxCapacity(maxCapacity),
  201. _idleTime(idleTime),
  202. _serial(0),
  203. _age(0),
  204. _stackSize(stackSize)
  205. {
  206. poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
  207. for (int i = 0; i < _minCapacity; i++)
  208. {
  209. PooledThread* pThread = createThread();
  210. _threads.push_back(pThread);
  211. pThread->start();
  212. }
  213. }
  214. ThreadPool::ThreadPool(const std::string& name,
  215. int minCapacity,
  216. int maxCapacity,
  217. int idleTime,
  218. int stackSize):
  219. _name(name),
  220. _minCapacity(minCapacity),
  221. _maxCapacity(maxCapacity),
  222. _idleTime(idleTime),
  223. _serial(0),
  224. _age(0),
  225. _stackSize(stackSize)
  226. {
  227. poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
  228. for (int i = 0; i < _minCapacity; i++)
  229. {
  230. PooledThread* pThread = createThread();
  231. _threads.push_back(pThread);
  232. pThread->start();
  233. }
  234. }
  235. ThreadPool::~ThreadPool()
  236. {
  237. try
  238. {
  239. stopAll();
  240. }
  241. catch (...)
  242. {
  243. poco_unexpected();
  244. }
  245. }
  246. void ThreadPool::addCapacity(int n)
  247. {
  248. FastMutex::ScopedLock lock(_mutex);
  249. poco_assert (_maxCapacity + n >= _minCapacity);
  250. _maxCapacity += n;
  251. housekeep();
  252. }
  253. int ThreadPool::capacity() const
  254. {
  255. FastMutex::ScopedLock lock(_mutex);
  256. return _maxCapacity;
  257. }
  258. int ThreadPool::available() const
  259. {
  260. FastMutex::ScopedLock lock(_mutex);
  261. int count = 0;
  262. for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
  263. {
  264. if ((*it)->idle()) ++count;
  265. }
  266. return (int) (count + _maxCapacity - _threads.size());
  267. }
  268. int ThreadPool::used() const
  269. {
  270. FastMutex::ScopedLock lock(_mutex);
  271. int count = 0;
  272. for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
  273. {
  274. if (!(*it)->idle()) ++count;
  275. }
  276. return count;
  277. }
  278. int ThreadPool::allocated() const
  279. {
  280. FastMutex::ScopedLock lock(_mutex);
  281. return int(_threads.size());
  282. }
  283. void ThreadPool::start(Runnable& target)
  284. {
  285. getThread()->start(Thread::PRIO_NORMAL, target);
  286. }
  287. void ThreadPool::start(Runnable& target, const std::string& name)
  288. {
  289. getThread()->start(Thread::PRIO_NORMAL, target, name);
  290. }
  291. void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)
  292. {
  293. getThread()->start(priority, target);
  294. }
  295. void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)
  296. {
  297. getThread()->start(priority, target, name);
  298. }
  299. void ThreadPool::stopAll()
  300. {
  301. FastMutex::ScopedLock lock(_mutex);
  302. for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
  303. {
  304. (*it)->release();
  305. }
  306. _threads.clear();
  307. }
  308. void ThreadPool::joinAll()
  309. {
  310. FastMutex::ScopedLock lock(_mutex);
  311. for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
  312. {
  313. (*it)->join();
  314. }
  315. housekeep();
  316. }
  317. void ThreadPool::collect()
  318. {
  319. FastMutex::ScopedLock lock(_mutex);
  320. housekeep();
  321. }
  322. void ThreadPool::housekeep()
  323. {
  324. _age = 0;
  325. if (_threads.size() <= _minCapacity)
  326. return;
  327. ThreadVec idleThreads;
  328. ThreadVec expiredThreads;
  329. ThreadVec activeThreads;
  330. idleThreads.reserve(_threads.size());
  331. activeThreads.reserve(_threads.size());
  332. for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
  333. {
  334. if ((*it)->idle())
  335. {
  336. if ((*it)->idleTime() < _idleTime)
  337. idleThreads.push_back(*it);
  338. else
  339. expiredThreads.push_back(*it);
  340. }
  341. else activeThreads.push_back(*it);
  342. }
  343. int n = (int) activeThreads.size();
  344. int limit = (int) idleThreads.size() + n;
  345. if (limit < _minCapacity) limit = _minCapacity;
  346. idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
  347. _threads.clear();
  348. for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it)
  349. {
  350. if (n < limit)
  351. {
  352. _threads.push_back(*it);
  353. ++n;
  354. }
  355. else (*it)->release();
  356. }
  357. _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end());
  358. }
  359. PooledThread* ThreadPool::getThread()
  360. {
  361. FastMutex::ScopedLock lock(_mutex);
  362. if (++_age == 32)
  363. housekeep();
  364. PooledThread* pThread = 0;
  365. for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it)
  366. {
  367. if ((*it)->idle())
  368. pThread = *it;
  369. }
  370. if (!pThread)
  371. {
  372. if (_threads.size() < _maxCapacity)
  373. {
  374. pThread = createThread();
  375. try
  376. {
  377. pThread->start();
  378. _threads.push_back(pThread);
  379. } catch (...)
  380. {
  381. delete pThread;
  382. throw;
  383. }
  384. }
  385. else
  386. throw NoThreadAvailableException();
  387. }
  388. pThread->activate();
  389. return pThread;
  390. }
  391. PooledThread* ThreadPool::createThread()
  392. {
  393. std::ostringstream name;
  394. name << _name << "[#" << ++_serial << "]";
  395. return new PooledThread(name.str(), _stackSize);
  396. }
  397. class ThreadPoolSingletonHolder
  398. {
  399. public:
  400. ThreadPoolSingletonHolder()
  401. {
  402. _pPool = 0;
  403. }
  404. ~ThreadPoolSingletonHolder()
  405. {
  406. delete _pPool;
  407. }
  408. ThreadPool* pool()
  409. {
  410. FastMutex::ScopedLock lock(_mutex);
  411. if (!_pPool)
  412. {
  413. _pPool = new ThreadPool("default");
  414. if (POCO_THREAD_STACK_SIZE > 0)
  415. _pPool->setStackSize(POCO_THREAD_STACK_SIZE);
  416. }
  417. return _pPool;
  418. }
  419. private:
  420. ThreadPool* _pPool;
  421. FastMutex _mutex;
  422. };
  423. namespace
  424. {
  425. static ThreadPoolSingletonHolder sh;
  426. }
  427. ThreadPool& ThreadPool::defaultPool()
  428. {
  429. return *sh.pool();
  430. }
  431. } // namespace Poco