瀏覽代碼

Merge remote-tracking branch 'remotes/alexparlett/feature/pooled-work-queue'

Lasse Öörni 12 年之前
父節點
當前提交
e4008732d1
共有 4 個文件被更改,包括 187 次插入129 次删除
  1. 54 2
      Source/Engine/Core/WorkQueue.cpp
  2. 125 119
      Source/Engine/Core/WorkQueue.h
  3. 2 2
      Source/Engine/Graphics/Octree.cpp
  4. 6 6
      Source/Engine/Graphics/View.cpp

+ 54 - 2
Source/Engine/Core/WorkQueue.cpp

@@ -99,6 +99,26 @@ void WorkQueue::CreateThreads(unsigned numThreads)
     }
     }
 }
 }
 
 
+SharedPtr<WorkItem> WorkQueue::GetFreeItem()
+{
+    // Check for the next usable item
+    HashMap<SharedPtr<WorkItem>, bool>::Iterator i = itemPool_.Begin();
+    for (; i != itemPool_.End(); i++)
+    {
+        // If available set it to in use and return it
+        if (i->second_)
+        {
+            i->second_ = false;
+            return i->first_;
+        }
+    }
+
+    // No usable items found, create a new one add it to queue as in use and return it
+    SharedPtr<WorkItem> item(new WorkItem());
+    itemPool_[item] = false;
+    return item;
+}
+
 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
 {
 {
     // Check for duplicate items.
     // Check for duplicate items.
@@ -109,7 +129,7 @@ void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
     // Clear completed flag in case item is reused
     // Clear completed flag in case item is reused
     workItems_.Push(item);
     workItems_.Push(item);
     item->completed_ = false;
     item->completed_ = false;
-    
+
     // Make sure worker threads' list is safe to modify
     // Make sure worker threads' list is safe to modify
     if (threads_.Size() && !paused_)
     if (threads_.Size() && !paused_)
         queueMutex_.Acquire();
         queueMutex_.Acquire();
@@ -270,7 +290,25 @@ void WorkQueue::PurgeCompleted()
                 eventData[P_ITEM] = static_cast<void*>(i->Get());
                 eventData[P_ITEM] = static_cast<void*>(i->Get());
                 SendEvent(E_WORKITEMCOMPLETED, eventData);
                 SendEvent(E_WORKITEMCOMPLETED, eventData);
             }
             }
-            
+
+            // Check if this was a pooled item and set it to usable
+            if (itemPool_.Contains(*i))
+            {
+                // Reset the values to their defaults. This should 
+                // be safe to do here as the completed event has 
+                // already been handled and this is part of the 
+                // internal pool.
+                (*i)->start_ = NULL;
+                (*i)->end_ = NULL;
+                (*i)->aux_ = NULL;
+                (*i)->workFunction_ = NULL;
+                (*i)->priority_ = M_MAX_UNSIGNED;
+                (*i)->sendEvent_ = false;
+                (*i)->completed_ = false;
+
+                itemPool_[*i] = true;
+            }
+
             i = workItems_.Erase(i);
             i = workItems_.Erase(i);
         }
         }
         else
         else
@@ -278,6 +316,19 @@ void WorkQueue::PurgeCompleted()
     }
     }
 }
 }
 
 
+void WorkQueue::PurgePool()
+{
+    static unsigned int lastSize = 0;
+    unsigned int currentSize = itemPool_.Size();
+    int difference = lastSize - currentSize;
+
+    // Difference tolerance, should be fairly significant to reduce the pool size.
+    for (unsigned i = 0; itemPool_.Size() > 0 && difference > 10 && i < difference; i++)
+        itemPool_.Erase(itemPool_.Begin());
+
+    lastSize = currentSize;
+}
+
 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
 {
 {
     // If no worker threads, complete low-priority work here
     // If no worker threads, complete low-priority work here
@@ -297,6 +348,7 @@ void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
     }
     }
     
     
     PurgeCompleted();
     PurgeCompleted();
+    PurgePool();
 }
 }
 
 
 }
 }

+ 125 - 119
Source/Engine/Core/WorkQueue.h

@@ -1,120 +1,126 @@
-//
-// Copyright (c) 2008-2014 the Urho3D project.
-//
-// Permission is hereby granted, free of charge, to any person obtaining a copy
-// of this software and associated documentation files (the "Software"), to deal
-// in the Software without restriction, including without limitation the rights
-// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
-// copies of the Software, and to permit persons to whom the Software is
-// furnished to do so, subject to the following conditions:
-//
-// The above copyright notice and this permission notice shall be included in
-// all copies or substantial portions of the Software.
-//
-// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
-// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
-// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
-// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
-// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
-// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
-// THE SOFTWARE.
-//
-
-#pragma once
-
-#include "List.h"
-#include "Mutex.h"
-#include "Object.h"
-
-namespace Urho3D
-{
-
-/// Work item completed event.
-EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
-{
-    PARAM(P_ITEM, Item);                        // WorkItem ptr
-}
-
-class WorkerThread;
-
-/// Work queue item.
-struct WorkItem : public RefCounted
-{
-    // Construct
-    WorkItem() :
-        priority_(M_MAX_UNSIGNED),
-        sendEvent_(false),
-        completed_(false)
-    {
-    }
-    
-    /// Work function. Called with the work item and thread index (0 = main thread) as parameters.
-    void (*workFunction_)(const WorkItem*, unsigned);
-    /// Data start pointer.
-    void* start_;
-    /// Data end pointer.
-    void* end_;
-    /// Auxiliary data pointer.
-    void* aux_;
-    /// Priority. Higher value = will be completed first.
-    unsigned priority_;
-    /// Whether to send event on completion.
-    bool sendEvent_;
-    /// Completed flag.
-    volatile bool completed_;
-};
-
-/// Work queue subsystem for multithreading.
-class URHO3D_API WorkQueue : public Object
-{
-    OBJECT(WorkQueue);
-    
-    friend class WorkerThread;
-    
-public:
-    /// Construct.
-    WorkQueue(Context* context);
-    /// Destruct.
-    ~WorkQueue();
-    
-    /// Create worker threads. Can only be called once.
-    void CreateThreads(unsigned numThreads);
-    /// Add a work item and resume worker threads.
+//
+// Copyright (c) 2008-2014 the Urho3D project.
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+
+#pragma once
+
+#include "List.h"
+#include "Mutex.h"
+#include "Object.h"
+
+namespace Urho3D
+{
+
+/// Work item completed event.
+EVENT(E_WORKITEMCOMPLETED, WorkItemCompleted)
+{
+    PARAM(P_ITEM, Item);                        // WorkItem ptr
+}
+
+class WorkerThread;
+
+/// Work queue item.
+struct WorkItem : public RefCounted
+{
+    // Construct
+    WorkItem() :
+        priority_(M_MAX_UNSIGNED),
+        sendEvent_(false),
+        completed_(false)
+    {
+    }
+    
+    /// Work function. Called with the work item and thread index (0 = main thread) as parameters.
+    void (*workFunction_)(const WorkItem*, unsigned);
+    /// Data start pointer.
+    void* start_;
+    /// Data end pointer.
+    void* end_;
+    /// Auxiliary data pointer.
+    void* aux_;
+    /// Priority. Higher value = will be completed first.
+    unsigned priority_;
+    /// Whether to send event on completion.
+    bool sendEvent_;
+    /// Completed flag.
+    volatile bool completed_;
+};
+
+/// Work queue subsystem for multithreading.
+class URHO3D_API WorkQueue : public Object
+{
+    OBJECT(WorkQueue);
+    
+    friend class WorkerThread;
+    
+public:
+    /// Construct.
+    WorkQueue(Context* context);
+    /// Destruct.
+    ~WorkQueue();
+    
+    /// Create worker threads. Can only be called once.
+    void CreateThreads(unsigned numThreads);
+    /// Get next free WorkItem Ptr.
+    SharedPtr<WorkItem> GetFreeItem();
+    /// Add a work item and resume worker threads.
     void AddWorkItem(SharedPtr<WorkItem> item);
     void AddWorkItem(SharedPtr<WorkItem> item);
-    /// Pause worker threads.
-    void Pause();
-    /// Resume worker threads.
-    void Resume();
-    /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
-    void Complete(unsigned priority);
-    
-    /// Return number of worker threads.
-    unsigned GetNumThreads() const { return threads_.Size(); }
-    /// Return whether all work with at least the specified priority is finished.
-    bool IsCompleted(unsigned priority) const;
-    
-private:
-    /// Process work items until shut down. Called by the worker threads.
-    void ProcessItems(unsigned threadIndex);
-    /// Purge completed work items and send completion events as necessary.
-    void PurgeCompleted();
-    /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
-    void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
-    
-    /// Worker threads.
-    Vector<SharedPtr<WorkerThread> > threads_;
-    /// Work item collection. Accessed only by the main thread.
-    List<SharedPtr<WorkItem> > workItems_;
-    /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
-    List<WorkItem*> queue_;
-    /// Worker queue mutex.
-    Mutex queueMutex_;
-    /// Shutting down flag.
-    volatile bool shutDown_;
-    /// Pausing flag. Indicates the worker threads should not contend for the queue mutex.
-    volatile bool pausing_;
-    /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
-    bool paused_;
-};
-
-}
+    /// Pause worker threads.
+    void Pause();
+    /// Resume worker threads.
+    void Resume();
+    /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
+    void Complete(unsigned priority);
+    
+    /// Return number of worker threads.
+    unsigned GetNumThreads() const { return threads_.Size(); }
+    /// Return whether all work with at least the specified priority is finished.
+    bool IsCompleted(unsigned priority) const;
+    
+private:
+    /// Process work items until shut down. Called by the worker threads.
+    void ProcessItems(unsigned threadIndex);
+    /// Purge completed work items and send completion events as necessary.
+    void PurgeCompleted();
+    /// Purge the pool to reduce allocation where its unneeded.
+    void PurgePool();
+    /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
+    void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
+    
+    /// Worker threads.
+    Vector<SharedPtr<WorkerThread> > threads_;
+    /// Work item pool for reuse to cut down on allocation. The bool is a flag for item pooling and whether it is available or not.
+    HashMap<SharedPtr<WorkItem>, bool> itemPool_;
+    /// Work item collection. Accessed only by the main thread.
+    List<SharedPtr<WorkItem> > workItems_;
+    /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
+    List<WorkItem*> queue_;
+    /// Worker queue mutex.
+    Mutex queueMutex_;
+    /// Shutting down flag.
+    volatile bool shutDown_;
+    /// Pausing flag. Indicates the worker threads should not contend for the queue mutex.
+    volatile bool pausing_;
+    /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
+    bool paused_;
+};
+
+}

+ 2 - 2
Source/Engine/Graphics/Octree.cpp

@@ -410,7 +410,7 @@ void Octree::Update(const FrameInfo& frame)
         // Create a work item for each thread
         // Create a work item for each thread
         for (int i = 0; i < numWorkItems; ++i)
         for (int i = 0; i < numWorkItems; ++i)
         {
         {
-            SharedPtr<WorkItem> item(new WorkItem());
+            SharedPtr<WorkItem> item = queue->GetFreeItem();
             item->workFunction_ = UpdateDrawablesWork;
             item->workFunction_ = UpdateDrawablesWork;
             item->aux_ = const_cast<FrameInfo*>(&frame);
             item->aux_ = const_cast<FrameInfo*>(&frame);
 
 
@@ -529,7 +529,7 @@ void Octree::Raycast(RayOctreeQuery& query) const
             PODVector<Drawable*>::Iterator start = rayQueryDrawables_.Begin();
             PODVector<Drawable*>::Iterator start = rayQueryDrawables_.Begin();
             while (start != rayQueryDrawables_.End())
             while (start != rayQueryDrawables_.End())
             {
             {
-                SharedPtr<WorkItem> item(new WorkItem());
+                SharedPtr<WorkItem> item = queue->GetFreeItem();
                 item->workFunction_ = RaycastDrawablesWork;
                 item->workFunction_ = RaycastDrawablesWork;
                 item->aux_ = const_cast<Octree*>(this);
                 item->aux_ = const_cast<Octree*>(this);
 
 

+ 6 - 6
Source/Engine/Graphics/View.cpp

@@ -691,7 +691,7 @@ void View::GetDrawables()
         // Create a work item for each thread
         // Create a work item for each thread
         for (int i = 0; i < numWorkItems; ++i)
         for (int i = 0; i < numWorkItems; ++i)
         {
         {
-            SharedPtr<WorkItem> item(new WorkItem());
+            SharedPtr<WorkItem> item = queue->GetFreeItem();
             item->workFunction_ = CheckVisibilityWork;
             item->workFunction_ = CheckVisibilityWork;
             item->aux_ = this;
             item->aux_ = this;
 
 
@@ -764,7 +764,7 @@ void View::GetBatches()
         
         
         for (unsigned i = 0; i < lightQueryResults_.Size(); ++i)
         for (unsigned i = 0; i < lightQueryResults_.Size(); ++i)
         {
         {
-            SharedPtr<WorkItem> item(new WorkItem());
+            SharedPtr<WorkItem> item = queue->GetFreeItem();
             item->workFunction_ = ProcessLightWork;
             item->workFunction_ = ProcessLightWork;
             item->aux_ = this;
             item->aux_ = this;
 
 
@@ -1059,7 +1059,7 @@ void View::UpdateGeometries()
             {
             {
                 BatchQueue* passQueue = &batchQueues_[command.pass_];
                 BatchQueue* passQueue = &batchQueues_[command.pass_];
                 
                 
-                SharedPtr<WorkItem> item(new WorkItem());
+                SharedPtr<WorkItem> item = queue->GetFreeItem();
                 item->workFunction_ = command.sortMode_ == SORT_FRONTTOBACK ? SortBatchQueueFrontToBackWork : SortBatchQueueBackToFrontWork;
                 item->workFunction_ = command.sortMode_ == SORT_FRONTTOBACK ? SortBatchQueueFrontToBackWork : SortBatchQueueBackToFrontWork;
                 item->start_ = &batchQueues_[command.pass_];
                 item->start_ = &batchQueues_[command.pass_];
                 queue->AddWorkItem(item);
                 queue->AddWorkItem(item);
@@ -1068,14 +1068,14 @@ void View::UpdateGeometries()
         
         
         for (Vector<LightBatchQueue>::Iterator i = lightQueues_.Begin(); i != lightQueues_.End(); ++i)
         for (Vector<LightBatchQueue>::Iterator i = lightQueues_.Begin(); i != lightQueues_.End(); ++i)
         {
         {
-            SharedPtr<WorkItem> lightItem(new WorkItem());
+            SharedPtr<WorkItem> lightItem = queue->GetFreeItem();
             lightItem->workFunction_ = SortLightQueueWork;
             lightItem->workFunction_ = SortLightQueueWork;
             lightItem->start_ = &(*i);
             lightItem->start_ = &(*i);
             queue->AddWorkItem(lightItem);
             queue->AddWorkItem(lightItem);
 
 
             if (i->shadowSplits_.Size())
             if (i->shadowSplits_.Size())
             {
             {
-                SharedPtr<WorkItem> shadowItem(new WorkItem());
+                SharedPtr<WorkItem> shadowItem = queue->GetFreeItem();
                 shadowItem->workFunction_ = SortShadowQueueWork;
                 shadowItem->workFunction_ = SortShadowQueueWork;
                 shadowItem->start_ = &(*i);
                 shadowItem->start_ = &(*i);
                 queue->AddWorkItem(shadowItem);
                 queue->AddWorkItem(shadowItem);
@@ -1117,7 +1117,7 @@ void View::UpdateGeometries()
                 if (i < numWorkItems - 1 && end - start > drawablesPerItem)
                 if (i < numWorkItems - 1 && end - start > drawablesPerItem)
                     end = start + drawablesPerItem;
                     end = start + drawablesPerItem;
                 
                 
-                SharedPtr<WorkItem> item(new WorkItem());
+                SharedPtr<WorkItem> item = queue->GetFreeItem();
                 item->workFunction_ = UpdateDrawableGeometriesWork;
                 item->workFunction_ = UpdateDrawableGeometriesWork;
                 item->aux_ = const_cast<FrameInfo*>(&frame_);
                 item->aux_ = const_cast<FrameInfo*>(&frame_);
                 item->start_ = &(*start);
                 item->start_ = &(*start);