Просмотр исходного кода

Merge pull request #1346 from eightyeight/fix-threadpool-tests

Areloch 10 лет назад
Родитель
Сommit
31afbedfb6

+ 48 - 2
Engine/source/platform/threads/test/threadPoolTest.cpp

@@ -44,6 +44,20 @@ public:
          mResults[mIndex] = mIndex;
       }
    };
+
+   // A worker that delays for some time. We'll use this to test the ThreadPool's
+   // synchronous and asynchronous operations.
+   struct DelayItem : public ThreadPool::WorkItem
+   {
+      U32 ms;
+      DelayItem(U32 _ms) : ms(_ms) {}
+
+   protected:
+      virtual void execute()
+      {
+         Platform::sleep(ms);
+      }
+   };
 };
 
 TEST_FIX(ThreadPool, BasicAPI)
@@ -63,8 +77,7 @@ TEST_FIX(ThreadPool, BasicAPI)
       pool->queueWorkItem(item);
    }
 
-   // Wait for all items to complete.
-   pool->flushWorkItems();
+   pool->waitForAllItems();
 
    // Verify.
    for (U32 i = 0; i < numItems; i++)
@@ -72,4 +85,37 @@ TEST_FIX(ThreadPool, BasicAPI)
    results.clear();
 }
 
+TEST_FIX(ThreadPool, Asynchronous)
+{
+   const U32 delay = 500; //ms
+
+   // Launch a single delaying work item.
+   ThreadPool* pool = &ThreadPool::GLOBAL();
+   ThreadSafeRef<DelayItem> item(new DelayItem(delay));
+   pool->queueWorkItem(item);
+
+   // The thread should not yet be finished.
+   EXPECT_EQ(false, item->hasExecuted());
+
+   // Wait til the item should have completed.
+   Platform::sleep(delay * 2);
+
+   EXPECT_EQ(true, item->hasExecuted());
+}
+
+TEST_FIX(ThreadPool, Synchronous)
+{
+   const U32 delay = 500; //ms
+
+   // Launch a single delaying work item.
+   ThreadPool* pool = &ThreadPool::GLOBAL();
+   ThreadSafeRef<DelayItem> item(new DelayItem(delay));
+   pool->queueWorkItem(item);
+
+   // Wait for the item to complete.
+   pool->waitForAllItems();
+
+   EXPECT_EQ(true, item->hasExecuted());
+}
+
 #endif

+ 25 - 1
Engine/source/platform/threads/threadPool.cpp

@@ -120,6 +120,7 @@ void ThreadPool::Context::updateAccumulatedPriorityBiases()
 void ThreadPool::WorkItem::process()
 {
    execute();
+   mExecuted = true;
 }
 
 //--------------------------------------------------------------------------
@@ -281,6 +282,8 @@ void ThreadPool::WorkerThread::run( void* arg )
             Platform::outputDebugString( "[ThreadPool::WorkerThread] thread '%i' takes item '0x%x'", getId(), *workItem );
 #endif
             workItem->process();
+
+            dFetchAndAdd( mPool->mNumPendingItems, ( U32 ) -1 );
          }
          else
             waitForSignal = true;
@@ -318,6 +321,7 @@ ThreadPool::ThreadPool( const char* name, U32 numThreads )
    : mName( name ),
      mNumThreads( numThreads ),
      mNumThreadsAwake( 0 ),
+     mNumPendingItems( 0 ),
      mThreads( 0 ),
      mSemaphore( 0 )
 {
@@ -409,7 +413,7 @@ void ThreadPool::queueWorkItem( WorkItem* item )
    else
    {
       // Put the item in the queue.
-
+      dFetchAndAdd( mNumPendingItems, 1 );
       mWorkItemQueue.insert( item->getPriority(), item );
 
       mSemaphore.release();
@@ -440,6 +444,26 @@ void ThreadPool::flushWorkItems( S32 timeOut )
    }
 }
 
+void ThreadPool::waitForAllItems( S32 timeOut )
+{
+   U32 endTime = 0;
+   if( timeOut != -1 )
+      endTime = Platform::getRealMilliseconds() + timeOut;
+
+   // Spinlock until there are no items that have not been processed.
+
+   while( dAtomicRead( mNumPendingItems ) )
+   {
+      Platform::sleep( 25 );
+
+      // Stop if we have exceeded our processing time budget.
+
+      if( timeOut != -1
+          && Platform::getRealMilliseconds() >= endTime )
+          break;
+   }
+}
+
 //--------------------------------------------------------------------------
 
 void ThreadPool::queueWorkItemOnMainThread( WorkItem* item )

+ 26 - 1
Engine/source/platform/threads/threadPool.h

@@ -194,6 +194,9 @@ class ThreadPool
             /// This is the primary function to implement by subclasses.
             virtual void execute() = 0;
 
+            /// This flag is set after the execute() method has completed.
+            bool mExecuted;
+
          public:
          
             /// Construct a new work item.
@@ -201,7 +204,8 @@ class ThreadPool
             /// @param context The work context in which the item should be placed.
             ///    If NULL, the root context will be used.
             WorkItem( Context* context = 0 )
-               : mContext( context ? context : Context::ROOT_CONTEXT() )
+               : mContext( context ? context : Context::ROOT_CONTEXT() ),
+                 mExecuted( false )
             {
             }
             
@@ -229,6 +233,12 @@ class ThreadPool
             /// Return the item's base priority value.
             /// @return item priority; defaults to 1.0.
             virtual F32 getPriority();
+
+            /// Has this work item been executed already?
+            bool hasExecuted() const
+            {
+               return mExecuted;
+            }
       };
 
       typedef ThreadSafeRef< WorkItem > WorkItemPtr;
@@ -254,6 +264,9 @@ class ThreadPool
       
       /// Number of worker threads guaranteed to be non-blocking.
       U32 mNumThreadsReady;
+
+      /// Number of work items that have not yet completed execution.
+      U32 mNumPendingItems;
       
       /// Semaphore used to wake up threads, if necessary.
       Semaphore mSemaphore;
@@ -306,6 +319,18 @@ class ThreadPool
       ///   the queue to flush out.  -1 = infinite.
       void flushWorkItems( S32 timeOut = -1 );
 
+      /// If you're using a non-global thread pool to parallelise some work, you
+      /// may want to block until all the parallel work is complete. As with
+      /// flushWorkItems, this method may block indefinitely if new items keep
+      /// getting added to the pool before old ones finish.
+      ///
+      /// <em>This method will not wait for items queued on the main thread using
+      /// queueWorkItemOnMainThread!</em>
+      ///
+      /// @param timeOut Soft limit on the number of milliseconds to wait for
+      ///   all items to complete.  -1 = infinite.
+      void waitForAllItems( S32 timeOut = -1 );
+
       /// Add a work item to the main thread's work queue.
       ///
       /// The main thread's work queue will be processed each frame using

+ 1 - 0
Tools/CMake/torque3d.cmake

@@ -200,6 +200,7 @@ if( NOT TORQUE_DEDICATED )
 endif()
 addPath("${srcDir}/platform/test")
 addPath("${srcDir}/platform/threads")
+addPath("${srcDir}/platform/threads/test")
 addPath("${srcDir}/platform/async")
 addPath("${srcDir}/platform/async/test")
 addPath("${srcDir}/platform/input")