123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802 |
- #include "LinearMath/btMinMax.h"
- #include "LinearMath/btAlignedObjectArray.h"
- #include "LinearMath/btThreads.h"
- #include "LinearMath/btQuickprof.h"
- #include <stdio.h>
- #include <algorithm>
- #if BT_THREADSAFE
- #include "btThreadSupportInterface.h"
- #if defined( _WIN32 )
- #define WIN32_LEAN_AND_MEAN
- #include <windows.h>
- #endif
- typedef unsigned long long btU64;
- static const int kCacheLineSize = 64;
- void btSpinPause()
- {
- #if defined( _WIN32 )
- YieldProcessor();
- #endif
- }
- struct WorkerThreadStatus
- {
- enum Type
- {
- kInvalid,
- kWaitingForWork,
- kWorking,
- kSleeping,
- };
- };
- ATTRIBUTE_ALIGNED64(class) WorkerThreadDirectives
- {
- static const int kMaxThreadCount = BT_MAX_THREAD_COUNT;
- // directives for all worker threads packed into a single cacheline
- char m_threadDirs[kMaxThreadCount];
- public:
- enum Type
- {
- kInvalid,
- kGoToSleep, // go to sleep
- kStayAwakeButIdle, // wait for not checking job queue
- kScanForJobs, // actively scan job queue for jobs
- };
- WorkerThreadDirectives()
- {
- for ( int i = 0; i < kMaxThreadCount; ++i )
- {
- m_threadDirs[ i ] = 0;
- }
- }
- Type getDirective(int threadId)
- {
- btAssert(threadId < kMaxThreadCount);
- return static_cast<Type>(m_threadDirs[threadId]);
- }
- void setDirectiveByRange(int threadBegin, int threadEnd, Type dir)
- {
- btAssert( threadBegin < threadEnd );
- btAssert( threadEnd <= kMaxThreadCount );
- char dirChar = static_cast<char>(dir);
- for ( int i = threadBegin; i < threadEnd; ++i )
- {
- m_threadDirs[ i ] = dirChar;
- }
- }
- };
- class JobQueue;
- ATTRIBUTE_ALIGNED64(struct) ThreadLocalStorage
- {
- int m_threadId;
- WorkerThreadStatus::Type m_status;
- int m_numJobsFinished;
- btSpinMutex m_mutex;
- btScalar m_sumResult;
- WorkerThreadDirectives * m_directive;
- JobQueue* m_queue;
- btClock* m_clock;
- unsigned int m_cooldownTime;
- };
- struct IJob
- {
- virtual void executeJob(int threadId) = 0;
- };
- class ParallelForJob : public IJob
- {
- const btIParallelForBody* m_body;
- int m_begin;
- int m_end;
- public:
- ParallelForJob( int iBegin, int iEnd, const btIParallelForBody& body )
- {
- m_body = &body;
- m_begin = iBegin;
- m_end = iEnd;
- }
- virtual void executeJob(int threadId) BT_OVERRIDE
- {
- BT_PROFILE( "executeJob" );
- // call the functor body to do the work
- m_body->forLoop( m_begin, m_end );
- }
- };
- class ParallelSumJob : public IJob
- {
- const btIParallelSumBody* m_body;
- ThreadLocalStorage* m_threadLocalStoreArray;
- int m_begin;
- int m_end;
- public:
- ParallelSumJob( int iBegin, int iEnd, const btIParallelSumBody& body, ThreadLocalStorage* tls )
- {
- m_body = &body;
- m_threadLocalStoreArray = tls;
- m_begin = iBegin;
- m_end = iEnd;
- }
- virtual void executeJob( int threadId ) BT_OVERRIDE
- {
- BT_PROFILE( "executeJob" );
- // call the functor body to do the work
- btScalar val = m_body->sumLoop( m_begin, m_end );
- #if BT_PARALLEL_SUM_DETERMINISTISM
- // by truncating bits of the result, we can make the parallelSum deterministic (at the expense of precision)
- const float TRUNC_SCALE = float(1<<19);
- val = floor(val*TRUNC_SCALE+0.5f)/TRUNC_SCALE; // truncate some bits
- #endif
- m_threadLocalStoreArray[threadId].m_sumResult += val;
- }
- };
- ATTRIBUTE_ALIGNED64(class) JobQueue
- {
- btThreadSupportInterface* m_threadSupport;
- btCriticalSection* m_queueLock;
- btSpinMutex m_mutex;
- btAlignedObjectArray<IJob*> m_jobQueue;
- char* m_jobMem;
- int m_jobMemSize;
- bool m_queueIsEmpty;
- int m_tailIndex;
- int m_headIndex;
- int m_allocSize;
- bool m_useSpinMutex;
- btAlignedObjectArray<JobQueue*> m_neighborContexts;
- char m_cachePadding[kCacheLineSize]; // prevent false sharing
- void freeJobMem()
- {
- if ( m_jobMem )
- {
- // free old
- btAlignedFree(m_jobMem);
- m_jobMem = NULL;
- }
- }
- void resizeJobMem(int newSize)
- {
- if (newSize > m_jobMemSize)
- {
- freeJobMem();
- m_jobMem = static_cast<char*>(btAlignedAlloc(newSize, kCacheLineSize));
- m_jobMemSize = newSize;
- }
- }
- public:
- JobQueue()
- {
- m_jobMem = NULL;
- m_jobMemSize = 0;
- m_threadSupport = NULL;
- m_queueLock = NULL;
- m_headIndex = 0;
- m_tailIndex = 0;
- m_useSpinMutex = false;
- }
- ~JobQueue()
- {
- exit();
- }
- void exit()
- {
- freeJobMem();
- if (m_queueLock && m_threadSupport)
- {
- m_threadSupport->deleteCriticalSection(m_queueLock);
- m_queueLock = NULL;
- m_threadSupport = 0;
- }
- }
- void init(btThreadSupportInterface* threadSup, btAlignedObjectArray<JobQueue>* contextArray)
- {
- m_threadSupport = threadSup;
- if (threadSup)
- {
- m_queueLock = m_threadSupport->createCriticalSection();
- }
- setupJobStealing(contextArray, contextArray->size());
- }
- void setupJobStealing(btAlignedObjectArray<JobQueue>* contextArray, int numActiveContexts)
- {
- btAlignedObjectArray<JobQueue>& contexts = *contextArray;
- int selfIndex = 0;
- for (int i = 0; i < contexts.size(); ++i)
- {
- if ( this == &contexts[ i ] )
- {
- selfIndex = i;
- break;
- }
- }
- int numNeighbors = btMin(2, contexts.size() - 1);
- int neighborOffsets[ ] = {-1, 1, -2, 2, -3, 3};
- int numOffsets = sizeof(neighborOffsets)/sizeof(neighborOffsets[0]);
- m_neighborContexts.reserve( numNeighbors );
- m_neighborContexts.resizeNoInitialize(0);
- for (int i = 0; i < numOffsets && m_neighborContexts.size() < numNeighbors; i++)
- {
- int neighborIndex = selfIndex + neighborOffsets[i];
- if ( neighborIndex >= 0 && neighborIndex < numActiveContexts)
- {
- m_neighborContexts.push_back( &contexts[ neighborIndex ] );
- }
- }
- }
- bool isQueueEmpty() const {return m_queueIsEmpty;}
- void lockQueue()
- {
- if ( m_useSpinMutex )
- {
- m_mutex.lock();
- }
- else
- {
- m_queueLock->lock();
- }
- }
- void unlockQueue()
- {
- if ( m_useSpinMutex )
- {
- m_mutex.unlock();
- }
- else
- {
- m_queueLock->unlock();
- }
- }
- void clearQueue(int jobCount, int jobSize)
- {
- lockQueue();
- m_headIndex = 0;
- m_tailIndex = 0;
- m_allocSize = 0;
- m_queueIsEmpty = true;
- int jobBufSize = jobSize * jobCount;
- // make sure we have enough memory allocated to store jobs
- if ( jobBufSize > m_jobMemSize )
- {
- resizeJobMem( jobBufSize );
- }
- // make sure job queue is big enough
- if ( jobCount > m_jobQueue.capacity() )
- {
- m_jobQueue.reserve( jobCount );
- }
- unlockQueue();
- m_jobQueue.resizeNoInitialize( 0 );
- }
- void* allocJobMem(int jobSize)
- {
- btAssert(m_jobMemSize >= (m_allocSize + jobSize));
- void* jobMem = &m_jobMem[m_allocSize];
- m_allocSize += jobSize;
- return jobMem;
- }
- void submitJob( IJob* job )
- {
- btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
- m_jobQueue.push_back( job );
- lockQueue();
- m_tailIndex++;
- m_queueIsEmpty = false;
- unlockQueue();
- }
- IJob* consumeJobFromOwnQueue()
- {
- if ( m_queueIsEmpty )
- {
- // lock free path. even if this is taken erroneously it isn't harmful
- return NULL;
- }
- IJob* job = NULL;
- lockQueue();
- if ( !m_queueIsEmpty )
- {
- job = m_jobQueue[ m_headIndex++ ];
- btAssert( reinterpret_cast<char*>( job ) >= &m_jobMem[ 0 ] && reinterpret_cast<char*>( job ) < &m_jobMem[ 0 ] + m_allocSize );
- if ( m_headIndex == m_tailIndex )
- {
- m_queueIsEmpty = true;
- }
- }
- unlockQueue();
- return job;
- }
- IJob* consumeJob()
- {
- if (IJob* job = consumeJobFromOwnQueue())
- {
- return job;
- }
- // own queue is empty, try to steal from neighbor
- for (int i = 0; i < m_neighborContexts.size(); ++i)
- {
- JobQueue* otherContext = m_neighborContexts[ i ];
- if ( IJob* job = otherContext->consumeJobFromOwnQueue() )
- {
- return job;
- }
- }
- return NULL;
- }
- };
- static void WorkerThreadFunc( void* userPtr )
- {
- BT_PROFILE( "WorkerThreadFunc" );
- ThreadLocalStorage* localStorage = (ThreadLocalStorage*) userPtr;
- JobQueue* jobQueue = localStorage->m_queue;
- bool shouldSleep = false;
- int threadId = localStorage->m_threadId;
- while (! shouldSleep)
- {
- // do work
- localStorage->m_mutex.lock();
- while ( IJob* job = jobQueue->consumeJob() )
- {
- localStorage->m_status = WorkerThreadStatus::kWorking;
- job->executeJob( threadId );
- localStorage->m_numJobsFinished++;
- }
- localStorage->m_status = WorkerThreadStatus::kWaitingForWork;
- localStorage->m_mutex.unlock();
- btU64 clockStart = localStorage->m_clock->getTimeMicroseconds();
- // while queue is empty,
- while (jobQueue->isQueueEmpty())
- {
- // todo: spin wait a bit to avoid hammering the empty queue
- btSpinPause();
- if ( localStorage->m_directive->getDirective(threadId) == WorkerThreadDirectives::kGoToSleep )
- {
- shouldSleep = true;
- break;
- }
- // if jobs are incoming,
- if ( localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs )
- {
- clockStart = localStorage->m_clock->getTimeMicroseconds(); // reset clock
- }
- else
- {
- for ( int i = 0; i < 50; ++i )
- {
- btSpinPause();
- btSpinPause();
- btSpinPause();
- btSpinPause();
- if (localStorage->m_directive->getDirective( threadId ) == WorkerThreadDirectives::kScanForJobs || !jobQueue->isQueueEmpty())
- {
- break;
- }
- }
- // if no jobs incoming and queue has been empty for the cooldown time, sleep
- btU64 timeElapsed = localStorage->m_clock->getTimeMicroseconds() - clockStart;
- if (timeElapsed > localStorage->m_cooldownTime)
- {
- shouldSleep = true;
- break;
- }
- }
- }
- }
- {
- BT_PROFILE("sleep");
- // go sleep
- localStorage->m_mutex.lock();
- localStorage->m_status = WorkerThreadStatus::kSleeping;
- localStorage->m_mutex.unlock();
- }
- }
- class btTaskSchedulerDefault : public btITaskScheduler
- {
- btThreadSupportInterface* m_threadSupport;
- WorkerThreadDirectives* m_workerDirective;
- btAlignedObjectArray<JobQueue> m_jobQueues;
- btAlignedObjectArray<JobQueue*> m_perThreadJobQueues;
- btAlignedObjectArray<ThreadLocalStorage> m_threadLocalStorage;
- btSpinMutex m_antiNestingLock; // prevent nested parallel-for
- btClock m_clock;
- int m_numThreads;
- int m_numWorkerThreads;
- int m_numActiveJobQueues;
- int m_maxNumThreads;
- int m_numJobs;
- static const int kFirstWorkerThreadId = 1;
- public:
- btTaskSchedulerDefault() : btITaskScheduler("ThreadSupport")
- {
- m_threadSupport = NULL;
- m_workerDirective = NULL;
- }
- virtual ~btTaskSchedulerDefault()
- {
- waitForWorkersToSleep();
- for ( int i = 0; i < m_jobQueues.size(); ++i )
- {
- m_jobQueues[i].exit();
- }
- if (m_threadSupport)
- {
- delete m_threadSupport;
- m_threadSupport = NULL;
- }
- if (m_workerDirective)
- {
- btAlignedFree(m_workerDirective);
- m_workerDirective = NULL;
- }
- }
- void init()
- {
- btThreadSupportInterface::ConstructionInfo constructionInfo( "TaskScheduler", WorkerThreadFunc );
- m_threadSupport = btThreadSupportInterface::create( constructionInfo );
- m_workerDirective = static_cast<WorkerThreadDirectives*>(btAlignedAlloc(sizeof(*m_workerDirective), 64));
- m_numWorkerThreads = m_threadSupport->getNumWorkerThreads();
- m_maxNumThreads = m_threadSupport->getNumWorkerThreads() + 1;
- m_numThreads = m_maxNumThreads;
- // ideal to have one job queue for each physical processor (except for the main thread which needs no queue)
- int numThreadsPerQueue = m_threadSupport->getLogicalToPhysicalCoreRatio();
- int numJobQueues = (numThreadsPerQueue == 1) ? (m_maxNumThreads-1) : (m_maxNumThreads / numThreadsPerQueue);
- m_jobQueues.resize(numJobQueues);
- m_numActiveJobQueues = numJobQueues;
- for ( int i = 0; i < m_jobQueues.size(); ++i )
- {
- m_jobQueues[i].init( m_threadSupport, &m_jobQueues );
- }
- m_perThreadJobQueues.resize(m_numThreads);
- for ( int i = 0; i < m_numThreads; i++ )
- {
- JobQueue* jq = NULL;
- // only worker threads get a job queue
- if (i > 0)
- {
- if (numThreadsPerQueue == 1)
- {
- // one queue per worker thread
- jq = &m_jobQueues[ i - kFirstWorkerThreadId ];
- }
- else
- {
- // 2 threads share each queue
- jq = &m_jobQueues[ i / numThreadsPerQueue ];
- }
- }
- m_perThreadJobQueues[i] = jq;
- }
- m_threadLocalStorage.resize(m_numThreads);
- for ( int i = 0; i < m_numThreads; i++ )
- {
- ThreadLocalStorage& storage = m_threadLocalStorage[i];
- storage.m_threadId = i;
- storage.m_directive = m_workerDirective;
- storage.m_status = WorkerThreadStatus::kSleeping;
- storage.m_cooldownTime = 100; // 100 microseconds, threads go to sleep after this long if they have nothing to do
- storage.m_clock = &m_clock;
- storage.m_queue = m_perThreadJobQueues[i];
- }
- setWorkerDirectives( WorkerThreadDirectives::kGoToSleep ); // no work for them yet
- setNumThreads( m_threadSupport->getCacheFriendlyNumThreads() );
- }
- void setWorkerDirectives(WorkerThreadDirectives::Type dir)
- {
- m_workerDirective->setDirectiveByRange(kFirstWorkerThreadId, m_numThreads, dir);
- }
- virtual int getMaxNumThreads() const BT_OVERRIDE
- {
- return m_maxNumThreads;
- }
- virtual int getNumThreads() const BT_OVERRIDE
- {
- return m_numThreads;
- }
- virtual void setNumThreads( int numThreads ) BT_OVERRIDE
- {
- m_numThreads = btMax( btMin(numThreads, int(m_maxNumThreads)), 1 );
- m_numWorkerThreads = m_numThreads - 1;
- m_numActiveJobQueues = 0;
- // if there is at least 1 worker,
- if ( m_numWorkerThreads > 0 )
- {
- // re-setup job stealing between queues to avoid attempting to steal from an inactive job queue
- JobQueue* lastActiveContext = m_perThreadJobQueues[ m_numThreads - 1 ];
- int iLastActiveContext = lastActiveContext - &m_jobQueues[0];
- m_numActiveJobQueues = iLastActiveContext + 1;
- for ( int i = 0; i < m_jobQueues.size(); ++i )
- {
- m_jobQueues[ i ].setupJobStealing( &m_jobQueues, m_numActiveJobQueues );
- }
- }
- m_workerDirective->setDirectiveByRange(m_numThreads, BT_MAX_THREAD_COUNT, WorkerThreadDirectives::kGoToSleep);
- }
- void waitJobs()
- {
- BT_PROFILE( "waitJobs" );
- // have the main thread work until the job queues are empty
- int numMainThreadJobsFinished = 0;
- for ( int i = 0; i < m_numActiveJobQueues; ++i )
- {
- while ( IJob* job = m_jobQueues[i].consumeJob() )
- {
- job->executeJob( 0 );
- numMainThreadJobsFinished++;
- }
- }
- // done with jobs for now, tell workers to rest (but not sleep)
- setWorkerDirectives( WorkerThreadDirectives::kStayAwakeButIdle );
- btU64 clockStart = m_clock.getTimeMicroseconds();
- // wait for workers to finish any jobs in progress
- while ( true )
- {
- int numWorkerJobsFinished = 0;
- for ( int iThread = kFirstWorkerThreadId; iThread < m_numThreads; ++iThread )
- {
- ThreadLocalStorage* storage = &m_threadLocalStorage[iThread];
- storage->m_mutex.lock();
- numWorkerJobsFinished += storage->m_numJobsFinished;
- storage->m_mutex.unlock();
- }
- if (numWorkerJobsFinished + numMainThreadJobsFinished == m_numJobs)
- {
- break;
- }
- btU64 timeElapsed = m_clock.getTimeMicroseconds() - clockStart;
- btAssert(timeElapsed < 1000);
- if (timeElapsed > 100000)
- {
- break;
- }
- btSpinPause();
- }
- }
- void wakeWorkers(int numWorkersToWake)
- {
- BT_PROFILE( "wakeWorkers" );
- btAssert( m_workerDirective->getDirective(1) == WorkerThreadDirectives::kScanForJobs );
- int numDesiredWorkers = btMin(numWorkersToWake, m_numWorkerThreads);
- int numActiveWorkers = 0;
- for ( int iWorker = 0; iWorker < m_numWorkerThreads; ++iWorker )
- {
- // note this count of active workers is not necessarily totally reliable, because a worker thread could be
- // just about to put itself to sleep. So we may on occasion fail to wake up all the workers. It should be rare.
- ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
- if (storage.m_status != WorkerThreadStatus::kSleeping)
- {
- numActiveWorkers++;
- }
- }
- for ( int iWorker = 0; iWorker < m_numWorkerThreads && numActiveWorkers < numDesiredWorkers; ++iWorker )
- {
- ThreadLocalStorage& storage = m_threadLocalStorage[ kFirstWorkerThreadId + iWorker ];
- if (storage.m_status == WorkerThreadStatus::kSleeping)
- {
- m_threadSupport->runTask( iWorker, &storage );
- numActiveWorkers++;
- }
- }
- }
- void waitForWorkersToSleep()
- {
- BT_PROFILE( "waitForWorkersToSleep" );
- setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
- m_threadSupport->waitForAllTasks();
- for ( int i = kFirstWorkerThreadId; i < m_numThreads; i++ )
- {
- ThreadLocalStorage& storage = m_threadLocalStorage[i];
- btAssert( storage.m_status == WorkerThreadStatus::kSleeping );
- }
- }
- virtual void sleepWorkerThreadsHint() BT_OVERRIDE
- {
- BT_PROFILE( "sleepWorkerThreadsHint" );
- // hint the task scheduler that we may not be using these threads for a little while
- setWorkerDirectives( WorkerThreadDirectives::kGoToSleep );
- }
- void prepareWorkerThreads()
- {
- for ( int i = kFirstWorkerThreadId; i < m_numThreads; ++i )
- {
- ThreadLocalStorage& storage = m_threadLocalStorage[i];
- storage.m_mutex.lock();
- storage.m_numJobsFinished = 0;
- storage.m_mutex.unlock();
- }
- setWorkerDirectives( WorkerThreadDirectives::kScanForJobs );
- }
- virtual void parallelFor( int iBegin, int iEnd, int grainSize, const btIParallelForBody& body ) BT_OVERRIDE
- {
- BT_PROFILE( "parallelFor_ThreadSupport" );
- btAssert( iEnd >= iBegin );
- btAssert( grainSize >= 1 );
- int iterationCount = iEnd - iBegin;
- if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
- {
- typedef ParallelForJob JobType;
- int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
- m_numJobs = jobCount;
- btAssert( jobCount >= 2 ); // need more than one job for multithreading
- int jobSize = sizeof( JobType );
- for (int i = 0; i < m_numActiveJobQueues; ++i)
- {
- m_jobQueues[i].clearQueue( jobCount, jobSize );
- }
- // prepare worker threads for incoming work
- prepareWorkerThreads();
- // submit all of the jobs
- int iJob = 0;
- int iThread = kFirstWorkerThreadId; // first worker thread
- for ( int i = iBegin; i < iEnd; i += grainSize )
- {
- btAssert( iJob < jobCount );
- int iE = btMin( i + grainSize, iEnd );
- JobQueue* jq = m_perThreadJobQueues[ iThread ];
- btAssert(jq);
- btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
- void* jobMem = jq->allocJobMem(jobSize);
- JobType* job = new ( jobMem ) ParallelForJob( i, iE, body ); // placement new
- jq->submitJob( job );
- iJob++;
- iThread++;
- if ( iThread >= m_numThreads )
- {
- iThread = kFirstWorkerThreadId; // first worker thread
- }
- }
- wakeWorkers( jobCount - 1 );
- // put the main thread to work on emptying the job queue and then wait for all workers to finish
- waitJobs();
- m_antiNestingLock.unlock();
- }
- else
- {
- BT_PROFILE( "parallelFor_mainThread" );
- // just run on main thread
- body.forLoop( iBegin, iEnd );
- }
- }
- virtual btScalar parallelSum( int iBegin, int iEnd, int grainSize, const btIParallelSumBody& body ) BT_OVERRIDE
- {
- BT_PROFILE( "parallelSum_ThreadSupport" );
- btAssert( iEnd >= iBegin );
- btAssert( grainSize >= 1 );
- int iterationCount = iEnd - iBegin;
- if ( iterationCount > grainSize && m_numWorkerThreads > 0 && m_antiNestingLock.tryLock() )
- {
- typedef ParallelSumJob JobType;
- int jobCount = ( iterationCount + grainSize - 1 ) / grainSize;
- m_numJobs = jobCount;
- btAssert( jobCount >= 2 ); // need more than one job for multithreading
- int jobSize = sizeof( JobType );
- for (int i = 0; i < m_numActiveJobQueues; ++i)
- {
- m_jobQueues[i].clearQueue( jobCount, jobSize );
- }
- // initialize summation
- for ( int iThread = 0; iThread < m_numThreads; ++iThread )
- {
- m_threadLocalStorage[iThread].m_sumResult = btScalar(0);
- }
- // prepare worker threads for incoming work
- prepareWorkerThreads();
- // submit all of the jobs
- int iJob = 0;
- int iThread = kFirstWorkerThreadId; // first worker thread
- for ( int i = iBegin; i < iEnd; i += grainSize )
- {
- btAssert( iJob < jobCount );
- int iE = btMin( i + grainSize, iEnd );
- JobQueue* jq = m_perThreadJobQueues[ iThread ];
- btAssert(jq);
- btAssert((jq - &m_jobQueues[0]) < m_numActiveJobQueues);
- void* jobMem = jq->allocJobMem(jobSize);
- JobType* job = new ( jobMem ) ParallelSumJob( i, iE, body, &m_threadLocalStorage[0] ); // placement new
- jq->submitJob( job );
- iJob++;
- iThread++;
- if ( iThread >= m_numThreads )
- {
- iThread = kFirstWorkerThreadId; // first worker thread
- }
- }
- wakeWorkers( jobCount - 1 );
- // put the main thread to work on emptying the job queue and then wait for all workers to finish
- waitJobs();
- // add up all the thread sums
- btScalar sum = btScalar(0);
- for ( int iThread = 0; iThread < m_numThreads; ++iThread )
- {
- sum += m_threadLocalStorage[ iThread ].m_sumResult;
- }
- m_antiNestingLock.unlock();
- return sum;
- }
- else
- {
- BT_PROFILE( "parallelSum_mainThread" );
- // just run on main thread
- return body.sumLoop( iBegin, iEnd );
- }
- }
- };
- btITaskScheduler* btCreateDefaultTaskScheduler()
- {
- btTaskSchedulerDefault* ts = new btTaskSchedulerDefault();
- ts->init();
- return ts;
- }
- #else // #if BT_THREADSAFE
- btITaskScheduler* btCreateDefaultTaskScheduler()
- {
- return NULL;
- }
- #endif // #else // #if BT_THREADSAFE
|