threadPool.cpp 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #include "platform/threads/threadPool.h"
  23. #include "platform/threads/thread.h"
  24. #include "platform/platformCPUCount.h"
  25. #include "core/strings/stringFunctions.h"
  26. #include "core/util/tSingleton.h"
  27. //#define DEBUG_SPEW
  28. //=============================================================================
  29. // ThreadPool::Context.
  30. //=============================================================================
  31. ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 );
  32. //--------------------------------------------------------------------------
  33. ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias )
  34. : mParent( parent ),
  35. mName( name ),
  36. mChildren( 0 ),
  37. mSibling( 0 ),
  38. mPriorityBias( priorityBias ),
  39. mAccumulatedPriorityBias( 0.0 )
  40. {
  41. if( parent )
  42. {
  43. mSibling = mParent->mChildren;
  44. mParent->mChildren = this;
  45. }
  46. }
  47. //--------------------------------------------------------------------------
  48. ThreadPool::Context::~Context()
  49. {
  50. if( mParent )
  51. for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling )
  52. if( context == this )
  53. {
  54. if( !prev )
  55. mParent->mChildren = this->mSibling;
  56. else
  57. prev->mSibling = this->mSibling;
  58. }
  59. }
  60. //--------------------------------------------------------------------------
  61. ThreadPool::Context* ThreadPool::Context::getChild( const char* name )
  62. {
  63. for( Context* child = getChildren(); child != 0; child = child->getSibling() )
  64. if( dStricmp( child->getName(), name ) == 0 )
  65. return child;
  66. return 0;
  67. }
  68. //--------------------------------------------------------------------------
  69. F32 ThreadPool::Context::getAccumulatedPriorityBias()
  70. {
  71. if( !mAccumulatedPriorityBias )
  72. updateAccumulatedPriorityBiases();
  73. return mAccumulatedPriorityBias;
  74. }
  75. //--------------------------------------------------------------------------
  76. void ThreadPool::Context::setPriorityBias( F32 value )
  77. {
  78. mPriorityBias = value;
  79. mAccumulatedPriorityBias = 0.0;
  80. }
  81. //--------------------------------------------------------------------------
  82. void ThreadPool::Context::updateAccumulatedPriorityBiases()
  83. {
  84. // Update our own priority bias.
  85. mAccumulatedPriorityBias = mPriorityBias;
  86. for( Context* context = getParent(); context != 0; context = context->getParent() )
  87. mAccumulatedPriorityBias *= context->getPriorityBias();
  88. // Update our children.
  89. for( Context* child = getChildren(); child != 0; child = child->getSibling() )
  90. child->updateAccumulatedPriorityBiases();
  91. }
  92. //=============================================================================
  93. // ThreadPool::WorkItem.
  94. //=============================================================================
  95. //--------------------------------------------------------------------------
  96. void ThreadPool::WorkItem::process()
  97. {
  98. execute();
  99. mExecuted = true;
  100. }
  101. //--------------------------------------------------------------------------
  102. bool ThreadPool::WorkItem::isCancellationRequested()
  103. {
  104. return false;
  105. }
  106. //--------------------------------------------------------------------------
  107. bool ThreadPool::WorkItem::cancellationPoint()
  108. {
  109. if( isCancellationRequested() )
  110. {
  111. onCancelled();
  112. return true;
  113. }
  114. else
  115. return false;
  116. }
  117. //--------------------------------------------------------------------------
  118. F32 ThreadPool::WorkItem::getPriority()
  119. {
  120. return 1.0;
  121. }
  122. //=============================================================================
  123. // ThreadPool::WorkItemWrapper.
  124. //=============================================================================
  125. /// Value wrapper for work items while placed on priority queue.
  126. /// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate.
  127. ///
  128. /// @see ThreadSafePriorityQueueWithUpdate
  129. /// @see ThreadPool::WorkItem
  130. ///
  131. struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem >
  132. {
  133. typedef ThreadSafeRef< WorkItem > Parent;
  134. WorkItemWrapper() {}
  135. WorkItemWrapper( WorkItem* item )
  136. : Parent( item ) {}
  137. bool isAlive();
  138. F32 getPriority();
  139. };
  140. inline bool ThreadPool::WorkItemWrapper::isAlive()
  141. {
  142. WorkItem* item = ptr();
  143. if( !item )
  144. return false;
  145. else if( item->isCancellationRequested() )
  146. {
  147. ( *this ) = 0;
  148. return false;
  149. }
  150. else
  151. return true;
  152. }
  153. inline F32 ThreadPool::WorkItemWrapper::getPriority()
  154. {
  155. WorkItem* item = ptr();
  156. AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" );
  157. // Compute a scaled priority value based on the item's context.
  158. return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() );
  159. }
  160. //=============================================================================
  161. // ThreadPool::WorkerThread.
  162. //=============================================================================
  163. ///
  164. ///
  165. struct ThreadPool::WorkerThread : public Thread
  166. {
  167. WorkerThread( ThreadPool* pool, U32 index );
  168. WorkerThread* getNext();
  169. virtual void run( void* arg = 0 );
  170. private:
  171. U32 mIndex;
  172. ThreadPool* mPool;
  173. WorkerThread* mNext;
  174. };
  175. ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index )
  176. : mIndex( index ),
  177. mPool( pool )
  178. {
  179. // Link us to the pool's thread list.
  180. mNext = pool->mThreads;
  181. pool->mThreads = this;
  182. }
  183. inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext()
  184. {
  185. return mNext;
  186. }
  187. void ThreadPool::WorkerThread::run( void* arg )
  188. {
  189. #ifdef TORQUE_DEBUG
  190. {
  191. // Set the thread's name for debugging.
  192. char buffer[ 2048 ];
  193. dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex );
  194. _setName( buffer );
  195. }
  196. #endif
  197. while( 1 )
  198. {
  199. if( checkForStop() )
  200. {
  201. #ifdef DEBUG_SPEW
  202. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() );
  203. #endif
  204. dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 );
  205. return;
  206. }
  207. // Mark us as potentially blocking.
  208. dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 );
  209. bool waitForSignal = false;
  210. {
  211. // Try to take an item from the queue. Do
  212. // this in a separate block, so we'll be
  213. // releasing the item after we have finished.
  214. WorkItemWrapper workItem;
  215. if( mPool->mWorkItemQueue.takeNext( workItem ) )
  216. {
  217. // Mark us as non-blocking as this loop definitely
  218. // won't wait on the semaphore.
  219. dFetchAndAdd( mPool->mNumThreadsReady, 1 );
  220. #ifdef DEBUG_SPEW
  221. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
  222. #endif
  223. workItem->process();
  224. dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 );
  225. }
  226. else
  227. waitForSignal = true;
  228. }
  229. if( waitForSignal )
  230. {
  231. dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 );
  232. #ifdef DEBUG_SPEW
  233. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() );
  234. #endif
  235. mPool->mSemaphore.acquire();
  236. #ifdef DEBUG_SPEW
  237. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() );
  238. #endif
  239. dFetchAndAdd( mPool->mNumThreadsAwake, 1 );
  240. dFetchAndAdd( mPool->mNumThreadsReady, 1 );
  241. }
  242. }
  243. }
  244. //=============================================================================
  245. // ThreadPool.
  246. //=============================================================================
  247. bool ThreadPool::smForceAllMainThread;
  248. U32 ThreadPool::smMainThreadTimeMS;
  249. ThreadPool::QueueType ThreadPool::smMainThreadQueue;
  250. //--------------------------------------------------------------------------
  251. ThreadPool::ThreadPool( const char* name, U32 numThreads )
  252. : mName( name ),
  253. mNumThreads( numThreads ),
  254. mNumThreadsAwake( 0 ),
  255. mNumPendingItems( 0 ),
  256. mSemaphore( 0 ),
  257. mThreads( 0 )
  258. {
  259. // Number of worker threads to create.
  260. if( !mNumThreads )
  261. {
  262. // Use platformCPUInfo directly as in the case of the global pool,
  263. // Platform::SystemInfo will not yet have been initialized.
  264. U32 numLogical = 0;
  265. U32 numCores = 0;
  266. CPUInfo::CPUCount( numLogical, numCores );
  267. const U32 baseCount = getMax( numLogical, numCores );
  268. mNumThreads = (baseCount > 0) ? baseCount : 2;
  269. }
  270. #ifdef DEBUG_SPEW
  271. Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads );
  272. #endif
  273. // Create the threads.
  274. mNumThreadsAwake = mNumThreads;
  275. mNumThreadsReady = mNumThreads;
  276. for( U32 i = 0; i < mNumThreads; i ++ )
  277. {
  278. WorkerThread* thread = new WorkerThread( this, i );
  279. thread->start();
  280. }
  281. }
  282. //--------------------------------------------------------------------------
  283. ThreadPool::~ThreadPool()
  284. {
  285. shutdown();
  286. }
  287. //--------------------------------------------------------------------------
  288. void ThreadPool::shutdown()
  289. {
  290. const U32 numThreads = mNumThreads;
  291. // Tell our worker threads to stop.
  292. for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() )
  293. thread->stop();
  294. // Release the semaphore as many times as there are threads.
  295. // Doing this separately guarantees we're not waking a thread
  296. // that hasn't been set its stop flag yet.
  297. for( U32 n = 0; n < numThreads; ++ n )
  298. mSemaphore.release();
  299. // Delete each worker thread. Wait until death as we're prone to
  300. // running into issues with decomposing work item lists otherwise.
  301. for( WorkerThread* thread = mThreads; thread != 0; )
  302. {
  303. WorkerThread* next = thread->getNext();
  304. thread->join();
  305. delete thread;
  306. thread = next;
  307. }
  308. mThreads = NULL;
  309. mNumThreads = 0;
  310. }
  311. //--------------------------------------------------------------------------
  312. void ThreadPool::queueWorkItem( WorkItem* item )
  313. {
  314. bool executeRightAway = ( getForceAllMainThread() );
  315. #ifdef DEBUG_SPEW
  316. Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'",
  317. ( executeRightAway ? "executing" : "queuing" ),
  318. item );
  319. #endif
  320. if( executeRightAway )
  321. item->process();
  322. else
  323. {
  324. // Put the item in the queue.
  325. dFetchAndAdd( mNumPendingItems, 1 );
  326. mWorkItemQueue.insert( item->getPriority(), item );
  327. mSemaphore.release();
  328. }
  329. }
  330. //--------------------------------------------------------------------------
  331. void ThreadPool::flushWorkItems( S32 timeOut )
  332. {
  333. AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" );
  334. U32 endTime = 0;
  335. if( timeOut != -1 )
  336. endTime = Platform::getRealMilliseconds() + timeOut;
  337. // Spinlock until the queue is empty.
  338. while( !mWorkItemQueue.isEmpty() )
  339. {
  340. Platform::sleep( 25 );
  341. // Stop if we have exceeded our processing time budget.
  342. if( timeOut != -1
  343. && Platform::getRealMilliseconds() >= endTime )
  344. break;
  345. }
  346. }
  347. void ThreadPool::waitForAllItems( S32 timeOut )
  348. {
  349. U32 endTime = 0;
  350. if( timeOut != -1 )
  351. endTime = Platform::getRealMilliseconds() + timeOut;
  352. // Spinlock until there are no items that have not been processed.
  353. while( dAtomicRead( mNumPendingItems ) )
  354. {
  355. Platform::sleep( 25 );
  356. // Stop if we have exceeded our processing time budget.
  357. if( timeOut != -1
  358. && Platform::getRealMilliseconds() >= endTime )
  359. break;
  360. }
  361. }
  362. //--------------------------------------------------------------------------
  363. void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )
  364. {
  365. smMainThreadQueue.insert( item->getPriority(), item );
  366. }
  367. //--------------------------------------------------------------------------
  368. void ThreadPool::processMainThreadWorkItems()
  369. {
  370. AssertFatal( ThreadManager::isMainThread(),
  371. "ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" );
  372. U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() );
  373. do
  374. {
  375. WorkItemWrapper item;
  376. if( !smMainThreadQueue.takeNext( item ) )
  377. break;
  378. else
  379. item->process();
  380. }
  381. while( Platform::getRealMilliseconds() < timeLimit );
  382. }