123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740 |
- //-----------------------------------------------------------------------------
- // Copyright (c) 2012 GarageGames, LLC
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to
- // deal in the Software without restriction, including without limitation the
- // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- // sell copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- // IN THE SOFTWARE.
- //-----------------------------------------------------------------------------
- #ifndef _THREADSAFEPRIORITYQUEUE_H_
- #define _THREADSAFEPRIORITYQUEUE_H_
- #ifndef _PLATFORMINTRINSICS_H_
- #include "platform/platformIntrinsics.h"
- #endif
- #ifndef _THREADSAFEREFCOUNT_H_
- #include "platform/threads/threadSafeRefCount.h"
- #endif
- #ifndef _TYPETRAITS_H_
- #include "platform/typetraits.h"
- #endif
- // Disable TMM's new operator grabbing.
- #include "platform/tmm_off.h"
- //#define DEBUG_SPEW
- /// @file
- /// Template code for an efficient thread-safe priority queue
- /// implementation. There are two alternative implementations to
- /// choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate
- /// where the latter adds concurrent status updates of queue items to
- /// the former implementation.
- //--------------------------------------------------------------------------
- // ThreadSafePriorityQueue.
- //--------------------------------------------------------------------------
- /// Fast, lock-free priority queue implementation for concurrent access.
- ///
- /// Equal priorities are allowed and are placed <em>before</em> existing items of
- /// identical priority in the queue.
- ///
- /// Based on (but with significant deviations from) "Fast and Lock-Free Concurrent
- /// Priority Queues for Multi-Thread Systems" by Hakan Sundell and Philippas Tsigas.
- /// Parts of the skiplist code is based on work by William Pugh.
- ///
- /// @param T The item value type. Must have a default constructor.
- /// @param K The priority key type. Must be comparable, have a default constructor,
- /// and be a valid template parameter to TypeTraits.
- /// @param SORT_MIN_TO_MAX If true, the queue sorts from minimum to maximum priority or
- /// the reverse if false.
- /// @param MAX_LEVEL The number of levels a node can have at most.
- /// @param PROBABILISTIC_BIAS The probabilistic level distribution factor for
- /// the skiplist. Multiplied by 100 and turned into int to conform to restrictions
- /// on non-type template parameters.
- ///
- /// @see TypeTraits
- template< typename T, typename K = F32, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
- struct ThreadSafePriorityQueue
- {
- typedef T ValueType;
- typedef K KeyType;
- enum { MAX_LEVEL_CONST = MAX_LEVEL };
- ThreadSafePriorityQueue();
- bool isEmpty();
- void insert( KeyType priority, const T& value );
- bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
- protected:
- struct Node;
- typedef ThreadSafeRef< Node > NodePtr;
- friend class ThreadSafeRefCount< Node >;
- friend struct DeleteSingle;
-
- /// A queue node.
- ///
- /// Nodes are reference-counted to coordinate memory management
- /// between the different threads. Reclamation happens on the
- /// thread that releases the last reference.
- ///
- /// Reference-counting and deletion requests are kept separate.
- /// A given node is marked for deletion and will then have its references
- /// progressively disappear and eventually be reclaimed once the
- /// reference count drops to zero.
- ///
- /// Note that 'Next' references are released by the destructor which
- /// is only called when the reference count to the node itself drops to
- /// zero. This is to avoid threads getting trapped in a node with no
- /// link out.
- struct Node : public ThreadSafeRefCount< Node >
- {
- typedef ThreadSafeRefCount< Node > Parent;
- Node( KeyType priority, const ValueType& value );
- ~Node();
- KeyType getPriority() { return mPriority; }
- ValueType& getValue() { return mValue; }
- U32 getLevel();
- NodePtr& getNext( U32 level );
- bool isMarkedForDeletion();
- bool tryMarkForDeletion();
-
- void clearValue() { mValue = ValueType(); }
- static U32 randomLevel();
- void* operator new( size_t size, S32 level = -1 );
- void operator delete( void* ptr );
- private:
- KeyType mPriority; ///< Priority key.
- U32 mLevel; ///< Level count and deletion bit (highest).
- ValueType mValue;
- Node* mNext[ 1 ]; ///< Variable-sized array of next pointers.
- struct FreeList
- {
- bool mDestroyed;
- Node* mNodes;
- ~FreeList();
- };
- static FreeList smFreeLists[ MAX_LEVEL ];
- };
- NodePtr mHead; ///< Artificial head node.
- NodePtr mTail; ///< Artificial tail node.
- void readNext( NodePtr& refPrev, NodePtr& refNext, U32 level );
- void scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
- void scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
- void insert( KeyType priority, const T& value, NodePtr& outResult );
- void helpDelete();
- };
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::smFreeLists[ MAX_LEVEL ];
- /// Construct an empty queue.
- ///
- /// Internally, this creates a head node with maximal priority and a tail node with minimal priority,
- /// both at maximum level.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueue()
- {
- NodePtr::unsafeWrite( mHead, new ( MAX_LEVEL - 1 )
- Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MIN : TypeTraits< KeyType >::MAX, ValueType() ) );
- NodePtr::unsafeWrite( mTail, new ( MAX_LEVEL - 1 )
- Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN, ValueType() ) );
- for( U32 level = 0; level < MAX_LEVEL; level ++ )
- mHead->getNext( level ) = mTail;
- }
- /// Return true if the queue does not currently contain an item.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::isEmpty()
- {
- return ( mHead->getNext( 0 ) == mTail );
- }
- /// Insert the given value into the queue at the place determined by the given priority.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
- {
- NodePtr result;
- insert( priority, value, result );
- }
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value, NodePtr& outResult )
- {
- // Create a new node at a random level.
- outResult = NULL;
- NodePtr::unsafeWrite( outResult, new Node( priority, value ) );
- U32 resultNodeLevel = outResult->getLevel();
- // Link up all the levels. Do this bottom-up instead of
- // top-down (as would be the right way for a skiplist) so
- // that our list state always remains valid. If going top-down,
- // we'll insert nodes with NULL pointers at their lower levels.
- U32 currentLevel = 0;
- do
- {
- while( 1 )
- {
- NodePtr nextNode;
- NodePtr prevNode;
-
- scanFromHead( prevNode, nextNode, currentLevel, priority );
- outResult->getNext( currentLevel ) = nextNode;
- if( prevNode->getNext( currentLevel ).trySetFromTo( nextNode, outResult, NodePtr::TAG_FailIfSet ) )
- break;
- else
- outResult->getNext( currentLevel ) = 0;
- }
- currentLevel ++;
- }
- while( currentLevel <= resultNodeLevel
- && !outResult->isMarkedForDeletion() ); // No point linking up remaining levels if another thread already took this node.
- }
- /// Take the item with the highest priority from the queue.
- ///
- /// @param outValue Reference to where the resulting value should be stored.
- /// @param upToPriority Priority limit (inclusive) up to which items are taken from the queue.
- /// @return true if there was a matching item in the queue.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
- {
- // Iterate through to the first unmarked node.
- NodePtr prevNode = mHead;
- while( 1 )
- {
- NodePtr node;
- readNext( prevNode, node, 0 );
- if( node == mTail )
- return false; // End reached.
- bool priorityThresholdReached = SORT_MIN_TO_MAX
- ? ( upToPriority >= node->getPriority() )
- : ( upToPriority <= node->getPriority() );
- if( !priorityThresholdReached )
- return false;
- else
- {
- // Try to mark the node for deletion. Only if that succeeds, taking the
- // node was a success and we can return. If it fails, spin and try again.
- if( node->tryMarkForDeletion() )
- {
- helpDelete();
- // Node is now off the list and will disappear as soon as
- // all references held by threads (including this one)
- // go out of scope.
- outValue = node->getValue();
- node->clearValue();
- return true;
- }
- }
- }
- }
- /// Update the given references to the next non-deleted node at the given level.
- /// refPrev will be updated to reference the immediate predecessor of the next
- /// node returned. Note that this can be a node in deleted state.
- ///
- /// @param refPrev Reference to a node of which the successor node should be
- /// returned. Updated to immediate predecessor of refNext on return.
- /// @param refNext Reference to update to refer to next non-deleted node on
- /// the given level.
- /// @param level Skiplist level to operate on.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::readNext( NodePtr& refPrev, NodePtr& refNext, U32 level )
- {
- while( 1 )
- {
- refNext = refPrev->getNext( level );
- AssertFatal( refNext != NULL, "ThreadSafePriorityQueue::readNext() - next is NULL" );
- if( !refNext->isMarkedForDeletion() || refNext == mTail )
- break;
- refPrev = refNext;
- }
- }
- /// Scan for the position at which to insert a node of the given priority.
- /// Upon return, the position between refPrev and refNext is the one to insert at.
- ///
- /// @param refPrev position at which to start scanning; updated to match insert position.
- /// @param refNext
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
- {
- while( 1 )
- {
- readNext( refPrev, refNext, level );
- if( refNext == mTail
- || ( SORT_MIN_TO_MAX
- ? ( refNext->getPriority() > priority )
- : ( refNext->getPriority() < priority ) ) )
- break;
- refPrev = refNext;
- }
- }
- ///
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
- {
- // Purge dead nodes at left end of queue so
- // we don't get stuck hitting the same node
- // in deletable state over and over again.
- helpDelete();
- S32 currentLevel = MAX_LEVEL - 1;
- refPrev = mHead;
- do
- {
- scan( refPrev, refNext, currentLevel, priority );
- currentLevel --;
- }
- while( currentLevel >= S32( level ) );
- }
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::helpDelete()
- {
- // Clean out all the references from head.
- // Spin over a given reference on each level until head
- // clearly refers to a node in non-deletable state. This
- // makes this code work cooperatively with other threads
- // doing takeNexts on prior or later nodes while also
- // guaranteeing that all next pointers to us will eventually
- // disappear.
- //
- // Note that this is *the only place* where we will be cleaning
- // out our lists.
- S32 level = MAX_LEVEL - 1;
- do
- {
- while( 1 )
- {
- NodePtr ptr = mHead->getNext( level );
- if( !ptr->isMarkedForDeletion() )
- break;
- else
- {
- NodePtr& next = ptr->getNext( level );
- next.setTag();
- mHead->getNext( level ).trySetFromTo( ptr, next, NodePtr::TAG_Unset );
- AssertFatal( next->getRefCount() >= 2, "ThreadSafePriorityQueue::helpDelete() - invalid refcount" );
- }
- }
- level --;
- }
- while( level >= 0 );
- }
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- inline ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::Node( KeyType priority, const ValueType& value )
- : Parent( false ),
- mPriority( priority ),
- mValue( value )
- {
- dMemset( mNext, 0, sizeof( Node* ) * ( getLevel() + 1 ) );
- // Level is already set by the allocation routines.
- }
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::~Node()
- {
- for( U32 level = 0; level < ( getLevel() + 1 ); level ++ )
- getNext( level ) = NULL;
- }
- /// Return the skip list level the node is at.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- inline U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getLevel()
- {
- // Mask out the deletion request bit.
- return ( mLevel & 0x7FFFFFFF );
- }
- /// Return the successor node at the given level.
- /// @param level The level of the desired successor node; must be within the node's level bounds.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- inline typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::NodePtr& ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getNext( U32 level )
- {
- return *reinterpret_cast< NodePtr* >( &mNext[ level ] );
- }
- /// Return true if the node is marked to be deleted.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::isMarkedForDeletion()
- {
- return ( mLevel & 0x80000000 );
- }
- /// Attempt to mark the node for deletion. If the mark bit has not yet been set
- /// and setting it on the current thread succeeds, returns true.
- ///
- /// @return true, if the marking succeeded.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::tryMarkForDeletion()
- {
- U32 oldVal = mLevel & 0x7FFFFFFF;
- U32 newVal = oldVal | 0x80000000;
- return ( dCompareAndSwap( mLevel, oldVal, newVal ) );
- }
- /// Choose a random level.
- ///
- /// The chosen level depends on the given PROBABILISTIC_BIAS and MAX_LEVEL,
- /// but is not affected by the actual number of nodes in a queue.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::randomLevel()
- {
- U32 level = 0;
- while( Platform::getRandom() < ( ( ( F32 ) PROBABILISTIC_BIAS ) / 100 ) && level < ( MAX_LEVEL - 1 ) )
- level ++;
- return level;
- }
- /// Allocate a new node.
- /// The node comes with a reference count of one and its level already set.
- ///
- /// @param level The level to allocate the node at. If this is -1, a random level is chosen.
- /// @return a new node.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void* ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator new( size_t size, S32 level )
- {
- if( level == -1 )
- level = randomLevel();
- Node* node = 0;
- while( 1 )
- {
- // Try to take a node from the freelist. If there's none,
- // allocate a new one.
- if( !smFreeLists[ level ].mDestroyed )
- node = Node::safeRead( smFreeLists[ level ].mNodes );
- if( !node )
- {
- node = ( Node* ) dMalloc( sizeof( Node ) + sizeof( Node* ) * level );
- dMemset( node, 0, sizeof( Node ) );
- node->mLevel = level;
- node->addRef();
- break;
- }
- else if( dCompareAndSwap( smFreeLists[ level ].mNodes, node, node->mNext[ 0 ] ) )
- {
- node->clearLowestBit();
- break;
- }
- else
- node->release(); // Other thread was quicker than us; release.
- }
- AssertFatal( node->getRefCount() != 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
- AssertFatal( ( node->getRefCount() % 2 ) == 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
- return node;
- }
- /// Reclaim a node.
- ///
- /// @param node The node to reclaim. Must refer to a Node instance.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator delete( void* ptr )
- {
- //TODO: limit number of nodes kept
- Node* node = ( Node* ) ptr;
- U32 level = node->getLevel();
- node->mLevel = level; // Reset the node's deletion bit.
- while( !smFreeLists[ level ].mDestroyed )
- {
- // Put the node on the freelist.
- Node* freeList = smFreeLists[ level ].mNodes;
- node->mNext[ 0 ] = freeList;
-
- if( dCompareAndSwap( smFreeLists[ level ].mNodes, freeList, node ) )
- {
- node = NULL;
- break;
- }
- }
-
- if( node )
- dFree( node );
- }
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList::~FreeList()
- {
- mDestroyed = true;
- while( mNodes )
- {
- //FIXME: could leak some bytes under unfortunate circumstances (this in
- // combination with mDestroyed is a dependent write)
- Node* next = mNodes;
- if( dCompareAndSwap( mNodes, next, next->mNext[ 0 ] ) )
- dFree( next );
- }
- }
- //--------------------------------------------------------------------------
- // ThreadSafePriorityQueueWithUpdate.
- //--------------------------------------------------------------------------
- /// Fast, lock-free priority queue implementation for concurrent access that
- /// performs dynamic re-prioritization of items.
- ///
- /// Within the bounds of a set update interval UPDATE_INTERVAL, the takeNext
- /// method is guaranteed to always return the item that has the highest priority
- /// at the time the method is called rather than at the time items were inserted
- /// into the queue.
- ///
- /// Values placed on the queue must implement the following interface:
- ///
- /// @code
- /// template< typename K >
- /// struct IThreadSafePriorityQueueItem
- /// {
- /// // Default constructor.
- /// IThreadSafePriorityQueueItem();
- ///
- /// // Return the current priority.
- /// // This must run normally even if the item is already dead.
- /// K getPriority();
- ///
- /// // Return true if the item is still meant to be waiting in the queue.
- /// bool isAlive();
- /// };
- /// @endcode
- template< typename T, typename K, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
- struct ThreadSafePriorityQueueWithUpdate : public ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >
- {
- typedef T ValueType;
- typedef K KeyType;
- enum { DEFAULT_UPDATE_INTERVAL = 256 };
- ThreadSafePriorityQueueWithUpdate( U32 updateInterval = DEFAULT_UPDATE_INTERVAL );
- void insert( KeyType priority, const T& value );
- bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
- U32 getUpdateInterval() const;
- void setUpdateInterval( U32 value );
- KeyType getTimeBasedPriorityBoost() const;
- void setTimeBasedPriorityBoost( KeyType value );
- void updatePriorities();
- protected:
- typedef ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS > Parent;
- typedef U32 TickType;
- typedef typename Parent::NodePtr NodePtr;
- U32 mUpdateInterval;
- KeyType mPriorityBoost; ///< If this is non-zero, priorities will be boosted by this amount each update. This can be used to prevent constant high-priority inserts to starve low-priority items already in the queue.
- /// Work queue for node updates.
- ThreadSafePriorityQueue< NodePtr, TickType, true, MAX_LEVEL, PROBABILISTIC_BIAS > mUpdateQueue;
- TickType getTick() { return Platform::getRealMilliseconds(); }
- };
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueueWithUpdate( U32 updateInterval )
- : mUpdateInterval( updateInterval ),
- mPriorityBoost( TypeTraits< KeyType >::ZERO )
- {
- }
- /// Return the current update interval in milliseconds.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- U32 ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getUpdateInterval() const
- {
- return mUpdateInterval;
- }
- /// Set update interval of queue to given value.
- ///
- /// <em>Call this method on the main thread only.</em>
- ///
- /// @param value Time between priority updates in milliseconds.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setUpdateInterval( U32 value )
- {
- mUpdateInterval = value;
- }
- /// Return the delta to apply to priorities on each update.
- /// Set to zero to deactivate time-based priority adjustments.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- K ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getTimeBasedPriorityBoost() const
- {
- return mPriorityBoost;
- }
- /// Set the delta for time-based priority adjustments to the given value.
- ///
- /// <em>Call this method on the main thread only.</em>
- ///
- /// @param value The new priority adjustment value.
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setTimeBasedPriorityBoost( KeyType value )
- {
- mPriorityBoost = value;
- }
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
- {
- NodePtr node;
- Parent::insert( priority, value, node );
- mUpdateQueue.insert( getTick() + getUpdateInterval(), node );
- }
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- bool ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
- {
- updatePriorities();
- bool result = false;
- do
- {
- result = Parent::takeNext( outValue, upToPriority );
- }
- while( result && !outValue.isAlive() );
-
- return result;
- }
- ///
- template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
- void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::updatePriorities()
- {
- TickType currentTime = getTick();
- U32 numNodesUpdated = 0;
- U32 numNodesDead = 0;
- U32 numNodesChanged = 0;
- NodePtr node;
- while( mUpdateQueue.takeNext( node, currentTime ) )
- {
- numNodesUpdated ++;
- // Since we're updating nodes on the update queue only periodically,
- // their associated values or main queue nodes may have died in the
- // meantime. If so, we just discard them here.
- if( node->getValue().isAlive()
- && !node->isMarkedForDeletion() )
- {
- KeyType newPriority = node->getValue().getPriority() + getTimeBasedPriorityBoost();
- if( newPriority != node->getPriority() )
- {
- // Node is outdated. Reinsert with new priority and mark the
- // old node for deletion.
- insert( newPriority, node->getValue() );
- node->tryMarkForDeletion();
- numNodesChanged ++;
- }
- else
- {
- // Node is still current. Just move to end.
- mUpdateQueue.insert( currentTime + getUpdateInterval(), node );
- }
- }
- else
- numNodesDead ++;
- }
- #ifdef DEBUG_SPEW
- if( numNodesUpdated )
- Platform::outputDebugString( "[ThreadSafePriorityQueueWithUpdate] updated %i nodes (%i changed, %i dead)",
- numNodesUpdated, numNodesChanged, numNodesDead );
- #endif
- }
- // Re-enable TMM if necessary.
- #include "platform/tmm_on.h"
- #undef DEBUG_SPEW
- #endif // !_THREADSAFEPRIORITYQUEUE_H_
|