weak_ringbuffer.h 35 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985
  1. // Copyright (c) 2006-2018 Maxim Khizhinsky
  2. //
  3. // Distributed under the Boost Software License, Version 1.0. (See accompanying
  4. // file LICENSE or copy at http://www.boost.org/LICENSE_1_0.txt)
  5. #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
  6. #define CDSLIB_CONTAINER_WEAK_RINGBUFFER_H
  7. #include <cds/container/details/base.h>
  8. #include <cds/opt/buffer.h>
  9. #include <cds/opt/value_cleaner.h>
  10. #include <cds/algo/atomic.h>
  11. #include <cds/details/bounded_container.h>
  12. namespace cds { namespace container {
  13. /// \p WeakRingBuffer related definitions
  14. /** @ingroup cds_nonintrusive_helper
  15. */
  16. namespace weak_ringbuffer {
  17. /// \p WeakRingBuffer default traits
  18. struct traits {
  19. /// Buffer type for internal array
  20. /*
  21. The type of element for the buffer is not important: \p WeakRingBuffer rebind
  22. the buffer for required type via \p rebind metafunction.
  23. For \p WeakRingBuffer the buffer size should have power-of-2 size.
  24. You should use only uninitialized buffer for the ring buffer -
  25. \p cds::opt::v::uninitialized_dynamic_buffer (the default),
  26. \p cds::opt::v::uninitialized_static_buffer.
  27. */
  28. typedef cds::opt::v::uninitialized_dynamic_buffer< void * > buffer;
  29. /// A functor to clean item dequeued.
  30. /**
  31. The functor calls the destructor for popped element.
  32. After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
  33. If \p T is a complex type, \p value_cleaner may be useful feature.
  34. For POD types \ref opt::v::empty_cleaner is suitable
  35. Default value is \ref opt::v::auto_cleaner that calls destructor only if it is not trivial.
  36. */
  37. typedef cds::opt::v::auto_cleaner value_cleaner;
  38. /// C++ memory ordering model
  39. /**
  40. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
  41. or \p opt::v::sequential_consistent (sequentially consistent memory model).
  42. */
  43. typedef opt::v::relaxed_ordering memory_model;
  44. /// Padding for internal critical atomic data. Default is \p opt::cache_line_padding
  45. enum { padding = opt::cache_line_padding };
  46. };
  47. /// Metafunction converting option list to \p weak_ringbuffer::traits
  48. /**
  49. Supported \p Options are:
  50. - \p opt::buffer - an uninitialized buffer type for internal cyclic array. Possible types are:
  51. \p opt::v::uninitialized_dynamic_buffer (the default), \p opt::v::uninitialized_static_buffer. The type of
  52. element in the buffer is not important: it will be changed via \p rebind metafunction.
  53. - \p opt::value_cleaner - a functor to clean items dequeued.
  54. The functor calls the destructor for ring-buffer item.
  55. After a set of items is dequeued, \p value_cleaner cleans the cells that the items have been occupied.
  56. If \p T is a complex type, \p value_cleaner can be an useful feature.
  57. Default value is \ref opt::v::empty_cleaner that is suitable for POD types.
  58. - \p opt::padding - padding for internal critical atomic data. Default is \p opt::cache_line_padding
  59. - \p opt::memory_model - C++ memory ordering model. Can be \p opt::v::relaxed_ordering (relaxed memory model, the default)
  60. or \p opt::v::sequential_consistent (sequentially consisnent memory model).
  61. Example: declare \p %WeakRingBuffer with static iternal buffer for 1024 objects:
  62. \code
  63. typedef cds::container::WeakRingBuffer< Foo,
  64. typename cds::container::weak_ringbuffer::make_traits<
  65. cds::opt::buffer< cds::opt::v::uninitialized_static_buffer< void *, 1024 >
  66. >::type
  67. > myRing;
  68. \endcode
  69. */
  70. template <typename... Options>
  71. struct make_traits {
  72. # ifdef CDS_DOXYGEN_INVOKED
  73. typedef implementation_defined type; ///< Metafunction result
  74. # else
  75. typedef typename cds::opt::make_options<
  76. typename cds::opt::find_type_traits< traits, Options... >::type
  77. , Options...
  78. >::type type;
  79. # endif
  80. };
  81. } // namespace weak_ringbuffer
  82. /// Single-producer single-consumer ring buffer
  83. /** @ingroup cds_nonintrusive_queue
  84. Source: [2013] Nhat Minh Le, Adrien Guatto, Albert Cohen, Antoniu Pop. Correct and Effcient Bounded
  85. FIFO Queues. [Research Report] RR-8365, INRIA. 2013. <hal-00862450>
  86. Ring buffer is a bounded queue. Additionally, \p %WeakRingBuffer supports batch operations -
  87. you can push/pop an array of elements.
  88. There are a specialization \ref cds_nonintrusive_WeakRingBuffer_void "WeakRingBuffer<void, Traits>"
  89. that is not a queue but a "memory pool" between producer and consumer threads.
  90. \p WeakRingBuffer<void> supports variable-sized data.
  91. @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
  92. 32-bit platform must provide support for 64-bit atomics.
  93. */
  94. template <typename T, typename Traits = weak_ringbuffer::traits>
  95. class WeakRingBuffer: public cds::bounded_container
  96. {
  97. public:
  98. typedef T value_type; ///< Value type to be stored in the ring buffer
  99. typedef Traits traits; ///< Ring buffer traits
  100. typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
  101. typedef typename traits::value_cleaner value_cleaner; ///< Value cleaner, see \p weak_ringbuffer::traits::value_cleaner
  102. /// Rebind template arguments
  103. template <typename T2, typename Traits2>
  104. struct rebind {
  105. typedef WeakRingBuffer< T2, Traits2 > other; ///< Rebinding result
  106. };
  107. //@cond
  108. // Only for tests
  109. typedef size_t item_counter;
  110. //@endcond
  111. private:
  112. //@cond
  113. typedef typename traits::buffer::template rebind< value_type >::other buffer;
  114. typedef uint64_t counter_type;
  115. //@endcond
  116. public:
  117. /// Creates the ring buffer of \p capacity
  118. /**
  119. For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
  120. If the buffer capacity is a power of two, lightweight binary arithmetics is used
  121. instead of modulo arithmetics.
  122. */
  123. WeakRingBuffer( size_t capacity = 0 )
  124. : front_( 0 )
  125. , pfront_( 0 )
  126. , cback_( 0 )
  127. , buffer_( capacity )
  128. {
  129. back_.store( 0, memory_model::memory_order_release );
  130. }
  131. /// Destroys the ring buffer
  132. ~WeakRingBuffer()
  133. {
  134. value_cleaner cleaner;
  135. counter_type back = back_.load( memory_model::memory_order_relaxed );
  136. for ( counter_type front = front_.load( memory_model::memory_order_relaxed ); front != back; ++front )
  137. cleaner( buffer_[ buffer_.mod( front ) ] );
  138. }
  139. /// Batch push - push array \p arr of size \p count
  140. /**
  141. \p CopyFunc is a per-element copy functor: for each element of \p arr
  142. <tt>copy( dest, arr[i] )</tt> is called.
  143. The \p CopyFunc signature:
  144. \code
  145. void copy_func( value_type& element, Q const& source );
  146. \endcode
  147. Here \p element is uninitialized so you should construct it using placement new
  148. if needed; for example, if the element type is \p str::string and \p Q is <tt>char const*</tt>,
  149. \p copy functor can be:
  150. \code
  151. cds::container::WeakRingBuffer<std::string> ringbuf;
  152. char const* arr[10];
  153. ringbuf.push( arr, 10,
  154. []( std::string& element, char const* src ) {
  155. new( &element ) std::string( src );
  156. });
  157. \endcode
  158. You may use move semantics if appropriate:
  159. \code
  160. cds::container::WeakRingBuffer<std::string> ringbuf;
  161. std::string arr[10];
  162. ringbuf.push( arr, 10,
  163. []( std::string& element, std:string& src ) {
  164. new( &element ) std::string( std::move( src ));
  165. });
  166. \endcode
  167. Returns \p true if success or \p false if not enough space in the ring
  168. */
  169. template <typename Q, typename CopyFunc>
  170. bool push( Q* arr, size_t count, CopyFunc copy )
  171. {
  172. assert( count < capacity());
  173. counter_type back = back_.load( memory_model::memory_order_relaxed );
  174. assert( static_cast<size_t>( back - pfront_ ) <= capacity());
  175. if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
  176. pfront_ = front_.load( memory_model::memory_order_acquire );
  177. if ( static_cast<size_t>( pfront_ + capacity() - back ) < count ) {
  178. // not enough space
  179. return false;
  180. }
  181. }
  182. // copy data
  183. for ( size_t i = 0; i < count; ++i, ++back )
  184. copy( buffer_[buffer_.mod( back )], arr[i] );
  185. back_.store( back, memory_model::memory_order_release );
  186. return true;
  187. }
  188. /// Batch push - push array \p arr of size \p count with assignment as copy functor
  189. /**
  190. This function is equivalent for:
  191. \code
  192. push( arr, count, []( value_type& dest, Q const& src ) { dest = src; } );
  193. \endcode
  194. The function is available only if <tt>std::is_constructible<value_type, Q>::value</tt>
  195. is \p true.
  196. Returns \p true if success or \p false if not enough space in the ring
  197. */
  198. template <typename Q>
  199. typename std::enable_if< std::is_constructible<value_type, Q>::value, bool>::type
  200. push( Q* arr, size_t count )
  201. {
  202. return push( arr, count, []( value_type& dest, Q const& src ) { new( &dest ) value_type( src ); } );
  203. }
  204. /// Push one element created from \p args
  205. /**
  206. The function is available only if <tt>std::is_constructible<value_type, Args...>::value</tt>
  207. is \p true.
  208. Returns \p false if the ring is full or \p true otherwise.
  209. */
  210. template <typename... Args>
  211. typename std::enable_if< std::is_constructible<value_type, Args...>::value, bool>::type
  212. emplace( Args&&... args )
  213. {
  214. counter_type back = back_.load( memory_model::memory_order_relaxed );
  215. assert( static_cast<size_t>( back - pfront_ ) <= capacity());
  216. if ( pfront_ + capacity() - back < 1 ) {
  217. pfront_ = front_.load( memory_model::memory_order_acquire );
  218. if ( pfront_ + capacity() - back < 1 ) {
  219. // not enough space
  220. return false;
  221. }
  222. }
  223. new( &buffer_[buffer_.mod( back )] ) value_type( std::forward<Args>(args)... );
  224. back_.store( back + 1, memory_model::memory_order_release );
  225. return true;
  226. }
  227. /// Enqueues data to the ring using a functor
  228. /**
  229. \p Func is a functor called to copy a value to the ring element.
  230. The functor \p f takes one argument - a reference to a empty cell of type \ref value_type :
  231. \code
  232. cds::container::WeakRingBuffer< Foo > myRing;
  233. Bar bar;
  234. myRing.enqueue_with( [&bar]( Foo& dest ) { dest = std::move(bar); } );
  235. \endcode
  236. */
  237. template <typename Func>
  238. bool enqueue_with( Func f )
  239. {
  240. counter_type back = back_.load( memory_model::memory_order_relaxed );
  241. assert( static_cast<size_t>( back - pfront_ ) <= capacity());
  242. if ( pfront_ + capacity() - back < 1 ) {
  243. pfront_ = front_.load( memory_model::memory_order_acquire );
  244. if ( pfront_ + capacity() - back < 1 ) {
  245. // not enough space
  246. return false;
  247. }
  248. }
  249. f( buffer_[buffer_.mod( back )] );
  250. back_.store( back + 1, memory_model::memory_order_release );
  251. return true;
  252. }
  253. /// Enqueues \p val value into the queue.
  254. /**
  255. The new queue item is created by calling placement new in free cell.
  256. Returns \p true if success, \p false if the ring is full.
  257. */
  258. bool enqueue( value_type const& val )
  259. {
  260. return emplace( val );
  261. }
  262. /// Enqueues \p val value into the queue, move semantics
  263. bool enqueue( value_type&& val )
  264. {
  265. return emplace( std::move( val ));
  266. }
  267. /// Synonym for \p enqueue( value_type const& )
  268. bool push( value_type const& val )
  269. {
  270. return enqueue( val );
  271. }
  272. /// Synonym for \p enqueue( value_type&& )
  273. bool push( value_type&& val )
  274. {
  275. return enqueue( std::move( val ));
  276. }
  277. /// Synonym for \p enqueue_with()
  278. template <typename Func>
  279. bool push_with( Func f )
  280. {
  281. return enqueue_with( f );
  282. }
  283. /// Batch pop \p count element from the ring buffer into \p arr
  284. /**
  285. \p CopyFunc is a per-element copy functor: for each element of \p arr
  286. <tt>copy( arr[i], source )</tt> is called.
  287. The \p CopyFunc signature:
  288. \code
  289. void copy_func( Q& dest, value_type& elemen );
  290. \endcode
  291. Returns \p true if success or \p false if not enough space in the ring
  292. */
  293. template <typename Q, typename CopyFunc>
  294. bool pop( Q* arr, size_t count, CopyFunc copy )
  295. {
  296. assert( count < capacity());
  297. counter_type front = front_.load( memory_model::memory_order_relaxed );
  298. assert( static_cast<size_t>( cback_ - front ) < capacity());
  299. if ( static_cast<size_t>( cback_ - front ) < count ) {
  300. cback_ = back_.load( memory_model::memory_order_acquire );
  301. if ( static_cast<size_t>( cback_ - front ) < count )
  302. return false;
  303. }
  304. // copy data
  305. value_cleaner cleaner;
  306. for ( size_t i = 0; i < count; ++i, ++front ) {
  307. value_type& val = buffer_[buffer_.mod( front )];
  308. copy( arr[i], val );
  309. cleaner( val );
  310. }
  311. front_.store( front, memory_model::memory_order_release );
  312. return true;
  313. }
  314. /// Batch pop - push array \p arr of size \p count with assignment as copy functor
  315. /**
  316. This function is equivalent for:
  317. \code
  318. pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
  319. \endcode
  320. The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
  321. is \p true.
  322. Returns \p true if success or \p false if not enough space in the ring
  323. */
  324. template <typename Q>
  325. typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
  326. pop( Q* arr, size_t count )
  327. {
  328. return pop( arr, count, []( Q& dest, value_type& src ) { dest = src; } );
  329. }
  330. /// Dequeues an element from the ring to \p val
  331. /**
  332. The function is available only if <tt>std::is_assignable<Q&, value_type const&>::value</tt>
  333. is \p true.
  334. Returns \p false if the ring is full or \p true otherwise.
  335. */
  336. template <typename Q>
  337. typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
  338. dequeue( Q& val )
  339. {
  340. return pop( &val, 1 );
  341. }
  342. /// Synonym for \p dequeue( Q& )
  343. template <typename Q>
  344. typename std::enable_if< std::is_assignable<Q&, value_type const&>::value, bool>::type
  345. pop( Q& val )
  346. {
  347. return dequeue( val );
  348. }
  349. /// Dequeues a value using a functor
  350. /**
  351. \p Func is a functor called to copy dequeued value.
  352. The functor takes one argument - a reference to removed node:
  353. \code
  354. cds:container::WeakRingBuffer< Foo > myRing;
  355. Bar bar;
  356. myRing.dequeue_with( [&bar]( Foo& src ) { bar = std::move( src );});
  357. \endcode
  358. Returns \p true if the ring is not empty, \p false otherwise.
  359. The functor is called only if the ring is not empty.
  360. */
  361. template <typename Func>
  362. bool dequeue_with( Func f )
  363. {
  364. counter_type front = front_.load( memory_model::memory_order_relaxed );
  365. assert( static_cast<size_t>( cback_ - front ) < capacity());
  366. if ( cback_ - front < 1 ) {
  367. cback_ = back_.load( memory_model::memory_order_acquire );
  368. if ( cback_ - front < 1 )
  369. return false;
  370. }
  371. value_type& val = buffer_[buffer_.mod( front )];
  372. f( val );
  373. value_cleaner()( val );
  374. front_.store( front + 1, memory_model::memory_order_release );
  375. return true;
  376. }
  377. /// Synonym for \p dequeue_with()
  378. template <typename Func>
  379. bool pop_with( Func f )
  380. {
  381. return dequeue_with( f );
  382. }
  383. /// Gets pointer to first element of ring buffer
  384. /**
  385. If the ring buffer is empty, returns \p nullptr
  386. The function is thread-safe since there is only one consumer.
  387. Recall, \p WeakRingBuffer is single-producer/single consumer container.
  388. */
  389. value_type* front()
  390. {
  391. counter_type front = front_.load( memory_model::memory_order_relaxed );
  392. assert( static_cast<size_t>( cback_ - front ) < capacity());
  393. if ( cback_ - front < 1 ) {
  394. cback_ = back_.load( memory_model::memory_order_acquire );
  395. if ( cback_ - front < 1 )
  396. return nullptr;
  397. }
  398. return &buffer_[buffer_.mod( front )];
  399. }
  400. /// Removes front element of ring-buffer
  401. /**
  402. If the ring-buffer is empty, returns \p false.
  403. Otherwise, pops the first element from the ring.
  404. */
  405. bool pop_front()
  406. {
  407. counter_type front = front_.load( memory_model::memory_order_relaxed );
  408. assert( static_cast<size_t>( cback_ - front ) <= capacity());
  409. if ( cback_ - front < 1 ) {
  410. cback_ = back_.load( memory_model::memory_order_acquire );
  411. if ( cback_ - front < 1 )
  412. return false;
  413. }
  414. // clean cell
  415. value_cleaner()( buffer_[buffer_.mod( front )] );
  416. front_.store( front + 1, memory_model::memory_order_release );
  417. return true;
  418. }
  419. /// Clears the ring buffer (only consumer can call this function!)
  420. void clear()
  421. {
  422. value_type v;
  423. while ( pop( v ));
  424. }
  425. /// Checks if the ring-buffer is empty
  426. bool empty() const
  427. {
  428. return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
  429. }
  430. /// Checks if the ring-buffer is full
  431. bool full() const
  432. {
  433. return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
  434. }
  435. /// Returns the current size of ring buffer
  436. size_t size() const
  437. {
  438. return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
  439. }
  440. /// Returns capacity of the ring buffer
  441. size_t capacity() const
  442. {
  443. return buffer_.capacity();
  444. }
  445. private:
  446. //@cond
  447. atomics::atomic<counter_type> front_;
  448. typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
  449. atomics::atomic<counter_type> back_;
  450. typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
  451. counter_type pfront_;
  452. typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
  453. counter_type cback_;
  454. typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
  455. buffer buffer_;
  456. //@endcond
  457. };
  458. /// Single-producer single-consumer ring buffer for untyped variable-sized data
  459. /** @ingroup cds_nonintrusive_queue
  460. @anchor cds_nonintrusive_WeakRingBuffer_void
  461. This SPSC ring-buffer is intended for data of variable size. The producer
  462. allocates a buffer from ring, you fill it with data and pushes them back to ring.
  463. The consumer thread reads data from front-end and then pops them:
  464. \code
  465. // allocates 1M ring buffer
  466. WeakRingBuffer<void> theRing( 1024 * 1024 );
  467. void producer_thread()
  468. {
  469. // Get data of size N bytes
  470. size_t size;
  471. void* data;
  472. while ( true ) {
  473. // Get external data
  474. std::tie( data, size ) = get_data();
  475. if ( data == nullptr )
  476. break;
  477. // Allocates a buffer from the ring
  478. void* buf = theRing.back( size );
  479. if ( !buf ) {
  480. std::cout << "The ring is full" << std::endl;
  481. break;
  482. }
  483. memcpy( buf, data, size );
  484. // Push data into the ring
  485. theRing.push_back();
  486. }
  487. }
  488. void consumer_thread()
  489. {
  490. while ( true ) {
  491. auto buf = theRing.front();
  492. if ( buf.first == nullptr ) {
  493. std::cout << "The ring is empty" << std::endl;
  494. break;
  495. }
  496. // Process data
  497. process_data( buf.first, buf.second );
  498. // Free buffer
  499. theRing.pop_front();
  500. }
  501. }
  502. \endcode
  503. @warning: \p %WeakRingBuffer is developed for 64-bit architecture.
  504. 32-bit platform must provide support for 64-bit atomics.
  505. */
  506. #ifdef CDS_DOXYGEN_INVOKED
  507. template <typename Traits = weak_ringbuffer::traits>
  508. #else
  509. template <typename Traits>
  510. #endif
  511. class WeakRingBuffer<void, Traits>: public cds::bounded_container
  512. {
  513. public:
  514. typedef Traits traits; ///< Ring buffer traits
  515. typedef typename traits::memory_model memory_model; ///< Memory ordering. See \p cds::opt::memory_model option
  516. private:
  517. //@cond
  518. typedef typename traits::buffer::template rebind< uint8_t >::other buffer;
  519. typedef uint64_t counter_type;
  520. //@endcond
  521. public:
  522. /// Creates the ring buffer of \p capacity bytes
  523. /**
  524. For \p cds::opt::v::uninitialized_static_buffer the \p nCapacity parameter is ignored.
  525. If the buffer capacity is a power of two, lightweight binary arithmetics is used
  526. instead of modulo arithmetics.
  527. */
  528. WeakRingBuffer( size_t capacity = 0 )
  529. : front_( 0 )
  530. , pfront_( 0 )
  531. , cback_( 0 )
  532. , buffer_( capacity )
  533. {
  534. back_.store( 0, memory_model::memory_order_release );
  535. }
  536. /// [producer] Reserve \p size bytes
  537. /**
  538. The function returns a pointer to reserved buffer of \p size bytes.
  539. If no enough space in the ring buffer the function returns \p nullptr.
  540. After successful \p %back() you should fill the buffer provided and call \p push_back():
  541. \code
  542. // allocates 1M ring buffer
  543. WeakRingBuffer<void> theRing( 1024 * 1024 );
  544. void producer_thread()
  545. {
  546. // Get data of size N bytes
  547. size_t size;1
  548. void* data;
  549. while ( true ) {
  550. // Get external data
  551. std::tie( data, size ) = get_data();
  552. if ( data == nullptr )
  553. break;
  554. // Allocates a buffer from the ring
  555. void* buf = theRing.back( size );
  556. if ( !buf ) {
  557. std::cout << "The ring is full" << std::endl;
  558. break;
  559. }
  560. memcpy( buf, data, size );
  561. // Push data into the ring
  562. theRing.push_back();
  563. }
  564. }
  565. \endcode
  566. */
  567. void* back( size_t size )
  568. {
  569. assert( size > 0 );
  570. // Any data is rounded to 8-byte boundary
  571. size_t real_size = calc_real_size( size );
  572. // check if we can reserve real_size bytes
  573. assert( real_size < capacity());
  574. counter_type back = back_.load( memory_model::memory_order_relaxed );
  575. assert( static_cast<size_t>( back - pfront_ ) <= capacity());
  576. if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
  577. pfront_ = front_.load( memory_model::memory_order_acquire );
  578. if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
  579. // not enough space
  580. return nullptr;
  581. }
  582. }
  583. uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
  584. // Check if the buffer free space is enough for storing real_size bytes
  585. size_t tail_size = capacity() - static_cast<size_t>( buffer_.mod( back ));
  586. if ( tail_size < real_size ) {
  587. // make unused tail
  588. assert( tail_size >= sizeof( size_t ));
  589. assert( !is_tail( tail_size ));
  590. *reinterpret_cast<size_t*>( reserved ) = make_tail( tail_size - sizeof(size_t));
  591. back += tail_size;
  592. // We must be in beginning of buffer
  593. assert( buffer_.mod( back ) == 0 );
  594. if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
  595. pfront_ = front_.load( memory_model::memory_order_acquire );
  596. if ( static_cast<size_t>( pfront_ + capacity() - back ) < real_size ) {
  597. // not enough space
  598. return nullptr;
  599. }
  600. }
  601. back_.store( back, memory_model::memory_order_release );
  602. reserved = buffer_.buffer();
  603. }
  604. // reserve and store size
  605. *reinterpret_cast<size_t*>( reserved ) = size;
  606. return reinterpret_cast<void*>( reserved + sizeof( size_t ));
  607. }
  608. /// [producer] Push reserved bytes into ring
  609. /**
  610. The function pushes reserved buffer into the ring. Afte this call,
  611. the buffer becomes visible by a consumer:
  612. \code
  613. // allocates 1M ring buffer
  614. WeakRingBuffer<void> theRing( 1024 * 1024 );
  615. void producer_thread()
  616. {
  617. // Get data of size N bytes
  618. size_t size;1
  619. void* data;
  620. while ( true ) {
  621. // Get external data
  622. std::tie( data, size ) = get_data();
  623. if ( data == nullptr )
  624. break;
  625. // Allocates a buffer from the ring
  626. void* buf = theRing.back( size );
  627. if ( !buf ) {
  628. std::cout << "The ring is full" << std::endl;
  629. break;
  630. }
  631. memcpy( buf, data, size );
  632. // Push data into the ring
  633. theRing.push_back();
  634. }
  635. }
  636. \endcode
  637. */
  638. void push_back()
  639. {
  640. counter_type back = back_.load( memory_model::memory_order_relaxed );
  641. uint8_t* reserved = buffer_.buffer() + buffer_.mod( back );
  642. size_t real_size = calc_real_size( *reinterpret_cast<size_t*>( reserved ));
  643. assert( real_size < capacity());
  644. back_.store( back + real_size, memory_model::memory_order_release );
  645. }
  646. /// [producer] Push \p data of \p size bytes into ring
  647. /**
  648. This function invokes \p back( size ), \p memcpy( buf, data, size )
  649. and \p push_back() in one call.
  650. */
  651. bool push_back( void const* data, size_t size )
  652. {
  653. void* buf = back( size );
  654. if ( buf ) {
  655. memcpy( buf, data, size );
  656. push_back();
  657. return true;
  658. }
  659. return false;
  660. }
  661. /// [consumer] Get top data from the ring
  662. /**
  663. If the ring is empty, the function returns \p nullptr in \p std:pair::first.
  664. */
  665. std::pair<void*, size_t> front()
  666. {
  667. counter_type front = front_.load( memory_model::memory_order_relaxed );
  668. assert( static_cast<size_t>( cback_ - front ) < capacity());
  669. if ( cback_ - front < sizeof( size_t )) {
  670. cback_ = back_.load( memory_model::memory_order_acquire );
  671. if ( cback_ - front < sizeof( size_t ))
  672. return std::make_pair( nullptr, 0u );
  673. }
  674. uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
  675. // check alignment
  676. assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
  677. size_t size = *reinterpret_cast<size_t*>( buf );
  678. if ( is_tail( size )) {
  679. // unused tail, skip
  680. CDS_VERIFY( pop_front());
  681. front = front_.load( memory_model::memory_order_relaxed );
  682. if ( cback_ - front < sizeof( size_t )) {
  683. cback_ = back_.load( memory_model::memory_order_acquire );
  684. if ( cback_ - front < sizeof( size_t ))
  685. return std::make_pair( nullptr, 0u );
  686. }
  687. buf = buffer_.buffer() + buffer_.mod( front );
  688. size = *reinterpret_cast<size_t*>( buf );
  689. assert( !is_tail( size ));
  690. assert( buf == buffer_.buffer());
  691. }
  692. #ifdef _DEBUG
  693. size_t real_size = calc_real_size( size );
  694. if ( static_cast<size_t>( cback_ - front ) < real_size ) {
  695. cback_ = back_.load( memory_model::memory_order_acquire );
  696. assert( static_cast<size_t>( cback_ - front ) >= real_size );
  697. }
  698. #endif
  699. return std::make_pair( reinterpret_cast<void*>( buf + sizeof( size_t )), size );
  700. }
  701. /// [consumer] Pops top data
  702. /**
  703. Typical consumer workloop:
  704. \code
  705. // allocates 1M ring buffer
  706. WeakRingBuffer<void> theRing( 1024 * 1024 );
  707. void consumer_thread()
  708. {
  709. while ( true ) {
  710. auto buf = theRing.front();
  711. if ( buf.first == nullptr ) {
  712. std::cout << "The ring is empty" << std::endl;
  713. break;
  714. }
  715. // Process data
  716. process_data( buf.first, buf.second );
  717. // Free buffer
  718. theRing.pop_front();
  719. }
  720. }
  721. \endcode
  722. */
  723. bool pop_front()
  724. {
  725. counter_type front = front_.load( memory_model::memory_order_relaxed );
  726. assert( static_cast<size_t>( cback_ - front ) <= capacity());
  727. if ( cback_ - front < sizeof(size_t)) {
  728. cback_ = back_.load( memory_model::memory_order_acquire );
  729. if ( cback_ - front < sizeof( size_t ))
  730. return false;
  731. }
  732. uint8_t * buf = buffer_.buffer() + buffer_.mod( front );
  733. // check alignment
  734. assert( ( reinterpret_cast<uintptr_t>( buf ) & ( sizeof( uintptr_t ) - 1 )) == 0 );
  735. size_t size = *reinterpret_cast<size_t*>( buf );
  736. size_t real_size = calc_real_size( untail( size ));
  737. #ifdef _DEBUG
  738. if ( static_cast<size_t>( cback_ - front ) < real_size ) {
  739. cback_ = back_.load( memory_model::memory_order_acquire );
  740. assert( static_cast<size_t>( cback_ - front ) >= real_size );
  741. }
  742. #endif
  743. front_.store( front + real_size, memory_model::memory_order_release );
  744. return true;
  745. }
  746. /// [consumer] Clears the ring buffer
  747. void clear()
  748. {
  749. for ( auto el = front(); el.first; el = front())
  750. pop_front();
  751. }
  752. /// Checks if the ring-buffer is empty
  753. bool empty() const
  754. {
  755. return front_.load( memory_model::memory_order_relaxed ) == back_.load( memory_model::memory_order_relaxed );
  756. }
  757. /// Checks if the ring-buffer is full
  758. bool full() const
  759. {
  760. return back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ) >= capacity();
  761. }
  762. /// Returns the current size of ring buffer
  763. size_t size() const
  764. {
  765. return static_cast<size_t>( back_.load( memory_model::memory_order_relaxed ) - front_.load( memory_model::memory_order_relaxed ));
  766. }
  767. /// Returns capacity of the ring buffer
  768. size_t capacity() const
  769. {
  770. return buffer_.capacity();
  771. }
  772. private:
  773. //@cond
  774. static size_t calc_real_size( size_t size )
  775. {
  776. size_t real_size = (( size + sizeof( uintptr_t ) - 1 ) & ~( sizeof( uintptr_t ) - 1 )) + sizeof( size_t );
  777. assert( real_size > size );
  778. assert( real_size - size >= sizeof( size_t ));
  779. return real_size;
  780. }
  781. static bool is_tail( size_t size )
  782. {
  783. return ( size & ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ))) != 0;
  784. }
  785. static size_t make_tail( size_t size )
  786. {
  787. return size | ( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 ));
  788. }
  789. static size_t untail( size_t size )
  790. {
  791. return size & (( size_t( 1 ) << ( sizeof( size_t ) * 8 - 1 )) - 1);
  792. }
  793. //@endcond
  794. private:
  795. //@cond
  796. atomics::atomic<counter_type> front_;
  797. typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad1_;
  798. atomics::atomic<counter_type> back_;
  799. typename opt::details::apply_padding< atomics::atomic<counter_type>, traits::padding >::padding_type pad2_;
  800. counter_type pfront_;
  801. typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad3_;
  802. counter_type cback_;
  803. typename opt::details::apply_padding< counter_type, traits::padding >::padding_type pad4_;
  804. buffer buffer_;
  805. //@endcond
  806. };
  807. }} // namespace cds::container
  808. #endif // #ifndef CDSLIB_CONTAINER_WEAK_RINGBUFFER_H