Browse Source

Only signal work items that have specified or higher priority in WorkQueue::Complete(), to prevent lower priority work item events triggering when they shouldn't (in the middle of render update.) Set explicit max priority in Octree & View items, but leave default priority to the lowest for user items. Removed a static variable from WorkQueue::PurgePool().

Lasse Öörni 12 years ago
parent
commit
e44e3f7652

+ 21 - 11
Source/Engine/Core/WorkQueue.cpp

@@ -22,6 +22,7 @@
 
 #include "Precompiled.h"
 #include "CoreEvents.h"
+#include "Log.h"
 #include "ProcessUtils.h"
 #include "Profiler.h"
 #include "Thread.h"
@@ -67,7 +68,8 @@ WorkQueue::WorkQueue(Context* context) :
     shutDown_(false),
     pausing_(false),
     paused_(false),
-    tolerance_(10)
+    tolerance_(10),
+    lastSize_(0)
 {
     SubscribeToEvent(E_BEGINFRAME, HANDLER(WorkQueue, HandleBeginFrame));
 }
@@ -119,9 +121,15 @@ SharedPtr<WorkItem> WorkQueue::GetFreeItem()
 
 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
 {
+    if (!item)
+    {
+        LOGERROR("Null work item submitted to the work queue");
+        return;
+    }
+    
     // Check for duplicate items.
     assert(!workItems_.Contains(item));
-
+    
     // Push to the main thread list to keep item alive
     // Clear completed flag in case item is reused
     workItems_.Push(item);
@@ -222,7 +230,7 @@ void WorkQueue::Complete(unsigned priority)
         }
     }
     
-    PurgeCompleted();
+    PurgeCompleted(priority);
 }
 
 bool WorkQueue::IsCompleted(unsigned priority) const
@@ -271,12 +279,14 @@ void WorkQueue::ProcessItems(unsigned threadIndex)
     }
 }
 
-void WorkQueue::PurgeCompleted()
+void WorkQueue::PurgeCompleted(unsigned priority)
 {
-    // Purge completed work items and send completion events.
+    // Purge completed work items and send completion events. Do not signal items lower than priority threshold,
+    // as those may be user submitted and lead to eg. scene manipulation that could happen in the middle of the
+    // render update, which is not allowed
     for (List<SharedPtr<WorkItem> >::Iterator i = workItems_.Begin(); i != workItems_.End();)
     {
-        if ((*i)->completed_)
+        if ((*i)->completed_ && (*i)->priority_ >= priority)
         {
             if ((*i)->sendEvent_)
             {
@@ -314,15 +324,14 @@ void WorkQueue::PurgeCompleted()
 
 void WorkQueue::PurgePool()
 {
-    static unsigned int lastSize = 0;
     unsigned int currentSize = poolItems_.Size();
-    int difference = lastSize - currentSize;
+    int difference = lastSize_ - currentSize;
 
     // Difference tolerance, should be fairly significant to reduce the pool size.
-    for (unsigned i = 0; poolItems_.Size() > 0 && difference > tolerance_ && i < difference; i++)
+    for (unsigned i = 0; poolItems_.Size() > 0 && difference > tolerance_ && i < (unsigned)difference; i++)
         poolItems_.PopFront();
 
-    lastSize = currentSize;
+    lastSize_ = currentSize;
 }
 
 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
@@ -343,7 +352,8 @@ void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
         }
     }
     
-    PurgeCompleted();
+    // Complete and signal items down to the lowest priority
+    PurgeCompleted(0);
     PurgePool();
 }
 

+ 140 - 138
Source/Engine/Core/WorkQueue.h

@@ -1,139 +1,141 @@
-//
-// Copyright (c) 2008-2014 the Urho3D project.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a copy
-// of this software and associated documentation files (the "Software"), to deal
-// in the Software without restriction, including without limitation the rights
-// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-// copies of the Software, and to permit persons to whom the Software is
-// furnished to do so, subject to the following conditions:
-//
-// The above copyright notice and this permission notice shall be included in
-// all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-// THE SOFTWARE.
-//
-
-#pragma once
-
-#include "List.h"
-#include "Mutex.h"
-#include "Object.h"
-
-namespace Urho3D
-{
-
-/// Work item completed event.
-EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
-{
-    PARAM(P_ITEM, Item);                        // WorkItem ptr
-}
-
-class WorkerThread;
-
-/// Work queue item.
-struct WorkItem : public RefCounted
-{
-    friend class WorkQueue;
-
-public:
-    // Construct
-    WorkItem() :
-        priority_(M_MAX_UNSIGNED),
-        sendEvent_(false),
-        completed_(false),
-        pooled_(false)
-    {
-    }
-    
-    /// Work function. Called with the work item and thread index (0 = main thread) as parameters.
-    void (*workFunction_)(const WorkItem*, unsigned);
-    /// Data start pointer.
-    void* start_;
-    /// Data end pointer.
-    void* end_;
-    /// Auxiliary data pointer.
-    void* aux_;
-    /// Priority. Higher value = will be completed first.
-    unsigned priority_;
-    /// Whether to send event on completion.
-    bool sendEvent_;
-    /// Completed flag.
-    volatile bool completed_;
-
-private:
-    bool pooled_;
-};
-
-/// Work queue subsystem for multithreading.
-class URHO3D_API WorkQueue : public Object
-{
-    OBJECT(WorkQueue);
-    
-    friend class WorkerThread;
-    
-public:
-    /// Construct.
-    WorkQueue(Context* context);
-    /// Destruct.
-    ~WorkQueue();
-    
-    /// Create worker threads. Can only be called once.
-    void CreateThreads(unsigned numThreads);
-    /// Get next free WorkItem Ptr.
-    SharedPtr<WorkItem> GetFreeItem();
-    /// Add a work item and resume worker threads.
+//
+// Copyright (c) 2008-2014 the Urho3D project.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+
+#pragma once
+
+#include "List.h"
+#include "Mutex.h"
+#include "Object.h"
+
+namespace Urho3D
+{
+
+/// Work item completed event.
+EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
+{
+    PARAM(P_ITEM, Item);                        // WorkItem ptr
+}
+
+class WorkerThread;
+
+/// Work queue item.
+struct WorkItem : public RefCounted
+{
+    friend class WorkQueue;
+
+public:
+    // Construct
+    WorkItem() :
+        priority_(0),
+        sendEvent_(false),
+        completed_(false),
+        pooled_(false)
+    {
+    }
+    
+    /// Work function. Called with the work item and thread index (0 = main thread) as parameters.
+    void (*workFunction_)(const WorkItem*, unsigned);
+    /// Data start pointer.
+    void* start_;
+    /// Data end pointer.
+    void* end_;
+    /// Auxiliary data pointer.
+    void* aux_;
+    /// Priority. Higher value = will be completed first.
+    unsigned priority_;
+    /// Whether to send event on completion.
+    bool sendEvent_;
+    /// Completed flag.
+    volatile bool completed_;
+
+private:
+    bool pooled_;
+};
+
+/// Work queue subsystem for multithreading.
+class URHO3D_API WorkQueue : public Object
+{
+    OBJECT(WorkQueue);
+    
+    friend class WorkerThread;
+    
+public:
+    /// Construct.
+    WorkQueue(Context* context);
+    /// Destruct.
+    ~WorkQueue();
+    
+    /// Create worker threads. Can only be called once.
+    void CreateThreads(unsigned numThreads);
+    /// Get pointer to an usable WorkItem from the item pool. Allocate one if no more free items.
+    SharedPtr<WorkItem> GetFreeItem();
+    /// Add a work item and resume worker threads.
     void AddWorkItem(SharedPtr<WorkItem> item);
-    /// Pause worker threads.
-    void Pause();
-    /// Resume worker threads.
-    void Resume();
-    /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
-    void Complete(unsigned priority);
-    /// Set the pool telerance before it starts deleting pool items.
-    void SetTolerance(int tolerance) { tolerance_ = tolerance; }
-    
-    /// Return number of worker threads.
-    unsigned GetNumThreads() const { return threads_.Size(); }
-    /// Return whether all work with at least the specified priority is finished.
-    bool IsCompleted(unsigned priority) const;
-    /// Return the pool tolerance.
-    int GetTolerance() const { return tolerance_; }
-    
-private:
-    /// Process work items until shut down. Called by the worker threads.
-    void ProcessItems(unsigned threadIndex);
-    /// Purge completed work items and send completion events as necessary.
-    void PurgeCompleted();
-    /// Purge the pool to reduce allocation where its unneeded.
-    void PurgePool();
-    /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
-    void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
-    
-    /// Worker threads.
-    Vector<SharedPtr<WorkerThread> > threads_;
-    /// Work item pool for reuse to cut down on allocation. The bool is a flag for item pooling and whether it is available or not.
-    List<SharedPtr<WorkItem> > poolItems_;
-    /// Work item collection. Accessed only by the main thread.
-    List<SharedPtr<WorkItem> > workItems_;
-    /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
-    List<WorkItem*> queue_;
-    /// Worker queue mutex.
-    Mutex queueMutex_;
-    /// Shutting down flag.
-    volatile bool shutDown_;
-    /// Pausing flag. Indicates the worker threads should not contend for the queue mutex.
-    volatile bool pausing_;
-    /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
-    bool paused_;
-    /// Tolerance for the shared pool before it begins to deallocate.
-    int tolerance_;
-};
-
-}
+    /// Pause worker threads.
+    void Pause();
+    /// Resume worker threads.
+    void Resume();
+    /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
+    void Complete(unsigned priority);
+    /// Set the pool telerance before it starts deleting pool items.
+    void SetTolerance(int tolerance) { tolerance_ = tolerance; }
+    
+    /// Return number of worker threads.
+    unsigned GetNumThreads() const { return threads_.Size(); }
+    /// Return whether all work with at least the specified priority is finished.
+    bool IsCompleted(unsigned priority) const;
+    /// Return the pool tolerance.
+    int GetTolerance() const { return tolerance_; }
+    
+private:
+    /// Process work items until shut down. Called by the worker threads.
+    void ProcessItems(unsigned threadIndex);
+    /// Purge completed work items which have at least the specified priority, and send completion events as necessary.
+    void PurgeCompleted(unsigned priority);
+    /// Purge the pool to reduce allocation where its unneeded.
+    void PurgePool();
+    /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
+    void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
+    
+    /// Worker threads.
+    Vector<SharedPtr<WorkerThread> > threads_;
+    /// Work item pool for reuse to cut down on allocation. The bool is a flag for item pooling and whether it is available or not.
+    List<SharedPtr<WorkItem> > poolItems_;
+    /// Work item collection. Accessed only by the main thread.
+    List<SharedPtr<WorkItem> > workItems_;
+    /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
+    List<WorkItem*> queue_;
+    /// Worker queue mutex.
+    Mutex queueMutex_;
+    /// Shutting down flag.
+    volatile bool shutDown_;
+    /// Pausing flag. Indicates the worker threads should not contend for the queue mutex.
+    volatile bool pausing_;
+    /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
+    bool paused_;
+    /// Tolerance for the shared pool before it begins to deallocate.
+    int tolerance_;
+    /// Last size of the shared pool.
+    unsigned lastSize_;
+};
+
+}

+ 2 - 0
Source/Engine/Graphics/Octree.cpp

@@ -411,6 +411,7 @@ void Octree::Update(const FrameInfo& frame)
         for (int i = 0; i < numWorkItems; ++i)
         {
             SharedPtr<WorkItem> item = queue->GetFreeItem();
+            item->priority_ = M_MAX_UNSIGNED;
             item->workFunction_ = UpdateDrawablesWork;
             item->aux_ = const_cast<FrameInfo*>(&frame);
 
@@ -530,6 +531,7 @@ void Octree::Raycast(RayOctreeQuery& query) const
             while (start != rayQueryDrawables_.End())
             {
                 SharedPtr<WorkItem> item = queue->GetFreeItem();
+                item->priority_ = M_MAX_UNSIGNED;
                 item->workFunction_ = RaycastDrawablesWork;
                 item->aux_ = const_cast<Octree*>(this);
 

+ 6 - 0
Source/Engine/Graphics/View.cpp

@@ -692,6 +692,7 @@ void View::GetDrawables()
         for (int i = 0; i < numWorkItems; ++i)
         {
             SharedPtr<WorkItem> item = queue->GetFreeItem();
+            item->priority_ = M_MAX_UNSIGNED;
             item->workFunction_ = CheckVisibilityWork;
             item->aux_ = this;
 
@@ -765,6 +766,7 @@ void View::GetBatches()
         for (unsigned i = 0; i < lightQueryResults_.Size(); ++i)
         {
             SharedPtr<WorkItem> item = queue->GetFreeItem();
+            item->priority_ = M_MAX_UNSIGNED;
             item->workFunction_ = ProcessLightWork;
             item->aux_ = this;
 
@@ -1060,6 +1062,7 @@ void View::UpdateGeometries()
                 BatchQueue* passQueue = &batchQueues_[command.pass_];
                 
                 SharedPtr<WorkItem> item = queue->GetFreeItem();
+                item->priority_ = M_MAX_UNSIGNED;
                 item->workFunction_ = command.sortMode_ == SORT_FRONTTOBACK ? SortBatchQueueFrontToBackWork : SortBatchQueueBackToFrontWork;
                 item->start_ = &batchQueues_[command.pass_];
                 queue->AddWorkItem(item);
@@ -1069,6 +1072,7 @@ void View::UpdateGeometries()
         for (Vector<LightBatchQueue>::Iterator i = lightQueues_.Begin(); i != lightQueues_.End(); ++i)
         {
             SharedPtr<WorkItem> lightItem = queue->GetFreeItem();
+            lightItem->priority_ = M_MAX_UNSIGNED;
             lightItem->workFunction_ = SortLightQueueWork;
             lightItem->start_ = &(*i);
             queue->AddWorkItem(lightItem);
@@ -1076,6 +1080,7 @@ void View::UpdateGeometries()
             if (i->shadowSplits_.Size())
             {
                 SharedPtr<WorkItem> shadowItem = queue->GetFreeItem();
+                shadowItem->priority_ = M_MAX_UNSIGNED;
                 shadowItem->workFunction_ = SortShadowQueueWork;
                 shadowItem->start_ = &(*i);
                 queue->AddWorkItem(shadowItem);
@@ -1118,6 +1123,7 @@ void View::UpdateGeometries()
                     end = start + drawablesPerItem;
                 
                 SharedPtr<WorkItem> item = queue->GetFreeItem();
+                item->priority_ = M_MAX_UNSIGNED;
                 item->workFunction_ = UpdateDrawableGeometriesWork;
                 item->aux_ = const_cast<FrameInfo*>(&frame_);
                 item->start_ = &(*start);