Browse Source

Add a method to wait for all pending items in a ThreadPool.

Daniel Buckmaster 10 years ago
parent
commit
0995520d6f

+ 26 - 1
Engine/source/platform/threads/threadPool.cpp

@@ -282,6 +282,8 @@ void ThreadPool::WorkerThread::run( void* arg )
             Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
 #endif
             workItem->process();
+
+            dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 );
          }
          else
             waitForSignal = true;
@@ -319,6 +321,7 @@ ThreadPool::ThreadPool( const char* name, U32 numThreads )
    : mName( name ),
      mNumThreads( numThreads ),
      mNumThreadsAwake( 0 ),
+     mNumPendingItems( 0 ),
      mThreads( 0 ),
      mSemaphore( 0 )
 {
@@ -410,7 +413,7 @@ void ThreadPool::queueWorkItem( WorkItem* item )
    else
    {
       // Put the item in the queue.
-
+      dFetchAndAdd( mNumPendingItems, 1 );
       mWorkItemQueue.insert( item->getPriority(), item );
 
       mSemaphore.release();
@@ -441,6 +444,28 @@ void ThreadPool::flushWorkItems( S32 timeOut )
    }
 }
 
+void ThreadPool::waitForAllItems( S32 timeOut )
+{
+   AssertFatal( mNumPendingItems, "ThreadPool::waitForAllItems() - no items pending" );
+
+   U32 endTime = 0;
+   if( timeOut != -1 )
+      endTime = Platform::getRealMilliseconds() + timeOut;
+
+   // Spinlock until there are no items that have not been processed.
+
+   while( dAtomicRead( mNumPendingItems ) )
+   {
+      Platform::sleep( 25 );
+
+      // Stop if we have exceeded our processing time budget.
+
+      if( timeOut != -1
+          && Platform::getRealMilliseconds() >= endTime )
+          break;
+   }
+}
+
 //--------------------------------------------------------------------------
 
 void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )

+ 15 - 0
Engine/source/platform/threads/threadPool.h

@@ -264,6 +264,9 @@ class ThreadPool
       
       /// Number of worker threads guaranteed to be non-blocking.
       U32 mNumThreadsReady;
+
+      /// Number of work items that have not yet completed execution.
+      U32 mNumPendingItems;
       
       /// Semaphore used to wake up threads, if necessary.
       Semaphore mSemaphore;
@@ -316,6 +319,18 @@ class ThreadPool
       ///   the queue to flush out.  -1 = infinite.
       void flushWorkItems( S32 timeOut = -1 );
 
+      /// If you're using a non-global thread pool to parallelise some work, you
+      /// may want to block until all the parallel work is complete. As with
+      /// flushWorkItems, this method may block indefinitely if new items keep
+      /// getting added to the pool before old ones finish.
+      ///
+      /// <em>This method will not wait for items queued on the main thread using
+      /// queueWorkItemOnMainThread!</em>
+      ///
+      /// @param timeOut Soft limit on the number of milliseconds to wait for
+      ///   all items to complete.  -1 = infinite.
+      void waitForAllItems( S32 timeOut = -1 );
+
       /// Add a work item to the main thread's work queue.
       ///
       /// The main thread's work queue will be processed each frame using