asyncPacketQueue.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #ifndef _ASYNCPACKETQUEUE_H_
  23. #define _ASYNCPACKETQUEUE_H_
  24. #ifndef _TFIXEDSIZEQUEUE_H_
  25. #include "core/util/tFixedSizeDeque.h"
  26. #endif
  27. #ifndef _TSTREAM_H_
  28. #include "core/stream/tStream.h"
  29. #endif
  30. #ifndef _TYPETRAITS_H_
  31. #include "platform/typetraits.h"
  32. #endif
  33. //#define DEBUG_SPEW
  34. /// @file
  35. /// Time-based packet streaming.
  36. ///
  37. /// The classes contained in this file can be used for any kind
  38. /// of continuous playback that depends on discrete samplings of
  39. /// a source stream (i.e. any kind of digital media streaming).
  40. //--------------------------------------------------------------------------
  41. // Async packet queue.
  42. //--------------------------------------------------------------------------
  43. /// Time-based packet stream queue.
  44. ///
  45. /// AsyncPacketQueue writes data packets to a consumer stream in sync to
  46. /// a tick time source. Outdated packets may optionally be dropped automatically
  47. /// by the queue. A fixed maximum number of packets can reside in the queue
  48. /// concurrently at any one time.
  49. ///
  50. /// Be aware that using single item queues for synchronizing to a timer
  51. /// will usually result in bad timing behavior when packet uploading takes
  52. /// any non-trivial amount of time.
  53. ///
  54. /// @note While the queue associates a variable tick count with each
  55. /// individual packet, the queue fill status is measured in number of
  56. /// packets rather than in total tick time.
  57. ///
  58. /// @param Packet Value type of packets passed through this queue.
  59. /// @param TimeSource Value type for time tick source to which the queue
  60. /// is synchronized.
  61. /// @param Consumer Value type of stream to which the packets are written.
  62. ///
  63. template< typename Packet, typename TimeSource = IPositionable< U32 >*, typename Consumer = IOutputStream< Packet >*, typename Tick = U32 >
  64. class AsyncPacketQueue
  65. {
  66. public:
  67. typedef void Parent;
  68. /// The type of data packets being streamed through this queue.
  69. typedef typename TypeTraits< Packet >::BaseType PacketType;
  70. /// The type of consumer that receives the packets from this queue.
  71. typedef typename TypeTraits< Consumer >::BaseType ConsumerType;
  72. ///
  73. typedef typename TypeTraits< TimeSource >::BaseType TimeSourceType;
  74. /// Type for counting ticks.
  75. typedef Tick TickType;
  76. protected:
  77. /// Information about the time slice covered by an
  78. /// individual packet currently on the queue.
  79. struct QueuedPacket
  80. {
  81. /// First tick contained in this packet.
  82. TickType mStartTick;
  83. /// First tick *not* contained in this packet anymore.
  84. TickType mEndTick;
  85. QueuedPacket( TickType start, TickType end )
  86. : mStartTick( start ), mEndTick( end ) {}
  87. /// Return the total number of ticks in this packet.
  88. TickType getNumTicks() const
  89. {
  90. return ( mEndTick - mStartTick );
  91. }
  92. };
  93. typedef FixedSizeDeque< QueuedPacket > PacketQueue;
  94. /// If true, packets that have missed their proper queuing timeframe
  95. /// will be dropped. If false, they will be queued nonetheless.
  96. bool mDropPackets;
  97. /// Total number of ticks spanned by the total queue playback time.
  98. /// If this is zero, the total queue time is considered to be infinite.
  99. TickType mTotalTicks;
  100. /// Total number of ticks submitted to the queue so far.
  101. TickType mTotalQueuedTicks;
  102. /// Queue that holds records for each packet currently in the queue. New packets
  103. /// are added to back.
  104. PacketQueue mPacketQueue;
  105. /// The time source to which we are sync'ing.
  106. TimeSource mTimeSource;
  107. /// The output stream that this queue feeds into.
  108. Consumer mConsumer;
  109. /// Total number of packets queued so far.
  110. U32 mTotalQueuedPackets;
  111. public:
  112. /// Construct an AsyncPacketQueue of the given length.
  113. ///
  114. /// @param maxQueuedPackets The length of the queue in packets. Only a maximum of
  115. /// 'maxQueuedPackets' packets can be concurrently in the queue at any one time.
  116. /// @param timeSource The tick time source to which the queue synchronizes.
  117. /// @param consumer The output stream that receives the packets in sync to timeSource.
  118. /// @param totalTicks The total number of ticks that will be played back through the
  119. /// queue; if 0, the length is considered indefinite.
  120. /// @param dropPackets Whether the queue should drop outdated packets; if dropped, a
  121. /// packet will not reach the consumer.
  122. AsyncPacketQueue( U32 maxQueuedPackets,
  123. TimeSource timeSource,
  124. Consumer consumer,
  125. TickType totalTicks = 0,
  126. bool dropPackets = false )
  127. : mDropPackets( dropPackets ),
  128. mTotalTicks( totalTicks ),
  129. mTotalQueuedTicks( 0 ),
  130. mPacketQueue( maxQueuedPackets ),
  131. mTimeSource( timeSource ),
  132. mConsumer( consumer )
  133. {
  134. mTotalQueuedPackets = 0;
  135. }
  136. /// Return true if there are currently
  137. bool isEmpty() const { return mPacketQueue.isEmpty(); }
  138. /// Return true if all packets have been streamed.
  139. bool isAtEnd() const;
  140. /// Return true if the queue needs one or more new packets to be submitted.
  141. bool needPacket();
  142. /// Submit a data packet to the queue.
  143. ///
  144. /// @param packet The data packet.
  145. /// @param packetTicks The duration of the packet in ticks.
  146. /// @param isLast If true, the packet is the last one in the stream.
  147. /// @param packetPos The absolute position of the packet in the stream; if this is not supplied
  148. /// the packet is assumed to immediately follow the preceding packet.
  149. ///
  150. /// @return true if the packet has been queued or false if it has been dropped.
  151. bool submitPacket( Packet packet,
  152. TickType packetTicks,
  153. bool isLast = false,
  154. TickType packetPos = TypeTraits< TickType >::MAX );
  155. /// Return the current playback position according to the time source.
  156. TickType getCurrentTick() const { return Deref( mTimeSource ).getPosition(); }
  157. /// Return the total number of ticks that have been queued so far.
  158. TickType getTotalQueuedTicks() const { return mTotalQueuedTicks; }
  159. /// Return the total number of packets that have been queued so far.
  160. U32 getTotalQueuedPackets() const { return mTotalQueuedPackets; }
  161. };
  162. template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
  163. inline bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::isAtEnd() const
  164. {
  165. // Never at end if infinite.
  166. if( !mTotalTicks )
  167. return false;
  168. // Otherwise, we're at end if we're past the total tick count.
  169. return ( getCurrentTick() >= mTotalTicks
  170. && ( mDropPackets || mTotalQueuedTicks >= mTotalTicks ) );
  171. }
  172. template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
  173. bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::needPacket()
  174. {
  175. // Never need more packets once we have reached the
  176. // end.
  177. if( isAtEnd() )
  178. return false;
  179. // Always needs packets while the queue is not
  180. // filled up completely.
  181. if( mPacketQueue.capacity() != 0 )
  182. return true;
  183. // Unqueue packets that have expired their playtime.
  184. TickType currentTick = getCurrentTick();
  185. while( mPacketQueue.size() && currentTick >= mPacketQueue.front().mEndTick )
  186. {
  187. #ifdef DEBUG_SPEW
  188. Platform::outputDebugString( "[AsyncPacketQueue] expired packet #%i: %i-%i (tick: %i; queue: %i)",
  189. mTotalQueuedPackets - mPacketQueue.size(),
  190. U32( mPacketQueue.front().mStartTick ),
  191. U32( mPacketQueue.front().mEndTick ),
  192. U32( currentTick ),
  193. mPacketQueue.size() );
  194. #endif
  195. mPacketQueue.popFront();
  196. }
  197. // Need more packets if the queue isn't full anymore.
  198. return ( mPacketQueue.capacity() != 0 );
  199. }
  200. template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
  201. bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::submitPacket( Packet packet, TickType packetTicks, bool isLast, TickType packetPos )
  202. {
  203. AssertFatal( mPacketQueue.capacity() != 0,
  204. "AsyncPacketQueue::submitPacket() - Queue is full!" );
  205. TickType packetStartPos;
  206. TickType packetEndPos;
  207. if( packetPos != TypeTraits< TickType >::MAX )
  208. {
  209. packetStartPos = packetPos;
  210. packetEndPos = packetPos + packetTicks;
  211. }
  212. else
  213. {
  214. packetStartPos = mTotalQueuedTicks;
  215. packetEndPos = mTotalQueuedTicks + packetTicks;
  216. }
  217. // Check whether the packet is outdated, if enabled.
  218. bool dropPacket = false;
  219. if( mDropPackets )
  220. {
  221. TickType currentTick = getCurrentTick();
  222. if( currentTick >= packetEndPos )
  223. dropPacket = true;
  224. }
  225. #ifdef DEBUG_SPEW
  226. Platform::outputDebugString( "[AsyncPacketQueue] new packet #%i: %i-%i (ticks: %i, current: %i, queue: %i)%s",
  227. mTotalQueuedPackets,
  228. U32( mTotalQueuedTicks ),
  229. U32( packetEndPos ),
  230. U32( packetTicks ),
  231. U32( getCurrentTick() ),
  232. mPacketQueue.size(),
  233. dropPacket ? " !! DROPPED !!" : "" );
  234. #endif
  235. // Queue the packet.
  236. if( !dropPacket )
  237. {
  238. mPacketQueue.pushBack( QueuedPacket( packetStartPos, packetEndPos ) );
  239. Deref( mConsumer ).write( &packet, 1 );
  240. }
  241. mTotalQueuedTicks = packetEndPos;
  242. if( isLast && !mTotalTicks )
  243. mTotalTicks = mTotalQueuedTicks;
  244. mTotalQueuedPackets ++;
  245. return !dropPacket;
  246. }
  247. #undef DEBUG_SPEW
  248. #endif // _ASYNCPACKETQUEUE_H_