threadPool.cpp 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #include "platform/threads/threadPool.h"
  23. #include "platform/threads/thread.h"
  24. #include "platform/platformCPUCount.h"
  25. #include "core/strings/stringFunctions.h"
  26. #include "core/util/tSingleton.h"
  27. //#define DEBUG_SPEW
  28. //=============================================================================
  29. // ThreadPool::Context.
  30. //=============================================================================
  31. ThreadPool::Context ThreadPool::Context::smRootContext( "ROOT", NULL, 1.0 );
  32. //--------------------------------------------------------------------------
  33. ThreadPool::Context::Context( const char* name, ThreadPool::Context* parent, F32 priorityBias )
  34. : mName( name ),
  35. mParent( parent ),
  36. mSibling( 0 ),
  37. mChildren( 0 ),
  38. mPriorityBias( priorityBias ),
  39. mAccumulatedPriorityBias( 0.0 )
  40. {
  41. if( parent )
  42. {
  43. mSibling = mParent->mChildren;
  44. mParent->mChildren = this;
  45. }
  46. }
  47. //--------------------------------------------------------------------------
  48. ThreadPool::Context::~Context()
  49. {
  50. if( mParent )
  51. for( Context* context = mParent->mChildren, *prev = 0; context != 0; prev = context, context = context->mSibling )
  52. if( context == this )
  53. {
  54. if( !prev )
  55. mParent->mChildren = this->mSibling;
  56. else
  57. prev->mSibling = this->mSibling;
  58. }
  59. }
  60. //--------------------------------------------------------------------------
  61. ThreadPool::Context* ThreadPool::Context::getChild( const char* name )
  62. {
  63. for( Context* child = getChildren(); child != 0; child = child->getSibling() )
  64. if( dStricmp( child->getName(), name ) == 0 )
  65. return child;
  66. return 0;
  67. }
  68. //--------------------------------------------------------------------------
  69. F32 ThreadPool::Context::getAccumulatedPriorityBias()
  70. {
  71. if( !mAccumulatedPriorityBias )
  72. updateAccumulatedPriorityBiases();
  73. return mAccumulatedPriorityBias;
  74. }
  75. //--------------------------------------------------------------------------
  76. void ThreadPool::Context::setPriorityBias( F32 value )
  77. {
  78. mPriorityBias = value;
  79. mAccumulatedPriorityBias = 0.0;
  80. }
  81. //--------------------------------------------------------------------------
  82. void ThreadPool::Context::updateAccumulatedPriorityBiases()
  83. {
  84. // Update our own priority bias.
  85. mAccumulatedPriorityBias = mPriorityBias;
  86. for( Context* context = getParent(); context != 0; context = context->getParent() )
  87. mAccumulatedPriorityBias *= context->getPriorityBias();
  88. // Update our children.
  89. for( Context* child = getChildren(); child != 0; child = child->getSibling() )
  90. child->updateAccumulatedPriorityBiases();
  91. }
  92. //=============================================================================
  93. // ThreadPool::WorkItem.
  94. //=============================================================================
  95. //--------------------------------------------------------------------------
  96. void ThreadPool::WorkItem::process()
  97. {
  98. execute();
  99. }
  100. //--------------------------------------------------------------------------
  101. bool ThreadPool::WorkItem::isCancellationRequested()
  102. {
  103. return false;
  104. }
  105. //--------------------------------------------------------------------------
  106. bool ThreadPool::WorkItem::cancellationPoint()
  107. {
  108. if( isCancellationRequested() )
  109. {
  110. onCancelled();
  111. return true;
  112. }
  113. else
  114. return false;
  115. }
  116. //--------------------------------------------------------------------------
  117. F32 ThreadPool::WorkItem::getPriority()
  118. {
  119. return 1.0;
  120. }
  121. //=============================================================================
  122. // ThreadPool::WorkItemWrapper.
  123. //=============================================================================
  124. /// Value wrapper for work items while placed on priority queue.
  125. /// Conforms to interface dictated by ThreadSafePriorityQueueWithUpdate.
  126. ///
  127. /// @see ThreadSafePriorityQueueWithUpdate
  128. /// @see ThreadPool::WorkItem
  129. ///
  130. struct ThreadPool::WorkItemWrapper : public ThreadSafeRef< WorkItem >
  131. {
  132. typedef ThreadSafeRef< WorkItem > Parent;
  133. WorkItemWrapper() {}
  134. WorkItemWrapper( WorkItem* item )
  135. : Parent( item ) {}
  136. bool isAlive();
  137. F32 getPriority();
  138. };
  139. inline bool ThreadPool::WorkItemWrapper::isAlive()
  140. {
  141. WorkItem* item = ptr();
  142. if( !item )
  143. return false;
  144. else if( item->isCancellationRequested() )
  145. {
  146. ( *this ) = 0;
  147. return false;
  148. }
  149. else
  150. return true;
  151. }
  152. inline F32 ThreadPool::WorkItemWrapper::getPriority()
  153. {
  154. WorkItem* item = ptr();
  155. AssertFatal( item != 0, "ThreadPool::WorkItemWrapper::getPriority - called on dead item" );
  156. // Compute a scaled priority value based on the item's context.
  157. return ( item->getContext()->getAccumulatedPriorityBias() * item->getPriority() );
  158. }
  159. //=============================================================================
  160. // ThreadPool::WorkerThread.
  161. //=============================================================================
  162. ///
  163. ///
  164. struct ThreadPool::WorkerThread : public Thread
  165. {
  166. WorkerThread( ThreadPool* pool, U32 index );
  167. WorkerThread* getNext();
  168. virtual void run( void* arg = 0 );
  169. private:
  170. U32 mIndex;
  171. ThreadPool* mPool;
  172. WorkerThread* mNext;
  173. };
  174. ThreadPool::WorkerThread::WorkerThread( ThreadPool* pool, U32 index )
  175. : mPool( pool ),
  176. mIndex( index )
  177. {
  178. // Link us to the pool's thread list.
  179. mNext = pool->mThreads;
  180. pool->mThreads = this;
  181. }
  182. inline ThreadPool::WorkerThread* ThreadPool::WorkerThread::getNext()
  183. {
  184. return mNext;
  185. }
  186. void ThreadPool::WorkerThread::run( void* arg )
  187. {
  188. #ifdef TORQUE_DEBUG
  189. {
  190. // Set the thread's name for debugging.
  191. char buffer[ 2048 ];
  192. dSprintf( buffer, sizeof( buffer ), "ThreadPool(%s) WorkerThread %i", mPool->mName.c_str(), mIndex );
  193. _setName( buffer );
  194. }
  195. #endif
  196. #if defined(TORQUE_OS_XENON)
  197. // On Xbox 360 you must explicitly assign software threads to hardware threads.
  198. // This will distribute job threads across the secondary CPUs leaving both
  199. // primary CPU cores available to the "main" thread. This will help prevent
  200. // more L2 thrashing of the main thread/core.
  201. static U32 sCoreAssignment = 2;
  202. XSetThreadProcessor( GetCurrentThread(), sCoreAssignment );
  203. sCoreAssignment = sCoreAssignment < 6 ? sCoreAssignment + 1 : 2;
  204. #endif
  205. while( 1 )
  206. {
  207. if( checkForStop() )
  208. {
  209. #ifdef DEBUG_SPEW
  210. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' exits", getId() );
  211. #endif
  212. dFetchAndAdd( mPool->mNumThreads, ( U32 ) -1 );
  213. return;
  214. }
  215. // Mark us as potentially blocking.
  216. dFetchAndAdd( mPool->mNumThreadsReady, ( U32 ) -1 );
  217. bool waitForSignal = false;
  218. {
  219. // Try to take an item from the queue. Do
  220. // this in a separate block, so we'll be
  221. // releasing the item after we have finished.
  222. WorkItemWrapper workItem;
  223. if( mPool->mWorkItemQueue.takeNext( workItem ) )
  224. {
  225. // Mark us as non-blocking as this loop definitely
  226. // won't wait on the semaphore.
  227. dFetchAndAdd( mPool->mNumThreadsReady, 1 );
  228. #ifdef DEBUG_SPEW
  229. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
  230. #endif
  231. workItem->process();
  232. }
  233. else
  234. waitForSignal = true;
  235. }
  236. if( waitForSignal )
  237. {
  238. dFetchAndAdd( mPool->mNumThreadsAwake, ( U32 ) -1 );
  239. #ifdef DEBUG_SPEW
  240. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' going to sleep", getId() );
  241. #endif
  242. mPool->mSemaphore.acquire();
  243. #ifdef DEBUG_SPEW
  244. Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' waking up", getId() );
  245. #endif
  246. dFetchAndAdd( mPool->mNumThreadsAwake, 1 );
  247. dFetchAndAdd( mPool->mNumThreadsReady, 1 );
  248. }
  249. }
  250. }
  251. //=============================================================================
  252. // ThreadPool.
  253. //=============================================================================
  254. bool ThreadPool::smForceAllMainThread;
  255. U32 ThreadPool::smMainThreadTimeMS;
  256. ThreadPool::QueueType ThreadPool::smMainThreadQueue;
  257. //--------------------------------------------------------------------------
  258. ThreadPool::ThreadPool( const char* name, U32 numThreads )
  259. : mName( name ),
  260. mNumThreads( numThreads ),
  261. mNumThreadsAwake( 0 ),
  262. mThreads( 0 ),
  263. mSemaphore( 0 )
  264. {
  265. // Number of worker threads to create.
  266. if( !mNumThreads )
  267. {
  268. // Use platformCPUInfo directly as in the case of the global pool,
  269. // Platform::SystemInfo will not yet have been initialized.
  270. U32 numLogical;
  271. U32 numPhysical;
  272. U32 numCores;
  273. CPUInfo::CPUCount( numLogical, numCores, numPhysical );
  274. const U32 baseCount = getMax( numLogical, numCores );
  275. if( baseCount )
  276. mNumThreads = baseCount;
  277. else
  278. mNumThreads = 2;
  279. }
  280. #ifdef DEBUG_SPEW
  281. Platform::outputDebugString( "[ThreadPool] spawning %i threads", mNumThreads );
  282. #endif
  283. // Create the threads.
  284. mNumThreadsAwake = mNumThreads;
  285. mNumThreadsReady = mNumThreads;
  286. for( U32 i = 0; i < mNumThreads; i ++ )
  287. {
  288. WorkerThread* thread = new WorkerThread( this, i );
  289. thread->start();
  290. }
  291. }
  292. //--------------------------------------------------------------------------
  293. ThreadPool::~ThreadPool()
  294. {
  295. shutdown();
  296. }
  297. //--------------------------------------------------------------------------
  298. void ThreadPool::shutdown()
  299. {
  300. const U32 numThreads = mNumThreads;
  301. // Tell our worker threads to stop.
  302. for( WorkerThread* thread = mThreads; thread != 0; thread = thread->getNext() )
  303. thread->stop();
  304. // Release the semaphore as many times as there are threads.
  305. // Doing this separately guarantees we're not waking a thread
  306. // that hasn't been set its stop flag yet.
  307. for( U32 n = 0; n < numThreads; ++ n )
  308. mSemaphore.release();
  309. // Delete each worker thread. Wait until death as we're prone to
  310. // running into issues with decomposing work item lists otherwise.
  311. for( WorkerThread* thread = mThreads; thread != 0; )
  312. {
  313. WorkerThread* next = thread->getNext();
  314. thread->join();
  315. delete thread;
  316. thread = next;
  317. }
  318. mThreads = NULL;
  319. mNumThreads = 0;
  320. }
  321. //--------------------------------------------------------------------------
  322. void ThreadPool::queueWorkItem( WorkItem* item )
  323. {
  324. bool executeRightAway = ( getForceAllMainThread() );
  325. #ifdef DEBUG_SPEW
  326. Platform::outputDebugString( "[ThreadPool] %s work item '0x%x'",
  327. ( executeRightAway ? "executing" : "queuing" ),
  328. item );
  329. #endif
  330. if( executeRightAway )
  331. item->process();
  332. else
  333. {
  334. // Put the item in the queue.
  335. mWorkItemQueue.insert( item->getPriority(), item );
  336. // Wake up some thread, if we need to.
  337. // Use the ready count here as the wake count does
  338. // not correctly protect the critical section in the
  339. // thread's run function. This may lead us to release
  340. // the semaphore more often than necessary, but it avoids
  341. // a race condition.
  342. if( !dCompareAndSwap( mNumThreadsReady, mNumThreads, mNumThreads ) )
  343. mSemaphore.release();
  344. }
  345. }
  346. //--------------------------------------------------------------------------
  347. void ThreadPool::flushWorkItems( S32 timeOut )
  348. {
  349. AssertFatal( mNumThreads, "ThreadPool::flushWorkItems() - no worker threads in pool" );
  350. U32 endTime = 0;
  351. if( timeOut != -1 )
  352. endTime = Platform::getRealMilliseconds() + timeOut;
  353. // Spinlock until the queue is empty.
  354. while( !mWorkItemQueue.isEmpty() )
  355. {
  356. Platform::sleep( 25 );
  357. // Stop if we have exceeded our processing time budget.
  358. if( timeOut != -1
  359. && Platform::getRealMilliseconds() >= endTime )
  360. break;
  361. }
  362. }
  363. //--------------------------------------------------------------------------
  364. void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )
  365. {
  366. smMainThreadQueue.insert( item->getPriority(), item );
  367. }
  368. //--------------------------------------------------------------------------
  369. void ThreadPool::processMainThreadWorkItems()
  370. {
  371. AssertFatal( ThreadManager::isMainThread(),
  372. "ThreadPool::processMainThreadWorkItems - this function must only be called on the main thread" );
  373. U32 timeLimit = ( Platform::getRealMilliseconds() + getMainThreadThresholdTimeMS() );
  374. do
  375. {
  376. WorkItemWrapper item;
  377. if( !smMainThreadQueue.takeNext( item ) )
  378. break;
  379. else
  380. item->process();
  381. }
  382. while( Platform::getRealMilliseconds() < timeLimit );
  383. }