123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313 |
- //-----------------------------------------------------------------------------
- // Copyright (c) 2012 GarageGames, LLC
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to
- // deal in the Software without restriction, including without limitation the
- // rights to use, copy, modify, merge, publish, distribute, sublicense, and/or
- // sell copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
- // FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS
- // IN THE SOFTWARE.
- //-----------------------------------------------------------------------------
- #ifndef _ASYNCPACKETQUEUE_H_
- #define _ASYNCPACKETQUEUE_H_
- #ifndef _TFIXEDSIZEQUEUE_H_
- #include "core/util/tFixedSizeDeque.h"
- #endif
- #ifndef _TSTREAM_H_
- #include "core/stream/tStream.h"
- #endif
- #ifndef _TYPETRAITS_H_
- #include "platform/typetraits.h"
- #endif
- //#define DEBUG_SPEW
- /// @file
- /// Time-based packet streaming.
- ///
- /// The classes contained in this file can be used for any kind
- /// of continuous playback that depends on discrete samplings of
- /// a source stream (i.e. any kind of digital media streaming).
- //--------------------------------------------------------------------------
- // Async packet queue.
- //--------------------------------------------------------------------------
- /// Time-based packet stream queue.
- ///
- /// AsyncPacketQueue writes data packets to a consumer stream in sync to
- /// a tick time source. Outdated packets may optionally be dropped automatically
- /// by the queue. A fixed maximum number of packets can reside in the queue
- /// concurrently at any one time.
- ///
- /// Be aware that using single item queues for synchronizing to a timer
- /// will usually result in bad timing behavior when packet uploading takes
- /// any non-trivial amount of time.
- ///
- /// @note While the queue associates a variable tick count with each
- /// individual packet, the queue fill status is measured in number of
- /// packets rather than in total tick time.
- ///
- /// @param Packet Value type of packets passed through this queue.
- /// @param TimeSource Value type for time tick source to which the queue
- /// is synchronized.
- /// @param Consumer Value type of stream to which the packets are written.
- ///
- template< typename Packet, typename TimeSource = IPositionable< U32 >*, typename Consumer = IOutputStream< Packet >*, typename Tick = U32 >
- class AsyncPacketQueue
- {
- public:
- typedef void Parent;
- /// The type of data packets being streamed through this queue.
- typedef typename TypeTraits< Packet >::BaseType PacketType;
- /// The type of consumer that receives the packets from this queue.
- typedef typename TypeTraits< Consumer >::BaseType ConsumerType;
- ///
- typedef typename TypeTraits< TimeSource >::BaseType TimeSourceType;
-
- /// Type for counting ticks.
- typedef Tick TickType;
- protected:
- /// Information about the time slice covered by an
- /// individual packet currently on the queue.
- struct QueuedPacket
- {
- /// First tick contained in this packet.
- TickType mStartTick;
- /// First tick *not* contained in this packet anymore.
- TickType mEndTick;
- QueuedPacket( TickType start, TickType end )
- : mStartTick( start ), mEndTick( end ) {}
- /// Return the total number of ticks in this packet.
- TickType getNumTicks() const
- {
- return ( mEndTick - mStartTick );
- }
- };
- typedef FixedSizeDeque< QueuedPacket > PacketQueue;
- /// If true, packets that have missed their proper queuing timeframe
- /// will be dropped. If false, they will be queued nonetheless.
- bool mDropPackets;
- /// Total number of ticks spanned by the total queue playback time.
- /// If this is zero, the total queue time is considered to be infinite.
- TickType mTotalTicks;
- /// Total number of ticks submitted to the queue so far.
- TickType mTotalQueuedTicks;
- /// Queue that holds records for each packet currently in the queue. New packets
- /// are added to back.
- PacketQueue mPacketQueue;
- /// The time source to which we are sync'ing.
- TimeSource mTimeSource;
- /// The output stream that this queue feeds into.
- Consumer mConsumer;
- /// Total number of packets queued so far.
- U32 mTotalQueuedPackets;
-
- public:
- /// Construct an AsyncPacketQueue of the given length.
- ///
- /// @param maxQueuedPackets The length of the queue in packets. Only a maximum of
- /// 'maxQueuedPackets' packets can be concurrently in the queue at any one time.
- /// @param timeSource The tick time source to which the queue synchronizes.
- /// @param consumer The output stream that receives the packets in sync to timeSource.
- /// @param totalTicks The total number of ticks that will be played back through the
- /// queue; if 0, the length is considered indefinite.
- /// @param dropPackets Whether the queue should drop outdated packets; if dropped, a
- /// packet will not reach the consumer.
- AsyncPacketQueue( U32 maxQueuedPackets,
- TimeSource timeSource,
- Consumer consumer,
- TickType totalTicks = 0,
- bool dropPackets = false )
- : mDropPackets( dropPackets ),
- mTotalTicks( totalTicks ),
- mTotalQueuedTicks( 0 ),
- mPacketQueue( maxQueuedPackets ),
- mTimeSource( timeSource ),
- mConsumer( consumer )
- {
- mTotalQueuedPackets = 0;
- }
- /// Return true if there are currently
- bool isEmpty() const { return mPacketQueue.isEmpty(); }
- /// Return true if all packets have been streamed.
- bool isAtEnd() const;
- /// Return true if the queue needs one or more new packets to be submitted.
- bool needPacket();
- /// Submit a data packet to the queue.
- ///
- /// @param packet The data packet.
- /// @param packetTicks The duration of the packet in ticks.
- /// @param isLast If true, the packet is the last one in the stream.
- /// @param packetPos The absolute position of the packet in the stream; if this is not supplied
- /// the packet is assumed to immediately follow the preceding packet.
- ///
- /// @return true if the packet has been queued or false if it has been dropped.
- bool submitPacket( Packet packet,
- TickType packetTicks,
- bool isLast = false,
- TickType packetPos = TypeTraits< TickType >::MAX );
- /// Return the current playback position according to the time source.
- TickType getCurrentTick() const { return Deref( mTimeSource ).getPosition(); }
- /// Return the total number of ticks that have been queued so far.
- TickType getTotalQueuedTicks() const { return mTotalQueuedTicks; }
-
- /// Return the total number of packets that have been queued so far.
- U32 getTotalQueuedPackets() const { return mTotalQueuedPackets; }
- };
- template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
- inline bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::isAtEnd() const
- {
- // Never at end if infinite.
- if( !mTotalTicks )
- return false;
- // Otherwise, we're at end if we're past the total tick count.
- return ( getCurrentTick() >= mTotalTicks
- && ( mDropPackets || mTotalQueuedTicks >= mTotalTicks ) );
- }
- template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
- bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::needPacket()
- {
- // Never need more packets once we have reached the
- // end.
- if( isAtEnd() )
- return false;
- // Always needs packets while the queue is not
- // filled up completely.
- if( mPacketQueue.capacity() != 0 )
- return true;
- // Unqueue packets that have expired their playtime.
- TickType currentTick = getCurrentTick();
- while( mPacketQueue.size() && currentTick >= mPacketQueue.front().mEndTick )
- {
- #ifdef DEBUG_SPEW
- Platform::outputDebugString( "[AsyncPacketQueue] expired packet #%i: %i-%i (tick: %i; queue: %i)",
- mTotalQueuedPackets - mPacketQueue.size(),
- U32( mPacketQueue.front().mStartTick ),
- U32( mPacketQueue.front().mEndTick ),
- U32( currentTick ),
- mPacketQueue.size() );
- #endif
-
- mPacketQueue.popFront();
- }
- // Need more packets if the queue isn't full anymore.
- return ( mPacketQueue.capacity() != 0 );
- }
- template< typename Packet, typename TimeSource, typename Consumer, typename Tick >
- bool AsyncPacketQueue< Packet, TimeSource, Consumer, Tick >::submitPacket( Packet packet, TickType packetTicks, bool isLast, TickType packetPos )
- {
- AssertFatal( mPacketQueue.capacity() != 0,
- "AsyncPacketQueue::submitPacket() - Queue is full!" );
- TickType packetStartPos;
- TickType packetEndPos;
-
- if( packetPos != TypeTraits< TickType >::MAX )
- {
- packetStartPos = packetPos;
- packetEndPos = packetPos + packetTicks;
- }
- else
- {
- packetStartPos = mTotalQueuedTicks;
- packetEndPos = mTotalQueuedTicks + packetTicks;
- }
- // Check whether the packet is outdated, if enabled.
- bool dropPacket = false;
- if( mDropPackets )
- {
- TickType currentTick = getCurrentTick();
- if( currentTick >= packetEndPos )
- dropPacket = true;
- }
- #ifdef DEBUG_SPEW
- Platform::outputDebugString( "[AsyncPacketQueue] new packet #%i: %i-%i (ticks: %i, current: %i, queue: %i)%s",
- mTotalQueuedPackets,
- U32( mTotalQueuedTicks ),
- U32( packetEndPos ),
- U32( packetTicks ),
- U32( getCurrentTick() ),
- mPacketQueue.size(),
- dropPacket ? " !! DROPPED !!" : "" );
- #endif
- // Queue the packet.
- if( !dropPacket )
- {
- mPacketQueue.pushBack( QueuedPacket( packetStartPos, packetEndPos ) );
- Deref( mConsumer ).write( &packet, 1 );
- }
- mTotalQueuedTicks = packetEndPos;
- if( isLast && !mTotalTicks )
- mTotalTicks = mTotalQueuedTicks;
-
- mTotalQueuedPackets ++;
-
- return !dropPacket;
- }
- #undef DEBUG_SPEW
- #endif // _ASYNCPACKETQUEUE_H_
|