123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417 |
- //-----------------------------------------------------------------------------
- // Copyright (c) 2012 GarageGames, LLC
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to
- // deal in the Software without restriction, including without limitation the
- // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- // sell copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- // IN THE SOFTWARE.
- //-----------------------------------------------------------------------------
- #ifndef _ASYNCBUFFEREDSTREAM_H_
- #define _ASYNCBUFFEREDSTREAM_H_
- #ifndef _TSTREAM_H_
- #include "core/stream/tStream.h"
- #endif
- #ifndef _THREADPOOL_H_
- #include "platform/threads/threadPool.h"
- #endif
- #ifndef _THREADSAFEDEQUE_H_
- #include "platform/threads/threadSafeDeque.h"
- #endif
- // Disable nonsense warning about unreferenced
- // local function on VC.
- #ifdef TORQUE_COMPILER_VISUALC
- #pragma warning( disable: 4505 )
- #endif
- template< typename T, class Stream >
- class AsyncBufferedReadItem;
- //=============================================================================
- // AsyncBufferedInputStream.
- //=============================================================================
- ///
- template< typename T, class Stream = IInputStream< T >* >
- class AsyncBufferedInputStream : public IInputStreamFilter< T, Stream >,
- public ThreadSafeRefCount< AsyncBufferedInputStream< T, Stream > >
- {
- public:
- typedef IInputStreamFilter< T, Stream > Parent;
-
- /// Type of elements read, buffered, and returned by this stream.
- typedef typename Parent::ElementType ElementType;
-
- /// Type of the source stream being read by this stream.
- typedef typename Parent::SourceStreamType SourceStreamType;
-
- /// Type of elements being read from the source stream.
- ///
- /// @note This does not need to correspond to the type of elements buffered
- /// in this stream.
- typedef typename Parent::SourceElementType SourceElementType;
- enum
- {
- /// The number of elements to buffer in advance by default.
- DEFAULT_STREAM_LOOKAHEAD = 3
- };
-
- friend class AsyncBufferedReadItem< T, Stream >; // _onArrival
- protected:
- /// Stream elements are kept on deques that can be concurrently
- /// accessed by multiple threads.
- typedef ThreadSafeDeque< ElementType > ElementList;
- /// If true, the stream will restart over from the beginning once
- /// it has been read in entirety.
- bool mIsLooping;
- /// If true, no further requests should be issued on this stream.
- /// @note This in itself doesn't say anything about pending requests.
- bool mIsStopped;
- /// Number of source elements remaining in the source stream.
- U32 mNumRemainingSourceElements;
- /// Number of elements currently on buffer list.
- U32 mNumBufferedElements;
- /// Maximum number of elements allowed on buffer list.
- U32 mMaxBufferedElements;
- /// List of buffered elements.
- ElementList mBufferedElements;
- /// The thread pool to which read items are queued.
- ThreadPool* mThreadPool;
- /// The thread context used for prioritizing read items in the pool.
- ThreadContext* mThreadContext;
- /// Request the next element from the underlying stream.
- virtual void _requestNext() = 0;
- /// Called when an element read has been completed on the underlying stream.
- virtual void _onArrival( const ElementType& element );
- public:
- /// Construct a new buffered stream reading from "source".
- ///
- /// @param stream The source stream from which to read the actual data elements.
- /// @param numSourceElementsToRead Total number of elements to read from "stream".
- /// @param numReadAhead Number of packets to read and buffer in advance.
- /// @param isLooping If true, the packet stream will loop infinitely over the source stream.
- /// @param pool The ThreadPool to use for asynchronous packet reads.
- /// @param context The ThreadContext to place asynchronous packet reads in.
- AsyncBufferedInputStream( const Stream& stream,
- U32 numSourceElementsToRead = 0,
- U32 numReadAhead = DEFAULT_STREAM_LOOKAHEAD,
- bool isLooping = false,
- ThreadPool* pool = &ThreadPool::GLOBAL(),
- ThreadContext* context = ThreadContext::ROOT_CONTEXT() );
- virtual ~AsyncBufferedInputStream();
-
- /// @return true if the stream is looping infinitely.
- bool isLooping() const { return mIsLooping; }
- /// @return the number of elements that will be read and buffered in advance.
- U32 getReadAhead() const { return mMaxBufferedElements; }
- /// Initiate the request chain of the element stream.
- void start() { _requestNext(); }
- /// Call for the request chain of the element stream to stop at the next
- /// synchronization point.
- void stop() { mIsStopped = true; }
- // IInputStream.
- virtual U32 read( ElementType* buffer, U32 num );
- };
- //-----------------------------------------------------------------------------
- template< typename T, typename Stream >
- AsyncBufferedInputStream< T, Stream >::AsyncBufferedInputStream
- ( const Stream& stream,
- U32 numSourceElementsToRead,
- U32 numReadAhead,
- bool isLooping,
- ThreadPool* threadPool,
- ThreadContext* threadContext )
- : Parent( stream ),
- mIsLooping( isLooping ),
- mIsStopped( false ),
- mNumRemainingSourceElements( numSourceElementsToRead ),
- mNumBufferedElements( 0 ),
- mMaxBufferedElements( numReadAhead ),
- mThreadPool( threadPool ),
- mThreadContext( threadContext )
- {
- if( mIsLooping )
- {
- // Stream is looping so we don't count down source elements.
-
- mNumRemainingSourceElements = 0;
- }
- else if( !mNumRemainingSourceElements )
- {
- // If not given number of elements to read, see if the source
- // stream is sizeable. If so, take its size as the number of elements.
-
- if( dynamic_cast< ISizeable<>* >( &Deref( stream ) ) )
- mNumRemainingSourceElements = ( ( ISizeable<>* ) &Deref( stream ) )->getSize();
- else
- {
- // Can't tell how many source elements there are.
-
- mNumRemainingSourceElements = U32_MAX;
- }
- }
- }
- //-----------------------------------------------------------------------------
- template< typename T, typename Stream >
- AsyncBufferedInputStream< T, Stream >::~AsyncBufferedInputStream()
- {
- ElementType element;
- while( mBufferedElements.tryPopFront( element ) )
- destructSingle( element );
- }
- //-----------------------------------------------------------------------------
- template< typename T, typename Stream >
- void AsyncBufferedInputStream< T, Stream >::_onArrival( const ElementType& element )
- {
- mBufferedElements.pushBack( element );
-
- // Adjust buffer count.
- while( 1 )
- {
- S32 numBuffered = mNumBufferedElements;
- if( dCompareAndSwap( mNumBufferedElements, numBuffered, numBuffered + 1 ) )
- {
- // If we haven't run against the lookahead limit and haven't reached
- // the end of the stream, immediately trigger a new request.
-
- if( !mIsStopped && ( numBuffered + 1 ) < mMaxBufferedElements )
- _requestNext();
-
- break;
- }
- }
- }
- //-----------------------------------------------------------------------------
- template< typename T, typename Stream >
- U32 AsyncBufferedInputStream< T, Stream >::read( ElementType* buffer, U32 num )
- {
- if( !num )
- return 0;
- U32 numRead = 0;
- for( U32 i = 0; i < num; ++ i )
- {
- // Try to pop a element off the buffered element list.
-
- ElementType element;
- if( mBufferedElements.tryPopFront( element ) )
- {
- buffer[ i ] = element;
- numRead ++;
- }
- else
- break;
- }
- // Get the request chain going again, if it has stopped.
-
- while( 1 )
- {
- U32 numBuffered = mNumBufferedElements;
- U32 newNumBuffered = numBuffered - numRead;
-
- if( dCompareAndSwap( mNumBufferedElements, numBuffered, newNumBuffered ) )
- {
- if( numBuffered == mMaxBufferedElements )
- _requestNext();
-
- break;
- }
- }
- return numRead;
- }
- //=============================================================================
- // AsyncSingleBufferedInputStream.
- //=============================================================================
- /// Asynchronous work item for reading an element from the source stream.
- template< typename T, typename Stream = IInputStream< T >* >
- class AsyncBufferedReadItem : public ThreadWorkItem
- {
- public:
- typedef ThreadWorkItem Parent;
- typedef ThreadSafeRef< AsyncBufferedInputStream< T, Stream > > AsyncStreamRef;
-
- protected:
- /// The issueing async state.
- AsyncStreamRef mAsyncStream;
-
- ///
- Stream mSourceStream;
- /// The element read from the stream.
- T mElement;
- // WorkItem
- virtual void execute()
- {
- if( Deref( mSourceStream ).read( &mElement, 1 ) )
- {
- // Buffer the element.
- if( this->cancellationPoint() ) return;
- mAsyncStream->_onArrival( mElement );
- }
- }
- virtual void onCancelled()
- {
- Parent::onCancelled();
- destructSingle( mElement );
- mAsyncStream = NULL;
- }
-
- public:
- ///
- AsyncBufferedReadItem(
- const AsyncStreamRef& asyncStream,
- ThreadPool::Context* context = NULL
- )
- : Parent( context ),
- mAsyncStream( asyncStream ),
- mSourceStream( asyncStream->getSourceStream() )
- {
- }
- };
- /// A stream filter that performs background read-aheads on its source stream
- /// and buffers the results.
- ///
- /// As each element is read in an independent threaded operation, reading an
- /// element should invole a certain amount of work for using this class to
- /// make sense.
- ///
- /// @note For looping streams, the stream must implement the IResettable interface.
- ///
- template< typename T, typename Stream = IInputStream< T >*, class ReadItem = AsyncBufferedReadItem< T, Stream > >
- class AsyncSingleBufferedInputStream : public AsyncBufferedInputStream< T, Stream >
- {
- public:
- typedef AsyncBufferedInputStream< T, Stream > Parent;
- typedef typename Parent::ElementType ElementType;
- typedef typename Parent::SourceElementType SourceElementType;
- typedef typename Parent::SourceStreamType SourceStreamType;
-
- protected:
-
- // AsyncBufferedInputStream.
- virtual void _requestNext();
- /// Create a new work item that reads the next element.
- virtual void _newReadItem( ThreadSafeRef< ThreadWorkItem >& outRef )
- {
- outRef = new ReadItem( this, this->mThreadContext );
- }
-
- public:
- /// Construct a new buffered stream reading from "source".
- ///
- /// @param stream The source stream from which to read the actual data elements.
- /// @param numSourceElementsToRead Total number of elements to read from "stream".
- /// @param numReadAhead Number of packets to read and buffer in advance.
- /// @param isLooping If true, the packet stream will loop infinitely over the source stream.
- /// @param pool The ThreadPool to use for asynchronous packet reads.
- /// @param context The ThreadContext to place asynchronous packet reads in.
- AsyncSingleBufferedInputStream( const Stream& stream,
- U32 numSourceElementsToRead = 0,
- U32 numReadAhead = Parent::DEFAULT_STREAM_LOOKAHEAD,
- bool isLooping = false,
- ThreadPool* pool = &ThreadPool::GLOBAL(),
- ThreadContext* context = ThreadContext::ROOT_CONTEXT() )
- : Parent( stream,
- numSourceElementsToRead,
- numReadAhead,
- isLooping,
- pool,
- context ) {}
- };
- //-----------------------------------------------------------------------------
- template< typename T, typename Stream, class ReadItem >
- void AsyncSingleBufferedInputStream< T, Stream, ReadItem >::_requestNext()
- {
- Stream& stream = this->getSourceStream();
- bool isEOS = !this->mNumRemainingSourceElements;
- if( isEOS && this->mIsLooping )
- {
- SourceStreamType* s = &Deref( stream );
- dynamic_cast< IResettable* >( s )->reset();
- isEOS = false;
- }
- else if( isEOS )
- return;
- //TODO: could scale priority depending on feed status
- // Queue a stream packet work item.
- if( !this->mIsLooping && this->mNumRemainingSourceElements != U32_MAX )
- -- this->mNumRemainingSourceElements;
-
- ThreadSafeRef< ThreadWorkItem > workItem;
- _newReadItem( workItem );
- this->mThreadPool->queueWorkItem( workItem );
- }
- #endif // !_ASYNCBUFFEREDSTREAM_H_
|