//----------------------------------------------------------------------------- // Copyright (c) 2012 GarageGames, LLC // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to // deal in the Software without restriction, including without limitation the // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or // sell copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS // IN THE SOFTWARE. //----------------------------------------------------------------------------- #ifndef _THREADSAFEPRIORITYQUEUE_H_ #define _THREADSAFEPRIORITYQUEUE_H_ #ifndef _PLATFORMINTRINSICS_H_ #include "platform/platformIntrinsics.h" #endif #ifndef _THREADSAFEREFCOUNT_H_ #include "platform/threads/threadSafeRefCount.h" #endif #ifndef _TYPETRAITS_H_ #include "platform/typetraits.h" #endif // Disable TMM's new operator grabbing. #include "platform/tmm_off.h" //#define DEBUG_SPEW /// @file /// Template code for an efficient thread-safe priority queue /// implementation. There are two alternative implementations to /// choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate /// where the latter adds concurrent status updates of queue items to /// the former implementation. //-------------------------------------------------------------------------- // ThreadSafePriorityQueue. //-------------------------------------------------------------------------- /// Fast, lock-free priority queue implementation for concurrent access. /// /// Equal priorities are allowed and are placed before existing items of /// identical priority in the queue. /// /// Based on (but with significant deviations from) "Fast and Lock-Free Concurrent /// Priority Queues for Multi-Thread Systems" by Hakan Sundell and Philippas Tsigas. /// Parts of the skiplist code is based on work by William Pugh. /// /// @param T The item value type. Must have a default constructor. /// @param K The priority key type. Must be comparable, have a default constructor, /// and be a valid template parameter to TypeTraits. /// @param SORT_MIN_TO_MAX If true, the queue sorts from minimum to maximum priority or /// the reverse if false. /// @param MAX_LEVEL The number of levels a node can have at most. /// @param PROBABILISTIC_BIAS The probabilistic level distribution factor for /// the skiplist. Multiplied by 100 and turned into int to conform to restrictions /// on non-type template parameters. /// /// @see TypeTraits template< typename T, typename K = F32, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 > struct ThreadSafePriorityQueue { typedef T ValueType; typedef K KeyType; enum { MAX_LEVEL_CONST = MAX_LEVEL }; ThreadSafePriorityQueue(); bool isEmpty(); void insert( KeyType priority, const T& value ); bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) ); protected: struct Node; typedef ThreadSafeRef< Node > NodePtr; friend class ThreadSafeRefCount< Node >; friend struct DeleteSingle; /// A queue node. /// /// Nodes are reference-counted to coordinate memory management /// between the different threads. Reclamation happens on the /// thread that releases the last reference. /// /// Reference-counting and deletion requests are kept separate. /// A given node is marked for deletion and will then have its references /// progressively disappear and eventually be reclaimed once the /// reference count drops to zero. /// /// Note that 'Next' references are released by the destructor which /// is only called when the reference count to the node itself drops to /// zero. This is to avoid threads getting trapped in a node with no /// link out. struct Node : public ThreadSafeRefCount< Node > { typedef ThreadSafeRefCount< Node > Parent; Node( KeyType priority, const ValueType& value ); ~Node(); KeyType getPriority() { return mPriority; } ValueType& getValue() { return mValue; } U32 getLevel(); NodePtr& getNext( U32 level ); bool isMarkedForDeletion(); bool tryMarkForDeletion(); void clearValue() { mValue = ValueType(); } static U32 randomLevel(); void* operator new( size_t size, S32 level = -1 ); void operator delete( void* ptr ); private: KeyType mPriority; ///< Priority key. U32 mLevel; ///< Level count and deletion bit (highest). ValueType mValue; Node* mNext[ 1 ]; ///< Variable-sized array of next pointers. struct FreeList { bool mDestroyed; Node* mNodes; ~FreeList(); }; static FreeList smFreeLists[ MAX_LEVEL ]; }; NodePtr mHead; ///< Artificial head node. NodePtr mTail; ///< Artificial tail node. void readNext( NodePtr& refPrev, NodePtr& refNext, U32 level ); void scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ); void scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ); void insert( KeyType priority, const T& value, NodePtr& outResult ); void helpDelete(); }; template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::smFreeLists[ MAX_LEVEL ]; /// Construct an empty queue. /// /// Internally, this creates a head node with maximal priority and a tail node with minimal priority, /// both at maximum level. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueue() { NodePtr::unsafeWrite( mHead, new ( MAX_LEVEL - 1 ) Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MIN : TypeTraits< KeyType >::MAX, ValueType() ) ); NodePtr::unsafeWrite( mTail, new ( MAX_LEVEL - 1 ) Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN, ValueType() ) ); for( U32 level = 0; level < MAX_LEVEL; level ++ ) mHead->getNext( level ) = mTail; } /// Return true if the queue does not currently contain an item. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::isEmpty() { return ( mHead->getNext( 0 ) == mTail ); } /// Insert the given value into the queue at the place determined by the given priority. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value ) { NodePtr result; insert( priority, value, result ); } template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value, NodePtr& outResult ) { // Create a new node at a random level. outResult = NULL; NodePtr::unsafeWrite( outResult, new Node( priority, value ) ); U32 resultNodeLevel = outResult->getLevel(); // Link up all the levels. Do this bottom-up instead of // top-down (as would be the right way for a skiplist) so // that our list state always remains valid. If going top-down, // we'll insert nodes with NULL pointers at their lower levels. U32 currentLevel = 0; do { while( 1 ) { NodePtr nextNode; NodePtr prevNode; scanFromHead( prevNode, nextNode, currentLevel, priority ); outResult->getNext( currentLevel ) = nextNode; if( prevNode->getNext( currentLevel ).trySetFromTo( nextNode, outResult, NodePtr::TAG_FailIfSet ) ) break; else outResult->getNext( currentLevel ) = 0; } currentLevel ++; } while( currentLevel <= resultNodeLevel && !outResult->isMarkedForDeletion() ); // No point linking up remaining levels if another thread already took this node. } /// Take the item with the highest priority from the queue. /// /// @param outValue Reference to where the resulting value should be stored. /// @param upToPriority Priority limit (inclusive) up to which items are taken from the queue. /// @return true if there was a matching item in the queue. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority ) { // Iterate through to the first unmarked node. NodePtr prevNode = mHead; while( 1 ) { NodePtr node; readNext( prevNode, node, 0 ); if( node == mTail ) return false; // End reached. bool priorityThresholdReached = SORT_MIN_TO_MAX ? ( upToPriority >= node->getPriority() ) : ( upToPriority <= node->getPriority() ); if( !priorityThresholdReached ) return false; else { // Try to mark the node for deletion. Only if that succeeds, taking the // node was a success and we can return. If it fails, spin and try again. if( node->tryMarkForDeletion() ) { helpDelete(); // Node is now off the list and will disappear as soon as // all references held by threads (including this one) // go out of scope. outValue = node->getValue(); node->clearValue(); return true; } } } } /// Update the given references to the next non-deleted node at the given level. /// refPrev will be updated to reference the immediate predecessor of the next /// node returned. Note that this can be a node in deleted state. /// /// @param refPrev Reference to a node of which the successor node should be /// returned. Updated to immediate predecessor of refNext on return. /// @param refNext Reference to update to refer to next non-deleted node on /// the given level. /// @param level Skiplist level to operate on. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::readNext( NodePtr& refPrev, NodePtr& refNext, U32 level ) { while( 1 ) { refNext = refPrev->getNext( level ); AssertFatal( refNext != NULL, "ThreadSafePriorityQueue::readNext() - next is NULL" ); if( !refNext->isMarkedForDeletion() || refNext == mTail ) break; refPrev = refNext; } } /// Scan for the position at which to insert a node of the given priority. /// Upon return, the position between refPrev and refNext is the one to insert at. /// /// @param refPrev position at which to start scanning; updated to match insert position. /// @param refNext template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ) { while( 1 ) { readNext( refPrev, refNext, level ); if( refNext == mTail || ( SORT_MIN_TO_MAX ? ( refNext->getPriority() > priority ) : ( refNext->getPriority() < priority ) ) ) break; refPrev = refNext; } } /// template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority ) { // Purge dead nodes at left end of queue so // we don't get stuck hitting the same node // in deletable state over and over again. helpDelete(); S32 currentLevel = MAX_LEVEL - 1; refPrev = mHead; do { scan( refPrev, refNext, currentLevel, priority ); currentLevel --; } while( currentLevel >= S32( level ) ); } template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::helpDelete() { // Clean out all the references from head. // Spin over a given reference on each level until head // clearly refers to a node in non-deletable state. This // makes this code work cooperatively with other threads // doing takeNexts on prior or later nodes while also // guaranteeing that all next pointers to us will eventually // disappear. // // Note that this is *the only place* where we will be cleaning // out our lists. S32 level = MAX_LEVEL - 1; do { while( 1 ) { NodePtr ptr = mHead->getNext( level ); if( !ptr->isMarkedForDeletion() ) break; else { NodePtr& next = ptr->getNext( level ); next.setTag(); mHead->getNext( level ).trySetFromTo( ptr, next, NodePtr::TAG_Unset ); AssertFatal( next->getRefCount() >= 2, "ThreadSafePriorityQueue::helpDelete() - invalid refcount" ); } } level --; } while( level >= 0 ); } template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > inline ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::Node( KeyType priority, const ValueType& value ) : Parent( false ), mPriority( priority ), mValue( value ) { dMemset( mNext, 0, sizeof( Node* ) * ( getLevel() + 1 ) ); // Level is already set by the allocation routines. } template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::~Node() { for( U32 level = 0; level < ( getLevel() + 1 ); level ++ ) getNext( level ) = NULL; } /// Return the skip list level the node is at. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > inline U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getLevel() { // Mask out the deletion request bit. return ( mLevel & 0x7FFFFFFF ); } /// Return the successor node at the given level. /// @param level The level of the desired successor node; must be within the node's level bounds. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > inline typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::NodePtr& ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getNext( U32 level ) { return *reinterpret_cast< NodePtr* >( &mNext[ level ] ); } /// Return true if the node is marked to be deleted. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::isMarkedForDeletion() { return ( mLevel & 0x80000000 ); } /// Attempt to mark the node for deletion. If the mark bit has not yet been set /// and setting it on the current thread succeeds, returns true. /// /// @return true, if the marking succeeded. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::tryMarkForDeletion() { U32 oldVal = mLevel & 0x7FFFFFFF; U32 newVal = oldVal | 0x80000000; return ( dCompareAndSwap( mLevel, oldVal, newVal ) ); } /// Choose a random level. /// /// The chosen level depends on the given PROBABILISTIC_BIAS and MAX_LEVEL, /// but is not affected by the actual number of nodes in a queue. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::randomLevel() { U32 level = 0; while( Platform::getRandom() < ( ( ( F32 ) PROBABILISTIC_BIAS ) / 100 ) && level < ( MAX_LEVEL - 1 ) ) level ++; return level; } /// Allocate a new node. /// The node comes with a reference count of one and its level already set. /// /// @param level The level to allocate the node at. If this is -1, a random level is chosen. /// @return a new node. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void* ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator new( size_t size, S32 level ) { if( level == -1 ) level = randomLevel(); Node* node = 0; while( 1 ) { // Try to take a node from the freelist. If there's none, // allocate a new one. if( !smFreeLists[ level ].mDestroyed ) node = Node::safeRead( smFreeLists[ level ].mNodes ); if( !node ) { node = ( Node* ) dMalloc( sizeof( Node ) + sizeof( Node* ) * level ); dMemset( node, 0, sizeof( Node ) ); node->mLevel = level; node->addRef(); break; } else if( dCompareAndSwap( smFreeLists[ level ].mNodes, node, node->mNext[ 0 ] ) ) { node->clearLowestBit(); break; } else node->release(); // Other thread was quicker than us; release. } AssertFatal( node->getRefCount() != 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" ); AssertFatal( ( node->getRefCount() % 2 ) == 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" ); return node; } /// Reclaim a node. /// /// @param node The node to reclaim. Must refer to a Node instance. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator delete( void* ptr ) { //TODO: limit number of nodes kept Node* node = ( Node* ) ptr; U32 level = node->getLevel(); node->mLevel = level; // Reset the node's deletion bit. while( !smFreeLists[ level ].mDestroyed ) { // Put the node on the freelist. Node* freeList = smFreeLists[ level ].mNodes; node->mNext[ 0 ] = freeList; if( dCompareAndSwap( smFreeLists[ level ].mNodes, freeList, node ) ) { node = NULL; break; } } if( node ) dFree( node ); } template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList::~FreeList() { mDestroyed = true; while( mNodes ) { //FIXME: could leak some bytes under unfortunate circumstances (this in // combination with mDestroyed is a dependent write) Node* next = mNodes; if( dCompareAndSwap( mNodes, next, next->mNext[ 0 ] ) ) dFree( next ); } } //-------------------------------------------------------------------------- // ThreadSafePriorityQueueWithUpdate. //-------------------------------------------------------------------------- /// Fast, lock-free priority queue implementation for concurrent access that /// performs dynamic re-prioritization of items. /// /// Within the bounds of a set update interval UPDATE_INTERVAL, the takeNext /// method is guaranteed to always return the item that has the highest priority /// at the time the method is called rather than at the time items were inserted /// into the queue. /// /// Values placed on the queue must implement the following interface: /// /// @code /// template< typename K > /// struct IThreadSafePriorityQueueItem /// { /// // Default constructor. /// IThreadSafePriorityQueueItem(); /// /// // Return the current priority. /// // This must run normally even if the item is already dead. /// K getPriority(); /// /// // Return true if the item is still meant to be waiting in the queue. /// bool isAlive(); /// }; /// @endcode template< typename T, typename K, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 > struct ThreadSafePriorityQueueWithUpdate : public ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS > { typedef T ValueType; typedef K KeyType; enum { DEFAULT_UPDATE_INTERVAL = 256 }; ThreadSafePriorityQueueWithUpdate( U32 updateInterval = DEFAULT_UPDATE_INTERVAL ); void insert( KeyType priority, const T& value ); bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) ); U32 getUpdateInterval() const; void setUpdateInterval( U32 value ); KeyType getTimeBasedPriorityBoost() const; void setTimeBasedPriorityBoost( KeyType value ); void updatePriorities(); protected: typedef ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS > Parent; typedef U32 TickType; typedef typename Parent::NodePtr NodePtr; U32 mUpdateInterval; KeyType mPriorityBoost; ///< If this is non-zero, priorities will be boosted by this amount each update. This can be used to prevent constant high-priority inserts to starve low-priority items already in the queue. /// Work queue for node updates. ThreadSafePriorityQueue< NodePtr, TickType, true, MAX_LEVEL, PROBABILISTIC_BIAS > mUpdateQueue; TickType getTick() { return Platform::getRealMilliseconds(); } }; template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueueWithUpdate( U32 updateInterval ) : mUpdateInterval( updateInterval ), mPriorityBoost( TypeTraits< KeyType >::ZERO ) { } /// Return the current update interval in milliseconds. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > U32 ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getUpdateInterval() const { return mUpdateInterval; } /// Set update interval of queue to given value. /// /// Call this method on the main thread only. /// /// @param value Time between priority updates in milliseconds. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setUpdateInterval( U32 value ) { mUpdateInterval = value; } /// Return the delta to apply to priorities on each update. /// Set to zero to deactivate time-based priority adjustments. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > K ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getTimeBasedPriorityBoost() const { return mPriorityBoost; } /// Set the delta for time-based priority adjustments to the given value. /// /// Call this method on the main thread only. /// /// @param value The new priority adjustment value. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setTimeBasedPriorityBoost( KeyType value ) { mPriorityBoost = value; } template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value ) { NodePtr node; Parent::insert( priority, value, node ); mUpdateQueue.insert( getTick() + getUpdateInterval(), node ); } template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > bool ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority ) { updatePriorities(); bool result = false; do { result = Parent::takeNext( outValue, upToPriority ); } while( result && !outValue.isAlive() ); return result; } /// template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS > void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::updatePriorities() { TickType currentTime = getTick(); U32 numNodesUpdated = 0; U32 numNodesDead = 0; U32 numNodesChanged = 0; NodePtr node; while( mUpdateQueue.takeNext( node, currentTime ) ) { numNodesUpdated ++; // Since we're updating nodes on the update queue only periodically, // their associated values or main queue nodes may have died in the // meantime. If so, we just discard them here. if( node->getValue().isAlive() && !node->isMarkedForDeletion() ) { KeyType newPriority = node->getValue().getPriority() + getTimeBasedPriorityBoost(); if( newPriority != node->getPriority() ) { // Node is outdated. Reinsert with new priority and mark the // old node for deletion. insert( newPriority, node->getValue() ); node->tryMarkForDeletion(); numNodesChanged ++; } else { // Node is still current. Just move to end. mUpdateQueue.insert( currentTime + getUpdateInterval(), node ); } } else numNodesDead ++; } #ifdef DEBUG_SPEW if( numNodesUpdated ) Platform::outputDebugString( "[ThreadSafePriorityQueueWithUpdate] updated %i nodes (%i changed, %i dead)", numNodesUpdated, numNodesChanged, numNodesDead ); #endif } // Re-enable TMM if necessary. #include "platform/tmm_on.h" #undef DEBUG_SPEW #endif // !_THREADSAFEPRIORITYQUEUE_H_