threadPool.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #include "platform/threads/threadPool.h"
  23. #include "platform/threads/thread.h"
  24. #include "platform/platformCPUCount.h"
  25. #include "core/strings/stringFunctions.h"
  26. #include "core/util/tSingleton.h"
  27. //#define DEBUG_SPEW
  28. //=============================================================================
  29. // ThreadPool::Context.
  30. //=============================================================================
  31. ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 );
  32. //--------------------------------------------------------------------------
  33. ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias )
  34. : mName( name ),
  35. mParent( parent ),
  36. mSibling( 0 ),
  37. mChildren( 0 ),
  38. mPriorityBias( priorityBias ),
  39. mAccumulatedPriorityBias( 0.0 )
  40. {
  41. if( parent )
  42. {
  43. mSibling = mParent->mChildren;
  44. mParent->mChildren = this;
  45. }
  46. }
  47. //--------------------------------------------------------------------------
  48. ThreadPool::Context::~Context()
  49. {
  50. if( mParent )
  51. for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling )
  52. if( context == this )
  53. {
  54. if( !prev )
  55. mParent->mChildren = this->mSibling;
  56. else
  57. prev->mSibling = this->mSibling;
  58. }
  59. }
  60. //--------------------------------------------------------------------------
  61. ThreadPool::Context* ThreadPool::Context::getChild( const char* name )
  62. {
  63. for( Context* child = getChildren(); child != 0; child = child->getSibling() )
  64. if( dStricmp( child->getName(), name ) == 0 )
  65. return child;
  66. return 0;
  67. }
  68. //--------------------------------------------------------------------------
  69. F32 ThreadPool::Context::getAccumulatedPriorityBias()
  70. {
  71. if( !mAccumulatedPriorityBias )
  72. updateAccumulatedPriorityBiases();
  73. return mAccumulatedPriorityBias;
  74. }
  75. //--------------------------------------------------------------------------
  76. void ThreadPool::Context::setPriorityBias( F32 value )
  77. {
  78. mPriorityBias = value;
  79. mAccumulatedPriorityBias = 0.0;
  80. }
  81. //--------------------------------------------------------------------------
  82. void ThreadPool::Context::updateAccumulatedPriorityBiases()
  83. {
  84. // Update our own priority bias.
  85. mAccumulatedPriorityBias = mPriorityBias;
  86. for( Context* context = getParent(); context != 0; context = context->getParent() )
  87. mAccumulatedPriorityBias *= context->getPriorityBias();
  88. // Update our children.
  89. for( Context* child = getChildren(); child != 0; child = child->getSibling() )
  90. child->updateAccumulatedPriorityBiases();
  91. }
  92. //=============================================================================
  93. // ThreadPool::WorkItem.
  94. //=============================================================================
  95. //--------------------------------------------------------------------------
  96. void ThreadPool::WorkItem::process()
  97. {
  98. execute();
  99. mExecuted = true;
  100. }
  101. //--------------------------------------------------------------------------
  102. bool ThreadPool::WorkItem::isCancellationRequested()
  103. {
  104. return false;
  105. }
  106. //--------------------------------------------------------------------------
  107. bool ThreadPool::WorkItem::cancellationPoint()
  108. {
  109. if( isCancellationRequested() )
  110. {
  111. onCancelled();
  112. return true;
  113. }
  114. else
  115. return false;
  116. }
  117. //--------------------------------------------------------------------------
  118. F32 ThreadPool::WorkItem::getPriority()
  119. {
  120. return 1.0;
  121. }
  122. //=============================================================================
  123. // ThreadPool::WorkItemWrapper.
  124. //=============================================================================
  125. /// Value wrapper for work items while placed on priority queue.
  126. /// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate.
  127. ///
  128. /// @see ThreadSafePriorityQueueWithUpdate
  129. /// @see ThreadPool::WorkItem
  130. ///
  131. struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem >
  132. {
  133. typedef ThreadSafeRef< WorkItem > Parent;
  134. WorkItemWrapper() {}
  135. WorkItemWrapper( WorkItem* item )
  136. : Parent( item ) {}
  137. bool isAlive();
  138. F32 getPriority();
  139. };
  140. inline bool ThreadPool::WorkItemWrapper::isAlive()
  141. {
  142. WorkItem* item = ptr();
  143. if( !item )
  144. return false;
  145. else if( item->isCancellationRequested() )
  146. {
  147. ( *this ) = 0;
  148. return false;
  149. }
  150. else
  151. return true;
  152. }
  153. inline F32 ThreadPool::WorkItemWrapper::getPriority()
  154. {
  155. WorkItem* item = ptr();
  156. AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" );
  157. // Compute a scaled priority value based on the item's context.
  158. return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() );
  159. }
  160. //=============================================================================
  161. // ThreadPool::WorkerThread.
  162. //=============================================================================
  163. ///
  164. ///
  165. struct ThreadPool::WorkerThread : public Thread
  166. {
  167. WorkerThread( ThreadPool* pool, U32 index );
  168. WorkerThread* getNext();
  169. virtual void run( void* arg = 0 );
  170. private:
  171. U32 mIndex;
  172. ThreadPool* mPool;
  173. WorkerThread* mNext;
  174. };
  175. ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index )
  176. : mPool( pool ),
  177. mIndex( index )
  178. {
  179. // Link us to the pool's thread list.
  180. mNext = pool->mThreads;
  181. pool->mThreads = this;
  182. }
  183. inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext()
  184. {
  185. return mNext;
  186. }
  187. void ThreadPool::WorkerThread::run( void* arg )
  188. {
  189. #ifdef TORQUE_DEBUG
  190. {
  191. // Set the thread's name for debugging.
  192. char buffer[ 2048 ];
  193. dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex );
  194. _setName( buffer );
  195. }
  196. #endif
  197. #if defined(TORQUE_OS_XENON)
  198. // On Xbox 360 you must explicitly assign software threads to hardware threads.
  199. // This will distribute job threads across the secondary CPUs leaving both
  200. // primary CPU cores available to the "main" thread. This will help prevent
  201. // more L2 thrashing of the main thread/core.
  202. static U32 sCoreAssignment = 2;
  203. XSetThreadProcessor( GetCurrentThread(), sCoreAssignment );
  204. sCoreAssignment = sCoreAssignment < 6 ? sCoreAssignment + 1 : 2;
  205. #endif
  206. while( 1 )
  207. {
  208. if( checkForStop() )
  209. {
  210. #ifdef DEBUG_SPEW
  211. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() );
  212. #endif
  213. dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 );
  214. return;
  215. }
  216. // Mark us as potentially blocking.
  217. dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 );
  218. bool waitForSignal = false;
  219. {
  220. // Try to take an item from the queue. Do
  221. // this in a separate block, so we'll be
  222. // releasing the item after we have finished.
  223. WorkItemWrapper workItem;
  224. if( mPool->mWorkItemQueue.takeNext( workItem ) )
  225. {
  226. // Mark us as non-blocking as this loop definitely
  227. // won't wait on the semaphore.
  228. dFetchAndAdd( mPool->mNumThreadsReady, 1 );
  229. #ifdef DEBUG_SPEW
  230. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
  231. #endif
  232. workItem->process();
  233. dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 );
  234. }
  235. else
  236. waitForSignal = true;
  237. }
  238. if( waitForSignal )
  239. {
  240. dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 );
  241. #ifdef DEBUG_SPEW
  242. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() );
  243. #endif
  244. mPool->mSemaphore.acquire();
  245. #ifdef DEBUG_SPEW
  246. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() );
  247. #endif
  248. dFetchAndAdd( mPool->mNumThreadsAwake, 1 );
  249. dFetchAndAdd( mPool->mNumThreadsReady, 1 );
  250. }
  251. }
  252. }
  253. //=============================================================================
  254. // ThreadPool.
  255. //=============================================================================
  256. bool ThreadPool::smForceAllMainThread;
  257. U32 ThreadPool::smMainThreadTimeMS;
  258. ThreadPool::QueueType ThreadPool::smMainThreadQueue;
  259. //--------------------------------------------------------------------------
  260. ThreadPool::ThreadPool( const char* name, U32 numThreads )
  261. : mName( name ),
  262. mNumThreads( numThreads ),
  263. mNumThreadsAwake( 0 ),
  264. mNumPendingItems( 0 ),
  265. mThreads( 0 ),
  266. mSemaphore( 0 )
  267. {
  268. // Number of worker threads to create.
  269. if( !mNumThreads )
  270. {
  271. // Use platformCPUInfo directly as in the case of the global pool,
  272. // Platform::SystemInfo will not yet have been initialized.
  273. U32 numLogical = 0;
  274. U32 numPhysical = 0;
  275. U32 numCores = 0;
  276. CPUInfo::CPUCount( numLogical, numCores, numPhysical );
  277. const U32 baseCount = getMax( numLogical, numCores );
  278. mNumThreads = (baseCount > 0) ? baseCount : 2;
  279. }
  280. #ifdef DEBUG_SPEW
  281. Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads );
  282. #endif
  283. // Create the threads.
  284. mNumThreadsAwake = mNumThreads;
  285. mNumThreadsReady = mNumThreads;
  286. for( U32 i = 0; i < mNumThreads; i ++ )
  287. {
  288. WorkerThread* thread = new WorkerThread( this, i );
  289. thread->start();
  290. }
  291. }
  292. //--------------------------------------------------------------------------
  293. ThreadPool::~ThreadPool()
  294. {
  295. shutdown();
  296. }
  297. //--------------------------------------------------------------------------
  298. void ThreadPool::shutdown()
  299. {
  300. const U32 numThreads = mNumThreads;
  301. // Tell our worker threads to stop.
  302. for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() )
  303. thread->stop();
  304. // Release the semaphore as many times as there are threads.
  305. // Doing this separately guarantees we're not waking a thread
  306. // that hasn't been set its stop flag yet.
  307. for( U32 n = 0; n < numThreads; ++ n )
  308. mSemaphore.release();
  309. // Delete each worker thread. Wait until death as we're prone to
  310. // running into issues with decomposing work item lists otherwise.
  311. for( WorkerThread* thread = mThreads; thread != 0; )
  312. {
  313. WorkerThread* next = thread->getNext();
  314. thread->join();
  315. delete thread;
  316. thread = next;
  317. }
  318. mThreads = NULL;
  319. mNumThreads = 0;
  320. }
  321. //--------------------------------------------------------------------------
  322. void ThreadPool::queueWorkItem( WorkItem* item )
  323. {
  324. bool executeRightAway = ( getForceAllMainThread() );
  325. #ifdef DEBUG_SPEW
  326. Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'",
  327. ( executeRightAway ? "executing" : "queuing" ),
  328. item );
  329. #endif
  330. if( executeRightAway )
  331. item->process();
  332. else
  333. {
  334. // Put the item in the queue.
  335. dFetchAndAdd( mNumPendingItems, 1 );
  336. mWorkItemQueue.insert( item->getPriority(), item );
  337. mSemaphore.release();
  338. }
  339. }
  340. //--------------------------------------------------------------------------
  341. void ThreadPool::flushWorkItems( S32 timeOut )
  342. {
  343. AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" );
  344. U32 endTime = 0;
  345. if( timeOut != -1 )
  346. endTime = Platform::getRealMilliseconds() + timeOut;
  347. // Spinlock until the queue is empty.
  348. while( !mWorkItemQueue.isEmpty() )
  349. {
  350. Platform::sleep( 25 );
  351. // Stop if we have exceeded our processing time budget.
  352. if( timeOut != -1
  353. && Platform::getRealMilliseconds() >= endTime )
  354. break;
  355. }
  356. }
  357. void ThreadPool::waitForAllItems( S32 timeOut )
  358. {
  359. U32 endTime = 0;
  360. if( timeOut != -1 )
  361. endTime = Platform::getRealMilliseconds() + timeOut;
  362. // Spinlock until there are no items that have not been processed.
  363. while( dAtomicRead( mNumPendingItems ) )
  364. {
  365. Platform::sleep( 25 );
  366. // Stop if we have exceeded our processing time budget.
  367. if( timeOut != -1
  368. && Platform::getRealMilliseconds() >= endTime )
  369. break;
  370. }
  371. }
  372. //--------------------------------------------------------------------------
  373. void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )
  374. {
  375. smMainThreadQueue.insert( item->getPriority(), item );
  376. }
  377. //--------------------------------------------------------------------------
  378. void ThreadPool::processMainThreadWorkItems()
  379. {
  380. AssertFatal( ThreadManager::isMainThread(),
  381. "ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" );
  382. U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() );
  383. do
  384. {
  385. WorkItemWrapper item;
  386. if( !smMainThreadQueue.takeNext( item ) )
  387. break;
  388. else
  389. item->process();
  390. }
  391. while( Platform::getRealMilliseconds() < timeLimit );
  392. }