threadSafePriorityQueue.h 27 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #ifndef _THREADSAFEPRIORITYQUEUE_H_
  23. #define _THREADSAFEPRIORITYQUEUE_H_
  24. #ifndef _PLATFORMINTRINSICS_H_
  25. #include "platform/platformIntrinsics.h"
  26. #endif
  27. #ifndef _THREADSAFEREFCOUNT_H_
  28. #include "platform/threads/threadSafeRefCount.h"
  29. #endif
  30. #ifndef _TYPETRAITS_H_
  31. #include "platform/typetraits.h"
  32. #endif
  33. // Disable TMM's new operator grabbing.
  34. #include "platform/tmm_off.h"
  35. //#define DEBUG_SPEW
  36. /// @file
  37. /// Template code for an efficient thread-safe priority queue
  38. /// implementation. There are two alternative implementations to
  39. /// choose from: ThreadSafePriorityQueue and ThreadSafePriorityQueueWithUpdate
  40. /// where the latter adds concurrent status updates of queue items to
  41. /// the former implementation.
  42. //--------------------------------------------------------------------------
  43. // ThreadSafePriorityQueue.
  44. //--------------------------------------------------------------------------
  45. /// Fast, lock-free priority queue implementation for concurrent access.
  46. ///
  47. /// Equal priorities are allowed and are placed <em>before</em> existing items of
  48. /// identical priority in the queue.
  49. ///
  50. /// Based on (but with significant deviations from) "Fast and Lock-Free Concurrent
  51. /// Priority Queues for Multi-Thread Systems" by Hakan Sundell and Philippas Tsigas.
  52. /// Parts of the skiplist code is based on work by William Pugh.
  53. ///
  54. /// @param T The item value type. Must have a default constructor.
  55. /// @param K The priority key type. Must be comparable, have a default constructor,
  56. /// and be a valid template parameter to TypeTraits.
  57. /// @param SORT_MIN_TO_MAX If true, the queue sorts from minimum to maximum priority or
  58. /// the reverse if false.
  59. /// @param MAX_LEVEL The number of levels a node can have at most.
  60. /// @param PROBABILISTIC_BIAS The probabilistic level distribution factor for
  61. /// the skiplist. Multiplied by 100 and turned into int to conform to restrictions
  62. /// on non-type template parameters.
  63. ///
  64. /// @see TypeTraits
  65. template< typename T, typename K = F32, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
  66. struct ThreadSafePriorityQueue
  67. {
  68. typedef T ValueType;
  69. typedef K KeyType;
  70. enum { MAX_LEVEL_CONST = MAX_LEVEL };
  71. ThreadSafePriorityQueue();
  72. bool isEmpty();
  73. void insert( KeyType priority, const T& value );
  74. bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
  75. protected:
  76. struct Node;
  77. typedef ThreadSafeRef< Node > NodePtr;
  78. friend class ThreadSafeRefCount< Node >;
  79. friend struct DeleteSingle;
  80. /// A queue node.
  81. ///
  82. /// Nodes are reference-counted to coordinate memory management
  83. /// between the different threads. Reclamation happens on the
  84. /// thread that releases the last reference.
  85. ///
  86. /// Reference-counting and deletion requests are kept separate.
  87. /// A given node is marked for deletion and will then have its references
  88. /// progressively disappear and eventually be reclaimed once the
  89. /// reference count drops to zero.
  90. ///
  91. /// Note that 'Next' references are released by the destructor which
  92. /// is only called when the reference count to the node itself drops to
  93. /// zero. This is to avoid threads getting trapped in a node with no
  94. /// link out.
  95. struct Node : public ThreadSafeRefCount< Node >
  96. {
  97. typedef ThreadSafeRefCount< Node > Parent;
  98. Node( KeyType priority, const ValueType& value );
  99. ~Node();
  100. KeyType getPriority() { return mPriority; }
  101. ValueType& getValue() { return mValue; }
  102. U32 getLevel();
  103. NodePtr& getNext( U32 level );
  104. bool isMarkedForDeletion();
  105. bool tryMarkForDeletion();
  106. void clearValue() { mValue = ValueType(); }
  107. static U32 randomLevel();
  108. void* operator new( size_t size, S32 level = -1 );
  109. void operator delete( void* ptr );
  110. private:
  111. KeyType mPriority; ///< Priority key.
  112. U32 mLevel; ///< Level count and deletion bit (highest).
  113. ValueType mValue;
  114. Node* mNext[ 1 ]; ///< Variable-sized array of next pointers.
  115. struct FreeList
  116. {
  117. bool mDestroyed;
  118. Node* mNodes;
  119. ~FreeList();
  120. };
  121. static FreeList smFreeLists[ MAX_LEVEL ];
  122. };
  123. NodePtr mHead; ///< Artificial head node.
  124. NodePtr mTail; ///< Artificial tail node.
  125. void readNext( NodePtr& refPrev, NodePtr& refNext, U32 level );
  126. void scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
  127. void scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority );
  128. void insert( KeyType priority, const T& value, NodePtr& outResult );
  129. void helpDelete();
  130. };
  131. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  132. typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::smFreeLists[ MAX_LEVEL ];
  133. /// Construct an empty queue.
  134. ///
  135. /// Internally, this creates a head node with maximal priority and a tail node with minimal priority,
  136. /// both at maximum level.
  137. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  138. ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueue()
  139. {
  140. NodePtr::unsafeWrite( mHead, new ( MAX_LEVEL - 1 )
  141. Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MIN : TypeTraits< KeyType >::MAX, ValueType() ) );
  142. NodePtr::unsafeWrite( mTail, new ( MAX_LEVEL - 1 )
  143. Node( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN, ValueType() ) );
  144. for( U32 level = 0; level < MAX_LEVEL; level ++ )
  145. mHead->getNext( level ) = mTail;
  146. }
  147. /// Return true if the queue does not currently contain an item.
  148. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  149. bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::isEmpty()
  150. {
  151. return ( mHead->getNext( 0 ) == mTail );
  152. }
  153. /// Insert the given value into the queue at the place determined by the given priority.
  154. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  155. inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
  156. {
  157. NodePtr result;
  158. insert( priority, value, result );
  159. }
  160. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  161. void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value, NodePtr& outResult )
  162. {
  163. // Create a new node at a random level.
  164. outResult = NULL;
  165. NodePtr::unsafeWrite( outResult, new Node( priority, value ) );
  166. U32 resultNodeLevel = outResult->getLevel();
  167. // Link up all the levels. Do this bottom-up instead of
  168. // top-down (as would be the right way for a skiplist) so
  169. // that our list state always remains valid. If going top-down,
  170. // we'll insert nodes with NULL pointers at their lower levels.
  171. U32 currentLevel = 0;
  172. do
  173. {
  174. while( 1 )
  175. {
  176. NodePtr nextNode;
  177. NodePtr prevNode;
  178. scanFromHead( prevNode, nextNode, currentLevel, priority );
  179. outResult->getNext( currentLevel ) = nextNode;
  180. if( prevNode->getNext( currentLevel ).trySetFromTo( nextNode, outResult, NodePtr::TAG_FailIfSet ) )
  181. break;
  182. else
  183. outResult->getNext( currentLevel ) = 0;
  184. }
  185. currentLevel ++;
  186. }
  187. while( currentLevel <= resultNodeLevel
  188. && !outResult->isMarkedForDeletion() ); // No point linking up remaining levels if another thread already took this node.
  189. }
  190. /// Take the item with the highest priority from the queue.
  191. ///
  192. /// @param outValue Reference to where the resulting value should be stored.
  193. /// @param upToPriority Priority limit (inclusive) up to which items are taken from the queue.
  194. /// @return true if there was a matching item in the queue.
  195. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  196. bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
  197. {
  198. // Iterate through to the first unmarked node.
  199. NodePtr prevNode = mHead;
  200. while( 1 )
  201. {
  202. NodePtr node;
  203. readNext( prevNode, node, 0 );
  204. if( node == mTail )
  205. return false; // End reached.
  206. bool priorityThresholdReached = SORT_MIN_TO_MAX
  207. ? ( upToPriority >= node->getPriority() )
  208. : ( upToPriority <= node->getPriority() );
  209. if( !priorityThresholdReached )
  210. return false;
  211. else
  212. {
  213. // Try to mark the node for deletion. Only if that succeeds, taking the
  214. // node was a success and we can return. If it fails, spin and try again.
  215. if( node->tryMarkForDeletion() )
  216. {
  217. helpDelete();
  218. // Node is now off the list and will disappear as soon as
  219. // all references held by threads (including this one)
  220. // go out of scope.
  221. outValue = node->getValue();
  222. node->clearValue();
  223. return true;
  224. }
  225. }
  226. }
  227. }
  228. /// Update the given references to the next non-deleted node at the given level.
  229. /// refPrev will be updated to reference the immediate predecessor of the next
  230. /// node returned. Note that this can be a node in deleted state.
  231. ///
  232. /// @param refPrev Reference to a node of which the successor node should be
  233. /// returned. Updated to immediate predecessor of refNext on return.
  234. /// @param refNext Reference to update to refer to next non-deleted node on
  235. /// the given level.
  236. /// @param level Skiplist level to operate on.
  237. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  238. inline void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::readNext( NodePtr& refPrev, NodePtr& refNext, U32 level )
  239. {
  240. while( 1 )
  241. {
  242. refNext = refPrev->getNext( level );
  243. AssertFatal( refNext != NULL, "ThreadSafePriorityQueue::readNext() - next is NULL" );
  244. if( !refNext->isMarkedForDeletion() || refNext == mTail )
  245. break;
  246. refPrev = refNext;
  247. }
  248. }
  249. /// Scan for the position at which to insert a node of the given priority.
  250. /// Upon return, the position between refPrev and refNext is the one to insert at.
  251. ///
  252. /// @param refPrev position at which to start scanning; updated to match insert position.
  253. /// @param refNext
  254. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  255. void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scan( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
  256. {
  257. while( 1 )
  258. {
  259. readNext( refPrev, refNext, level );
  260. if( refNext == mTail
  261. || ( SORT_MIN_TO_MAX
  262. ? ( refNext->getPriority() > priority )
  263. : ( refNext->getPriority() < priority ) ) )
  264. break;
  265. refPrev = refNext;
  266. }
  267. }
  268. ///
  269. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  270. void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::scanFromHead( NodePtr& refPrev, NodePtr& refNext, U32 level, KeyType priority )
  271. {
  272. // Purge dead nodes at left end of queue so
  273. // we don't get stuck hitting the same node
  274. // in deletable state over and over again.
  275. helpDelete();
  276. S32 currentLevel = MAX_LEVEL - 1;
  277. refPrev = mHead;
  278. do
  279. {
  280. scan( refPrev, refNext, currentLevel, priority );
  281. currentLevel --;
  282. }
  283. while( currentLevel >= S32( level ) );
  284. }
  285. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  286. void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::helpDelete()
  287. {
  288. // Clean out all the references from head.
  289. // Spin over a given reference on each level until head
  290. // clearly refers to a node in non-deletable state. This
  291. // makes this code work cooperatively with other threads
  292. // doing takeNexts on prior or later nodes while also
  293. // guaranteeing that all next pointers to us will eventually
  294. // disappear.
  295. //
  296. // Note that this is *the only place* where we will be cleaning
  297. // out our lists.
  298. S32 level = MAX_LEVEL - 1;
  299. do
  300. {
  301. while( 1 )
  302. {
  303. NodePtr ptr = mHead->getNext( level );
  304. if( !ptr->isMarkedForDeletion() )
  305. break;
  306. else
  307. {
  308. NodePtr& next = ptr->getNext( level );
  309. next.setTag();
  310. mHead->getNext( level ).trySetFromTo( ptr, next, NodePtr::TAG_Unset );
  311. AssertFatal( next->getRefCount() >= 2, "ThreadSafePriorityQueue::helpDelete() - invalid refcount" );
  312. }
  313. }
  314. level --;
  315. }
  316. while( level >= 0 );
  317. }
  318. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  319. inline ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::Node( KeyType priority, const ValueType& value )
  320. : Parent( false ),
  321. mPriority( priority ),
  322. mValue( value )
  323. {
  324. dMemset( mNext, 0, sizeof( Node* ) * ( getLevel() + 1 ) );
  325. // Level is already set by the allocation routines.
  326. }
  327. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  328. ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::~Node()
  329. {
  330. for( U32 level = 0; level < ( getLevel() + 1 ); level ++ )
  331. getNext( level ) = NULL;
  332. }
  333. /// Return the skip list level the node is at.
  334. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  335. inline U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getLevel()
  336. {
  337. // Mask out the deletion request bit.
  338. return ( mLevel & 0x7FFFFFFF );
  339. }
  340. /// Return the successor node at the given level.
  341. /// @param level The level of the desired successor node; must be within the node's level bounds.
  342. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  343. inline typename ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::NodePtr& ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::getNext( U32 level )
  344. {
  345. return *reinterpret_cast< NodePtr* >( &mNext[ level ] );
  346. }
  347. /// Return true if the node is marked to be deleted.
  348. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  349. inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::isMarkedForDeletion()
  350. {
  351. return ( mLevel & 0x80000000 );
  352. }
  353. /// Attempt to mark the node for deletion. If the mark bit has not yet been set
  354. /// and setting it on the current thread succeeds, returns true.
  355. ///
  356. /// @return true, if the marking succeeded.
  357. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  358. inline bool ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::tryMarkForDeletion()
  359. {
  360. U32 oldVal = mLevel & 0x7FFFFFFF;
  361. U32 newVal = oldVal | 0x80000000;
  362. return ( dCompareAndSwap( mLevel, oldVal, newVal ) );
  363. }
  364. /// Choose a random level.
  365. ///
  366. /// The chosen level depends on the given PROBABILISTIC_BIAS and MAX_LEVEL,
  367. /// but is not affected by the actual number of nodes in a queue.
  368. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  369. U32 ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::randomLevel()
  370. {
  371. U32 level = 0;
  372. while( Platform::getRandom() < ( ( ( F32 ) PROBABILISTIC_BIAS ) / 100 ) && level < ( MAX_LEVEL - 1 ) )
  373. level ++;
  374. return level;
  375. }
  376. /// Allocate a new node.
  377. /// The node comes with a reference count of one and its level already set.
  378. ///
  379. /// @param level The level to allocate the node at. If this is -1, a random level is chosen.
  380. /// @return a new node.
  381. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  382. void* ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator new( size_t size, S32 level )
  383. {
  384. if( level == -1 )
  385. level = randomLevel();
  386. Node* node = 0;
  387. while( 1 )
  388. {
  389. // Try to take a node from the freelist. If there's none,
  390. // allocate a new one.
  391. if( !smFreeLists[ level ].mDestroyed )
  392. node = Node::safeRead( smFreeLists[ level ].mNodes );
  393. if( !node )
  394. {
  395. node = ( Node* ) dMalloc( sizeof( Node ) + sizeof( Node* ) * level );
  396. dMemset( node, 0, sizeof( Node ) );
  397. node->mLevel = level;
  398. node->addRef();
  399. break;
  400. }
  401. else if( dCompareAndSwap( smFreeLists[ level ].mNodes, node, node->mNext[ 0 ] ) )
  402. {
  403. node->clearLowestBit();
  404. break;
  405. }
  406. else
  407. node->release(); // Other thread was quicker than us; release.
  408. }
  409. AssertFatal( node->getRefCount() != 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
  410. AssertFatal( ( node->getRefCount() % 2 ) == 0, "ThreadSafePriorityQueue::new Node() - invalid refcount" );
  411. return node;
  412. }
  413. /// Reclaim a node.
  414. ///
  415. /// @param node The node to reclaim. Must refer to a Node instance.
  416. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  417. void ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::operator delete( void* ptr )
  418. {
  419. //TODO: limit number of nodes kept
  420. Node* node = ( Node* ) ptr;
  421. U32 level = node->getLevel();
  422. node->mLevel = level; // Reset the node's deletion bit.
  423. while( !smFreeLists[ level ].mDestroyed )
  424. {
  425. // Put the node on the freelist.
  426. Node* freeList = smFreeLists[ level ].mNodes;
  427. node->mNext[ 0 ] = freeList;
  428. if( dCompareAndSwap( smFreeLists[ level ].mNodes, freeList, node ) )
  429. {
  430. node = NULL;
  431. break;
  432. }
  433. }
  434. if( node )
  435. dFree( node );
  436. }
  437. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  438. ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::Node::FreeList::~FreeList()
  439. {
  440. mDestroyed = true;
  441. while( mNodes )
  442. {
  443. //FIXME: could leak some bytes under unfortunate circumstances (this in
  444. // combination with mDestroyed is a dependent write)
  445. Node* next = mNodes;
  446. if( dCompareAndSwap( mNodes, next, next->mNext[ 0 ] ) )
  447. dFree( next );
  448. }
  449. }
  450. //--------------------------------------------------------------------------
  451. // ThreadSafePriorityQueueWithUpdate.
  452. //--------------------------------------------------------------------------
  453. /// Fast, lock-free priority queue implementation for concurrent access that
  454. /// performs dynamic re-prioritization of items.
  455. ///
  456. /// Within the bounds of a set update interval UPDATE_INTERVAL, the takeNext
  457. /// method is guaranteed to always return the item that has the highest priority
  458. /// at the time the method is called rather than at the time items were inserted
  459. /// into the queue.
  460. ///
  461. /// Values placed on the queue must implement the following interface:
  462. ///
  463. /// @code
  464. /// template&lt; typename K >
  465. /// struct IThreadSafePriorityQueueItem
  466. /// {
  467. /// // Default constructor.
  468. /// IThreadSafePriorityQueueItem();
  469. ///
  470. /// // Return the current priority.
  471. /// // This must run normally even if the item is already dead.
  472. /// K getPriority();
  473. ///
  474. /// // Return true if the item is still meant to be waiting in the queue.
  475. /// bool isAlive();
  476. /// };
  477. /// @endcode
  478. template< typename T, typename K, bool SORT_MIN_TO_MAX = false, U32 MAX_LEVEL = 4, U32 PROBABILISTIC_BIAS = 50 >
  479. struct ThreadSafePriorityQueueWithUpdate : public ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >
  480. {
  481. typedef T ValueType;
  482. typedef K KeyType;
  483. enum { DEFAULT_UPDATE_INTERVAL = 256 };
  484. ThreadSafePriorityQueueWithUpdate( U32 updateInterval = DEFAULT_UPDATE_INTERVAL );
  485. void insert( KeyType priority, const T& value );
  486. bool takeNext( T& outValue, KeyType upToPriority = ( SORT_MIN_TO_MAX ? TypeTraits< KeyType >::MAX : TypeTraits< KeyType >::MIN ) );
  487. U32 getUpdateInterval() const;
  488. void setUpdateInterval( U32 value );
  489. KeyType getTimeBasedPriorityBoost() const;
  490. void setTimeBasedPriorityBoost( KeyType value );
  491. void updatePriorities();
  492. protected:
  493. typedef ThreadSafePriorityQueue< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS > Parent;
  494. typedef U32 TickType;
  495. typedef typename Parent::NodePtr NodePtr;
  496. U32 mUpdateInterval;
  497. KeyType mPriorityBoost; ///< If this is non-zero, priorities will be boosted by this amount each update. This can be used to prevent constant high-priority inserts to starve low-priority items already in the queue.
  498. /// Work queue for node updates.
  499. ThreadSafePriorityQueue< NodePtr, TickType, true, MAX_LEVEL, PROBABILISTIC_BIAS > mUpdateQueue;
  500. TickType getTick() { return Platform::getRealMilliseconds(); }
  501. };
  502. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  503. ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::ThreadSafePriorityQueueWithUpdate( U32 updateInterval )
  504. : mUpdateInterval( updateInterval ),
  505. mPriorityBoost( TypeTraits< KeyType >::ZERO )
  506. {
  507. }
  508. /// Return the current update interval in milliseconds.
  509. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  510. U32 ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getUpdateInterval() const
  511. {
  512. return mUpdateInterval;
  513. }
  514. /// Set update interval of queue to given value.
  515. ///
  516. /// <em>Call this method on the main thread only.</em>
  517. ///
  518. /// @param value Time between priority updates in milliseconds.
  519. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  520. void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setUpdateInterval( U32 value )
  521. {
  522. mUpdateInterval = value;
  523. }
  524. /// Return the delta to apply to priorities on each update.
  525. /// Set to zero to deactivate time-based priority adjustments.
  526. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  527. K ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::getTimeBasedPriorityBoost() const
  528. {
  529. return mPriorityBoost;
  530. }
  531. /// Set the delta for time-based priority adjustments to the given value.
  532. ///
  533. /// <em>Call this method on the main thread only.</em>
  534. ///
  535. /// @param value The new priority adjustment value.
  536. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  537. void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::setTimeBasedPriorityBoost( KeyType value )
  538. {
  539. mPriorityBoost = value;
  540. }
  541. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  542. void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::insert( KeyType priority, const ValueType& value )
  543. {
  544. NodePtr node;
  545. Parent::insert( priority, value, node );
  546. mUpdateQueue.insert( getTick() + getUpdateInterval(), node );
  547. }
  548. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  549. bool ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::takeNext( T& outValue, KeyType upToPriority )
  550. {
  551. updatePriorities();
  552. bool result = false;
  553. do
  554. {
  555. result = Parent::takeNext( outValue, upToPriority );
  556. }
  557. while( result && !outValue.isAlive() );
  558. return result;
  559. }
  560. ///
  561. template< typename T, typename K, bool SORT_MIN_TO_MAX, U32 MAX_LEVEL, U32 PROBABILISTIC_BIAS >
  562. void ThreadSafePriorityQueueWithUpdate< T, K, SORT_MIN_TO_MAX, MAX_LEVEL, PROBABILISTIC_BIAS >::updatePriorities()
  563. {
  564. TickType currentTime = getTick();
  565. U32 numNodesUpdated = 0;
  566. U32 numNodesDead = 0;
  567. U32 numNodesChanged = 0;
  568. NodePtr node;
  569. while( mUpdateQueue.takeNext( node, currentTime ) )
  570. {
  571. numNodesUpdated ++;
  572. // Since we're updating nodes on the update queue only periodically,
  573. // their associated values or main queue nodes may have died in the
  574. // meantime. If so, we just discard them here.
  575. if( node->getValue().isAlive()
  576. && !node->isMarkedForDeletion() )
  577. {
  578. KeyType newPriority = node->getValue().getPriority() + getTimeBasedPriorityBoost();
  579. if( newPriority != node->getPriority() )
  580. {
  581. // Node is outdated. Reinsert with new priority and mark the
  582. // old node for deletion.
  583. insert( newPriority, node->getValue() );
  584. node->tryMarkForDeletion();
  585. numNodesChanged ++;
  586. }
  587. else
  588. {
  589. // Node is still current. Just move to end.
  590. mUpdateQueue.insert( currentTime + getUpdateInterval(), node );
  591. }
  592. }
  593. else
  594. numNodesDead ++;
  595. }
  596. #ifdef DEBUG_SPEW
  597. if( numNodesUpdated )
  598. Platform::outputDebugString( "[ThreadSafePriorityQueueWithUpdate] updated %i nodes (%i changed, %i dead)",
  599. numNodesUpdated, numNodesChanged, numNodesDead );
  600. #endif
  601. }
  602. // Re-enable TMM if necessary.
  603. #include "platform/tmm_on.h"
  604. #undef DEBUG_SPEW
  605. #endif // !_THREADSAFEPRIORITYQUEUE_H_