Browse Source

Allow threaded work items with varying priorities. Default is immediate (maximum) priority as used by Octree & View. Optionally send events when threaded work items complete.

Lasse Öörni 13 years ago
parent
commit
945085d975
4 changed files with 157 additions and 55 deletions
  1. 118 40
      Engine/Core/WorkQueue.cpp
  2. 34 10
      Engine/Core/WorkQueue.h
  3. 2 2
      Engine/Graphics/Octree.cpp
  4. 3 3
      Engine/Graphics/View.cpp

+ 118 - 40
Engine/Core/WorkQueue.cpp

@@ -21,7 +21,9 @@
 //
 
 #include "Precompiled.h"
+#include "CoreEvents.h"
 #include "ProcessUtils.h"
+#include "Profiler.h"
 #include "Thread.h"
 #include "Timer.h"
 #include "WorkQueue.h"
@@ -29,6 +31,8 @@
 namespace Urho3D
 {
 
+const unsigned MAX_NONTHREADED_WORK_USEC = 1000;
+
 /// Worker thread managed by the work queue.
 class WorkerThread : public Thread, public RefCounted
 {
@@ -62,11 +66,11 @@ OBJECTTYPESTATIC(WorkQueue);
 
 WorkQueue::WorkQueue(Context* context) :
     Object(context),
-    numActive_(0),
     shutDown_(false),
     pausing_(false),
     paused_(false)
 {
+    SubscribeToEvent(E_BEGINFRAME, HANDLER(WorkQueue, HandleBeginFrame));
 }
 
 WorkQueue::~WorkQueue()
@@ -99,23 +103,36 @@ void WorkQueue::CreateThreads(unsigned numThreads)
 
 void WorkQueue::AddWorkItem(const WorkItem& item)
 {
-    if (threads_.Size())
+    // Push to the main thread list to keep item alive
+    // Clear completed flag in case item is reused
+    workItems_.Push(item);
+    WorkItem* itemPtr = &workItems_.Back();
+    itemPtr->completed_ = false;
+    
+    // Make sure worker threads' list is safe to modify
+    if (threads_.Size() && !paused_)
+        queueMutex_.Acquire();
+    
+    // Find position for new item
+    if (queue_.Empty())
+        queue_.Push(itemPtr);
+    else
     {
-        if (paused_)
-        {
-            queue_.Push(item);
-            queueMutex_.Release();
-            paused_ = false;
-        }
-        else
+        for (List<WorkItem*>::Iterator i = queue_.Begin(); i != queue_.End(); ++i)
         {
-            queueMutex_.Acquire();
-            queue_.Push(item);
-            queueMutex_.Release();
+            if ((*i)->priority_ <= itemPtr->priority_)
+            {
+                queue_.Insert(i, itemPtr);
+                break;
+            }
         }
     }
-    else
-        item.workFunction_(&item, 0);
+    
+    if (threads_.Size())
+    {
+        queueMutex_.Release();
+        paused_ = false;
+    }
 }
 
 void WorkQueue::Pause()
@@ -141,43 +158,64 @@ void WorkQueue::Resume()
 }
 
 
-void WorkQueue::Complete()
+void WorkQueue::Complete(unsigned priority)
 {
     if (threads_.Size())
     {
         Resume();
         
-        // Take work items in the main thread until queue empty
+        // Take work items also in the main thread until queue empty or no high-priority items anymore
         while (!queue_.Empty())
         {
             queueMutex_.Acquire();
-            if (!queue_.Empty())
+            if (!queue_.Empty() && queue_.Front()->priority_ >= priority)
             {
-                WorkItem item = queue_.Front();
+                WorkItem* item = queue_.Front();
                 queue_.PopFront();
                 queueMutex_.Release();
-                item.workFunction_(&item, 0);
+                item->workFunction_(item, 0);
+                item->completed_ = true;
             }
             else
+            {
                 queueMutex_.Release();
+                break;
+            }
         }
         
-        // Wait for all work to finish
-        while (!IsCompleted())
+        // Wait for threaded work to complete
+        while (!IsCompleted(priority))
         {
         }
         
-        // Pause worker threads by leaving the mutex locked
-        Pause();
+        // If no work at all remaining, pause worker threads by leaving the mutex locked
+        if (queue_.Empty())
+            Pause();
     }
+    else
+    {
+        // No worker threads: ensure all high-priority items are completed in the main thread
+        while (!queue_.Empty() && queue_.Front()->priority_ >= priority)
+        {
+            WorkItem* item = queue_.Front();
+            queue_.PopFront();
+            item->workFunction_(item, 0);
+            item->completed_ = true;
+        }
+    }
+    
+    PurgeCompleted();
 }
 
-bool WorkQueue::IsCompleted() const
+bool WorkQueue::IsCompleted(unsigned priority) const
 {
-    if (threads_.Size())
-        return !numActive_ && queue_.Empty();
-    else
-        return true;
+    for (List<WorkItem>::ConstIterator i = workItems_.Begin(); i != workItems_.End(); ++i)
+    {
+        if (i->priority_ >= priority && !i->completed_)
+            return false;
+    }
+    
+    return true;
 }
 
 void WorkQueue::ProcessItems(unsigned threadIndex)
@@ -196,23 +234,18 @@ void WorkQueue::ProcessItems(unsigned threadIndex)
             queueMutex_.Acquire();
             if (!queue_.Empty())
             {
-                if (!wasActive)
-                {
-                    ++numActive_;
-                    wasActive = true;
-                }
-                WorkItem item = queue_.Front();
+                wasActive = true;
+                
+                WorkItem* item = queue_.Front();
                 queue_.PopFront();
                 queueMutex_.Release();
-                item.workFunction_(&item, threadIndex);
+                item->workFunction_(item, threadIndex);
+                item->completed_ = true;
             }
             else
             {
-                if (wasActive)
-                {
-                    --numActive_;
-                    wasActive = false;
-                }
+                wasActive = false;
+                
                 queueMutex_.Release();
                 Time::Sleep(0);
             }
@@ -220,4 +253,49 @@ void WorkQueue::ProcessItems(unsigned threadIndex)
     }
 }
 
+void WorkQueue::PurgeCompleted()
+{
+    using namespace WorkItemCompleted;
+    
+    VariantMap eventData;
+    
+    // Purge completed work items and send completion events.
+    for (List<WorkItem>::Iterator i = workItems_.Begin(); i != workItems_.End();)
+    {
+        if (i->completed_)
+        {
+            if (i->sendEvent_)
+            {
+                eventData[P_ITEM] = (void*)(&(*i));
+                SendEvent(E_WORKITEMCOMPLETED, eventData);
+            }
+            
+            i = workItems_.Erase(i);
+        }
+        else
+            ++i;
+    }
+}
+
+void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
+{
+    // If no worker threads, complete low-priority work here
+    if (threads_.Empty() && !queue_.Empty())
+    {
+        PROFILE(CompleteWorkNonthreaded);
+        
+        HiresTimer timer;
+        
+        while (!queue_.Empty() && timer.GetUSec(false) < MAX_NONTHREADED_WORK_USEC)
+        {
+            WorkItem* item = queue_.Front();
+            queue_.PopFront();
+            item->workFunction_(item, 0);
+            item->completed_ = true;
+        }
+    }
+    
+    PurgeCompleted();
+}
+
 }

+ 34 - 10
Engine/Core/WorkQueue.h

@@ -29,11 +29,25 @@
 namespace Urho3D
 {
 
+/// Work item completed event.
+EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
+{
+    PARAM(P_ITEM, Item);                        // WorkItem ptr
+}
+
 class WorkerThread;
 
 /// Work queue item.
 struct WorkItem
 {
+    // Construct
+    WorkItem() :
+        priority_(M_MAX_UNSIGNED),
+        sendEvent_(false),
+        completed_(false)
+    {
+    }
+    
     /// Work function. Called with the work item and thread index (0 = main thread) as parameters.
     void (*workFunction_)(const WorkItem*, unsigned);
     /// Data start pointer.
@@ -42,6 +56,12 @@ struct WorkItem
     void* end_;
     /// Auxiliary data pointer.
     void* aux_;
+    /// Priority. Higher value = will be completed first.
+    unsigned priority_;
+    /// Whether to send event on completion.
+    bool sendEvent_;
+    /// Completed flag.
+    volatile bool completed_;
 };
 
 /// Work queue subsystem for multithreading.
@@ -59,32 +79,36 @@ public:
     
     /// Create worker threads. Can only be called once.
     void CreateThreads(unsigned numThreads);
-    /// Add a work item and resume work. If no threads, will process it immediately.
+    /// Add a work item and resume work.
     void AddWorkItem(const WorkItem& item);
     /// Pause work.
     void Pause();
     /// Resume work.
     void Resume();
-    /// Finish all queued work, then pause.
-    void Complete();
+    /// Finish all queued work which has at least the specified priority, then pause. Main thread will also execute priority work.
+    void Complete(unsigned priority);
     
     /// Return number of worker threads.
     unsigned GetNumThreads() const { return threads_.Size(); }
-    /// Return whether all work is completed.
-    bool IsCompleted() const;
+    /// Return whether all work with at least the specified priority is finished.
+    bool IsCompleted(unsigned priority) const;
     
 private:
     /// Process work items until shut down. Called by the worker threads.
     void ProcessItems(unsigned threadIndex);
+    /// Purge completed work items and send completion events as necessary.
+    void PurgeCompleted();
+    /// Handle frame start event. Purges completed work from the main thread queue, and performs work if no threads at all.
+    void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
     
     /// Worker threads.
     Vector<SharedPtr<WorkerThread> > threads_;
-    /// Work item queue.
-    List<WorkItem> queue_;
-    /// Queue mutex.
+    /// Work item collection. Accessed only by the main thread.
+    List<WorkItem> workItems_;
+    /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
+    List<WorkItem*> queue_;
+    /// Worker queue mutex.
     Mutex queueMutex_;
-    /// Number of threads working on an item.
-    volatile unsigned numActive_;
     /// Shutting down flag.
     volatile bool shutDown_;
     /// Pausing flag. Indicates the worker threads should not contend for the queue mutex.

+ 2 - 2
Engine/Graphics/Octree.cpp

@@ -458,7 +458,7 @@ void Octree::Raycast(RayOctreeQuery& query) const
             }
             
             // Merge per-thread results
-            queue->Complete();
+            queue->Complete(M_MAX_UNSIGNED);
             for (unsigned i = 0; i < rayQueryResults_.Size(); ++i)
                 query.result_.Insert(query.result_.End(), rayQueryResults_[i].Begin(), rayQueryResults_[i].End());
         }
@@ -568,7 +568,7 @@ void Octree::UpdateDrawables(const FrameInfo& frame)
         start = end;
     }
     
-    queue->Complete();
+    queue->Complete(M_MAX_UNSIGNED);
     scene->EndThreadedUpdate();
     drawableUpdates_.Clear();
 }

+ 3 - 3
Engine/Graphics/View.cpp

@@ -628,7 +628,7 @@ void View::GetDrawables()
             start = end;
         }
         
-        queue->Complete();
+        queue->Complete(M_MAX_UNSIGNED);
     }
     
     // Sort into geometries & lights, and build visible scene bounding boxes in world and view space
@@ -716,7 +716,7 @@ void View::GetBatches()
         }
         
         // Ensure all lights have been processed before proceeding
-        queue->Complete();
+        queue->Complete(M_MAX_UNSIGNED);
     }
     
     // Build light queues and lit batches
@@ -1065,7 +1065,7 @@ void View::UpdateGeometries()
     }
     
     // Finally ensure all threaded work has completed
-    queue->Complete();
+    queue->Complete(M_MAX_UNSIGNED);
 }
 
 void View::GetLitBatches(Drawable* drawable, LightBatchQueue& lightQueue, BatchQueue* alphaQueue, bool useLitBase)