_concurrent_queue_impl.h 37 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB__concurrent_queue_impl_H
  14. #define __TBB__concurrent_queue_impl_H
  15. #ifndef __TBB_concurrent_queue_H
  16. #error Do not #include this internal file directly; use public TBB headers instead.
  17. #endif
  18. #include "../tbb_stddef.h"
  19. #include "../tbb_machine.h"
  20. #include "../atomic.h"
  21. #include "../spin_mutex.h"
  22. #include "../cache_aligned_allocator.h"
  23. #include "../tbb_exception.h"
  24. #include "../tbb_profiling.h"
  25. #include <new>
  26. #include __TBB_STD_SWAP_HEADER
  27. #include <iterator>
  28. namespace tbb {
  29. #if !__TBB_TEMPLATE_FRIENDS_BROKEN
  30. // forward declaration
  31. namespace strict_ppl {
  32. template<typename T, typename A> class concurrent_queue;
  33. }
  34. template<typename T, typename A> class concurrent_bounded_queue;
  35. #endif
  36. //! For internal use only.
  37. namespace strict_ppl {
  38. //! @cond INTERNAL
  39. namespace internal {
  40. using namespace tbb::internal;
  41. typedef size_t ticket;
  42. template<typename T> class micro_queue ;
  43. template<typename T> class micro_queue_pop_finalizer ;
  44. template<typename T> class concurrent_queue_base_v3;
  45. template<typename T> struct concurrent_queue_rep;
  46. //! parts of concurrent_queue_rep that do not have references to micro_queue
  47. /**
  48. * For internal use only.
  49. */
  50. struct concurrent_queue_rep_base : no_copy {
  51. template<typename T> friend class micro_queue;
  52. template<typename T> friend class concurrent_queue_base_v3;
  53. protected:
  54. //! Approximately n_queue/golden ratio
  55. static const size_t phi = 3;
  56. public:
  57. // must be power of 2
  58. static const size_t n_queue = 8;
  59. //! Prefix on a page
  60. struct page {
  61. page* next;
  62. uintptr_t mask;
  63. };
  64. atomic<ticket> head_counter;
  65. char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
  66. atomic<ticket> tail_counter;
  67. char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
  68. //! Always a power of 2
  69. size_t items_per_page;
  70. //! Size of an item
  71. size_t item_size;
  72. //! number of invalid entries in the queue
  73. atomic<size_t> n_invalid_entries;
  74. char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
  75. } ;
  76. inline bool is_valid_page(const concurrent_queue_rep_base::page* p) {
  77. return uintptr_t(p)>1;
  78. }
  79. //! Abstract class to define interface for page allocation/deallocation
  80. /**
  81. * For internal use only.
  82. */
  83. class concurrent_queue_page_allocator
  84. {
  85. template<typename T> friend class micro_queue ;
  86. template<typename T> friend class micro_queue_pop_finalizer ;
  87. protected:
  88. virtual ~concurrent_queue_page_allocator() {}
  89. private:
  90. virtual concurrent_queue_rep_base::page* allocate_page() = 0;
  91. virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
  92. } ;
  93. #if _MSC_VER && !defined(__INTEL_COMPILER)
  94. // unary minus operator applied to unsigned type, result still unsigned
  95. #pragma warning( push )
  96. #pragma warning( disable: 4146 )
  97. #endif
  98. //! A queue using simple locking.
  99. /** For efficiency, this class has no constructor.
  100. The caller is expected to zero-initialize it. */
  101. template<typename T>
  102. class micro_queue : no_copy {
  103. public:
  104. typedef void (*item_constructor_t)(T* location, const void* src);
  105. private:
  106. typedef concurrent_queue_rep_base::page page;
  107. //! Class used to ensure exception-safety of method "pop"
  108. class destroyer: no_copy {
  109. T& my_value;
  110. public:
  111. destroyer( T& value ) : my_value(value) {}
  112. ~destroyer() {my_value.~T();}
  113. };
  114. void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
  115. construct_item( &get_ref(dst, dindex), src );
  116. }
  117. void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
  118. item_constructor_t construct_item )
  119. {
  120. T& src_item = get_ref( const_cast<page&>(src), sindex );
  121. construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
  122. }
  123. void assign_and_destroy_item( void* dst, page& src, size_t index ) {
  124. T& from = get_ref(src,index);
  125. destroyer d(from);
  126. *static_cast<T*>(dst) = tbb::internal::move( from );
  127. }
  128. void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
  129. public:
  130. friend class micro_queue_pop_finalizer<T>;
  131. struct padded_page: page {
  132. //! Not defined anywhere - exists to quiet warnings.
  133. padded_page();
  134. //! Not defined anywhere - exists to quiet warnings.
  135. void operator=( const padded_page& );
  136. //! Must be last field.
  137. T last;
  138. };
  139. static T& get_ref( page& p, size_t index ) {
  140. return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
  141. }
  142. atomic<page*> head_page;
  143. atomic<ticket> head_counter;
  144. atomic<page*> tail_page;
  145. atomic<ticket> tail_counter;
  146. spin_mutex page_mutex;
  147. void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
  148. item_constructor_t construct_item ) ;
  149. bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
  150. micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
  151. item_constructor_t construct_item ) ;
  152. page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
  153. size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
  154. void invalidate_page_and_rethrow( ticket k ) ;
  155. };
  156. template<typename T>
  157. void micro_queue<T>::spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const {
  158. for( atomic_backoff b(true);;b.pause() ) {
  159. ticket c = counter;
  160. if( c==k ) return;
  161. else if( c&1 ) {
  162. ++rb.n_invalid_entries;
  163. throw_exception( eid_bad_last_alloc );
  164. }
  165. }
  166. }
  167. template<typename T>
  168. void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
  169. item_constructor_t construct_item )
  170. {
  171. k &= -concurrent_queue_rep_base::n_queue;
  172. page* p = NULL;
  173. size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
  174. if( !index ) {
  175. __TBB_TRY {
  176. concurrent_queue_page_allocator& pa = base;
  177. p = pa.allocate_page();
  178. } __TBB_CATCH (...) {
  179. ++base.my_rep->n_invalid_entries;
  180. invalidate_page_and_rethrow( k );
  181. }
  182. p->mask = 0;
  183. p->next = NULL;
  184. }
  185. if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
  186. call_itt_notify(acquired, &tail_counter);
  187. if( p ) {
  188. spin_mutex::scoped_lock lock( page_mutex );
  189. page* q = tail_page;
  190. if( is_valid_page(q) )
  191. q->next = p;
  192. else
  193. head_page = p;
  194. tail_page = p;
  195. } else {
  196. p = tail_page;
  197. }
  198. __TBB_TRY {
  199. copy_item( *p, index, item, construct_item );
  200. // If no exception was thrown, mark item as present.
  201. itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
  202. call_itt_notify(releasing, &tail_counter);
  203. tail_counter += concurrent_queue_rep_base::n_queue;
  204. } __TBB_CATCH (...) {
  205. ++base.my_rep->n_invalid_entries;
  206. call_itt_notify(releasing, &tail_counter);
  207. tail_counter += concurrent_queue_rep_base::n_queue;
  208. __TBB_RETHROW();
  209. }
  210. }
  211. template<typename T>
  212. bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
  213. k &= -concurrent_queue_rep_base::n_queue;
  214. if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
  215. call_itt_notify(acquired, &head_counter);
  216. if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
  217. call_itt_notify(acquired, &tail_counter);
  218. page *p = head_page;
  219. __TBB_ASSERT( p, NULL );
  220. size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
  221. bool success = false;
  222. {
  223. micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
  224. if( p->mask & uintptr_t(1)<<index ) {
  225. success = true;
  226. assign_and_destroy_item( dst, *p, index );
  227. } else {
  228. --base.my_rep->n_invalid_entries;
  229. }
  230. }
  231. return success;
  232. }
  233. template<typename T>
  234. micro_queue<T>& micro_queue<T>::assign( const micro_queue<T>& src, concurrent_queue_base_v3<T>& base,
  235. item_constructor_t construct_item )
  236. {
  237. head_counter = src.head_counter;
  238. tail_counter = src.tail_counter;
  239. const page* srcp = src.head_page;
  240. if( is_valid_page(srcp) ) {
  241. ticket g_index = head_counter;
  242. __TBB_TRY {
  243. size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
  244. size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
  245. size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
  246. head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
  247. page* cur_page = head_page;
  248. if( srcp != src.tail_page ) {
  249. for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
  250. cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
  251. cur_page = cur_page->next;
  252. }
  253. __TBB_ASSERT( srcp==src.tail_page, NULL );
  254. size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
  255. if( last_index==0 ) last_index = base.my_rep->items_per_page;
  256. cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
  257. cur_page = cur_page->next;
  258. }
  259. tail_page = cur_page;
  260. } __TBB_CATCH (...) {
  261. invalidate_page_and_rethrow( g_index );
  262. }
  263. } else {
  264. head_page = tail_page = NULL;
  265. }
  266. return *this;
  267. }
  268. template<typename T>
  269. void micro_queue<T>::invalidate_page_and_rethrow( ticket k ) {
  270. // Append an invalid page at address 1 so that no more pushes are allowed.
  271. page* invalid_page = (page*)uintptr_t(1);
  272. {
  273. spin_mutex::scoped_lock lock( page_mutex );
  274. itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
  275. page* q = tail_page;
  276. if( is_valid_page(q) )
  277. q->next = invalid_page;
  278. else
  279. head_page = invalid_page;
  280. tail_page = invalid_page;
  281. }
  282. __TBB_RETHROW();
  283. }
  284. template<typename T>
  285. concurrent_queue_rep_base::page* micro_queue<T>::make_copy( concurrent_queue_base_v3<T>& base,
  286. const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
  287. ticket& g_index, item_constructor_t construct_item )
  288. {
  289. concurrent_queue_page_allocator& pa = base;
  290. page* new_page = pa.allocate_page();
  291. new_page->next = NULL;
  292. new_page->mask = src_page->mask;
  293. for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
  294. if( new_page->mask & uintptr_t(1)<<begin_in_page )
  295. copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
  296. return new_page;
  297. }
  298. template<typename T>
  299. class micro_queue_pop_finalizer: no_copy {
  300. typedef concurrent_queue_rep_base::page page;
  301. ticket my_ticket;
  302. micro_queue<T>& my_queue;
  303. page* my_page;
  304. concurrent_queue_page_allocator& allocator;
  305. public:
  306. micro_queue_pop_finalizer( micro_queue<T>& queue, concurrent_queue_base_v3<T>& b, ticket k, page* p ) :
  307. my_ticket(k), my_queue(queue), my_page(p), allocator(b)
  308. {}
  309. ~micro_queue_pop_finalizer() ;
  310. };
  311. template<typename T>
  312. micro_queue_pop_finalizer<T>::~micro_queue_pop_finalizer() {
  313. page* p = my_page;
  314. if( is_valid_page(p) ) {
  315. spin_mutex::scoped_lock lock( my_queue.page_mutex );
  316. page* q = p->next;
  317. my_queue.head_page = q;
  318. if( !is_valid_page(q) ) {
  319. my_queue.tail_page = NULL;
  320. }
  321. }
  322. itt_store_word_with_release(my_queue.head_counter, my_ticket);
  323. if( is_valid_page(p) ) {
  324. allocator.deallocate_page( p );
  325. }
  326. }
  327. #if _MSC_VER && !defined(__INTEL_COMPILER)
  328. #pragma warning( pop )
  329. #endif // warning 4146 is back
  330. template<typename T> class concurrent_queue_iterator_rep ;
  331. template<typename T> class concurrent_queue_iterator_base_v3;
  332. //! representation of concurrent_queue_base
  333. /**
  334. * the class inherits from concurrent_queue_rep_base and defines an array of micro_queue<T>'s
  335. */
  336. template<typename T>
  337. struct concurrent_queue_rep : public concurrent_queue_rep_base {
  338. micro_queue<T> array[n_queue];
  339. //! Map ticket to an array index
  340. static size_t index( ticket k ) {
  341. return k*phi%n_queue;
  342. }
  343. micro_queue<T>& choose( ticket k ) {
  344. // The formula here approximates LRU in a cache-oblivious way.
  345. return array[index(k)];
  346. }
  347. };
  348. //! base class of concurrent_queue
  349. /**
  350. * The class implements the interface defined by concurrent_queue_page_allocator
  351. * and has a pointer to an instance of concurrent_queue_rep.
  352. */
  353. template<typename T>
  354. class concurrent_queue_base_v3: public concurrent_queue_page_allocator {
  355. private:
  356. //! Internal representation
  357. concurrent_queue_rep<T>* my_rep;
  358. friend struct concurrent_queue_rep<T>;
  359. friend class micro_queue<T>;
  360. friend class concurrent_queue_iterator_rep<T>;
  361. friend class concurrent_queue_iterator_base_v3<T>;
  362. protected:
  363. typedef typename concurrent_queue_rep<T>::page page;
  364. private:
  365. typedef typename micro_queue<T>::padded_page padded_page;
  366. typedef typename micro_queue<T>::item_constructor_t item_constructor_t;
  367. virtual page *allocate_page() __TBB_override {
  368. concurrent_queue_rep<T>& r = *my_rep;
  369. size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
  370. return reinterpret_cast<page*>(allocate_block ( n ));
  371. }
  372. virtual void deallocate_page( concurrent_queue_rep_base::page *p ) __TBB_override {
  373. concurrent_queue_rep<T>& r = *my_rep;
  374. size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
  375. deallocate_block( reinterpret_cast<void*>(p), n );
  376. }
  377. //! custom allocator
  378. virtual void *allocate_block( size_t n ) = 0;
  379. //! custom de-allocator
  380. virtual void deallocate_block( void *p, size_t n ) = 0;
  381. protected:
  382. concurrent_queue_base_v3();
  383. virtual ~concurrent_queue_base_v3() {
  384. #if TBB_USE_ASSERT
  385. size_t nq = my_rep->n_queue;
  386. for( size_t i=0; i<nq; i++ )
  387. __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
  388. #endif /* TBB_USE_ASSERT */
  389. cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
  390. }
  391. //! Enqueue item at tail of queue
  392. void internal_push( const void* src, item_constructor_t construct_item ) {
  393. concurrent_queue_rep<T>& r = *my_rep;
  394. ticket k = r.tail_counter++;
  395. r.choose(k).push( src, k, *this, construct_item );
  396. }
  397. //! Attempt to dequeue item from queue.
  398. /** NULL if there was no item to dequeue. */
  399. bool internal_try_pop( void* dst ) ;
  400. //! Get size of queue; result may be invalid if queue is modified concurrently
  401. size_t internal_size() const ;
  402. //! check if the queue is empty; thread safe
  403. bool internal_empty() const ;
  404. //! free any remaining pages
  405. /* note that the name may be misleading, but it remains so due to a historical accident. */
  406. void internal_finish_clear() ;
  407. //! Obsolete
  408. void internal_throw_exception() const {
  409. throw_exception( eid_bad_alloc );
  410. }
  411. //! copy or move internal representation
  412. void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
  413. #if __TBB_CPP11_RVALUE_REF_PRESENT
  414. //! swap internal representation
  415. void internal_swap( concurrent_queue_base_v3& src ) {
  416. std::swap( my_rep, src.my_rep );
  417. }
  418. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  419. };
  420. template<typename T>
  421. concurrent_queue_base_v3<T>::concurrent_queue_base_v3() {
  422. const size_t item_size = sizeof(T);
  423. my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
  424. __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
  425. __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
  426. __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
  427. __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
  428. memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
  429. my_rep->item_size = item_size;
  430. my_rep->items_per_page = item_size<= 8 ? 32 :
  431. item_size<= 16 ? 16 :
  432. item_size<= 32 ? 8 :
  433. item_size<= 64 ? 4 :
  434. item_size<=128 ? 2 :
  435. 1;
  436. }
  437. template<typename T>
  438. bool concurrent_queue_base_v3<T>::internal_try_pop( void* dst ) {
  439. concurrent_queue_rep<T>& r = *my_rep;
  440. ticket k;
  441. do {
  442. k = r.head_counter;
  443. for(;;) {
  444. if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
  445. // Queue is empty
  446. return false;
  447. }
  448. // Queue had item with ticket k when we looked. Attempt to get that item.
  449. ticket tk=k;
  450. #if defined(_MSC_VER) && defined(_Wp64)
  451. #pragma warning (push)
  452. #pragma warning (disable: 4267)
  453. #endif
  454. k = r.head_counter.compare_and_swap( tk+1, tk );
  455. #if defined(_MSC_VER) && defined(_Wp64)
  456. #pragma warning (pop)
  457. #endif
  458. if( k==tk )
  459. break;
  460. // Another thread snatched the item, retry.
  461. }
  462. } while( !r.choose( k ).pop( dst, k, *this ) );
  463. return true;
  464. }
  465. template<typename T>
  466. size_t concurrent_queue_base_v3<T>::internal_size() const {
  467. concurrent_queue_rep<T>& r = *my_rep;
  468. __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
  469. ticket hc = r.head_counter;
  470. size_t nie = r.n_invalid_entries;
  471. ticket tc = r.tail_counter;
  472. __TBB_ASSERT( hc!=tc || !nie, NULL );
  473. ptrdiff_t sz = tc-hc-nie;
  474. return sz<0 ? 0 : size_t(sz);
  475. }
  476. template<typename T>
  477. bool concurrent_queue_base_v3<T>::internal_empty() const {
  478. concurrent_queue_rep<T>& r = *my_rep;
  479. ticket tc = r.tail_counter;
  480. ticket hc = r.head_counter;
  481. // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
  482. return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
  483. }
  484. template<typename T>
  485. void concurrent_queue_base_v3<T>::internal_finish_clear() {
  486. concurrent_queue_rep<T>& r = *my_rep;
  487. size_t nq = r.n_queue;
  488. for( size_t i=0; i<nq; ++i ) {
  489. page* tp = r.array[i].tail_page;
  490. if( is_valid_page(tp) ) {
  491. __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
  492. deallocate_page( tp );
  493. r.array[i].tail_page = NULL;
  494. } else
  495. __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
  496. }
  497. }
  498. template<typename T>
  499. void concurrent_queue_base_v3<T>::assign( const concurrent_queue_base_v3& src,
  500. item_constructor_t construct_item )
  501. {
  502. concurrent_queue_rep<T>& r = *my_rep;
  503. r.items_per_page = src.my_rep->items_per_page;
  504. // copy concurrent_queue_rep data
  505. r.head_counter = src.my_rep->head_counter;
  506. r.tail_counter = src.my_rep->tail_counter;
  507. r.n_invalid_entries = src.my_rep->n_invalid_entries;
  508. // copy or move micro_queues
  509. for( size_t i = 0; i < r.n_queue; ++i )
  510. r.array[i].assign( src.my_rep->array[i], *this, construct_item);
  511. __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
  512. "the source concurrent queue should not be concurrently modified." );
  513. }
  514. template<typename Container, typename Value> class concurrent_queue_iterator;
  515. template<typename T>
  516. class concurrent_queue_iterator_rep: no_assign {
  517. typedef typename micro_queue<T>::padded_page padded_page;
  518. public:
  519. ticket head_counter;
  520. const concurrent_queue_base_v3<T>& my_queue;
  521. typename concurrent_queue_base_v3<T>::page* array[concurrent_queue_rep<T>::n_queue];
  522. concurrent_queue_iterator_rep( const concurrent_queue_base_v3<T>& queue ) :
  523. head_counter(queue.my_rep->head_counter),
  524. my_queue(queue)
  525. {
  526. for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
  527. array[k] = queue.my_rep->array[k].head_page;
  528. }
  529. //! Set item to point to kth element. Return true if at end of queue or item is marked valid; false otherwise.
  530. bool get_item( T*& item, size_t k ) ;
  531. };
  532. template<typename T>
  533. bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
  534. if( k==my_queue.my_rep->tail_counter ) {
  535. item = NULL;
  536. return true;
  537. } else {
  538. typename concurrent_queue_base_v3<T>::page* p = array[concurrent_queue_rep<T>::index(k)];
  539. __TBB_ASSERT(p,NULL);
  540. size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
  541. item = &micro_queue<T>::get_ref(*p,i);
  542. return (p->mask & uintptr_t(1)<<i)!=0;
  543. }
  544. }
  545. //! Constness-independent portion of concurrent_queue_iterator.
  546. /** @ingroup containers */
  547. template<typename Value>
  548. class concurrent_queue_iterator_base_v3 {
  549. //! Represents concurrent_queue over which we are iterating.
  550. /** NULL if one past last element in queue. */
  551. concurrent_queue_iterator_rep<Value>* my_rep;
  552. template<typename C, typename T, typename U>
  553. friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
  554. template<typename C, typename T, typename U>
  555. friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
  556. protected:
  557. //! Pointer to current item
  558. Value* my_item;
  559. //! Default constructor
  560. concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
  561. #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
  562. __TBB_compiler_fence();
  563. #endif
  564. }
  565. //! Copy constructor
  566. concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i )
  567. : my_rep(NULL), my_item(NULL) {
  568. assign(i);
  569. }
  570. concurrent_queue_iterator_base_v3& operator=( const concurrent_queue_iterator_base_v3& i ) {
  571. assign(i);
  572. return *this;
  573. }
  574. //! Construct iterator pointing to head of queue.
  575. concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) ;
  576. //! Assignment
  577. void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
  578. //! Advance iterator one step towards tail of queue.
  579. void advance() ;
  580. //! Destructor
  581. ~concurrent_queue_iterator_base_v3() {
  582. cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
  583. my_rep = NULL;
  584. }
  585. };
  586. template<typename Value>
  587. concurrent_queue_iterator_base_v3<Value>::concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3<Value>& queue ) {
  588. my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
  589. new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
  590. size_t k = my_rep->head_counter;
  591. if( !my_rep->get_item(my_item, k) ) advance();
  592. }
  593. template<typename Value>
  594. void concurrent_queue_iterator_base_v3<Value>::assign( const concurrent_queue_iterator_base_v3<Value>& other ) {
  595. if( my_rep!=other.my_rep ) {
  596. if( my_rep ) {
  597. cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().deallocate(my_rep, 1);
  598. my_rep = NULL;
  599. }
  600. if( other.my_rep ) {
  601. my_rep = cache_aligned_allocator<concurrent_queue_iterator_rep<Value> >().allocate(1);
  602. new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
  603. }
  604. }
  605. my_item = other.my_item;
  606. }
  607. template<typename Value>
  608. void concurrent_queue_iterator_base_v3<Value>::advance() {
  609. __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
  610. size_t k = my_rep->head_counter;
  611. const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
  612. #if TBB_USE_ASSERT
  613. Value* tmp;
  614. my_rep->get_item(tmp,k);
  615. __TBB_ASSERT( my_item==tmp, NULL );
  616. #endif /* TBB_USE_ASSERT */
  617. size_t i = modulo_power_of_two( k/concurrent_queue_rep<Value>::n_queue, queue.my_rep->items_per_page );
  618. if( i==queue.my_rep->items_per_page-1 ) {
  619. typename concurrent_queue_base_v3<Value>::page*& root = my_rep->array[concurrent_queue_rep<Value>::index(k)];
  620. root = root->next;
  621. }
  622. // advance k
  623. my_rep->head_counter = ++k;
  624. if( !my_rep->get_item(my_item, k) ) advance();
  625. }
  626. //! Similar to C++0x std::remove_cv
  627. /** "tbb_" prefix added to avoid overload confusion with C++0x implementations. */
  628. template<typename T> struct tbb_remove_cv {typedef T type;};
  629. template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
  630. template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
  631. template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
  632. //! Meets requirements of a forward iterator for STL.
  633. /** Value is either the T or const T type of the container.
  634. @ingroup containers */
  635. template<typename Container, typename Value>
  636. class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
  637. public std::iterator<std::forward_iterator_tag,Value> {
  638. #if !__TBB_TEMPLATE_FRIENDS_BROKEN
  639. template<typename T, class A>
  640. friend class ::tbb::strict_ppl::concurrent_queue;
  641. #else
  642. public:
  643. #endif
  644. //! Construct iterator pointing to head of queue.
  645. explicit concurrent_queue_iterator( const concurrent_queue_base_v3<typename tbb_remove_cv<Value>::type>& queue ) :
  646. concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
  647. {
  648. }
  649. public:
  650. concurrent_queue_iterator() {}
  651. /** If Value==Container::value_type, then this routine is the copy constructor.
  652. If Value==const Container::value_type, then this routine is a conversion constructor. */
  653. concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
  654. concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
  655. {}
  656. //! Iterator assignment
  657. concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) {
  658. concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>::operator=(other);
  659. return *this;
  660. }
  661. //! Reference to current item
  662. Value& operator*() const {
  663. return *static_cast<Value*>(this->my_item);
  664. }
  665. Value* operator->() const {return &operator*();}
  666. //! Advance to next item in queue
  667. concurrent_queue_iterator& operator++() {
  668. this->advance();
  669. return *this;
  670. }
  671. //! Post increment
  672. Value* operator++(int) {
  673. Value* result = &operator*();
  674. operator++();
  675. return result;
  676. }
  677. }; // concurrent_queue_iterator
  678. template<typename C, typename T, typename U>
  679. bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
  680. return i.my_item==j.my_item;
  681. }
  682. template<typename C, typename T, typename U>
  683. bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
  684. return i.my_item!=j.my_item;
  685. }
  686. } // namespace internal
  687. //! @endcond
  688. } // namespace strict_ppl
  689. //! @cond INTERNAL
  690. namespace internal {
  691. class concurrent_queue_rep;
  692. class concurrent_queue_iterator_rep;
  693. class concurrent_queue_iterator_base_v3;
  694. template<typename Container, typename Value> class concurrent_queue_iterator;
  695. //! For internal use only.
  696. /** Type-independent portion of concurrent_queue.
  697. @ingroup containers */
  698. class concurrent_queue_base_v3: no_copy {
  699. private:
  700. //! Internal representation
  701. concurrent_queue_rep* my_rep;
  702. friend class concurrent_queue_rep;
  703. friend struct micro_queue;
  704. friend class micro_queue_pop_finalizer;
  705. friend class concurrent_queue_iterator_rep;
  706. friend class concurrent_queue_iterator_base_v3;
  707. protected:
  708. //! Prefix on a page
  709. struct page {
  710. page* next;
  711. uintptr_t mask;
  712. };
  713. //! Capacity of the queue
  714. ptrdiff_t my_capacity;
  715. //! Always a power of 2
  716. size_t items_per_page;
  717. //! Size of an item
  718. size_t item_size;
  719. enum copy_specifics { copy, move };
  720. #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
  721. public:
  722. #endif
  723. template<typename T>
  724. struct padded_page: page {
  725. //! Not defined anywhere - exists to quiet warnings.
  726. padded_page();
  727. //! Not defined anywhere - exists to quiet warnings.
  728. void operator=( const padded_page& );
  729. //! Must be last field.
  730. T last;
  731. };
  732. private:
  733. virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
  734. virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
  735. protected:
  736. __TBB_EXPORTED_METHOD concurrent_queue_base_v3( size_t item_size );
  737. virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3();
  738. //! Enqueue item at tail of queue using copy operation
  739. void __TBB_EXPORTED_METHOD internal_push( const void* src );
  740. //! Dequeue item from head of queue
  741. void __TBB_EXPORTED_METHOD internal_pop( void* dst );
  742. //! Abort all pending queue operations
  743. void __TBB_EXPORTED_METHOD internal_abort();
  744. //! Attempt to enqueue item onto queue using copy operation
  745. bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
  746. //! Attempt to dequeue item from queue.
  747. /** NULL if there was no item to dequeue. */
  748. bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
  749. //! Get size of queue
  750. ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
  751. //! Check if the queue is empty
  752. bool __TBB_EXPORTED_METHOD internal_empty() const;
  753. //! Set the queue capacity
  754. void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
  755. //! custom allocator
  756. virtual page *allocate_page() = 0;
  757. //! custom de-allocator
  758. virtual void deallocate_page( page *p ) = 0;
  759. //! free any remaining pages
  760. /* note that the name may be misleading, but it remains so due to a historical accident. */
  761. void __TBB_EXPORTED_METHOD internal_finish_clear() ;
  762. //! throw an exception
  763. void __TBB_EXPORTED_METHOD internal_throw_exception() const;
  764. //! copy internal representation
  765. void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
  766. #if __TBB_CPP11_RVALUE_REF_PRESENT
  767. //! swap queues
  768. void internal_swap( concurrent_queue_base_v3& src ) {
  769. std::swap( my_capacity, src.my_capacity );
  770. std::swap( items_per_page, src.items_per_page );
  771. std::swap( item_size, src.item_size );
  772. std::swap( my_rep, src.my_rep );
  773. }
  774. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  775. //! Enqueues item at tail of queue using specified operation (copy or move)
  776. void internal_insert_item( const void* src, copy_specifics op_type );
  777. //! Attempts to enqueue at tail of queue using specified operation (copy or move)
  778. bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
  779. //! Assigns one queue to another using specified operation (copy or move)
  780. void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
  781. private:
  782. virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
  783. };
  784. //! For internal use only.
  785. /** Backward compatible modification of concurrent_queue_base_v3
  786. @ingroup containers */
  787. class concurrent_queue_base_v8: public concurrent_queue_base_v3 {
  788. protected:
  789. concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
  790. //! move items
  791. void __TBB_EXPORTED_METHOD move_content( concurrent_queue_base_v8& src ) ;
  792. //! Attempt to enqueue item onto queue using move operation
  793. bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full( const void* src );
  794. //! Enqueue item at tail of queue using move operation
  795. void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
  796. private:
  797. friend struct micro_queue;
  798. virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
  799. virtual void move_item( page& dst, size_t index, const void* src ) = 0;
  800. };
  801. //! Type-independent portion of concurrent_queue_iterator.
  802. /** @ingroup containers */
  803. class concurrent_queue_iterator_base_v3 {
  804. //! concurrent_queue over which we are iterating.
  805. /** NULL if one past last element in queue. */
  806. concurrent_queue_iterator_rep* my_rep;
  807. template<typename C, typename T, typename U>
  808. friend bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
  809. template<typename C, typename T, typename U>
  810. friend bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j );
  811. void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
  812. protected:
  813. //! Pointer to current item
  814. void* my_item;
  815. //! Default constructor
  816. concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
  817. //! Copy constructor
  818. concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
  819. assign(i);
  820. }
  821. concurrent_queue_iterator_base_v3& operator=( const concurrent_queue_iterator_base_v3& i ) {
  822. assign(i);
  823. return *this;
  824. }
  825. //! Obsolete entry point for constructing iterator pointing to head of queue.
  826. /** Does not work correctly for SSE types. */
  827. __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue );
  828. //! Construct iterator pointing to head of queue.
  829. __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
  830. //! Assignment
  831. void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
  832. //! Advance iterator one step towards tail of queue.
  833. void __TBB_EXPORTED_METHOD advance();
  834. //! Destructor
  835. __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
  836. };
  837. typedef concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base;
  838. //! Meets requirements of a forward iterator for STL.
  839. /** Value is either the T or const T type of the container.
  840. @ingroup containers */
  841. template<typename Container, typename Value>
  842. class concurrent_queue_iterator: public concurrent_queue_iterator_base,
  843. public std::iterator<std::forward_iterator_tag,Value> {
  844. #if !__TBB_TEMPLATE_FRIENDS_BROKEN
  845. template<typename T, class A>
  846. friend class ::tbb::concurrent_bounded_queue;
  847. #else
  848. public:
  849. #endif
  850. //! Construct iterator pointing to head of queue.
  851. explicit concurrent_queue_iterator( const concurrent_queue_base_v3& queue ) :
  852. concurrent_queue_iterator_base_v3(queue,__TBB_offsetof(concurrent_queue_base_v3::padded_page<Value>,last))
  853. {
  854. }
  855. public:
  856. concurrent_queue_iterator() {}
  857. /** If Value==Container::value_type, then this routine is the copy constructor.
  858. If Value==const Container::value_type, then this routine is a conversion constructor. */
  859. concurrent_queue_iterator( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) :
  860. concurrent_queue_iterator_base_v3(other)
  861. {}
  862. //! Iterator assignment
  863. concurrent_queue_iterator& operator=( const concurrent_queue_iterator<Container,typename Container::value_type>& other ) {
  864. concurrent_queue_iterator_base_v3::operator=(other);
  865. return *this;
  866. }
  867. //! Reference to current item
  868. Value& operator*() const {
  869. return *static_cast<Value*>(my_item);
  870. }
  871. Value* operator->() const {return &operator*();}
  872. //! Advance to next item in queue
  873. concurrent_queue_iterator& operator++() {
  874. advance();
  875. return *this;
  876. }
  877. //! Post increment
  878. Value* operator++(int) {
  879. Value* result = &operator*();
  880. operator++();
  881. return result;
  882. }
  883. }; // concurrent_queue_iterator
  884. template<typename C, typename T, typename U>
  885. bool operator==( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
  886. return i.my_item==j.my_item;
  887. }
  888. template<typename C, typename T, typename U>
  889. bool operator!=( const concurrent_queue_iterator<C,T>& i, const concurrent_queue_iterator<C,U>& j ) {
  890. return i.my_item!=j.my_item;
  891. }
  892. } // namespace internal;
  893. //! @endcond
  894. } // namespace tbb
  895. #endif /* __TBB__concurrent_queue_impl_H */