btTaskScheduler.cpp 25 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802
  1. #include "LinearMath/btMinMax.h"
  2. #include "LinearMath/btAlignedObjectArray.h"
  3. #include "LinearMath/btThreads.h"
  4. #include "LinearMath/btQuickprof.h"
  5. #include <stdio.h>
  6. #include <algorithm>
  7. #if BT_THREADSAFE
  8. #include "btThreadSupportInterface.h"
  9. #if defined( _WIN32 )
  10. #define WIN32_LEAN_AND_MEAN
  11. #include <windows.h>
  12. #endif
  13. typedef unsigned long long btU64;
  14. static const int kCacheLineSize = 64;
  15. void btSpinPause()
  16. {
  17. #if defined( _WIN32 )
  18. YieldProcessor();
  19. #endif
  20. }
  21. struct WorkerThreadStatus
  22. {
  23. enum Type
  24. {
  25. kInvalid,
  26. kWaitingForWork,
  27. kWorking,
  28. kSleeping,
  29. };
  30. };
  31. ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives
  32. {
  33. static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
  34. // directives for all worker threads packed into a single cacheline
  35. char m_threadDirs[kMaxThreadCount];
  36. public:
  37. enum Type
  38. {
  39. kInvalid,
  40. kGoToSleep, // go to sleep
  41. kStayAwakeButIdle, // wait for not checking job queue
  42. kScanForJobs, // actively scan job queue for jobs
  43. };
  44. WorkerThreadDirectives()
  45. {
  46. for ( int i = 0; i < kMaxThreadCount; ++i )
  47. {
  48. m_threadDirs[ i ] = 0;
  49. }
  50. }
  51. Type getDirective(int threadId)
  52. {
  53. btAssert(threadId < kMaxThreadCount);
  54. return static_cast<Type>(m_threadDirs[threadId]);
  55. }
  56. void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
  57. {
  58. btAssert( threadBegin < threadEnd );
  59. btAssert( threadEnd <= kMaxThreadCount );
  60. char dirChar = static_cast<char>(dir);
  61. for ( int i = threadBegin; i < threadEnd; ++i )
  62. {
  63. m_threadDirs[ i ] = dirChar;
  64. }
  65. }
  66. };
  67. class JobQueue;
  68. ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage
  69. {
  70. int m_threadId;
  71. WorkerThreadStatus::Type m_status;
  72. int m_numJobsFinished;
  73. btSpinMutex m_mutex;
  74. btScalar m_sumResult;
  75. WorkerThreadDirectives * m_directive;
  76. JobQueue* m_queue;
  77. btClock* m_clock;
  78. unsigned int m_cooldownTime;
  79. };
  80. struct IJob
  81. {
  82. virtual void executeJob(int threadId) = 0;
  83. };
  84. class ParallelForJob : public IJob
  85. {
  86. const btIParallelForBody* m_body;
  87. int m_begin;
  88. int m_end;
  89. public:
  90. ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body )
  91. {
  92. m_body = &body;
  93. m_begin = iBegin;
  94. m_end = iEnd;
  95. }
  96. virtual void executeJob(int threadId) BT_OVERRIDE
  97. {
  98. BT_PROFILE( "executeJob" );
  99. // call the functor body to do the work
  100. m_body->forLoop( m_begin, m_end );
  101. }
  102. };
  103. class ParallelSumJob : public IJob
  104. {
  105. const btIParallelSumBody* m_body;
  106. ThreadLocalStorage* m_threadLocalStoreArray;
  107. int m_begin;
  108. int m_end;
  109. public:
  110. ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls )
  111. {
  112. m_body = &body;
  113. m_threadLocalStoreArray = tls;
  114. m_begin = iBegin;
  115. m_end = iEnd;
  116. }
  117. virtual void executeJob( int threadId ) BT_OVERRIDE
  118. {
  119. BT_PROFILE( "executeJob" );
  120. // call the functor body to do the work
  121. btScalar val = m_body->sumLoop( m_begin, m_end );
  122. #if BT_PARALLEL_SUM_DETERMINISTISM
  123. // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
  124. const float TRUNC_SCALE = float(1<<19);
  125. val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits
  126. #endif
  127. m_threadLocalStoreArray[threadId].m_sumResult += val;
  128. }
  129. };
  130. ATTRIBUTE_ALIGNED64(class) JobQueue
  131. {
  132. btThreadSupportInterface* m_threadSupport;
  133. btCriticalSection* m_queueLock;
  134. btSpinMutex m_mutex;
  135. btAlignedObjectArray<IJob*> m_jobQueue;
  136. char* m_jobMem;
  137. int m_jobMemSize;
  138. bool m_queueIsEmpty;
  139. int m_tailIndex;
  140. int m_headIndex;
  141. int m_allocSize;
  142. bool m_useSpinMutex;
  143. btAlignedObjectArray<JobQueue*> m_neighborContexts;
  144. char m_cachePadding[kCacheLineSize]; // prevent false sharing
  145. void freeJobMem()
  146. {
  147. if ( m_jobMem )
  148. {
  149. // free old
  150. btAlignedFree(m_jobMem);
  151. m_jobMem = NULL;
  152. }
  153. }
  154. void resizeJobMem(int newSize)
  155. {
  156. if (newSize > m_jobMemSize)
  157. {
  158. freeJobMem();
  159. m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
  160. m_jobMemSize = newSize;
  161. }
  162. }
  163. public:
  164. JobQueue()
  165. {
  166. m_jobMem = NULL;
  167. m_jobMemSize = 0;
  168. m_threadSupport = NULL;
  169. m_queueLock = NULL;
  170. m_headIndex = 0;
  171. m_tailIndex = 0;
  172. m_useSpinMutex = false;
  173. }
  174. ~JobQueue()
  175. {
  176. exit();
  177. }
  178. void exit()
  179. {
  180. freeJobMem();
  181. if (m_queueLock && m_threadSupport)
  182. {
  183. m_threadSupport->deleteCriticalSection(m_queueLock);
  184. m_queueLock = NULL;
  185. m_threadSupport = 0;
  186. }
  187. }
  188. void init(btThreadSupportInterface* threadSup, btAlignedObjectArray<JobQueue>* contextArray)
  189. {
  190. m_threadSupport = threadSup;
  191. if (threadSup)
  192. {
  193. m_queueLock = m_threadSupport->createCriticalSection();
  194. }
  195. setupJobStealing(contextArray, contextArray->size());
  196. }
  197. void setupJobStealing(btAlignedObjectArray<JobQueue>* contextArray, int numActiveContexts)
  198. {
  199. btAlignedObjectArray<JobQueue>& contexts = *contextArray;
  200. int selfIndex = 0;
  201. for (int i = 0; i < contexts.size(); ++i)
  202. {
  203. if ( this == &contexts[ i ] )
  204. {
  205. selfIndex = i;
  206. break;
  207. }
  208. }
  209. int numNeighbors = btMin(2, contexts.size() - 1);
  210. int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3};
  211. int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]);
  212. m_neighborContexts.reserve( numNeighbors );
  213. m_neighborContexts.resizeNoInitialize(0);
  214. for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
  215. {
  216. int neighborIndex = selfIndex + neighborOffsets[i];
  217. if ( neighborIndex >= 0 && neighborIndex < numActiveContexts)
  218. {
  219. m_neighborContexts.push_back( &contexts[ neighborIndex ] );
  220. }
  221. }
  222. }
  223. bool isQueueEmpty() const {return m_queueIsEmpty;}
  224. void lockQueue()
  225. {
  226. if ( m_useSpinMutex )
  227. {
  228. m_mutex.lock();
  229. }
  230. else
  231. {
  232. m_queueLock->lock();
  233. }
  234. }
  235. void unlockQueue()
  236. {
  237. if ( m_useSpinMutex )
  238. {
  239. m_mutex.unlock();
  240. }
  241. else
  242. {
  243. m_queueLock->unlock();
  244. }
  245. }
  246. void clearQueue(int jobCount, int jobSize)
  247. {
  248. lockQueue();
  249. m_headIndex = 0;
  250. m_tailIndex = 0;
  251. m_allocSize = 0;
  252. m_queueIsEmpty = true;
  253. int jobBufSize = jobSize * jobCount;
  254. // make sure we have enough memory allocated to store jobs
  255. if ( jobBufSize > m_jobMemSize )
  256. {
  257. resizeJobMem( jobBufSize );
  258. }
  259. // make sure job queue is big enough
  260. if ( jobCount > m_jobQueue.capacity() )
  261. {
  262. m_jobQueue.reserve( jobCount );
  263. }
  264. unlockQueue();
  265. m_jobQueue.resizeNoInitialize( 0 );
  266. }
  267. void* allocJobMem(int jobSize)
  268. {
  269. btAssert(m_jobMemSize >= (m_allocSize + jobSize));
  270. void* jobMem = &m_jobMem[m_allocSize];
  271. m_allocSize += jobSize;
  272. return jobMem;
  273. }
  274. void submitJob( IJob* job )
  275. {
  276. btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
  277. m_jobQueue.push_back( job );
  278. lockQueue();
  279. m_tailIndex++;
  280. m_queueIsEmpty = false;
  281. unlockQueue();
  282. }
  283. IJob* consumeJobFromOwnQueue()
  284. {
  285. if ( m_queueIsEmpty )
  286. {
  287. // lock free path. even if this is taken erroneously it isn't harmful
  288. return NULL;
  289. }
  290. IJob* job = NULL;
  291. lockQueue();
  292. if ( !m_queueIsEmpty )
  293. {
  294. job = m_jobQueue[ m_headIndex++ ];
  295. btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
  296. if ( m_headIndex == m_tailIndex )
  297. {
  298. m_queueIsEmpty = true;
  299. }
  300. }
  301. unlockQueue();
  302. return job;
  303. }
  304. IJob* consumeJob()
  305. {
  306. if (IJob* job = consumeJobFromOwnQueue())
  307. {
  308. return job;
  309. }
  310. // own queue is empty, try to steal from neighbor
  311. for (int i = 0; i < m_neighborContexts.size(); ++i)
  312. {
  313. JobQueue* otherContext = m_neighborContexts[ i ];
  314. if ( IJob* job = otherContext->consumeJobFromOwnQueue() )
  315. {
  316. return job;
  317. }
  318. }
  319. return NULL;
  320. }
  321. };
  322. static void WorkerThreadFunc( void* userPtr )
  323. {
  324. BT_PROFILE( "WorkerThreadFunc" );
  325. ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr;
  326. JobQueue* jobQueue = localStorage->m_queue;
  327. bool shouldSleep = false;
  328. int threadId = localStorage->m_threadId;
  329. while (! shouldSleep)
  330. {
  331. // do work
  332. localStorage->m_mutex.lock();
  333. while ( IJob* job = jobQueue->consumeJob() )
  334. {
  335. localStorage->m_status = WorkerThreadStatus::kWorking;
  336. job->executeJob( threadId );
  337. localStorage->m_numJobsFinished++;
  338. }
  339. localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
  340. localStorage->m_mutex.unlock();
  341. btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
  342. // while queue is empty,
  343. while (jobQueue->isQueueEmpty())
  344. {
  345. // todo: spin wait a bit to avoid hammering the empty queue
  346. btSpinPause();
  347. if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep )
  348. {
  349. shouldSleep = true;
  350. break;
  351. }
  352. // if jobs are incoming,
  353. if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs )
  354. {
  355. clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
  356. }
  357. else
  358. {
  359. for ( int i = 0; i < 50; ++i )
  360. {
  361. btSpinPause();
  362. btSpinPause();
  363. btSpinPause();
  364. btSpinPause();
  365. if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
  366. {
  367. break;
  368. }
  369. }
  370. // if no jobs incoming and queue has been empty for the cooldown time, sleep
  371. btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
  372. if (timeElapsed > localStorage->m_cooldownTime)
  373. {
  374. shouldSleep = true;
  375. break;
  376. }
  377. }
  378. }
  379. }
  380. {
  381. BT_PROFILE("sleep");
  382. // go sleep
  383. localStorage->m_mutex.lock();
  384. localStorage->m_status = WorkerThreadStatus::kSleeping;
  385. localStorage->m_mutex.unlock();
  386. }
  387. }
  388. class btTaskSchedulerDefault : public btITaskScheduler
  389. {
  390. btThreadSupportInterface* m_threadSupport;
  391. WorkerThreadDirectives* m_workerDirective;
  392. btAlignedObjectArray<JobQueue> m_jobQueues;
  393. btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
  394. btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
  395. btSpinMutex m_antiNestingLock; // prevent nested parallel-for
  396. btClock m_clock;
  397. int m_numThreads;
  398. int m_numWorkerThreads;
  399. int m_numActiveJobQueues;
  400. int m_maxNumThreads;
  401. int m_numJobs;
  402. static const int kFirstWorkerThreadId = 1;
  403. public:
  404. btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
  405. {
  406. m_threadSupport = NULL;
  407. m_workerDirective = NULL;
  408. }
  409. virtual ~btTaskSchedulerDefault()
  410. {
  411. waitForWorkersToSleep();
  412. for ( int i = 0; i < m_jobQueues.size(); ++i )
  413. {
  414. m_jobQueues[i].exit();
  415. }
  416. if (m_threadSupport)
  417. {
  418. delete m_threadSupport;
  419. m_threadSupport = NULL;
  420. }
  421. if (m_workerDirective)
  422. {
  423. btAlignedFree(m_workerDirective);
  424. m_workerDirective = NULL;
  425. }
  426. }
  427. void init()
  428. {
  429. btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc );
  430. m_threadSupport = btThreadSupportInterface::create( constructionInfo );
  431. m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
  432. m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
  433. m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
  434. m_numThreads = m_maxNumThreads;
  435. // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
  436. int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
  437. int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue);
  438. m_jobQueues.resize(numJobQueues);
  439. m_numActiveJobQueues = numJobQueues;
  440. for ( int i = 0; i < m_jobQueues.size(); ++i )
  441. {
  442. m_jobQueues[i].init( m_threadSupport, &m_jobQueues );
  443. }
  444. m_perThreadJobQueues.resize(m_numThreads);
  445. for ( int i = 0; i < m_numThreads; i++ )
  446. {
  447. JobQueue* jq = NULL;
  448. // only worker threads get a job queue
  449. if (i > 0)
  450. {
  451. if (numThreadsPerQueue == 1)
  452. {
  453. // one queue per worker thread
  454. jq = &m_jobQueues[ i - kFirstWorkerThreadId ];
  455. }
  456. else
  457. {
  458. // 2 threads share each queue
  459. jq = &m_jobQueues[ i / numThreadsPerQueue ];
  460. }
  461. }
  462. m_perThreadJobQueues[i] = jq;
  463. }
  464. m_threadLocalStorage.resize(m_numThreads);
  465. for ( int i = 0; i < m_numThreads; i++ )
  466. {
  467. ThreadLocalStorage& storage = m_threadLocalStorage[i];
  468. storage.m_threadId = i;
  469. storage.m_directive = m_workerDirective;
  470. storage.m_status = WorkerThreadStatus::kSleeping;
  471. storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
  472. storage.m_clock = &m_clock;
  473. storage.m_queue = m_perThreadJobQueues[i];
  474. }
  475. setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet
  476. setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() );
  477. }
  478. void setWorkerDirectives(WorkerThreadDirectives::Type dir)
  479. {
  480. m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
  481. }
  482. virtual int getMaxNumThreads() const BT_OVERRIDE
  483. {
  484. return m_maxNumThreads;
  485. }
  486. virtual int getNumThreads() const BT_OVERRIDE
  487. {
  488. return m_numThreads;
  489. }
  490. virtual void setNumThreads( int numThreads ) BT_OVERRIDE
  491. {
  492. m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 );
  493. m_numWorkerThreads = m_numThreads - 1;
  494. m_numActiveJobQueues = 0;
  495. // if there is at least 1 worker,
  496. if ( m_numWorkerThreads > 0 )
  497. {
  498. // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
  499. JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ];
  500. int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
  501. m_numActiveJobQueues = iLastActiveContext + 1;
  502. for ( int i = 0; i < m_jobQueues.size(); ++i )
  503. {
  504. m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues );
  505. }
  506. }
  507. m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
  508. }
  509. void waitJobs()
  510. {
  511. BT_PROFILE( "waitJobs" );
  512. // have the main thread work until the job queues are empty
  513. int numMainThreadJobsFinished = 0;
  514. for ( int i = 0; i < m_numActiveJobQueues; ++i )
  515. {
  516. while ( IJob* job = m_jobQueues[i].consumeJob() )
  517. {
  518. job->executeJob( 0 );
  519. numMainThreadJobsFinished++;
  520. }
  521. }
  522. // done with jobs for now, tell workers to rest (but not sleep)
  523. setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle );
  524. btU64 clockStart = m_clock.getTimeMicroseconds();
  525. // wait for workers to finish any jobs in progress
  526. while ( true )
  527. {
  528. int numWorkerJobsFinished = 0;
  529. for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread )
  530. {
  531. ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
  532. storage->m_mutex.lock();
  533. numWorkerJobsFinished += storage->m_numJobsFinished;
  534. storage->m_mutex.unlock();
  535. }
  536. if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
  537. {
  538. break;
  539. }
  540. btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
  541. btAssert(timeElapsed < 1000);
  542. if (timeElapsed > 100000)
  543. {
  544. break;
  545. }
  546. btSpinPause();
  547. }
  548. }
  549. void wakeWorkers(int numWorkersToWake)
  550. {
  551. BT_PROFILE( "wakeWorkers" );
  552. btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs );
  553. int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
  554. int numActiveWorkers = 0;
  555. for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
  556. {
  557. // note this count of active workers is not necessarily totally reliable, because a worker thread could be
  558. // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
  559. ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
  560. if (storage.m_status != WorkerThreadStatus::kSleeping)
  561. {
  562. numActiveWorkers++;
  563. }
  564. }
  565. for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker )
  566. {
  567. ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
  568. if (storage.m_status == WorkerThreadStatus::kSleeping)
  569. {
  570. m_threadSupport->runTask( iWorker, &storage );
  571. numActiveWorkers++;
  572. }
  573. }
  574. }
  575. void waitForWorkersToSleep()
  576. {
  577. BT_PROFILE( "waitForWorkersToSleep" );
  578. setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
  579. m_threadSupport->waitForAllTasks();
  580. for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ )
  581. {
  582. ThreadLocalStorage& storage = m_threadLocalStorage[i];
  583. btAssert( storage.m_status == WorkerThreadStatus::kSleeping );
  584. }
  585. }
  586. virtual void sleepWorkerThreadsHint() BT_OVERRIDE
  587. {
  588. BT_PROFILE( "sleepWorkerThreadsHint" );
  589. // hint the task scheduler that we may not be using these threads for a little while
  590. setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
  591. }
  592. void prepareWorkerThreads()
  593. {
  594. for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i )
  595. {
  596. ThreadLocalStorage& storage = m_threadLocalStorage[i];
  597. storage.m_mutex.lock();
  598. storage.m_numJobsFinished = 0;
  599. storage.m_mutex.unlock();
  600. }
  601. setWorkerDirectives( WorkerThreadDirectives::kScanForJobs );
  602. }
  603. virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE
  604. {
  605. BT_PROFILE( "parallelFor_ThreadSupport" );
  606. btAssert( iEnd >= iBegin );
  607. btAssert( grainSize >= 1 );
  608. int iterationCount = iEnd - iBegin;
  609. if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
  610. {
  611. typedef ParallelForJob JobType;
  612. int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
  613. m_numJobs = jobCount;
  614. btAssert( jobCount >= 2 ); // need more than one job for multithreading
  615. int jobSize = sizeof( JobType );
  616. for (int i = 0; i < m_numActiveJobQueues; ++i)
  617. {
  618. m_jobQueues[i].clearQueue( jobCount, jobSize );
  619. }
  620. // prepare worker threads for incoming work
  621. prepareWorkerThreads();
  622. // submit all of the jobs
  623. int iJob = 0;
  624. int iThread = kFirstWorkerThreadId; // first worker thread
  625. for ( int i = iBegin; i < iEnd; i += grainSize )
  626. {
  627. btAssert( iJob < jobCount );
  628. int iE = btMin( i + grainSize, iEnd );
  629. JobQueue* jq = m_perThreadJobQueues[ iThread ];
  630. btAssert(jq);
  631. btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
  632. void* jobMem = jq->allocJobMem(jobSize);
  633. JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new
  634. jq->submitJob( job );
  635. iJob++;
  636. iThread++;
  637. if ( iThread >= m_numThreads )
  638. {
  639. iThread = kFirstWorkerThreadId; // first worker thread
  640. }
  641. }
  642. wakeWorkers( jobCount - 1 );
  643. // put the main thread to work on emptying the job queue and then wait for all workers to finish
  644. waitJobs();
  645. m_antiNestingLock.unlock();
  646. }
  647. else
  648. {
  649. BT_PROFILE( "parallelFor_mainThread" );
  650. // just run on main thread
  651. body.forLoop( iBegin, iEnd );
  652. }
  653. }
  654. virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
  655. {
  656. BT_PROFILE( "parallelSum_ThreadSupport" );
  657. btAssert( iEnd >= iBegin );
  658. btAssert( grainSize >= 1 );
  659. int iterationCount = iEnd - iBegin;
  660. if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
  661. {
  662. typedef ParallelSumJob JobType;
  663. int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
  664. m_numJobs = jobCount;
  665. btAssert( jobCount >= 2 ); // need more than one job for multithreading
  666. int jobSize = sizeof( JobType );
  667. for (int i = 0; i < m_numActiveJobQueues; ++i)
  668. {
  669. m_jobQueues[i].clearQueue( jobCount, jobSize );
  670. }
  671. // initialize summation
  672. for ( int iThread = 0; iThread < m_numThreads; ++iThread )
  673. {
  674. m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
  675. }
  676. // prepare worker threads for incoming work
  677. prepareWorkerThreads();
  678. // submit all of the jobs
  679. int iJob = 0;
  680. int iThread = kFirstWorkerThreadId; // first worker thread
  681. for ( int i = iBegin; i < iEnd; i += grainSize )
  682. {
  683. btAssert( iJob < jobCount );
  684. int iE = btMin( i + grainSize, iEnd );
  685. JobQueue* jq = m_perThreadJobQueues[ iThread ];
  686. btAssert(jq);
  687. btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
  688. void* jobMem = jq->allocJobMem(jobSize);
  689. JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new
  690. jq->submitJob( job );
  691. iJob++;
  692. iThread++;
  693. if ( iThread >= m_numThreads )
  694. {
  695. iThread = kFirstWorkerThreadId; // first worker thread
  696. }
  697. }
  698. wakeWorkers( jobCount - 1 );
  699. // put the main thread to work on emptying the job queue and then wait for all workers to finish
  700. waitJobs();
  701. // add up all the thread sums
  702. btScalar sum = btScalar(0);
  703. for ( int iThread = 0; iThread < m_numThreads; ++iThread )
  704. {
  705. sum += m_threadLocalStorage[ iThread ].m_sumResult;
  706. }
  707. m_antiNestingLock.unlock();
  708. return sum;
  709. }
  710. else
  711. {
  712. BT_PROFILE( "parallelSum_mainThread" );
  713. // just run on main thread
  714. return body.sumLoop( iBegin, iEnd );
  715. }
  716. }
  717. };
  718. btITaskScheduler* btCreateDefaultTaskScheduler()
  719. {
  720. btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
  721. ts->init();
  722. return ts;
  723. }
  724. #else // #if BT_THREADSAFE
  725. btITaskScheduler* btCreateDefaultTaskScheduler()
  726. {
  727. return NULL;
  728. }
  729. #endif // #else // #if BT_THREADSAFE