Browse Source

Added Pool for WorkItem SharedPtrs to save allocation every frame. Its an optional construct to allow the user to create their own pools or not use the pool if they so desire.

Below is the profiling for the Octree and View changes to pooled allocation.

Before:
UpdateDrawables                        35    0.139    0.193    0.135     4.871
SortAndUpdateGeometry          35    0.080    0.135    0.077     2.802

After:
UpdateDrawables                        21    0.082    0.172    0.078     1.735
SortAndUpdateGeometry          21    0.054    0.069    0.052     1.144
Alex Parlett 12 years ago
parent
commit
e8abdfc366

+ 54 - 2
Source/Engine/Core/WorkQueue.cpp

@@ -99,6 +99,26 @@ void WorkQueue::CreateThreads(unsigned numThreads)
     }
 }
 
+SharedPtr<WorkItem> WorkQueue::GetFreeItem()
+{
+    // Check for the next usable item
+    HashMap<SharedPtr<WorkItem>, bool>::Iterator i = itemPool_.Begin();
+    for (; i != itemPool_.End(); i++)
+    {
+        // If available set it to in use and return it
+        if (i->second_)
+        {
+            i->second_ = false;
+            return i->first_;
+        }
+    }
+
+    // No usable items found, create a new one add it to queue as in use and return it
+    SharedPtr<WorkItem> item(new WorkItem());
+    itemPool_[item] = false;
+    return item;
+}
+
 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
 {
     // Check for duplicate items.
@@ -109,7 +129,7 @@ void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
     // Clear completed flag in case item is reused
     workItems_.Push(item);
     item->completed_ = false;
-    
+
     // Make sure worker threads' list is safe to modify
     if (threads_.Size() && !paused_)
         queueMutex_.Acquire();
@@ -270,7 +290,25 @@ void WorkQueue::PurgeCompleted()
                 eventData[P_ITEM] = static_cast<void*>(i->Get());
                 SendEvent(E_WORKITEMCOMPLETED, eventData);
             }
-            
+
+            // Check if this was a pooled item and set it to usable
+            if (itemPool_.Contains(*i))
+            {
+                // Reset the values to their defaults. This should 
+                // be safe to do here as the completed event has 
+                // already been handled and this is part of the 
+                // internal pool.
+                (*i)->start_ = NULL;
+                (*i)->end_ = NULL;
+                (*i)->aux_ = NULL;
+                (*i)->workFunction_ = NULL;
+                (*i)->priority_ = M_MAX_UNSIGNED;
+                (*i)->sendEvent_ = false;
+                (*i)->completed_ = false;
+
+                itemPool_[*i] = true;
+            }
+
             i = workItems_.Erase(i);
         }
         else
@@ -278,6 +316,19 @@ void WorkQueue::PurgeCompleted()
     }
 }
 
+void WorkQueue::PurgePool()
+{
+    static unsigned int lastSize = 0;
+    unsigned int currentSize = itemPool_.Size();
+    int difference = lastSize - currentSize;
+
+    // Difference tolerance, should be fairly significant to reduce the pool size.
+    for (unsigned i = 0; itemPool_.Size() > 0 && difference > 10 && i < difference; i++)
+        itemPool_.Erase(itemPool_.Begin());
+
+    lastSize = currentSize;
+}
+
 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
 {
     // If no worker threads, complete low-priority work here
@@ -297,6 +348,7 @@ void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
     }
     
     PurgeCompleted();
+    PurgePool();
 }
 
 }

+ 125 - 119
Source/Engine/Core/WorkQueue.h

@@ -1,120 +1,126 @@
-//
-// Copyright (c) 2008-2014 the Urho3D project.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a copy
-// of this software and associated documentation files (the "Software"), to deal
-// in the Software without restriction, including without limitation the rights
-// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-// copies of the Software, and to permit persons to whom the Software is
-// furnished to do so, subject to the following conditions:
-//
-// The above copyright notice and this permission notice shall be included in
-// all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-// THE SOFTWARE.
-//
-
-#pragma once
-
-#include "List.h"
-#include "Mutex.h"
-#include "Object.h"
-
-namespace Urho3D
-{
-
-/// Work item completed event.
-EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
-{
-    PARAM(P_ITEM, Item);                        // WorkItem ptr
-}
-
-class WorkerThread;
-
-/// Work queue item.
-struct WorkItem : public RefCounted
-{
-    // Construct
-    WorkItem() :
-        priority_(M_MAX_UNSIGNED),
-        sendEvent_(false),
-        completed_(false)
-    {
-    }
-    
-    /// Work function. Called with the work item and thread index (0 = main thread) as parameters.
-    void (*workFunction_)(const WorkItem*, unsigned);
-    /// Data start pointer.
-    void* start_;
-    /// Data end pointer.
-    void* end_;
-    /// Auxiliary data pointer.
-    void* aux_;
-    /// Priority. Higher value = will be completed first.
-    unsigned priority_;
-    /// Whether to send event on completion.
-    bool sendEvent_;
-    /// Completed flag.
-    volatile bool completed_;
-};
-
-/// Work queue subsystem for multithreading.
-class URHO3D_API WorkQueue : public Object
-{
-    OBJECT(WorkQueue);
-    
-    friend class WorkerThread;
-    
-public:
-    /// Construct.
-    WorkQueue(Context* context);
-    /// Destruct.
-    ~WorkQueue();
-    
-    /// Create worker threads. Can only be called once.
-    void CreateThreads(unsigned numThreads);
-    /// Add a work item and resume worker threads.
+//
+// Copyright (c) 2008-2014 the Urho3D project.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+
+#pragma once
+
+#include "List.h"
+#include "Mutex.h"
+#include "Object.h"
+
+namespace Urho3D
+{
+
+/// Work item completed event.
+EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
+{
+    PARAM(P_ITEM, Item);                        // WorkItem ptr
+}
+
+class WorkerThread;
+
+/// Work queue item.
+struct WorkItem : public RefCounted
+{
+    // Construct
+    WorkItem() :
+        priority_(M_MAX_UNSIGNED),
+        sendEvent_(false),
+        completed_(false)
+    {
+    }
+    
+    /// Work function. Called with the work item and thread index (0 = main thread) as parameters.
+    void (*workFunction_)(const WorkItem*, unsigned);
+    /// Data start pointer.
+    void* start_;
+    /// Data end pointer.
+    void* end_;
+    /// Auxiliary data pointer.
+    void* aux_;
+    /// Priority. Higher value = will be completed first.
+    unsigned priority_;
+    /// Whether to send event on completion.
+    bool sendEvent_;
+    /// Completed flag.
+    volatile bool completed_;
+};
+
+/// Work queue subsystem for multithreading.
+class URHO3D_API WorkQueue : public Object
+{
+    OBJECT(WorkQueue);
+    
+    friend class WorkerThread;
+    
+public:
+    /// Construct.
+    WorkQueue(Context* context);
+    /// Destruct.
+    ~WorkQueue();
+    
+    /// Create worker threads. Can only be called once.
+    void CreateThreads(unsigned numThreads);
+    /// Get next free WorkItem Ptr.
+    SharedPtr<WorkItem> GetFreeItem();
+    /// Add a work item and resume worker threads.
     void AddWorkItem(SharedPtr<WorkItem> item);
-    /// Pause worker threads.
-    void Pause();
-    /// Resume worker threads.
-    void Resume();
-    /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
-    void Complete(unsigned priority);
-    
-    /// Return number of worker threads.
-    unsigned GetNumThreads() const { return threads_.Size(); }
-    /// Return whether all work with at least the specified priority is finished.
-    bool IsCompleted(unsigned priority) const;
-    
-private:
-    /// Process work items until shut down. Called by the worker threads.
-    void ProcessItems(unsigned threadIndex);
-    /// Purge completed work items and send completion events as necessary.
-    void PurgeCompleted();
-    /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
-    void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
-    
-    /// Worker threads.
-    Vector<SharedPtr<WorkerThread> > threads_;
-    /// Work item collection. Accessed only by the main thread.
-    List<SharedPtr<WorkItem> > workItems_;
-    /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
-    List<WorkItem*> queue_;
-    /// Worker queue mutex.
-    Mutex queueMutex_;
-    /// Shutting down flag.
-    volatile bool shutDown_;
-    /// Pausing flag. Indicates the worker threads should not contend for the queue mutex.
-    volatile bool pausing_;
-    /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
-    bool paused_;
-};
-
-}
+    /// Pause worker threads.
+    void Pause();
+    /// Resume worker threads.
+    void Resume();
+    /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
+    void Complete(unsigned priority);
+    
+    /// Return number of worker threads.
+    unsigned GetNumThreads() const { return threads_.Size(); }
+    /// Return whether all work with at least the specified priority is finished.
+    bool IsCompleted(unsigned priority) const;
+    
+private:
+    /// Process work items until shut down. Called by the worker threads.
+    void ProcessItems(unsigned threadIndex);
+    /// Purge completed work items and send completion events as necessary.
+    void PurgeCompleted();
+    /// Purge the pool to reduce allocation where its unneeded.
+    void PurgePool();
+    /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
+    void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
+    
+    /// Worker threads.
+    Vector<SharedPtr<WorkerThread> > threads_;
+    /// Work item pool for reuse to cut down on allocation. The bool is a flag for item pooling and whether it is available or not.
+    HashMap<SharedPtr<WorkItem>, bool> itemPool_;
+    /// Work item collection. Accessed only by the main thread.
+    List<SharedPtr<WorkItem> > workItems_;
+    /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
+    List<WorkItem*> queue_;
+    /// Worker queue mutex.
+    Mutex queueMutex_;
+    /// Shutting down flag.
+    volatile bool shutDown_;
+    /// Pausing flag. Indicates the worker threads should not contend for the queue mutex.
+    volatile bool pausing_;
+    /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
+    bool paused_;
+};
+
+}

+ 2 - 2
Source/Engine/Graphics/Octree.cpp

@@ -410,7 +410,7 @@ void Octree::Update(const FrameInfo& frame)
         // Create a work item for each thread
         for (int i = 0; i < numWorkItems; ++i)
         {
-            SharedPtr<WorkItem> item(new WorkItem());
+            SharedPtr<WorkItem> item = queue->GetFreeItem();
             item->workFunction_ = UpdateDrawablesWork;
             item->aux_ = const_cast<FrameInfo*>(&frame);
 
@@ -529,7 +529,7 @@ void Octree::Raycast(RayOctreeQuery& query) const
             PODVector<Drawable*>::Iterator start = rayQueryDrawables_.Begin();
             while (start != rayQueryDrawables_.End())
             {
-                SharedPtr<WorkItem> item(new WorkItem());
+                SharedPtr<WorkItem> item = queue->GetFreeItem();
                 item->workFunction_ = RaycastDrawablesWork;
                 item->aux_ = const_cast<Octree*>(this);
 

+ 6 - 6
Source/Engine/Graphics/View.cpp

@@ -691,7 +691,7 @@ void View::GetDrawables()
         // Create a work item for each thread
         for (int i = 0; i < numWorkItems; ++i)
         {
-            SharedPtr<WorkItem> item(new WorkItem());
+            SharedPtr<WorkItem> item = queue->GetFreeItem();
             item->workFunction_ = CheckVisibilityWork;
             item->aux_ = this;
 
@@ -764,7 +764,7 @@ void View::GetBatches()
         
         for (unsigned i = 0; i < lightQueryResults_.Size(); ++i)
         {
-            SharedPtr<WorkItem> item(new WorkItem());
+            SharedPtr<WorkItem> item = queue->GetFreeItem();
             item->workFunction_ = ProcessLightWork;
             item->aux_ = this;
 
@@ -1059,7 +1059,7 @@ void View::UpdateGeometries()
             {
                 BatchQueue* passQueue = &batchQueues_[command.pass_];
                 
-                SharedPtr<WorkItem> item(new WorkItem());
+                SharedPtr<WorkItem> item = queue->GetFreeItem();
                 item->workFunction_ = command.sortMode_ == SORT_FRONTTOBACK ? SortBatchQueueFrontToBackWork : SortBatchQueueBackToFrontWork;
                 item->start_ = &batchQueues_[command.pass_];
                 queue->AddWorkItem(item);
@@ -1068,14 +1068,14 @@ void View::UpdateGeometries()
         
         for (Vector<LightBatchQueue>::Iterator i = lightQueues_.Begin(); i != lightQueues_.End(); ++i)
         {
-            SharedPtr<WorkItem> lightItem(new WorkItem());
+            SharedPtr<WorkItem> lightItem = queue->GetFreeItem();
             lightItem->workFunction_ = SortLightQueueWork;
             lightItem->start_ = &(*i);
             queue->AddWorkItem(lightItem);
 
             if (i->shadowSplits_.Size())
             {
-                SharedPtr<WorkItem> shadowItem(new WorkItem());
+                SharedPtr<WorkItem> shadowItem = queue->GetFreeItem();
                 shadowItem->workFunction_ = SortShadowQueueWork;
                 shadowItem->start_ = &(*i);
                 queue->AddWorkItem(shadowItem);
@@ -1117,7 +1117,7 @@ void View::UpdateGeometries()
                 if (i < numWorkItems - 1 && end - start > drawablesPerItem)
                     end = start + drawablesPerItem;
                 
-                SharedPtr<WorkItem> item(new WorkItem());
+                SharedPtr<WorkItem> item = queue->GetFreeItem();
                 item->workFunction_ = UpdateDrawableGeometriesWork;
                 item->aux_ = const_cast<FrameInfo*>(&frame_);
                 item->start_ = &(*start);