concurrent_queue.h 16 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB_concurrent_queue_H
  14. #define __TBB_concurrent_queue_H
  15. #define __TBB_concurrent_queue_H_include_area
  16. #include "internal/_warning_suppress_enable_notice.h"
  17. #include "internal/_concurrent_queue_impl.h"
  18. #include "internal/_allocator_traits.h"
  19. namespace tbb {
  20. namespace strict_ppl {
  21. //! A high-performance thread-safe non-blocking concurrent queue.
  22. /** Multiple threads may each push and pop concurrently.
  23. Assignment construction is not allowed.
  24. @ingroup containers */
  25. template<typename T, typename A = cache_aligned_allocator<T> >
  26. class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
  27. template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
  28. //! Allocator type
  29. typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
  30. page_allocator_type my_allocator;
  31. //! Allocates a block of size n (bytes)
  32. virtual void *allocate_block( size_t n ) __TBB_override {
  33. void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
  34. if( !b )
  35. internal::throw_exception(internal::eid_bad_alloc);
  36. return b;
  37. }
  38. //! Deallocates block created by allocate_block.
  39. virtual void deallocate_block( void *b, size_t n ) __TBB_override {
  40. my_allocator.deallocate( reinterpret_cast<char*>(b), n );
  41. }
  42. static void copy_construct_item(T* location, const void* src){
  43. new (location) T(*static_cast<const T*>(src));
  44. }
  45. #if __TBB_CPP11_RVALUE_REF_PRESENT
  46. static void move_construct_item(T* location, const void* src) {
  47. new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
  48. }
  49. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  50. public:
  51. //! Element type in the queue.
  52. typedef T value_type;
  53. //! Reference type
  54. typedef T& reference;
  55. //! Const reference type
  56. typedef const T& const_reference;
  57. //! Integral type for representing size of the queue.
  58. typedef size_t size_type;
  59. //! Difference type for iterator
  60. typedef ptrdiff_t difference_type;
  61. //! Allocator type
  62. typedef A allocator_type;
  63. //! Construct empty queue
  64. explicit concurrent_queue(const allocator_type& a = allocator_type()) :
  65. my_allocator( a )
  66. {
  67. }
  68. //! [begin,end) constructor
  69. template<typename InputIterator>
  70. concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
  71. my_allocator( a )
  72. {
  73. for( ; begin != end; ++begin )
  74. this->push(*begin);
  75. }
  76. //! Copy constructor
  77. concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
  78. internal::concurrent_queue_base_v3<T>(), my_allocator( a )
  79. {
  80. this->assign( src, copy_construct_item );
  81. }
  82. #if __TBB_CPP11_RVALUE_REF_PRESENT
  83. //! Move constructors
  84. concurrent_queue( concurrent_queue&& src ) :
  85. internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
  86. {
  87. this->internal_swap( src );
  88. }
  89. concurrent_queue( concurrent_queue&& src, const allocator_type& a ) :
  90. internal::concurrent_queue_base_v3<T>(), my_allocator( a )
  91. {
  92. // checking that memory allocated by one instance of allocator can be deallocated
  93. // with another
  94. if( my_allocator == src.my_allocator) {
  95. this->internal_swap( src );
  96. } else {
  97. // allocators are different => performing per-element move
  98. this->assign( src, move_construct_item );
  99. src.clear();
  100. }
  101. }
  102. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  103. //! Destroy queue
  104. ~concurrent_queue();
  105. //! Enqueue an item at tail of queue.
  106. void push( const T& source ) {
  107. this->internal_push( &source, copy_construct_item );
  108. }
  109. #if __TBB_CPP11_RVALUE_REF_PRESENT
  110. void push( T&& source ) {
  111. this->internal_push( &source, move_construct_item );
  112. }
  113. #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
  114. template<typename... Arguments>
  115. void emplace( Arguments&&... args ) {
  116. push( T(std::forward<Arguments>( args )...) );
  117. }
  118. #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
  119. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  120. //! Attempt to dequeue an item from head of queue.
  121. /** Does not wait for item to become available.
  122. Returns true if successful; false otherwise. */
  123. bool try_pop( T& result ) {
  124. return this->internal_try_pop( &result );
  125. }
  126. //! Return the number of items in the queue; thread unsafe
  127. size_type unsafe_size() const {return this->internal_size();}
  128. //! Equivalent to size()==0.
  129. bool empty() const {return this->internal_empty();}
  130. //! Clear the queue. not thread-safe.
  131. void clear() ;
  132. //! Return allocator object
  133. allocator_type get_allocator() const { return this->my_allocator; }
  134. typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
  135. typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
  136. //------------------------------------------------------------------------
  137. // The iterators are intended only for debugging. They are slow and not thread safe.
  138. //------------------------------------------------------------------------
  139. iterator unsafe_begin() {return iterator(*this);}
  140. iterator unsafe_end() {return iterator();}
  141. const_iterator unsafe_begin() const {return const_iterator(*this);}
  142. const_iterator unsafe_end() const {return const_iterator();}
  143. } ;
  144. #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
  145. // Deduction guide for the constructor from two iterators
  146. template<typename InputIterator,
  147. typename T = typename std::iterator_traits<InputIterator>::value_type,
  148. typename A = cache_aligned_allocator<T>
  149. > concurrent_queue(InputIterator, InputIterator, const A& = A())
  150. -> concurrent_queue<T, A>;
  151. #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
  152. template<typename T, class A>
  153. concurrent_queue<T,A>::~concurrent_queue() {
  154. clear();
  155. this->internal_finish_clear();
  156. }
  157. template<typename T, class A>
  158. void concurrent_queue<T,A>::clear() {
  159. T value;
  160. while( !empty() ) try_pop(value);
  161. }
  162. } // namespace strict_ppl
  163. //! A high-performance thread-safe blocking concurrent bounded queue.
  164. /** This is the pre-PPL TBB concurrent queue which supports boundedness and blocking semantics.
  165. Note that method names agree with the PPL-style concurrent queue.
  166. Multiple threads may each push and pop concurrently.
  167. Assignment construction is not allowed.
  168. @ingroup containers */
  169. template<typename T, class A = cache_aligned_allocator<T> >
  170. class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
  171. template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
  172. typedef typename tbb::internal::allocator_rebind<A, char>::type page_allocator_type;
  173. //! Allocator type
  174. page_allocator_type my_allocator;
  175. typedef typename concurrent_queue_base_v3::padded_page<T> padded_page;
  176. typedef typename concurrent_queue_base_v3::copy_specifics copy_specifics;
  177. //! Class used to ensure exception-safety of method "pop"
  178. class destroyer: internal::no_copy {
  179. T& my_value;
  180. public:
  181. destroyer( T& value ) : my_value(value) {}
  182. ~destroyer() {my_value.~T();}
  183. };
  184. T& get_ref( page& p, size_t index ) {
  185. __TBB_ASSERT( index<items_per_page, NULL );
  186. return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
  187. }
  188. virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
  189. new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
  190. }
  191. #if __TBB_CPP11_RVALUE_REF_PRESENT
  192. virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
  193. new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
  194. }
  195. #else
  196. virtual void move_item( page&, size_t, const void* ) __TBB_override {
  197. __TBB_ASSERT( false, "Unreachable code" );
  198. }
  199. #endif
  200. virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
  201. new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
  202. }
  203. #if __TBB_CPP11_RVALUE_REF_PRESENT
  204. virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
  205. new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
  206. }
  207. #else
  208. virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
  209. __TBB_ASSERT( false, "Unreachable code" );
  210. }
  211. #endif
  212. virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
  213. T& from = get_ref(src,index);
  214. destroyer d(from);
  215. *static_cast<T*>(dst) = tbb::internal::move( from );
  216. }
  217. virtual page *allocate_page() __TBB_override {
  218. size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
  219. page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
  220. if( !p )
  221. internal::throw_exception(internal::eid_bad_alloc);
  222. return p;
  223. }
  224. virtual void deallocate_page( page *p ) __TBB_override {
  225. size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
  226. my_allocator.deallocate( reinterpret_cast<char*>(p), n );
  227. }
  228. public:
  229. //! Element type in the queue.
  230. typedef T value_type;
  231. //! Allocator type
  232. typedef A allocator_type;
  233. //! Reference type
  234. typedef T& reference;
  235. //! Const reference type
  236. typedef const T& const_reference;
  237. //! Integral type for representing size of the queue.
  238. /** Note that the size_type is a signed integral type.
  239. This is because the size can be negative if there are pending pops without corresponding pushes. */
  240. typedef std::ptrdiff_t size_type;
  241. //! Difference type for iterator
  242. typedef std::ptrdiff_t difference_type;
  243. //! Construct empty queue
  244. explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
  245. concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
  246. {
  247. }
  248. //! Copy constructor
  249. concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type())
  250. : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
  251. {
  252. assign( src );
  253. }
  254. #if __TBB_CPP11_RVALUE_REF_PRESENT
  255. //! Move constructors
  256. concurrent_bounded_queue( concurrent_bounded_queue&& src )
  257. : concurrent_queue_base_v8( sizeof(T) ), my_allocator( std::move(src.my_allocator) )
  258. {
  259. internal_swap( src );
  260. }
  261. concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a )
  262. : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
  263. {
  264. // checking that memory allocated by one instance of allocator can be deallocated
  265. // with another
  266. if( my_allocator == src.my_allocator) {
  267. this->internal_swap( src );
  268. } else {
  269. // allocators are different => performing per-element move
  270. this->move_content( src );
  271. src.clear();
  272. }
  273. }
  274. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  275. //! [begin,end) constructor
  276. template<typename InputIterator>
  277. concurrent_bounded_queue( InputIterator begin, InputIterator end,
  278. const allocator_type& a = allocator_type())
  279. : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
  280. {
  281. for( ; begin != end; ++begin )
  282. internal_push_if_not_full(&*begin);
  283. }
  284. //! Destroy queue
  285. ~concurrent_bounded_queue();
  286. //! Enqueue an item at tail of queue.
  287. void push( const T& source ) {
  288. internal_push( &source );
  289. }
  290. #if __TBB_CPP11_RVALUE_REF_PRESENT
  291. //! Move an item at tail of queue.
  292. void push( T&& source ) {
  293. internal_push_move( &source );
  294. }
  295. #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
  296. template<typename... Arguments>
  297. void emplace( Arguments&&... args ) {
  298. push( T(std::forward<Arguments>( args )...) );
  299. }
  300. #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
  301. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  302. //! Dequeue item from head of queue.
  303. /** Block until an item becomes available, and then dequeue it. */
  304. void pop( T& destination ) {
  305. internal_pop( &destination );
  306. }
  307. #if TBB_USE_EXCEPTIONS
  308. //! Abort all pending queue operations
  309. void abort() {
  310. internal_abort();
  311. }
  312. #endif
  313. //! Enqueue an item at tail of queue if queue is not already full.
  314. /** Does not wait for queue to become not full.
  315. Returns true if item is pushed; false if queue was already full. */
  316. bool try_push( const T& source ) {
  317. return internal_push_if_not_full( &source );
  318. }
  319. #if __TBB_CPP11_RVALUE_REF_PRESENT
  320. //! Move an item at tail of queue if queue is not already full.
  321. /** Does not wait for queue to become not full.
  322. Returns true if item is pushed; false if queue was already full. */
  323. bool try_push( T&& source ) {
  324. return internal_push_move_if_not_full( &source );
  325. }
  326. #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
  327. template<typename... Arguments>
  328. bool try_emplace( Arguments&&... args ) {
  329. return try_push( T(std::forward<Arguments>( args )...) );
  330. }
  331. #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
  332. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  333. //! Attempt to dequeue an item from head of queue.
  334. /** Does not wait for item to become available.
  335. Returns true if successful; false otherwise. */
  336. bool try_pop( T& destination ) {
  337. return internal_pop_if_present( &destination );
  338. }
  339. //! Return number of pushes minus number of pops.
  340. /** Note that the result can be negative if there are pops waiting for the
  341. corresponding pushes. The result can also exceed capacity() if there
  342. are push operations in flight. */
  343. size_type size() const {return internal_size();}
  344. //! Equivalent to size()<=0.
  345. bool empty() const {return internal_empty();}
  346. //! Maximum number of allowed elements
  347. size_type capacity() const {
  348. return my_capacity;
  349. }
  350. //! Set the capacity
  351. /** Setting the capacity to 0 causes subsequent try_push operations to always fail,
  352. and subsequent push operations to block forever. */
  353. void set_capacity( size_type new_capacity ) {
  354. internal_set_capacity( new_capacity, sizeof(T) );
  355. }
  356. //! return allocator object
  357. allocator_type get_allocator() const { return this->my_allocator; }
  358. //! clear the queue. not thread-safe.
  359. void clear() ;
  360. typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
  361. typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
  362. //------------------------------------------------------------------------
  363. // The iterators are intended only for debugging. They are slow and not thread safe.
  364. //------------------------------------------------------------------------
  365. iterator unsafe_begin() {return iterator(*this);}
  366. iterator unsafe_end() {return iterator();}
  367. const_iterator unsafe_begin() const {return const_iterator(*this);}
  368. const_iterator unsafe_end() const {return const_iterator();}
  369. };
  370. #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
  371. // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
  372. template<typename InputIterator,
  373. typename T = typename std::iterator_traits<InputIterator>::value_type,
  374. typename A = cache_aligned_allocator<T>
  375. > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
  376. -> concurrent_bounded_queue<T, A>;
  377. #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
  378. template<typename T, class A>
  379. concurrent_bounded_queue<T,A>::~concurrent_bounded_queue() {
  380. clear();
  381. internal_finish_clear();
  382. }
  383. template<typename T, class A>
  384. void concurrent_bounded_queue<T,A>::clear() {
  385. T value;
  386. while( try_pop(value) ) /*noop*/;
  387. }
  388. using strict_ppl::concurrent_queue;
  389. } // namespace tbb
  390. #include "internal/_warning_suppress_disable_notice.h"
  391. #undef __TBB_concurrent_queue_H_include_area
  392. #endif /* __TBB_concurrent_queue_H */