//----------------------------------------------------------------------------- // Copyright (c) 2012 GarageGames, LLC // // Permission is hereby granted, free of charge, to any person obtaining a copy // of this software and associated documentation files (the "Software"), to // deal in the Software without restriction, including without limitation the // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or // sell copies of the Software, and to permit persons to whom the Software is // furnished to do so, subject to the following conditions: // // The above copyright notice and this permission notice shall be included in // all copies or substantial portions of the Software. // // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS // IN THE SOFTWARE. //----------------------------------------------------------------------------- #ifndef _THREADPOOL_H_ #define _THREADPOOL_H_ #ifndef _THREADSAFEREFCOUNT_H_ #include "platform/threads/threadSafeRefCount.h" #endif #ifndef _THREADSAFEPRIORITYQUEUE_H_ #include "platform/threads/threadSafePriorityQueue.h" #endif #ifndef _PLATFORM_THREAD_SEMAPHORE_H_ #include "platform/threads/semaphore.h" #endif #ifndef _TSINGLETON_H_ #include "core/util/tSingleton.h" #endif /// @file /// Interface for an asynchronous work manager. /// Asynchronous work manager. /// /// Thread pooling allows to submit work items for background execution. /// Each work item will be placed on a queue and, based on a total priority /// ordering, executed when it has the highest priority and a worker thread /// becomes available. /// /// @note The global pool maintains the invariant that only the main thread /// may submit items in order to be able to flush the item queue reliably /// from the main thread itself. If other threads were issuing items to /// the queue, the queue may never empty out and the main thread will /// deadlock. /// /// Flushing is the simplest method to guarantee that no asynchronous /// operation is pending in a specific case (deletion of the target object /// being the most common case). However, when possible, avoid this /// situation and design your work items to operate independently, /// e.g. by having only a single point of access to data that may have /// disappeared in the meantime and putting a check around that single /// access so that the item will silently die when its target object has /// disappeared. /// /// The cleanest safe solution to this is to create a separate concurrently /// reference-counted structure that holds all interfacing state and /// functionality shared between a work item and its issueing code. This way /// the host object can safely disappear with the interfacing structure /// automatically being released once the last concurrent work item has been /// processed or discarded. /// class ThreadPool { public: /// A ThreadPool context defines a logical context in which WorkItems are /// being executed. Their primary use is for biasing priorities of /// WorkItems. /// /// Contexts are arranged in a tree hierarchy. Each parent node's priority /// bias scales all the priority biases underneath it. /// /// Note that instances of this class are meant to be instantiated /// globally only. /// class Context { protected: /// Superordinate context; scales this context's priority bias. Context* mParent; /// First child. Context* mChildren; /// Next sibling in child chain. Context* mSibling; /// Name of this context. Should be unique in parent namespace. const char* mName; /// Priority scale factor of this context. F32 mPriorityBias; /// Accumulated scale factor. F32 mAccumulatedPriorityBias; /// The root context; does not modify priorities. All contexts should be direct or indirect children of this one. static Context smRootContext; /// Recursively update cached accumulated priority biases. void updateAccumulatedPriorityBiases(); public: Context( const char* name, Context* parent, F32 priorityBias ); ~Context(); /// Return the name of the worker threading context. const char* getName() const { return mName; } /// Return the context's own work item priority bias. F32 getPriorityBias() const { return mPriorityBias; } /// Return the superordinate node to the current context. Context* getParent() const { return mParent; } /// Return the next sibling to the current context. Context* getSibling() const { return mSibling; } /// Return the first child context. Context* getChildren() const { return mChildren; } /// Return the root context. static Context* ROOT_CONTEXT() { return &smRootContext; } /// F32 getAccumulatedPriorityBias(); /// Context* getChild( const char* name ); /// void setPriorityBias( F32 value ); }; /// An action to execute on a worker thread from the pool. /// /// Work items are concurrently reference-counted and will be /// automatically released once the last reference disappears. /// class WorkItem : public ThreadSafeRefCount< WorkItem > { public: typedef ThreadSafeRefCount< WorkItem > Parent; protected: /// The work context of this item. Context* mContext; /// Mark a point in a work item's execution where the item can /// be safely cancelled. /// /// This method should be called by subclasses' execute() methods /// whenever an item can be safely cancelled. When it returns true, /// the work item should exit from its execute() method. bool cancellationPoint(); /// Called when the item has been cancelled. virtual void onCancelled() {} /// Execute the actions associated with this work item. /// This is the primary function to implement by subclasses. virtual void execute() = 0; /// This flag is set after the execute() method has completed. bool mExecuted; public: /// Construct a new work item. /// /// @param context The work context in which the item should be placed. /// If NULL, the root context will be used. WorkItem( Context* context = 0 ) : mContext( context ? context : Context::ROOT_CONTEXT() ), mExecuted( false ) { } virtual ~WorkItem() {} /// Return the work context associated with the work item. inline Context* getContext() const { return mContext; } /// Process the work item. void process(); /// Return true if the work item should be cancelled. /// /// This method can be overridden by subclasses. It's value will be /// checked each time cancellationPoint() is called. When it returns /// true, the item's process() method will exit automatically. /// /// @return true, if item should be cancelled; default is false. /// @see ThreadPool::WorkItem::cancellationPoint virtual bool isCancellationRequested(); /// Return the item's base priority value. /// @return item priority; defaults to 1.0. virtual F32 getPriority(); /// Has this work item been executed already? bool hasExecuted() const { return mExecuted; } }; typedef ThreadSafeRef< WorkItem > WorkItemPtr; struct GlobalThreadPool; protected: struct WorkItemWrapper; struct WorkerThread; friend struct WorkerThread; // mSemaphore, mNumThreadsAwake, mThreads typedef ThreadSafePriorityQueueWithUpdate< WorkItemWrapper, F32 > QueueType; /// Name of this pool. Mainly for debugging. Used to name worker threads. String mName; /// Number of worker threads spawned by the pool. U32 mNumThreads; /// Number of worker threads in non-sleeping state. U32 mNumThreadsAwake; /// Number of worker threads guaranteed to be non-blocking. U32 mNumThreadsReady; /// Number of work items that have not yet completed execution. U32 mNumPendingItems; /// Semaphore used to wake up threads, if necessary. Semaphore mSemaphore; /// Threaded priority queue for concurrent access by worker threads. QueueType mWorkItemQueue; /// List of worker threads. WorkerThread* mThreads; /// Force all work items to execute on main thread; /// turns this into a single-threaded system. /// Primarily useful to find whether malfunctions are caused /// by parallel execution or not. static bool smForceAllMainThread; /// static U32 smMainThreadTimeMS; /// Work queue for main thread; can be used to ping back work items to /// main thread that need processing that can only happen on main thread. static QueueType smMainThreadQueue; public: /// Create a new thread pool with the given number of worker threads. /// /// If numThreads is zero (the default), the number of threads created /// will be based on the number of CPU cores available. /// /// @param numThreads Number of threads to create or zero for default. ThreadPool( const char* name, U32 numThreads = 0 ); ~ThreadPool(); /// Manually shutdown threads outside of static destructors. void shutdown(); /// void queueWorkItem( WorkItem* item ); /// /// For the global pool, it is very important to only ever call /// this function on the main thread and to let work items only ever /// come from the main thread. Otherwise this function has the potential /// of dead-locking as new work items may constantly be fed to the queue /// without it ever getting empty. /// /// @param timeOut Soft limit on the number of milliseconds to wait for /// the queue to flush out. -1 = infinite. void flushWorkItems( S32 timeOut = -1 ); /// If you're using a non-global thread pool to parallelise some work, you /// may want to block until all the parallel work is complete. As with /// flushWorkItems, this method may block indefinitely if new items keep /// getting added to the pool before old ones finish. /// /// This method will not wait for items queued on the main thread using /// queueWorkItemOnMainThread! /// /// @param timeOut Soft limit on the number of milliseconds to wait for /// all items to complete. -1 = infinite. void waitForAllItems( S32 timeOut = -1 ); /// Add a work item to the main thread's work queue. /// /// The main thread's work queue will be processed each frame using /// a set timeout to limit the work being done. Nonetheless, work /// items will not be suspended in-midst of processing, so make sure /// that whatever work you issue to the main thread is light work /// or you may see short hangs in gameplay. /// /// To reiterate this: any code executed through this interface directly /// adds to frame processing time on the main thread. /// /// This method *may* (and is meant to) be called from threads /// other than the main thread. static void queueWorkItemOnMainThread( WorkItem* item ); /// Process work items waiting on the main thread's work queue. /// /// There is a soft limit imposed on the time this method is allowed /// to run so as to balance frame-to-frame load. However, work /// items, once their processing is initiated, will not be suspended /// and will run for as long as they take to complete, so make sure /// individual items perform as little work as necessary. /// /// @see ThreadPool::getMainThreadThesholdTimeMS static void processMainThreadWorkItems(); /// Return the interval in which item priorities are updated on the queue. /// @return update interval in milliseconds. U32 getQueueUpdateInterval() const { return mWorkItemQueue.getUpdateInterval(); } /// Return the priority increment applied to work items on each passing of the update interval. F32 getQueueTimeBasedPriorityBoost() const { return mWorkItemQueue.getTimeBasedPriorityBoost(); } /// Set the update interval of the work item queue to the given value. /// @param milliSeconds Time between updates in milliseconds. void setQueueUpdateInterval( U32 milliSeconds ) { mWorkItemQueue.setUpdateInterval( milliSeconds ); } /// Set the priority increment applied to work items on each update interval. /// @param value Priority increment. Set to zero to deactivate. void setQueueTimeBasedPriorityBoost( F32 value ) { mWorkItemQueue.setTimeBasedPriorityBoost( value ); } /// static U32& getMainThreadThresholdTimeMS() { return smMainThreadTimeMS; } /// static bool& getForceAllMainThread() { return smForceAllMainThread; } /// Return the global thread pool singleton. static ThreadPool& GLOBAL(); }; typedef ThreadPool::Context ThreadContext; typedef ThreadPool::WorkItem ThreadWorkItem; struct ThreadPool::GlobalThreadPool : public ThreadPool, public ManagedSingleton< GlobalThreadPool > { typedef ThreadPool Parent; GlobalThreadPool() : Parent( "GLOBAL" ) {} // For ManagedSingleton. static const char* getSingletonName() { return "GlobalThreadPool"; } }; inline ThreadPool& ThreadPool::GLOBAL() { return *( GlobalThreadPool::instance() ); } #endif // !_THREADPOOL_H_