2
0
Эх сурвалжийг харах

Add WorkQueue functions to remove items (single or multiple) before being assigned to worker threads. Closes #690.

Lasse Öörni 10 жил өмнө
parent
commit
3ca9b7d499

+ 71 - 18
Source/Urho3D/Core/WorkQueue.cpp

@@ -159,6 +159,55 @@ void WorkQueue::AddWorkItem(SharedPtr<WorkItem> item)
     }
     }
 }
 }
 
 
+bool WorkQueue::RemoveWorkItem(SharedPtr<WorkItem> item)
+{
+    if (!item)
+        return false;
+
+    MutexLock lock(queueMutex_);
+    
+    // Can only remove successfully if the item was not yet taken by threads for execution
+    List<WorkItem*>::Iterator i = queue_.Find(item.Get());
+    if (i != queue_.End())
+    {
+        List<SharedPtr<WorkItem> >::Iterator j = workItems_.Find(item);
+        if (j != workItems_.End())
+        {
+            queue_.Erase(i);
+            ReturnToPool(item);
+            workItems_.Erase(j);
+            return true;
+        }
+    }
+
+    return false;
+}
+
+unsigned WorkQueue::RemoveWorkItems(const Vector<SharedPtr<WorkItem> >& items)
+{
+    MutexLock lock(queueMutex_);
+    unsigned removed = 0;
+
+    for (Vector<SharedPtr<WorkItem> >::ConstIterator i = items.Begin(); i != items.End(); ++i)
+    {
+        List<WorkItem*>::Iterator j = queue_.Find(i->Get());
+        if (j != queue_.End())
+        {
+            List<SharedPtr<WorkItem> >::Iterator k = workItems_.Find(*i);
+            if (k != workItems_.End())
+            {
+                queue_.Erase(j);
+                ReturnToPool(*k);
+                workItems_.Erase(k);
+                ++removed;
+            }
+        }
+    }
+
+    return removed;
+}
+
+
 void WorkQueue::Pause()
 void WorkQueue::Pause()
 {
 {
     if (!paused_)
     if (!paused_)
@@ -295,24 +344,7 @@ void WorkQueue::PurgeCompleted(unsigned priority)
                 SendEvent(E_WORKITEMCOMPLETED, eventData);
                 SendEvent(E_WORKITEMCOMPLETED, eventData);
             }
             }
 
 
-            // Check if this was a pooled item and set it to usable
-            if ((*i)->pooled_)
-            {
-                // Reset the values to their defaults. This should 
-                // be safe to do here as the completed event has 
-                // already been handled and this is part of the 
-                // internal pool.
-                (*i)->start_ = NULL;
-                (*i)->end_ = NULL;
-                (*i)->aux_ = NULL;
-                (*i)->workFunction_ = NULL;
-                (*i)->priority_ = M_MAX_UNSIGNED;
-                (*i)->sendEvent_ = false;
-                (*i)->completed_ = false;
-
-                poolItems_.Push(*i);
-            }
-
+            ReturnToPool(*i);
             i = workItems_.Erase(i);
             i = workItems_.Erase(i);
         }
         }
         else
         else
@@ -332,6 +364,27 @@ void WorkQueue::PurgePool()
     lastSize_ = currentSize;
     lastSize_ = currentSize;
 }
 }
 
 
+void WorkQueue::ReturnToPool(SharedPtr<WorkItem>& item)
+{
+    // Check if this was a pooled item and set it to usable
+    if (item->pooled_)
+    {
+        // Reset the values to their defaults. This should 
+        // be safe to do here as the completed event has
+        // already been handled and this is part of the
+        // internal pool.
+        item->start_ = 0;
+        item->end_ = 0;
+        item->aux_ = 0;
+        item->workFunction_ = 0;
+        item->priority_ = M_MAX_UNSIGNED;
+        item->sendEvent_ = false;
+        item->completed_ = false;
+
+        poolItems_.Push(item);
+    }
+}
+
 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
 void WorkQueue::HandleBeginFrame(StringHash eventType, VariantMap& eventData)
 {
 {
     // If no worker threads, complete low-priority work here
     // If no worker threads, complete low-priority work here

+ 6 - 0
Source/Urho3D/Core/WorkQueue.h

@@ -90,6 +90,10 @@ public:
     SharedPtr<WorkItem> GetFreeItem();
     SharedPtr<WorkItem> GetFreeItem();
     /// Add a work item and resume worker threads.
     /// Add a work item and resume worker threads.
     void AddWorkItem(SharedPtr<WorkItem> item);
     void AddWorkItem(SharedPtr<WorkItem> item);
+    /// Remove a work item before it has started executing. Return true if successfully removed.
+    bool RemoveWorkItem(SharedPtr<WorkItem> item);
+    /// Remove a number of work items before they have started executing. Return the number of items successfully removed.
+    unsigned RemoveWorkItems(const Vector<SharedPtr<WorkItem> >& items);
     /// Pause worker threads.
     /// Pause worker threads.
     void Pause();
     void Pause();
     /// Resume worker threads.
     /// Resume worker threads.
@@ -117,6 +121,8 @@ private:
     void PurgeCompleted(unsigned priority);
     void PurgeCompleted(unsigned priority);
     /// Purge the pool to reduce allocation where its unneeded.
     /// Purge the pool to reduce allocation where its unneeded.
     void PurgePool();
     void PurgePool();
+    /// Return a work item to the pool.
+    void ReturnToPool(SharedPtr<WorkItem>& item);
     /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
     /// Handle frame start event. Purge completed work from the main thread queue, and perform work if no threads at all.
     void HandleBeginFrame(StringHash eventType, VariantMap& eventData);
     void HandleBeginFrame(StringHash eventType, VariantMap& eventData);