2
0

asyncBufferedStream.h 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #ifndef _ASYNCBUFFEREDSTREAM_H_
  23. #define _ASYNCBUFFEREDSTREAM_H_
  24. #ifndef _TSTREAM_H_
  25. #include "core/stream/tStream.h"
  26. #endif
  27. #ifndef _THREADPOOL_H_
  28. #include "platform/threads/threadPool.h"
  29. #endif
  30. #ifndef _THREADSAFEDEQUE_H_
  31. #include "platform/threads/threadSafeDeque.h"
  32. #endif
  33. // Disable nonsense warning about unreferenced
  34. // local function on VC.
  35. #ifdef TORQUE_COMPILER_VISUALC
  36. #pragma warning( disable: 4505 )
  37. #endif
  38. template< typename T, class Stream >
  39. class AsyncBufferedReadItem;
  40. //=============================================================================
  41. // AsyncBufferedInputStream.
  42. //=============================================================================
  43. ///
  44. template< typename T, class Stream = IInputStream< T >* >
  45. class AsyncBufferedInputStream : public IInputStreamFilter< T, Stream >,
  46. public ThreadSafeRefCount< AsyncBufferedInputStream< T, Stream > >
  47. {
  48. public:
  49. typedef IInputStreamFilter< T, Stream > Parent;
  50. /// Type of elements read, buffered, and returned by this stream.
  51. typedef typename Parent::ElementType ElementType;
  52. /// Type of the source stream being read by this stream.
  53. typedef typename Parent::SourceStreamType SourceStreamType;
  54. /// Type of elements being read from the source stream.
  55. ///
  56. /// @note This does not need to correspond to the type of elements buffered
  57. /// in this stream.
  58. typedef typename Parent::SourceElementType SourceElementType;
  59. enum
  60. {
  61. /// The number of elements to buffer in advance by default.
  62. DEFAULT_STREAM_LOOKAHEAD = 3
  63. };
  64. friend class AsyncBufferedReadItem< T, Stream >; // _onArrival
  65. protected:
  66. /// Stream elements are kept on deques that can be concurrently
  67. /// accessed by multiple threads.
  68. typedef ThreadSafeDeque< ElementType > ElementList;
  69. /// If true, the stream will restart over from the beginning once
  70. /// it has been read in entirety.
  71. bool mIsLooping;
  72. /// If true, no further requests should be issued on this stream.
  73. /// @note This in itself doesn't say anything about pending requests.
  74. bool mIsStopped;
  75. /// Number of source elements remaining in the source stream.
  76. U32 mNumRemainingSourceElements;
  77. /// Number of elements currently on buffer list.
  78. U32 mNumBufferedElements;
  79. /// Maximum number of elements allowed on buffer list.
  80. U32 mMaxBufferedElements;
  81. /// List of buffered elements.
  82. ElementList mBufferedElements;
  83. /// The thread pool to which read items are queued.
  84. ThreadPool* mThreadPool;
  85. /// The thread context used for prioritizing read items in the pool.
  86. ThreadContext* mThreadContext;
  87. /// Request the next element from the underlying stream.
  88. virtual void _requestNext() = 0;
  89. /// Called when an element read has been completed on the underlying stream.
  90. virtual void _onArrival( const ElementType& element );
  91. public:
  92. /// Construct a new buffered stream reading from "source".
  93. ///
  94. /// @param stream The source stream from which to read the actual data elements.
  95. /// @param numSourceElementsToRead Total number of elements to read from "stream".
  96. /// @param numReadAhead Number of packets to read and buffer in advance.
  97. /// @param isLooping If true, the packet stream will loop infinitely over the source stream.
  98. /// @param pool The ThreadPool to use for asynchronous packet reads.
  99. /// @param context The ThreadContext to place asynchronous packet reads in.
  100. AsyncBufferedInputStream( const Stream& stream,
  101. U32 numSourceElementsToRead = 0,
  102. U32 numReadAhead = DEFAULT_STREAM_LOOKAHEAD,
  103. bool isLooping = false,
  104. ThreadPool* pool = &ThreadPool::GLOBAL(),
  105. ThreadContext* context = ThreadContext::ROOT_CONTEXT() );
  106. virtual ~AsyncBufferedInputStream();
  107. /// @return true if the stream is looping infinitely.
  108. bool isLooping() const { return mIsLooping; }
  109. /// @return the number of elements that will be read and buffered in advance.
  110. U32 getReadAhead() const { return mMaxBufferedElements; }
  111. /// Initiate the request chain of the element stream.
  112. void start() { _requestNext(); }
  113. /// Call for the request chain of the element stream to stop at the next
  114. /// synchronization point.
  115. void stop() { mIsStopped = true; }
  116. // IInputStream.
  117. virtual U32 read( ElementType* buffer, U32 num );
  118. };
  119. //-----------------------------------------------------------------------------
  120. template< typename T, typename Stream >
  121. AsyncBufferedInputStream< T, Stream >::AsyncBufferedInputStream
  122. ( const Stream& stream,
  123. U32 numSourceElementsToRead,
  124. U32 numReadAhead,
  125. bool isLooping,
  126. ThreadPool* threadPool,
  127. ThreadContext* threadContext )
  128. : Parent( stream ),
  129. mIsLooping( isLooping ),
  130. mIsStopped( false ),
  131. mNumRemainingSourceElements( numSourceElementsToRead ),
  132. mNumBufferedElements( 0 ),
  133. mMaxBufferedElements( numReadAhead ),
  134. mThreadPool( threadPool ),
  135. mThreadContext( threadContext )
  136. {
  137. if( mIsLooping )
  138. {
  139. // Stream is looping so we don't count down source elements.
  140. mNumRemainingSourceElements = 0;
  141. }
  142. else if( !mNumRemainingSourceElements )
  143. {
  144. // If not given number of elements to read, see if the source
  145. // stream is sizeable. If so, take its size as the number of elements.
  146. if( dynamic_cast< ISizeable<>* >( &Deref( stream ) ) )
  147. mNumRemainingSourceElements = ( ( ISizeable<>* ) &Deref( stream ) )->getSize();
  148. else
  149. {
  150. // Can't tell how many source elements there are.
  151. mNumRemainingSourceElements = U32_MAX;
  152. }
  153. }
  154. }
  155. //-----------------------------------------------------------------------------
  156. template< typename T, typename Stream >
  157. AsyncBufferedInputStream< T, Stream >::~AsyncBufferedInputStream()
  158. {
  159. ElementType element;
  160. while( mBufferedElements.tryPopFront( element ) )
  161. destructSingle( element );
  162. }
  163. //-----------------------------------------------------------------------------
  164. template< typename T, typename Stream >
  165. void AsyncBufferedInputStream< T, Stream >::_onArrival( const ElementType& element )
  166. {
  167. mBufferedElements.pushBack( element );
  168. // Adjust buffer count.
  169. while( 1 )
  170. {
  171. S32 numBuffered = mNumBufferedElements;
  172. if( dCompareAndSwap( mNumBufferedElements, numBuffered, numBuffered + 1 ) )
  173. {
  174. // If we haven't run against the lookahead limit and haven't reached
  175. // the end of the stream, immediately trigger a new request.
  176. if( !mIsStopped && ( numBuffered + 1 ) < mMaxBufferedElements )
  177. _requestNext();
  178. break;
  179. }
  180. }
  181. }
  182. //-----------------------------------------------------------------------------
  183. template< typename T, typename Stream >
  184. U32 AsyncBufferedInputStream< T, Stream >::read( ElementType* buffer, U32 num )
  185. {
  186. if( !num )
  187. return 0;
  188. U32 numRead = 0;
  189. for( U32 i = 0; i < num; ++ i )
  190. {
  191. // Try to pop a element off the buffered element list.
  192. ElementType element;
  193. if( mBufferedElements.tryPopFront( element ) )
  194. {
  195. buffer[ i ] = element;
  196. numRead ++;
  197. }
  198. else
  199. break;
  200. }
  201. // Get the request chain going again, if it has stopped.
  202. while( 1 )
  203. {
  204. U32 numBuffered = mNumBufferedElements;
  205. U32 newNumBuffered = numBuffered - numRead;
  206. if( dCompareAndSwap( mNumBufferedElements, numBuffered, newNumBuffered ) )
  207. {
  208. if( numBuffered == mMaxBufferedElements )
  209. _requestNext();
  210. break;
  211. }
  212. }
  213. return numRead;
  214. }
  215. //=============================================================================
  216. // AsyncSingleBufferedInputStream.
  217. //=============================================================================
  218. /// Asynchronous work item for reading an element from the source stream.
  219. template< typename T, typename Stream = IInputStream< T >* >
  220. class AsyncBufferedReadItem : public ThreadWorkItem
  221. {
  222. public:
  223. typedef ThreadWorkItem Parent;
  224. typedef ThreadSafeRef< AsyncBufferedInputStream< T, Stream > > AsyncStreamRef;
  225. protected:
  226. /// The issueing async state.
  227. AsyncStreamRef mAsyncStream;
  228. ///
  229. Stream mSourceStream;
  230. /// The element read from the stream.
  231. T mElement;
  232. // WorkItem
  233. virtual void execute()
  234. {
  235. if( Deref( mSourceStream ).read( &mElement, 1 ) )
  236. {
  237. // Buffer the element.
  238. if( this->cancellationPoint() ) return;
  239. mAsyncStream->_onArrival( mElement );
  240. }
  241. }
  242. virtual void onCancelled()
  243. {
  244. Parent::onCancelled();
  245. destructSingle( mElement );
  246. mAsyncStream = NULL;
  247. }
  248. public:
  249. ///
  250. AsyncBufferedReadItem(
  251. const AsyncStreamRef& asyncStream,
  252. ThreadPool::Context* context = NULL
  253. )
  254. : Parent( context ),
  255. mAsyncStream( asyncStream ),
  256. mSourceStream( asyncStream->getSourceStream() )
  257. {
  258. }
  259. };
  260. /// A stream filter that performs background read-aheads on its source stream
  261. /// and buffers the results.
  262. ///
  263. /// As each element is read in an independent threaded operation, reading an
  264. /// element should invole a certain amount of work for using this class to
  265. /// make sense.
  266. ///
  267. /// @note For looping streams, the stream must implement the IResettable interface.
  268. ///
  269. template< typename T, typename Stream = IInputStream< T >*, class ReadItem = AsyncBufferedReadItem< T, Stream > >
  270. class AsyncSingleBufferedInputStream : public AsyncBufferedInputStream< T, Stream >
  271. {
  272. public:
  273. typedef AsyncBufferedInputStream< T, Stream > Parent;
  274. typedef typename Parent::ElementType ElementType;
  275. typedef typename Parent::SourceElementType SourceElementType;
  276. typedef typename Parent::SourceStreamType SourceStreamType;
  277. protected:
  278. // AsyncBufferedInputStream.
  279. virtual void _requestNext();
  280. /// Create a new work item that reads the next element.
  281. virtual void _newReadItem( ThreadSafeRef< ThreadWorkItem >& outRef )
  282. {
  283. outRef = new ReadItem( this, this->mThreadContext );
  284. }
  285. public:
  286. /// Construct a new buffered stream reading from "source".
  287. ///
  288. /// @param stream The source stream from which to read the actual data elements.
  289. /// @param numSourceElementsToRead Total number of elements to read from "stream".
  290. /// @param numReadAhead Number of packets to read and buffer in advance.
  291. /// @param isLooping If true, the packet stream will loop infinitely over the source stream.
  292. /// @param pool The ThreadPool to use for asynchronous packet reads.
  293. /// @param context The ThreadContext to place asynchronous packet reads in.
  294. AsyncSingleBufferedInputStream( const Stream& stream,
  295. U32 numSourceElementsToRead = 0,
  296. U32 numReadAhead = Parent::DEFAULT_STREAM_LOOKAHEAD,
  297. bool isLooping = false,
  298. ThreadPool* pool = &ThreadPool::GLOBAL(),
  299. ThreadContext* context = ThreadContext::ROOT_CONTEXT() )
  300. : Parent( stream,
  301. numSourceElementsToRead,
  302. numReadAhead,
  303. isLooping,
  304. pool,
  305. context ) {}
  306. };
  307. //-----------------------------------------------------------------------------
  308. template< typename T, typename Stream, class ReadItem >
  309. void AsyncSingleBufferedInputStream< T, Stream, ReadItem >::_requestNext()
  310. {
  311. Stream& stream = this->getSourceStream();
  312. bool isEOS = !this->mNumRemainingSourceElements;
  313. if( isEOS && this->mIsLooping )
  314. {
  315. SourceStreamType* s = &Deref( stream );
  316. dynamic_cast< IResettable* >( s )->reset();
  317. isEOS = false;
  318. }
  319. else if( isEOS )
  320. return;
  321. //TODO: could scale priority depending on feed status
  322. // Queue a stream packet work item.
  323. if( !this->mIsLooping && this->mNumRemainingSourceElements != U32_MAX )
  324. -- this->mNumRemainingSourceElements;
  325. ThreadSafeRef< ThreadWorkItem > workItem;
  326. _newReadItem( workItem );
  327. this->mThreadPool->queueWorkItem( workItem );
  328. }
  329. #endif // !_ASYNCBUFFEREDSTREAM_H_