Browse Source

Exposed ability to set the tolerance before the pool will begin erasing items.
Modified the way in which pooled items are tracked to remove a number of O(n) searches.
Modified the check for duplicate items into an assert so its caught in debug but not in release.

Alex Parlett 12 years ago
parent
commit
884be1c22d
2 changed files with 34 additions and 24 deletions
  1. 19 22
      Source/Engine/Core/WorkQueue.cpp
  2. 15 2
      Source/Engine/Core/WorkQueue.h

+ 19 - 22
Source/Engine/Core/WorkQueue.cpp

@@ -66,7 +66,8 @@ WorkQueue::WorkQueue(Context* context) :
     Object(context),
     Object(context),
     shutDown_(false),
     shutDown_(false),
     pausing_(false),
     pausing_(false),
-    paused_(false)
+    paused_(false),
+    tolerance_(10)
 {
 {
     SubscribeToEvent(E_BEGINFRAME, HANDLER(WorkQueue, HandleBeginFrame));
     SubscribeToEvent(E_BEGINFRAME, HANDLER(WorkQueue, HandleBeginFrame));
 }
 }
@@ -101,29 +102,25 @@ void WorkQueue::CreateThreads(unsigned numThreads)
 
 
 SharedPtr<WorkItem> WorkQueue::GetFreeItem()
 SharedPtr<WorkItem> WorkQueue::GetFreeItem()
 {
 {
-    // Check for the next usable item
-    HashMap<SharedPtr<WorkItem>, bool>::Iterator i = itemPool_.Begin();
-    for (; i != itemPool_.End(); i++)
+    if (poolItems_.Size() > 0)
     {
     {
-        // If available set it to in use and return it
-        if (i->second_)
-        {
-            i->second_ = false;
-            return i->first_;
-        }
+        SharedPtr<WorkItem> item = poolItems_.Front();
+        poolItems_.PopFront();
+        return item;
+    }
+    else
+    {
+        // No usable items found, create a new one set it as pooled and return it.
+        SharedPtr<WorkItem> item(new WorkItem());
+        item->pooled_ = true;
+        return item;
     }
     }
-
-    // No usable items found, create a new one add it to queue as in use and return it
-    SharedPtr<WorkItem> item(new WorkItem());
-    itemPool_[item] = false;
-    return item;
 }
 }
 
 
 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
 void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
 {
 {
     // Check for duplicate items.
     // Check for duplicate items.
-    if (workItems_.Contains(item))
-        return;
+    assert(!workItems_.Contains(item));
 
 
     // Push to the main thread list to keep item alive
     // Push to the main thread list to keep item alive
     // Clear completed flag in case item is reused
     // Clear completed flag in case item is reused
@@ -292,7 +289,7 @@ void WorkQueue::PurgeCompleted()
             }
             }
 
 
             // Check if this was a pooled item and set it to usable
             // Check if this was a pooled item and set it to usable
-            if (itemPool_.Contains(*i))
+            if ((*i)->pooled_)
             {
             {
                 // Reset the values to their defaults. This should 
                 // Reset the values to their defaults. This should 
                 // be safe to do here as the completed event has 
                 // be safe to do here as the completed event has 
@@ -306,7 +303,7 @@ void WorkQueue::PurgeCompleted()
                 (*i)->sendEvent_ = false;
                 (*i)->sendEvent_ = false;
                 (*i)->completed_ = false;
                 (*i)->completed_ = false;
 
 
-                itemPool_[*i] = true;
+                poolItems_.Push(*i);
             }
             }
 
 
             i = workItems_.Erase(i);
             i = workItems_.Erase(i);
@@ -319,12 +316,12 @@ void WorkQueue::PurgeCompleted()
 void WorkQueue::PurgePool()
 void WorkQueue::PurgePool()
 {
 {
     static unsigned int lastSize = 0;
     static unsigned int lastSize = 0;
-    unsigned int currentSize = itemPool_.Size();
+    unsigned int currentSize = poolItems_.Size();
     int difference = lastSize - currentSize;
     int difference = lastSize - currentSize;
 
 
     // Difference tolerance, should be fairly significant to reduce the pool size.
     // Difference tolerance, should be fairly significant to reduce the pool size.
-    for (unsigned i = 0; itemPool_.Size() > 0 && difference > 10 && i < difference; i++)
-        itemPool_.Erase(itemPool_.Begin());
+    for (unsigned i = 0; poolItems_.Size() > 0 && difference > tolerance_ && i < difference; i++)
+        poolItems_.PopFront();
 
 
     lastSize = currentSize;
     lastSize = currentSize;
 }
 }

+ 15 - 2
Source/Engine/Core/WorkQueue.h

@@ -40,11 +40,15 @@ class WorkerThread;
 /// Work queue item.
 /// Work queue item.
 struct WorkItem : public RefCounted
 struct WorkItem : public RefCounted
 {
 {
+    friend class WorkQueue;
+
+public:
     // Construct
     // Construct
     WorkItem() :
     WorkItem() :
         priority_(M_MAX_UNSIGNED),
         priority_(M_MAX_UNSIGNED),
         sendEvent_(false),
         sendEvent_(false),
-        completed_(false)
+        completed_(false),
+        pooled_(false)
     {
     {
     }
     }
     
     
@@ -62,6 +66,9 @@ struct WorkItem : public RefCounted
     bool sendEvent_;
     bool sendEvent_;
     /// Completed flag.
     /// Completed flag.
     volatile bool completed_;
     volatile bool completed_;
+
+private:
+    bool pooled_;
 };
 };
 
 
 /// Work queue subsystem for multithreading.
 /// Work queue subsystem for multithreading.
@@ -89,11 +96,15 @@ public:
     void Resume();
     void Resume();
     /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
     /// Finish all queued work which has at least the specified priority. Main thread will also execute priority work. Pause worker threads if no more work remains.
     void Complete(unsigned priority);
     void Complete(unsigned priority);
+    /// Set the pool telerance before it starts deleting pool items.
+    void SetTolerance(int tolerance) { tolerance_ = tolerance; }
     
     
     /// Return number of worker threads.
     /// Return number of worker threads.
     unsigned GetNumThreads() const { return threads_.Size(); }
     unsigned GetNumThreads() const { return threads_.Size(); }
     /// Return whether all work with at least the specified priority is finished.
     /// Return whether all work with at least the specified priority is finished.
     bool IsCompleted(unsigned priority) const;
     bool IsCompleted(unsigned priority) const;
+    /// Return the pool tolerance.
+    int GetTolerance() const { return tolerance_; }
     
     
 private:
 private:
     /// Process work items until shut down. Called by the worker threads.
     /// Process work items until shut down. Called by the worker threads.
@@ -108,7 +119,7 @@ private:
     /// Worker threads.
     /// Worker threads.
     Vector<SharedPtr<WorkerThread> > threads_;
     Vector<SharedPtr<WorkerThread> > threads_;
     /// Work item pool for reuse to cut down on allocation. The bool is a flag for item pooling and whether it is available or not.
     /// Work item pool for reuse to cut down on allocation. The bool is a flag for item pooling and whether it is available or not.
-    HashMap<SharedPtr<WorkItem>, bool> itemPool_;
+    List<SharedPtr<WorkItem> > poolItems_;
     /// Work item collection. Accessed only by the main thread.
     /// Work item collection. Accessed only by the main thread.
     List<SharedPtr<WorkItem> > workItems_;
     List<SharedPtr<WorkItem> > workItems_;
     /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
     /// Work item prioritized queue for worker threads. Pointers are guaranteed to be valid (point to workItems.)
@@ -121,6 +132,8 @@ private:
     volatile bool pausing_;
     volatile bool pausing_;
     /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
     /// Paused flag. Indicates the queue mutex being locked to prevent worker threads using up CPU time.
     bool paused_;
     bool paused_;
+    /// Tolerance for the shared pool before it begins to deallocate.
+    int tolerance_;
 };
 };
 
 
 }
 }