asyncPacketQueue.h 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) 2012 GarageGames, LLC
  3. //
  4. // Permission is hereby granted, free of charge, to any person obtaining a copy
  5. // of this software and associated documentation files (the "Software"), to
  6. // deal in the Software without restriction, including without limitation the
  7. // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
  8. // sell copies of the Software, and to permit persons to whom the Software is
  9. // furnished to do so, subject to the following conditions:
  10. //
  11. // The above copyright notice and this permission notice shall be included in
  12. // all copies or substantial portions of the Software.
  13. //
  14. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  15. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  16. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  17. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  18. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
  19. // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
  20. // IN THE SOFTWARE.
  21. //-----------------------------------------------------------------------------
  22. #ifndef _ASYNCPACKETQUEUE_H_
  23. #define _ASYNCPACKETQUEUE_H_
  24. #ifndef _TFIXEDSIZEQUEUE_H_
  25. #include "core/util/tFixedSizeDeque.h"
  26. #endif
  27. #ifndef _TSTREAM_H_
  28. #include "core/stream/tStream.h"
  29. #endif
  30. #ifndef _TYPETRAITS_H_
  31. #include "platform/typetraits.h"
  32. #endif
  33. //#define DEBUG_SPEW
  34. /// @file
  35. /// Time-based packet streaming.
  36. ///
  37. /// The classes contained in this file can be used for any kind
  38. /// of continuous playback that depends on discrete samplings of
  39. /// a source stream (i.e. any kind of digital media streaming).
  40. //--------------------------------------------------------------------------
  41. // Async packet queue.
  42. //--------------------------------------------------------------------------
  43. /// Time-based packet stream queue.
  44. ///
  45. /// AsyncPacketQueue writes data packets to a consumer stream in sync to
  46. /// a tick time source. Outdated packets may optionally be dropped automatically
  47. /// by the queue. A fixed maximum number of packets can reside in the queue
  48. /// concurrently at any one time.
  49. ///
  50. /// Be aware that using single item queues for synchronizing to a timer
  51. /// will usually result in bad timing behavior when packet uploading takes
  52. /// any non-trivial amount of time.
  53. ///
  54. /// @note While the queue associates a variable tick count with each
  55. /// individual packet, the queue fill status is measured in number of
  56. /// packets rather than in total tick time.
  57. ///
  58. /// @param Packet Value type of packets passed through this queue.
  59. /// @param TimeSource Value type for time tick source to which the queue
  60. /// is synchronized.
  61. /// @param Consumer Value type of stream to which the packets are written.
  62. ///
  63. template< typename Packet, typename TimeSource = IPositionable< U32 >*, typename Consumer = IOutputStream< Packet >*, typename Tick = U32 >
  64. class AsyncPacketQueue
  65. {
  66. public:
  67. typedef void Parent;
  68. /// The type of data packets being streamed through this queue.
  69. typedef typename TypeTraits< Packet >::BaseType PacketType;
  70. /// The type of consumer that receives the packets from this queue.
  71. typedef typename TypeTraits< Consumer >::BaseType ConsumerType;
  72. ///
  73. typedef typename TypeTraits< TimeSource >::BaseType TimeSourceType;
  74. /// Type for counting ticks.
  75. typedef Tick TickType;
  76. protected:
  77. /// Information about the time slice covered by an
  78. /// individual packet currently on the queue.
  79. struct QueuedPacket
  80. {
  81. /// First tick contained in this packet.
  82. TickType mStartTick;
  83. /// First tick *not* contained in this packet anymore.
  84. TickType mEndTick;
  85. QueuedPacket( TickType start, TickType end )
  86. : mStartTick( start ), mEndTick( end ) {}
  87. /// Return the total number of ticks in this packet.
  88. TickType getNumTicks() const
  89. {
  90. return ( mEndTick - mStartTick );
  91. }
  92. };
  93. typedef FixedSizeDeque< QueuedPacket > PacketQueue;
  94. /// If true, packets that have missed their proper queuing timeframe
  95. /// will be dropped. If false, they will be queued nonetheless.
  96. bool mDropPackets;
  97. /// Total number of ticks spanned by the total queue playback time.
  98. /// If this is zero, the total queue time is considered to be infinite.
  99. TickType mTotalTicks;
  100. /// Total number of ticks submitted to the queue so far.
  101. TickType mTotalQueuedTicks;
  102. /// Queue that holds records for each packet currently in the queue. New packets
  103. /// are added to back.
  104. PacketQueue mPacketQueue;
  105. /// The time source to which we are sync'ing.
  106. TimeSource mTimeSource;
  107. /// The output stream that this queue feeds into.
  108. Consumer mConsumer;
  109. /// Total number of packets queued so far.
  110. U32 mTotalQueuedPackets;
  111. public:
  112. /// Construct an AsyncPacketQueue of the given length.
  113. ///
  114. /// @param maxQueuedPackets The length of the queue in packets. Only a maximum of
  115. /// 'maxQueuedPackets' packets can be concurrently in the queue at any one time.
  116. /// @param timeSource The tick time source to which the queue synchronizes.
  117. /// @param consumer The output stream that receives the packets in sync to timeSource.
  118. /// @param totalTicks The total number of ticks that will be played back through the
  119. /// queue; if 0, the length is considered indefinite.
  120. /// @param dropPackets Whether the queue should drop outdated packets; if dropped, a
  121. /// packet will not reach the consumer.
  122. AsyncPacketQueue( U32 maxQueuedPackets,
  123. TimeSource timeSource,
  124. Consumer consumer,
  125. TickType totalTicks = 0,
  126. bool dropPackets = false )
  127. : mDropPackets( dropPackets ),
  128. mTotalTicks( totalTicks ),
  129. mTotalQueuedTicks( 0 ),
  130. mPacketQueue( maxQueuedPackets ),
  131. mTimeSource( timeSource ),
  132. mConsumer( consumer )
  133. {
  134. #ifdef TORQUE_DEBUG
  135. mTotalQueuedPackets = 0;
  136. #endif
  137. }
  138. /// Return true if there are currently
  139. bool isEmpty() const { return mPacketQueue.isEmpty(); }
  140. /// Return true if all packets have been streamed.
  141. bool isAtEnd() const;
  142. /// Return true if the queue needs one or more new packets to be submitted.
  143. bool needPacket();
  144. /// Submit a data packet to the queue.
  145. ///
  146. /// @param packet The data packet.
  147. /// @param packetTicks The duration of the packet in ticks.
  148. /// @param isLast If true, the packet is the last one in the stream.
  149. /// @param packetPos The absolute position of the packet in the stream; if this is not supplied
  150. /// the packet is assumed to immediately follow the preceding packet.
  151. ///
  152. /// @return true if the packet has been queued or false if it has been dropped.
  153. bool submitPacket( Packet packet,
  154. TickType packetTicks,
  155. bool isLast = false,
  156. TickType packetPos = TypeTraits< TickType >::MAX );
  157. /// Return the current playback position according to the time source.
  158. TickType getCurrentTick() const { return Deref( mTimeSource ).getPosition(); }
  159. /// Return the total number of ticks that have been queued so far.
  160. TickType getTotalQueuedTicks() const { return mTotalQueuedTicks; }
  161. /// Return the total number of packets that have been queued so far.
  162. U32 getTotalQueuedPackets() const { return mTotalQueuedPackets; }
  163. };
  164. template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
  165. inline bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::isAtEnd() const
  166. {
  167. // Never at end if infinite.
  168. if( !mTotalTicks )
  169. return false;
  170. // Otherwise, we're at end if we're past the total tick count.
  171. return ( getCurrentTick() >= mTotalTicks
  172. && ( mDropPackets || mTotalQueuedTicks >= mTotalTicks ) );
  173. }
  174. template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
  175. bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::needPacket()
  176. {
  177. // Never need more packets once we have reached the
  178. // end.
  179. if( isAtEnd() )
  180. return false;
  181. // Always needs packets while the queue is not
  182. // filled up completely.
  183. if( mPacketQueue.capacity() != 0 )
  184. return true;
  185. // Unqueue packets that have expired their playtime.
  186. TickType currentTick = getCurrentTick();
  187. while( mPacketQueue.size() && currentTick >= mPacketQueue.front().mEndTick )
  188. {
  189. #ifdef DEBUG_SPEW
  190. Platform::outputDebugString( "[AsyncPacketQueue] expired packet #%i: %i-%i (tick: %i; queue: %i)",
  191. mTotalQueuedPackets - mPacketQueue.size(),
  192. U32( mPacketQueue.front().mStartTick ),
  193. U32( mPacketQueue.front().mEndTick ),
  194. U32( currentTick ),
  195. mPacketQueue.size() );
  196. #endif
  197. mPacketQueue.popFront();
  198. }
  199. // Need more packets if the queue isn't full anymore.
  200. return ( mPacketQueue.capacity() != 0 );
  201. }
  202. template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
  203. bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::submitPacket( Packet packet, TickType packetTicks, bool isLast, TickType packetPos )
  204. {
  205. AssertFatal( mPacketQueue.capacity() != 0,
  206. "AsyncPacketQueue::submitPacket() - Queue is full!" );
  207. TickType packetStartPos;
  208. TickType packetEndPos;
  209. if( packetPos != TypeTraits< TickType >::MAX )
  210. {
  211. packetStartPos = packetPos;
  212. packetEndPos = packetPos + packetTicks;
  213. }
  214. else
  215. {
  216. packetStartPos = mTotalQueuedTicks;
  217. packetEndPos = mTotalQueuedTicks + packetTicks;
  218. }
  219. // Check whether the packet is outdated, if enabled.
  220. bool dropPacket = false;
  221. if( mDropPackets )
  222. {
  223. TickType currentTick = getCurrentTick();
  224. if( currentTick >= packetEndPos )
  225. dropPacket = true;
  226. }
  227. #ifdef DEBUG_SPEW
  228. Platform::outputDebugString( "[AsyncPacketQueue] new packet #%i: %i-%i (ticks: %i, current: %i, queue: %i)%s",
  229. mTotalQueuedPackets,
  230. U32( mTotalQueuedTicks ),
  231. U32( packetEndPos ),
  232. U32( packetTicks ),
  233. U32( getCurrentTick() ),
  234. mPacketQueue.size(),
  235. dropPacket ? " !! DROPPED !!" : "" );
  236. #endif
  237. // Queue the packet.
  238. if( !dropPacket )
  239. {
  240. mPacketQueue.pushBack( QueuedPacket( packetStartPos, packetEndPos ) );
  241. Deref( mConsumer ).write( &packet, 1 );
  242. }
  243. mTotalQueuedTicks = packetEndPos;
  244. if( isLast && !mTotalTicks )
  245. mTotalTicks = mTotalQueuedTicks;
  246. mTotalQueuedPackets ++;
  247. return !dropPacket;
  248. }
  249. #undef DEBUG_SPEW
  250. #endif // _ASYNCPACKETQUEUE_H_