| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528 |
- //
- // ThreadPool.cpp
- //
- // $Id: //poco/1.4/Foundation/src/ThreadPool.cpp#2 $
- //
- // Library: Foundation
- // Package: Threading
- // Module: ThreadPool
- //
- // Copyright (c) 2004-2006, Applied Informatics Software Engineering GmbH.
- // and Contributors.
- //
- // SPDX-License-Identifier: BSL-1.0
- //
- #include "Poco/ThreadPool.h"
- #include "Poco/Runnable.h"
- #include "Poco/Thread.h"
- #include "Poco/Event.h"
- #include "Poco/ThreadLocal.h"
- #include "Poco/ErrorHandler.h"
- #include <sstream>
- #include <ctime>
- #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
- #include "wce_time.h"
- #endif
- namespace Poco {
- class PooledThread: public Runnable
- {
- public:
- PooledThread(const std::string& name, int stackSize = POCO_THREAD_STACK_SIZE);
- ~PooledThread();
- void start();
- void start(Thread::Priority priority, Runnable& target);
- void start(Thread::Priority priority, Runnable& target, const std::string& name);
- bool idle();
- int idleTime();
- void join();
- void activate();
- void release();
- void run();
- private:
- volatile bool _idle;
- volatile std::time_t _idleTime;
- Runnable* _pTarget;
- std::string _name;
- Thread _thread;
- Event _targetReady;
- Event _targetCompleted;
- Event _started;
- FastMutex _mutex;
- };
- PooledThread::PooledThread(const std::string& name, int stackSize):
- _idle(true),
- _idleTime(0),
- _pTarget(0),
- _name(name),
- _thread(name),
- _targetCompleted(false)
- {
- poco_assert_dbg (stackSize >= 0);
- _thread.setStackSize(stackSize);
- #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
- _idleTime = wceex_time(NULL);
- #else
- _idleTime = std::time(NULL);
- #endif
- }
- PooledThread::~PooledThread()
- {
- }
- void PooledThread::start()
- {
- _thread.start(*this);
- _started.wait();
- }
- void PooledThread::start(Thread::Priority priority, Runnable& target)
- {
- FastMutex::ScopedLock lock(_mutex);
-
- poco_assert (_pTarget == 0);
- _pTarget = ⌖
- _thread.setPriority(priority);
- _targetReady.set();
- }
- void PooledThread::start(Thread::Priority priority, Runnable& target, const std::string& name)
- {
- FastMutex::ScopedLock lock(_mutex);
- std::string fullName(name);
- if (name.empty())
- {
- fullName = _name;
- }
- else
- {
- fullName.append(" (");
- fullName.append(_name);
- fullName.append(")");
- }
- _thread.setName(fullName);
- _thread.setPriority(priority);
-
- poco_assert (_pTarget == 0);
- _pTarget = ⌖
- _targetReady.set();
- }
- inline bool PooledThread::idle()
- {
- FastMutex::ScopedLock lock(_mutex);
- return _idle;
- }
- int PooledThread::idleTime()
- {
- FastMutex::ScopedLock lock(_mutex);
- #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
- return (int) (wceex_time(NULL) - _idleTime);
- #else
- return (int) (time(NULL) - _idleTime);
- #endif
- }
- void PooledThread::join()
- {
- _mutex.lock();
- Runnable* pTarget = _pTarget;
- _mutex.unlock();
- if (pTarget)
- _targetCompleted.wait();
- }
- void PooledThread::activate()
- {
- FastMutex::ScopedLock lock(_mutex);
-
- poco_assert (_idle);
- _idle = false;
- _targetCompleted.reset();
- }
- void PooledThread::release()
- {
- const long JOIN_TIMEOUT = 10000;
-
- _mutex.lock();
- _pTarget = 0;
- _mutex.unlock();
- // In case of a statically allocated thread pool (such
- // as the default thread pool), Windows may have already
- // terminated the thread before we got here.
- if (_thread.isRunning())
- _targetReady.set();
- if (_thread.tryJoin(JOIN_TIMEOUT))
- {
- delete this;
- }
- }
- void PooledThread::run()
- {
- _started.set();
- for (;;)
- {
- _targetReady.wait();
- _mutex.lock();
- if (_pTarget) // a NULL target means kill yourself
- {
- _mutex.unlock();
- try
- {
- _pTarget->run();
- }
- catch (Exception& exc)
- {
- ErrorHandler::handle(exc);
- }
- catch (std::exception& exc)
- {
- ErrorHandler::handle(exc);
- }
- catch (...)
- {
- ErrorHandler::handle();
- }
- FastMutex::ScopedLock lock(_mutex);
- _pTarget = 0;
- #if defined(_WIN32_WCE) && _WIN32_WCE < 0x800
- _idleTime = wceex_time(NULL);
- #else
- _idleTime = time(NULL);
- #endif
- _idle = true;
- _targetCompleted.set();
- ThreadLocalStorage::clear();
- _thread.setName(_name);
- _thread.setPriority(Thread::PRIO_NORMAL);
- }
- else
- {
- _mutex.unlock();
- break;
- }
- }
- }
- ThreadPool::ThreadPool(int minCapacity,
- int maxCapacity,
- int idleTime,
- int stackSize):
- _minCapacity(minCapacity),
- _maxCapacity(maxCapacity),
- _idleTime(idleTime),
- _serial(0),
- _age(0),
- _stackSize(stackSize)
- {
- poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
- for (int i = 0; i < _minCapacity; i++)
- {
- PooledThread* pThread = createThread();
- _threads.push_back(pThread);
- pThread->start();
- }
- }
- ThreadPool::ThreadPool(const std::string& name,
- int minCapacity,
- int maxCapacity,
- int idleTime,
- int stackSize):
- _name(name),
- _minCapacity(minCapacity),
- _maxCapacity(maxCapacity),
- _idleTime(idleTime),
- _serial(0),
- _age(0),
- _stackSize(stackSize)
- {
- poco_assert (minCapacity >= 1 && maxCapacity >= minCapacity && idleTime > 0);
- for (int i = 0; i < _minCapacity; i++)
- {
- PooledThread* pThread = createThread();
- _threads.push_back(pThread);
- pThread->start();
- }
- }
- ThreadPool::~ThreadPool()
- {
- try
- {
- stopAll();
- }
- catch (...)
- {
- poco_unexpected();
- }
- }
- void ThreadPool::addCapacity(int n)
- {
- FastMutex::ScopedLock lock(_mutex);
- poco_assert (_maxCapacity + n >= _minCapacity);
- _maxCapacity += n;
- housekeep();
- }
- int ThreadPool::capacity() const
- {
- FastMutex::ScopedLock lock(_mutex);
- return _maxCapacity;
- }
- int ThreadPool::available() const
- {
- FastMutex::ScopedLock lock(_mutex);
- int count = 0;
- for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
- {
- if ((*it)->idle()) ++count;
- }
- return (int) (count + _maxCapacity - _threads.size());
- }
- int ThreadPool::used() const
- {
- FastMutex::ScopedLock lock(_mutex);
- int count = 0;
- for (ThreadVec::const_iterator it = _threads.begin(); it != _threads.end(); ++it)
- {
- if (!(*it)->idle()) ++count;
- }
- return count;
- }
- int ThreadPool::allocated() const
- {
- FastMutex::ScopedLock lock(_mutex);
- return int(_threads.size());
- }
- void ThreadPool::start(Runnable& target)
- {
- getThread()->start(Thread::PRIO_NORMAL, target);
- }
- void ThreadPool::start(Runnable& target, const std::string& name)
- {
- getThread()->start(Thread::PRIO_NORMAL, target, name);
- }
- void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target)
- {
- getThread()->start(priority, target);
- }
- void ThreadPool::startWithPriority(Thread::Priority priority, Runnable& target, const std::string& name)
- {
- getThread()->start(priority, target, name);
- }
- void ThreadPool::stopAll()
- {
- FastMutex::ScopedLock lock(_mutex);
- for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
- {
- (*it)->release();
- }
- _threads.clear();
- }
- void ThreadPool::joinAll()
- {
- FastMutex::ScopedLock lock(_mutex);
- for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
- {
- (*it)->join();
- }
- housekeep();
- }
- void ThreadPool::collect()
- {
- FastMutex::ScopedLock lock(_mutex);
- housekeep();
- }
- void ThreadPool::housekeep()
- {
- _age = 0;
- if (_threads.size() <= _minCapacity)
- return;
- ThreadVec idleThreads;
- ThreadVec expiredThreads;
- ThreadVec activeThreads;
- idleThreads.reserve(_threads.size());
- activeThreads.reserve(_threads.size());
-
- for (ThreadVec::iterator it = _threads.begin(); it != _threads.end(); ++it)
- {
- if ((*it)->idle())
- {
- if ((*it)->idleTime() < _idleTime)
- idleThreads.push_back(*it);
- else
- expiredThreads.push_back(*it);
- }
- else activeThreads.push_back(*it);
- }
- int n = (int) activeThreads.size();
- int limit = (int) idleThreads.size() + n;
- if (limit < _minCapacity) limit = _minCapacity;
- idleThreads.insert(idleThreads.end(), expiredThreads.begin(), expiredThreads.end());
- _threads.clear();
- for (ThreadVec::iterator it = idleThreads.begin(); it != idleThreads.end(); ++it)
- {
- if (n < limit)
- {
- _threads.push_back(*it);
- ++n;
- }
- else (*it)->release();
- }
- _threads.insert(_threads.end(), activeThreads.begin(), activeThreads.end());
- }
- PooledThread* ThreadPool::getThread()
- {
- FastMutex::ScopedLock lock(_mutex);
- if (++_age == 32)
- housekeep();
- PooledThread* pThread = 0;
- for (ThreadVec::iterator it = _threads.begin(); !pThread && it != _threads.end(); ++it)
- {
- if ((*it)->idle())
- pThread = *it;
- }
- if (!pThread)
- {
- if (_threads.size() < _maxCapacity)
- {
- pThread = createThread();
- try
- {
- pThread->start();
- _threads.push_back(pThread);
- } catch (...)
- {
- delete pThread;
- throw;
- }
- }
- else
- throw NoThreadAvailableException();
- }
- pThread->activate();
- return pThread;
- }
- PooledThread* ThreadPool::createThread()
- {
- std::ostringstream name;
- name << _name << "[#" << ++_serial << "]";
- return new PooledThread(name.str(), _stackSize);
- }
- class ThreadPoolSingletonHolder
- {
- public:
- ThreadPoolSingletonHolder()
- {
- _pPool = 0;
- }
- ~ThreadPoolSingletonHolder()
- {
- delete _pPool;
- }
- ThreadPool* pool()
- {
- FastMutex::ScopedLock lock(_mutex);
-
- if (!_pPool)
- {
- _pPool = new ThreadPool("default");
- if (POCO_THREAD_STACK_SIZE > 0)
- _pPool->setStackSize(POCO_THREAD_STACK_SIZE);
- }
- return _pPool;
- }
-
- private:
- ThreadPool* _pPool;
- FastMutex _mutex;
- };
- namespace
- {
- static ThreadPoolSingletonHolder sh;
- }
- ThreadPool& ThreadPool::defaultPool()
- {
- return *sh.pool();
- }
- } // namespace Poco
|