threadPoolAsyncIO.h 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #ifndef _THREADPOOLASYNCIO_H_
  23. #define _THREADPOOLASYNCIO_H_
  24. #ifndef _THREADPOOL_H_
  25. # include "platform/threads/threadPool.h"
  26. #endif
  27. #ifndef _RAWDATA_H_
  28. # include "core/util/rawData.h"
  29. #endif
  30. #ifndef _TSTREAM_H_
  31. # include "core/stream/tStream.h"
  32. #endif
  33. //RDTODO: I/O error handling
  34. /// @file
  35. /// Thread pool work items for asynchronous stream I/O.
  36. /// Through the use of stream filters, this can be basically used for any
  37. /// type of asynchronous stream processing.
  38. //--------------------------------------------------------------------------
  39. // AsyncIOItem.
  40. //--------------------------------------------------------------------------
  41. /// Abstract superclass of async I/O work items.
  42. ///
  43. /// Supports both offset-based stream I/O as well as I/O on streams with
  44. /// implicit positions. Note that if you use the latter type, make sure
  45. /// that no other thread is messing with the stream at the same time or
  46. /// chaos will ensue.
  47. ///
  48. /// @param T Type of elements being streamed.
  49. template< typename T, class Stream >
  50. class AsyncIOItem : public ThreadPool::WorkItem
  51. {
  52. public:
  53. typedef WorkItem Parent;
  54. typedef T ValueType;
  55. typedef RawDataT< ValueType > BufferType;
  56. typedef U32 OffsetType;
  57. typedef Stream StreamType;
  58. protected:
  59. /// Buffer keeping/receiving the data elements.
  60. BufferType mBuffer;
  61. /// The stream to read from/write to.
  62. StreamType* mStream;
  63. /// Number of elements to read from/write to the stream.
  64. U32 mNumElements;
  65. /// Offset in "mBuffer" from where to read/where to start writing to.
  66. U32 mOffsetInBuffer;
  67. /// Offset in stream from where to read/where to write to.
  68. /// @note This is only meaningful if the stream is an offset I/O
  69. /// stream. For a stream that is can do both types of I/O,
  70. /// explicit offsets are preferred and this value is used.
  71. OffsetType mOffsetInStream;
  72. ///
  73. ValueType* getBufferPtr()
  74. {
  75. return &getBuffer().data[ getOffsetInBuffer() ];
  76. }
  77. public:
  78. ///
  79. /// If the stream uses implicit positioning, then the supplied "offsetInStream"
  80. /// is meaningless and ignored.
  81. AsyncIOItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
  82. ThreadContext* context = 0 )
  83. : Parent( context ),
  84. mStream( stream ),
  85. mNumElements( numElements ),
  86. mOffsetInBuffer( 0 ),
  87. mOffsetInStream( offsetInStream ) {}
  88. /// Construct a read item on "stream" that stores data into the given "buffer".
  89. ///
  90. AsyncIOItem( StreamType* stream, BufferType& buffer, U32 offsetInBuffer,
  91. U32 numElements, OffsetType offsetInStream, bool takeOwnershipOfBuffer = true,
  92. ThreadContext* context = 0 )
  93. : Parent( context ),
  94. mBuffer( buffer ),
  95. mStream( stream ),
  96. mNumElements( numElements ),
  97. mOffsetInBuffer( offsetInBuffer ),
  98. mOffsetInStream( offsetInStream )
  99. {
  100. if( takeOwnershipOfBuffer )
  101. mBuffer.ownMemory = true;
  102. }
  103. /// Return the stream being written to/read from.
  104. StreamType* getStream()
  105. {
  106. return mStream;
  107. }
  108. /// Return the data buffer being written to/read from.
  109. /// @note This may not yet have been allocated.
  110. BufferType& getBuffer()
  111. {
  112. return mBuffer;
  113. }
  114. /// Return the number of elements involved in the transfer.
  115. U32 getNumElements()
  116. {
  117. return mNumElements;
  118. }
  119. /// Return the position in the data buffer at which to start the transfer.
  120. U32 getOffsetInBuffer()
  121. {
  122. return mOffsetInBuffer;
  123. }
  124. /// Return the position in the stream at which to start the transfer.
  125. /// @note Only meaningful for streams that support offset I/O.
  126. OffsetType getOffsetInStream()
  127. {
  128. return mOffsetInStream;
  129. }
  130. };
  131. //--------------------------------------------------------------------------
  132. // AsyncReadItem.
  133. //--------------------------------------------------------------------------
  134. //RDTODO: error handling
  135. /// Work item to asynchronously read from a stream.
  136. ///
  137. /// The given stream type may implement any of the input stream
  138. /// interfaces. Preference is given to IAsyncInputStream, then to
  139. /// IOffsetInputStream, and only if none of these are implemented
  140. /// IInputStream is used.
  141. ///
  142. /// For IAsyncInputStreams, the async read operation is issued immediately
  143. /// on the constructing thread and then picked up on the worker thread.
  144. /// This ensures optimal use of concurrency.
  145. template< typename T, class Stream = IOffsetInputStream< T > >
  146. class AsyncReadItem : public AsyncIOItem< T, Stream >
  147. {
  148. public:
  149. typedef AsyncIOItem< T, Stream > Parent;
  150. typedef typename Parent::StreamType StreamType;
  151. typedef typename Parent::OffsetType OffsetType;
  152. typedef typename Parent::BufferType BufferType;
  153. typedef typename Parent::ValueType ValueType;
  154. /// Construct a read item that reads "numElements" at "offsetInStream"
  155. /// from "stream".
  156. ///
  157. /// Since with this constructor no data buffer is supplied, it will be
  158. /// dynamically allocated by the read() method. Note that this only makes
  159. /// sense if this class is subclassed and processing is done on the buffer
  160. /// after it has been read.
  161. ///
  162. /// @param stream The stream to read from.
  163. /// @param numElement The number of elements to read from the stream.
  164. /// @param offsetInStream The offset at which to read from the stream;
  165. /// ignored if the stream uses implicit positioning
  166. /// @param context The tread pool context to place the item into.
  167. AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
  168. ThreadContext* context = 0 )
  169. : Parent( stream, numElements, offsetInStream, context )
  170. {
  171. _prep();
  172. }
  173. AsyncReadItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
  174. BufferType& buffer, bool takeOwnershipOfBuffer = false,
  175. U32 offsetInBuffer = 0, ThreadContext* context = 0 )
  176. : Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
  177. {
  178. _prep();
  179. }
  180. /// @return The number of elements actually read from the stream.
  181. U32 getNumElementsRead()
  182. {
  183. return mNumElementsRead;
  184. }
  185. protected:
  186. /// Handle of asynchronous stream read, if we are using an async interface.
  187. void* mAsyncHandle;
  188. /// After the read operation has completed, this holds the number of
  189. /// elements actually read from the stream.
  190. U32 mNumElementsRead;
  191. virtual void execute();
  192. void _allocBuffer()
  193. {
  194. if( !this->getBuffer().data )
  195. this->getBuffer().alloc( this->getNumElements() );
  196. }
  197. void _prep()
  198. {
  199. IAsyncInputStream< T >* s = dynamic_cast< IAsyncInputStream< T >* >( this->getStream() );
  200. if( s )
  201. {
  202. _allocBuffer();
  203. mAsyncHandle = s->issueReadAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
  204. }
  205. }
  206. // Helper functions to differentiate between stream types.
  207. void _read( IInputStream< T >* stream )
  208. {
  209. mNumElementsRead = stream->read( this->getBufferPtr(), this->getNumElements() );
  210. }
  211. void _read( IOffsetInputStream< T >* stream )
  212. {
  213. mNumElementsRead = stream->readAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
  214. }
  215. void _read( IAsyncInputStream< T >* stream )
  216. {
  217. stream->tryCompleteReadAt( mAsyncHandle, mNumElementsRead, true );
  218. }
  219. };
  220. template< typename T, class Stream >
  221. void AsyncReadItem< T, Stream >::execute()
  222. {
  223. _allocBuffer();
  224. // Read the data. Do a dynamic cast for any of the
  225. // interfaces we prefer.
  226. if( this->cancellationPoint() ) return;
  227. StreamType* stream = this->getStream();
  228. if( dynamic_cast< IAsyncInputStream< T >* >( stream ) )
  229. _read( ( IAsyncInputStream< T >* ) stream );
  230. else if( dynamic_cast< IOffsetInputStream< T >* >( stream ) )
  231. _read( ( IOffsetInputStream< T >* ) stream );
  232. else
  233. _read( stream );
  234. }
  235. //--------------------------------------------------------------------------
  236. // AsyncWriteItem.
  237. //--------------------------------------------------------------------------
  238. /// Work item for writing to an output stream.
  239. ///
  240. /// The stream being written to may implement any of the given output stream
  241. /// interfaces. Preference is given to IAsyncOutputStream, then to
  242. /// IOffsetOutputStream, and only if none of these is implemented IOutputStream
  243. /// is used.
  244. ///
  245. /// A useful feature is to yield ownership of the data buffer to the
  246. /// write item. This way, this can be pretty much used in a fire-and-forget
  247. /// manner where after submission, no further synchronization happens
  248. /// between the client and the work item.
  249. ///
  250. /// @note Be aware that if writing to an output stream that has an implicit
  251. /// position property, multiple concurrent writes will interfere with each other.
  252. template< typename T, class Stream = IOffsetOutputStream< T > >
  253. class AsyncWriteItem : public AsyncIOItem< T, Stream >
  254. {
  255. public:
  256. typedef AsyncIOItem< T, Stream > Parent;
  257. typedef typename Parent::StreamType StreamType;
  258. typedef typename Parent::OffsetType OffsetType;
  259. typedef typename Parent::BufferType BufferType;
  260. typedef typename Parent::ValueType ValueType;
  261. AsyncWriteItem( StreamType* stream, U32 numElements, OffsetType offsetInStream,
  262. BufferType& buffer, bool takeOwnershipOfBuffer = true,
  263. U32 offsetInBuffer = 0, ThreadContext* context = 0 )
  264. : Parent( stream, buffer, offsetInBuffer, numElements, offsetInStream, takeOwnershipOfBuffer, context )
  265. {
  266. _prep( stream );
  267. }
  268. protected:
  269. /// Handle of asynchronous write operation, if the stream implements IAsyncOutputStream.
  270. void* mAsyncHandle;
  271. virtual void execute();
  272. void _prep( StreamType* stream )
  273. {
  274. IAsyncOutputStream< T >* s = dynamic_cast< IAsyncOutputStream< T >* >( stream );
  275. if( s )
  276. mAsyncHandle = s->issueWriteAt( this->getOffset(), this->getBufferPtr(), this->getNumElements() );
  277. }
  278. void _write( IOutputStream< T >* stream )
  279. {
  280. stream->write( this->getBufferPtr(), this->getNumElements() );
  281. }
  282. void _write( IOffsetOutputStream< T >* stream )
  283. {
  284. stream->writeAt( this->getOffsetInStream(), this->getBufferPtr(), this->getNumElements() );
  285. }
  286. void _write( IAsyncOutputStream< T >* stream )
  287. {
  288. stream->tryCompleteWriteAt( mAsyncHandle, true );
  289. }
  290. };
  291. template< typename T, class Stream >
  292. void AsyncWriteItem< T, Stream >::execute()
  293. {
  294. if( this->cancellationPoint() ) return;
  295. StreamType* stream = this->getStream();
  296. if( dynamic_cast< IAsyncOutputStream< T >* >( stream ) )
  297. _write( ( IAsyncOutputStream< T >* ) stream );
  298. if( dynamic_cast< IOffsetOutputStream< T >* >( stream ) )
  299. _write( ( IOffsetOutputStream< T >* ) stream );
  300. else
  301. _write( stream );
  302. }
  303. #endif // _THREADPOOLASYNCIO_H_