| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394 |
- // Copyright (c) 2006-2018 Maxim Khizhinsky
- //
- // Distributed under the Boost Software License, Version 1.0. (See accompanying
- // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
- #ifndef CDSLIB_CONTAINER_RWQUEUE_H
- #define CDSLIB_CONTAINER_RWQUEUE_H
- #include <cds/sync/spinlock.h>
- #include <cds/opt/options.h>
- #include <cds/details/allocator.h>
- #include <mutex> // unique_lock
- #include <memory>
- namespace cds { namespace container {
- /// RWQueue related definitions
- /** @ingroup cds_nonintrusive_helper
- */
- namespace rwqueue {
- /// RWQueue default type traits
- struct traits
- {
- /// Lock policy
- typedef cds::sync::spin lock_type;
- /// Node allocator
- typedef CDS_DEFAULT_ALLOCATOR allocator;
- /// Item counting feature; by default, disabled. Use \p cds::atomicity::item_counter to enable item counting
- typedef cds::atomicity::empty_item_counter item_counter;
- /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
- enum { padding = opt::cache_line_padding };
- };
- /// Metafunction converting option list to \p rwqueue::traits
- /**
- Supported \p Options are:
- - opt::lock_type - lock policy, default is \p cds::sync::spin. Any type satisfied \p Mutex C++ concept may be used.
- - opt::allocator - allocator (like \p std::allocator) used for allocating queue nodes. Default is \ref CDS_DEFAULT_ALLOCATOR
- - opt::item_counter - the type of item counting feature. Default is \p cds::atomicity::empty_item_counter (item counting disabled)
- To enable item counting use \p cds::atomicity::item_counter.
- - \p opt::padding - padding for internal critical data. Default is \p opt::cache_line_padding
- Example: declare mutex-based \p %RWQueue with item counting
- \code
- typedef cds::container::RWQueue< Foo,
- typename cds::container::rwqueue::make_traits<
- cds::opt::item_counter< cds::atomicity::item_counter >,
- cds::opt::lock_type< std::mutex >
- >::type
- > myQueue;
- \endcode
- */
- template <typename... Options>
- struct make_traits {
- # ifdef CDS_DOXYGEN_INVOKED
- typedef implementation_defined type; ///< Metafunction result
- # else
- typedef typename cds::opt::make_options<
- typename cds::opt::find_type_traits< traits, Options... >::type
- , Options...
- >::type type;
- # endif
- };
- } // namespace rwqueue
- /// Michael & Scott blocking queue with fine-grained synchronization schema
- /** @ingroup cds_nonintrusive_queue
- The queue has two different locks: one for reading and one for writing.
- Therefore, one writer and one reader can simultaneously access to the queue.
- The queue does not require any garbage collector.
- <b>Source</b>
- - [1998] Maged Michael, Michael Scott "Simple, fast, and practical non-blocking
- and blocking concurrent queue algorithms"
- <b>Template arguments</b>
- - \p T - value type to be stored in the queue
- - \p Traits - queue traits, default is \p rwqueue::traits. You can use \p rwqueue::make_traits
- metafunction to make your traits or just derive your traits from \p %rwqueue::traits:
- \code
- struct myTraits: public cds::container::rwqueue::traits {
- typedef cds::atomicity::item_counter item_counter;
- };
- typedef cds::container::RWQueue< Foo, myTraits > myQueue;
- // Equivalent make_traits example:
- typedef cds::container::RWQueue< Foo,
- typename cds::container::rwqueue::make_traits<
- cds::opt::item_counter< cds::atomicity::item_counter >
- >::type
- > myQueue;
- \endcode
- */
- template <typename T, typename Traits = rwqueue::traits >
- class RWQueue
- {
- public:
- /// Rebind template arguments
- template <typename T2, typename Traits2>
- struct rebind {
- typedef RWQueue< T2, Traits2 > other ; ///< Rebinding result
- };
- public:
- typedef T value_type; ///< Type of value to be stored in the queue
- typedef Traits traits; ///< Queue traits
- typedef typename traits::lock_type lock_type; ///< Locking primitive
- typedef typename traits::item_counter item_counter; ///< Item counting policy used
- protected:
- //@cond
- /// Node type
- struct node_type
- {
- atomics::atomic< node_type *> m_pNext; ///< Pointer to the next node in the queue
- value_type m_value; ///< Value stored in the node
- node_type( value_type const& v )
- : m_pNext( nullptr )
- , m_value(v)
- {}
- node_type()
- : m_pNext( nullptr )
- {}
- template <typename... Args>
- node_type( Args&&... args )
- : m_pNext( nullptr )
- , m_value( std::forward<Args>(args)...)
- {}
- };
- //@endcond
- public:
- /// Allocator type used for allocate/deallocate the queue nodes
- typedef typename std::allocator_traits<
- typename traits::allocator
- >::template rebind_alloc<node_type> allocator_type;
- protected:
- //@cond
- typedef std::unique_lock<lock_type> scoped_lock;
- typedef cds::details::Allocator< node_type, allocator_type > node_allocator;
- struct head_type {
- mutable lock_type lock;
- node_type * ptr;
- };
- head_type m_Head;
- typename opt::details::apply_padding< head_type, traits::padding >::padding_type pad_;
- head_type m_Tail;
- item_counter m_ItemCounter;
- //@endcond
- protected:
- //@cond
- static node_type * alloc_node()
- {
- return node_allocator().New();
- }
- static node_type * alloc_node( T const& data )
- {
- return node_allocator().New( data );
- }
- template <typename... Args>
- static node_type * alloc_node_move( Args&&... args )
- {
- return node_allocator().MoveNew( std::forward<Args>( args )... );
- }
- static void free_node( node_type * pNode )
- {
- node_allocator().Delete( pNode );
- }
- bool enqueue_node( node_type * p )
- {
- assert( p != nullptr );
- {
- scoped_lock lock( m_Tail.lock );
- m_Tail.ptr->m_pNext.store( p, atomics::memory_order_release );
- m_Tail.ptr = p;
- }
- ++m_ItemCounter;
- return true;
- }
- struct node_disposer {
- void operator()( node_type * pNode )
- {
- free_node( pNode );
- }
- };
- typedef std::unique_ptr< node_type, node_disposer > scoped_node_ptr;
- //@endcond
- public:
- /// Makes empty queue
- RWQueue()
- {
- node_type * pNode = alloc_node();
- m_Head.ptr =
- m_Tail.ptr = pNode;
- }
- /// Destructor clears queue
- ~RWQueue()
- {
- clear();
- assert( m_Head.ptr == m_Tail.ptr );
- free_node( m_Head.ptr );
- }
- /// Enqueues \p data. Always return \a true
- bool enqueue( value_type const& data )
- {
- scoped_node_ptr p( alloc_node( data ));
- if ( enqueue_node( p.get())) {
- p.release();
- return true;
- }
- return false;
- }
- /// Enqueues \p data, move semantics
- bool enqueue( value_type&& data )
- {
- scoped_node_ptr p( alloc_node_move( std::move( data )));
- if ( enqueue_node( p.get())) {
- p.release();
- return true;
- }
- return false;
- }
- /// Enqueues \p data to the queue using a functor
- /**
- \p Func is a functor called to create node.
- The functor \p f takes one argument - a reference to a new node of type \ref value_type :
- \code
- cds::container::RWQueue< cds::gc::HP, Foo > myQueue;
- Bar bar;
- myQueue.enqueue_with( [&bar]( Foo& dest ) { dest = bar; } );
- \endcode
- */
- template <typename Func>
- bool enqueue_with( Func f )
- {
- scoped_node_ptr p( alloc_node());
- f( p->m_value );
- if ( enqueue_node( p.get())) {
- p.release();
- return true;
- }
- return false;
- }
- /// Enqueues data of type \ref value_type constructed with <tt>std::forward<Args>(args)...</tt>
- template <typename... Args>
- bool emplace( Args&&... args )
- {
- scoped_node_ptr p( alloc_node_move( std::forward<Args>(args)... ));
- if ( enqueue_node( p.get())) {
- p.release();
- return true;
- }
- return false;
- }
- /// Synonym for \p enqueue( value_type const& ) function
- bool push( value_type const& val )
- {
- return enqueue( val );
- }
- /// Synonym for \p enqueue( value_type&& ) function
- bool push( value_type&& val )
- {
- return enqueue( std::move( val ));
- }
- /// Synonym for \p enqueue_with() function
- template <typename Func>
- bool push_with( Func f )
- {
- return enqueue_with( f );
- }
- /// Dequeues a value to \p dest.
- /**
- If queue is empty returns \a false, \p dest can be corrupted.
- If queue is not empty returns \a true, \p dest contains the value dequeued
- */
- bool dequeue( value_type& dest )
- {
- return dequeue_with( [&dest]( value_type& src ) { dest = std::move( src ); });
- }
- /// Dequeues a value using a functor
- /**
- \p Func is a functor called to copy dequeued value.
- The functor takes one argument - a reference to removed node:
- \code
- cds:container::RWQueue< cds::gc::HP, Foo > myQueue;
- Bar bar;
- myQueue.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
- \endcode
- The functor is called only if the queue is not empty.
- */
- template <typename Func>
- bool dequeue_with( Func f )
- {
- node_type * pNode;
- {
- scoped_lock lock( m_Head.lock );
- pNode = m_Head.ptr;
- node_type * pNewHead = pNode->m_pNext.load( atomics::memory_order_acquire );
- if ( pNewHead == nullptr )
- return false;
- f( pNewHead->m_value );
- m_Head.ptr = pNewHead;
- } // unlock here
- --m_ItemCounter;
- free_node( pNode );
- return true;
- }
- /// Synonym for \p dequeue() function
- bool pop( value_type& dest )
- {
- return dequeue( dest );
- }
- /// Synonym for \p dequeue_with() function
- template <typename Func>
- bool pop_with( Func f )
- {
- return dequeue_with( f );
- }
- /// Checks if queue is empty
- bool empty() const
- {
- scoped_lock lock( m_Head.lock );
- return m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) == nullptr;
- }
- /// Clears queue
- void clear()
- {
- scoped_lock lockR( m_Head.lock );
- scoped_lock lockW( m_Tail.lock );
- while ( m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed ) != nullptr ) {
- node_type * pHead = m_Head.ptr;
- m_Head.ptr = m_Head.ptr->m_pNext.load( atomics::memory_order_relaxed );
- free_node( pHead );
- }
- m_ItemCounter.reset();
- }
- /// Returns queue's item count
- /**
- The value returned depends on \p rwqueue::traits::item_counter. For \p atomicity::empty_item_counter,
- this function always returns 0.
- @note Even if you use real item counter and it returns 0, this fact is not mean that the queue
- is empty. To check queue emptyness use \p empty() method.
- */
- size_t size() const
- {
- return m_ItemCounter.value();
- }
- //@cond
- /// The class has no internal statistics. For test consistency only
- std::nullptr_t statistics() const
- {
- return nullptr;
- }
- //@endcond
- };
- }} // namespace cds::container
- #endif // #ifndef CDSLIB_CONTAINER_RWQUEUE_H
|