threadPool.h 15 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #ifndef _THREADPOOL_H_
  23. #define _THREADPOOL_H_
  24. #ifndef _THREADSAFEREFCOUNT_H_
  25. #include "platform/threads/threadSafeRefCount.h"
  26. #endif
  27. #ifndef _THREADSAFEPRIORITYQUEUE_H_
  28. #include "platform/threads/threadSafePriorityQueue.h"
  29. #endif
  30. #ifndef _PLATFORM_THREAD_SEMAPHORE_H_
  31. #include "platform/threads/semaphore.h"
  32. #endif
  33. #ifndef _TSINGLETON_H_
  34. #include "core/util/tSingleton.h"
  35. #endif
  36. /// @file
  37. /// Interface for an asynchronous work manager.
  38. /// Asynchronous work manager.
  39. ///
  40. /// Thread pooling allows to submit work items for background execution.
  41. /// Each work item will be placed on a queue and, based on a total priority
  42. /// ordering, executed when it has the highest priority and a worker thread
  43. /// becomes available.
  44. ///
  45. /// @note The global pool maintains the invariant that only the main thread
  46. /// may submit items in order to be able to flush the item queue reliably
  47. /// from the main thread itself. If other threads were issuing items to
  48. /// the queue, the queue may never empty out and the main thread will
  49. /// deadlock.
  50. ///
  51. /// Flushing is the simplest method to guarantee that no asynchronous
  52. /// operation is pending in a specific case (deletion of the target object
  53. /// being the most common case). However, when possible, avoid this
  54. /// situation and design your work items to operate independently,
  55. /// e.g. by having only a single point of access to data that may have
  56. /// disappeared in the meantime and putting a check around that single
  57. /// access so that the item will silently die when its target object has
  58. /// disappeared.
  59. ///
  60. /// The cleanest safe solution to this is to create a separate concurrently
  61. /// reference-counted structure that holds all interfacing state and
  62. /// functionality shared between a work item and its issueing code. This way
  63. /// the host object can safely disappear with the interfacing structure
  64. /// automatically being released once the last concurrent work item has been
  65. /// processed or discarded.
  66. ///
  67. class ThreadPool
  68. {
  69. public:
  70. /// A ThreadPool context defines a logical context in which WorkItems are
  71. /// being executed. Their primary use is for biasing priorities of
  72. /// WorkItems.
  73. ///
  74. /// Contexts are arranged in a tree hierarchy. Each parent node's priority
  75. /// bias scales all the priority biases underneath it.
  76. ///
  77. /// Note that instances of this class are meant to be instantiated
  78. /// globally only.
  79. ///
  80. class Context
  81. {
  82. protected:
  83. /// Superordinate context; scales this context's priority bias.
  84. Context* mParent;
  85. /// First child.
  86. Context* mChildren;
  87. /// Next sibling in child chain.
  88. Context* mSibling;
  89. /// Name of this context. Should be unique in parent namespace.
  90. const char* mName;
  91. /// Priority scale factor of this context.
  92. F32 mPriorityBias;
  93. /// Accumulated scale factor.
  94. F32 mAccumulatedPriorityBias;
  95. /// The root context; does not modify priorities. All contexts should be direct or indirect children of this one.
  96. static Context smRootContext;
  97. /// Recursively update cached accumulated priority biases.
  98. void updateAccumulatedPriorityBiases();
  99. public:
  100. Context( const char* name, Context* parent, F32 priorityBias );
  101. ~Context();
  102. /// Return the name of the worker threading context.
  103. const char* getName() const
  104. {
  105. return mName;
  106. }
  107. /// Return the context's own work item priority bias.
  108. F32 getPriorityBias() const
  109. {
  110. return mPriorityBias;
  111. }
  112. /// Return the superordinate node to the current context.
  113. Context* getParent() const
  114. {
  115. return mParent;
  116. }
  117. /// Return the next sibling to the current context.
  118. Context* getSibling() const
  119. {
  120. return mSibling;
  121. }
  122. /// Return the first child context.
  123. Context* getChildren() const
  124. {
  125. return mChildren;
  126. }
  127. /// Return the root context.
  128. static Context* ROOT_CONTEXT()
  129. {
  130. return &smRootContext;
  131. }
  132. ///
  133. F32 getAccumulatedPriorityBias();
  134. ///
  135. Context* getChild( const char* name );
  136. ///
  137. void setPriorityBias( F32 value );
  138. };
  139. /// An action to execute on a worker thread from the pool.
  140. ///
  141. /// Work items are concurrently reference-counted and will be
  142. /// automatically released once the last reference disappears.
  143. ///
  144. class WorkItem : public ThreadSafeRefCount< WorkItem >
  145. {
  146. public:
  147. typedef ThreadSafeRefCount< WorkItem > Parent;
  148. protected:
  149. /// The work context of this item.
  150. Context* mContext;
  151. /// Mark a point in a work item's execution where the item can
  152. /// be safely cancelled.
  153. ///
  154. /// This method should be called by subclasses' execute() methods
  155. /// whenever an item can be safely cancelled. When it returns true,
  156. /// the work item should exit from its execute() method.
  157. bool cancellationPoint();
  158. /// Called when the item has been cancelled.
  159. virtual void onCancelled() {}
  160. /// Execute the actions associated with this work item.
  161. /// This is the primary function to implement by subclasses.
  162. virtual void execute() = 0;
  163. /// This flag is set after the execute() method has completed.
  164. bool mExecuted;
  165. public:
  166. /// Construct a new work item.
  167. ///
  168. /// @param context The work context in which the item should be placed.
  169. /// If NULL, the root context will be used.
  170. WorkItem( Context* context = 0 )
  171. : mContext( context ? context : Context::ROOT_CONTEXT() ),
  172. mExecuted( false )
  173. {
  174. }
  175. virtual ~WorkItem() {}
  176. /// Return the work context associated with the work item.
  177. inline Context* getContext() const
  178. {
  179. return mContext;
  180. }
  181. /// Process the work item.
  182. void process();
  183. /// Return true if the work item should be cancelled.
  184. ///
  185. /// This method can be overridden by subclasses. It's value will be
  186. /// checked each time cancellationPoint() is called. When it returns
  187. /// true, the item's process() method will exit automatically.
  188. ///
  189. /// @return true, if item should be cancelled; default is false.
  190. /// @see ThreadPool::WorkItem::cancellationPoint
  191. virtual bool isCancellationRequested();
  192. /// Return the item's base priority value.
  193. /// @return item priority; defaults to 1.0.
  194. virtual F32 getPriority();
  195. /// Has this work item been executed already?
  196. bool hasExecuted() const
  197. {
  198. return mExecuted;
  199. }
  200. };
  201. typedef ThreadSafeRef< WorkItem > WorkItemPtr;
  202. struct GlobalThreadPool;
  203. protected:
  204. struct WorkItemWrapper;
  205. struct WorkerThread;
  206. friend struct WorkerThread; // mSemaphore, mNumThreadsAwake, mThreads
  207. typedef ThreadSafePriorityQueueWithUpdate< WorkItemWrapper, F32 > QueueType;
  208. /// Name of this pool. Mainly for debugging. Used to name worker threads.
  209. String mName;
  210. /// Number of worker threads spawned by the pool.
  211. U32 mNumThreads;
  212. /// Number of worker threads in non-sleeping state.
  213. U32 mNumThreadsAwake;
  214. /// Number of worker threads guaranteed to be non-blocking.
  215. U32 mNumThreadsReady;
  216. /// Number of work items that have not yet completed execution.
  217. U32 mNumPendingItems;
  218. /// Semaphore used to wake up threads, if necessary.
  219. Semaphore mSemaphore;
  220. /// Threaded priority queue for concurrent access by worker threads.
  221. QueueType mWorkItemQueue;
  222. /// List of worker threads.
  223. WorkerThread* mThreads;
  224. /// Force all work items to execute on main thread;
  225. /// turns this into a single-threaded system.
  226. /// Primarily useful to find whether malfunctions are caused
  227. /// by parallel execution or not.
  228. static bool smForceAllMainThread;
  229. ///
  230. static U32 smMainThreadTimeMS;
  231. /// Work queue for main thread; can be used to ping back work items to
  232. /// main thread that need processing that can only happen on main thread.
  233. static QueueType smMainThreadQueue;
  234. public:
  235. /// Create a new thread pool with the given number of worker threads.
  236. ///
  237. /// If numThreads is zero (the default), the number of threads created
  238. /// will be based on the number of CPU cores available.
  239. ///
  240. /// @param numThreads Number of threads to create or zero for default.
  241. ThreadPool( const char* name, U32 numThreads = 0 );
  242. ~ThreadPool();
  243. /// Manually shutdown threads outside of static destructors.
  244. void shutdown();
  245. ///
  246. void queueWorkItem( WorkItem* item );
  247. ///
  248. /// <em>For the global pool, it is very important to only ever call
  249. /// this function on the main thread and to let work items only ever
  250. /// come from the main thread. Otherwise this function has the potential
  251. /// of dead-locking as new work items may constantly be fed to the queue
  252. /// without it ever getting empty.</em>
  253. ///
  254. /// @param timeOut Soft limit on the number of milliseconds to wait for
  255. /// the queue to flush out. -1 = infinite.
  256. void flushWorkItems( S32 timeOut = -1 );
  257. /// If you're using a non-global thread pool to parallelise some work, you
  258. /// may want to block until all the parallel work is complete. As with
  259. /// flushWorkItems, this method may block indefinitely if new items keep
  260. /// getting added to the pool before old ones finish.
  261. ///
  262. /// <em>This method will not wait for items queued on the main thread using
  263. /// queueWorkItemOnMainThread!</em>
  264. ///
  265. /// @param timeOut Soft limit on the number of milliseconds to wait for
  266. /// all items to complete. -1 = infinite.
  267. void waitForAllItems( S32 timeOut = -1 );
  268. /// Add a work item to the main thread's work queue.
  269. ///
  270. /// The main thread's work queue will be processed each frame using
  271. /// a set timeout to limit the work being done. Nonetheless, work
  272. /// items will not be suspended in-midst of processing, so make sure
  273. /// that whatever work you issue to the main thread is light work
  274. /// or you may see short hangs in gameplay.
  275. ///
  276. /// To reiterate this: any code executed through this interface directly
  277. /// adds to frame processing time on the main thread.
  278. ///
  279. /// This method *may* (and is meant to) be called from threads
  280. /// other than the main thread.
  281. static void queueWorkItemOnMainThread( WorkItem* item );
  282. /// Process work items waiting on the main thread's work queue.
  283. ///
  284. /// There is a soft limit imposed on the time this method is allowed
  285. /// to run so as to balance frame-to-frame load. However, work
  286. /// items, once their processing is initiated, will not be suspended
  287. /// and will run for as long as they take to complete, so make sure
  288. /// individual items perform as little work as necessary.
  289. ///
  290. /// @see ThreadPool::getMainThreadThesholdTimeMS
  291. static void processMainThreadWorkItems();
  292. /// Return the interval in which item priorities are updated on the queue.
  293. /// @return update interval in milliseconds.
  294. U32 getQueueUpdateInterval() const
  295. {
  296. return mWorkItemQueue.getUpdateInterval();
  297. }
  298. /// Return the priority increment applied to work items on each passing of the update interval.
  299. F32 getQueueTimeBasedPriorityBoost() const
  300. {
  301. return mWorkItemQueue.getTimeBasedPriorityBoost();
  302. }
  303. /// Set the update interval of the work item queue to the given value.
  304. /// @param milliSeconds Time between updates in milliseconds.
  305. void setQueueUpdateInterval( U32 milliSeconds )
  306. {
  307. mWorkItemQueue.setUpdateInterval( milliSeconds );
  308. }
  309. /// Set the priority increment applied to work items on each update interval.
  310. /// @param value Priority increment. Set to zero to deactivate.
  311. void setQueueTimeBasedPriorityBoost( F32 value )
  312. {
  313. mWorkItemQueue.setTimeBasedPriorityBoost( value );
  314. }
  315. ///
  316. static U32& getMainThreadThresholdTimeMS()
  317. {
  318. return smMainThreadTimeMS;
  319. }
  320. ///
  321. static bool& getForceAllMainThread()
  322. {
  323. return smForceAllMainThread;
  324. }
  325. /// Return the global thread pool singleton.
  326. static ThreadPool& GLOBAL();
  327. };
  328. typedef ThreadPool::Context ThreadContext;
  329. typedef ThreadPool::WorkItem ThreadWorkItem;
  330. struct ThreadPool::GlobalThreadPool : public ThreadPool, public ManagedSingleton< GlobalThreadPool >
  331. {
  332. typedef ThreadPool Parent;
  333. GlobalThreadPool()
  334. : Parent( "GLOBAL" ) {}
  335. // For ManagedSingleton.
  336. static const char* getSingletonName() { return "GlobalThreadPool"; }
  337. };
  338. inline ThreadPool& ThreadPool::GLOBAL()
  339. {
  340. return *( GlobalThreadPool::instance() );
  341. }
  342. #endif // !_THREADPOOL_H_