concurrent_priority_queue.h 21 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB_concurrent_priority_queue_H
  14. #define __TBB_concurrent_priority_queue_H
  15. #define __TBB_concurrent_priority_queue_H_include_area
  16. #include "internal/_warning_suppress_enable_notice.h"
  17. #include "atomic.h"
  18. #include "cache_aligned_allocator.h"
  19. #include "tbb_exception.h"
  20. #include "tbb_stddef.h"
  21. #include "tbb_profiling.h"
  22. #include "internal/_aggregator_impl.h"
  23. #include "internal/_template_helpers.h"
  24. #include "internal/_allocator_traits.h"
  25. #include <vector>
  26. #include <iterator>
  27. #include <functional>
  28. #include __TBB_STD_SWAP_HEADER
  29. #if __TBB_INITIALIZER_LISTS_PRESENT
  30. #include <initializer_list>
  31. #endif
  32. #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
  33. #include <type_traits>
  34. #endif
  35. namespace tbb {
  36. namespace interface5 {
  37. namespace internal {
  38. #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
  39. template<typename T, bool C = std::is_copy_constructible<T>::value>
  40. struct use_element_copy_constructor {
  41. typedef tbb::internal::true_type type;
  42. };
  43. template<typename T>
  44. struct use_element_copy_constructor <T,false> {
  45. typedef tbb::internal::false_type type;
  46. };
  47. #else
  48. template<typename>
  49. struct use_element_copy_constructor {
  50. typedef tbb::internal::true_type type;
  51. };
  52. #endif
  53. } // namespace internal
  54. using namespace tbb::internal;
  55. //! Concurrent priority queue
  56. template <typename T, typename Compare=std::less<T>, typename A=cache_aligned_allocator<T> >
  57. class concurrent_priority_queue {
  58. public:
  59. //! Element type in the queue.
  60. typedef T value_type;
  61. //! Reference type
  62. typedef T& reference;
  63. //! Const reference type
  64. typedef const T& const_reference;
  65. //! Integral type for representing size of the queue.
  66. typedef size_t size_type;
  67. //! Difference type for iterator
  68. typedef ptrdiff_t difference_type;
  69. //! Allocator type
  70. typedef A allocator_type;
  71. //! Constructs a new concurrent_priority_queue with default capacity
  72. explicit concurrent_priority_queue(const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(), data(a)
  73. {
  74. my_aggregator.initialize_handler(my_functor_t(this));
  75. }
  76. //! Constructs a new concurrent_priority_queue with default capacity
  77. explicit concurrent_priority_queue(const Compare& c, const allocator_type& a = allocator_type()) : mark(0), my_size(0), compare(c), data(a)
  78. {
  79. my_aggregator.initialize_handler(my_functor_t(this));
  80. }
  81. //! Constructs a new concurrent_priority_queue with init_sz capacity
  82. explicit concurrent_priority_queue(size_type init_capacity, const allocator_type& a = allocator_type()) :
  83. mark(0), my_size(0), compare(), data(a)
  84. {
  85. data.reserve(init_capacity);
  86. my_aggregator.initialize_handler(my_functor_t(this));
  87. }
  88. //! Constructs a new concurrent_priority_queue with init_sz capacity
  89. explicit concurrent_priority_queue(size_type init_capacity, const Compare& c, const allocator_type& a = allocator_type()) :
  90. mark(0), my_size(0), compare(c), data(a)
  91. {
  92. data.reserve(init_capacity);
  93. my_aggregator.initialize_handler(my_functor_t(this));
  94. }
  95. //! [begin,end) constructor
  96. template<typename InputIterator>
  97. concurrent_priority_queue(InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
  98. mark(0), compare(), data(begin, end, a)
  99. {
  100. my_aggregator.initialize_handler(my_functor_t(this));
  101. heapify();
  102. my_size = data.size();
  103. }
  104. //! [begin,end) constructor
  105. template<typename InputIterator>
  106. concurrent_priority_queue(InputIterator begin, InputIterator end, const Compare& c, const allocator_type& a = allocator_type()) :
  107. mark(0), compare(c), data(begin, end, a)
  108. {
  109. my_aggregator.initialize_handler(my_functor_t(this));
  110. heapify();
  111. my_size = data.size();
  112. }
  113. #if __TBB_INITIALIZER_LISTS_PRESENT
  114. //! Constructor from std::initializer_list
  115. concurrent_priority_queue(std::initializer_list<T> init_list, const allocator_type &a = allocator_type()) :
  116. mark(0), compare(), data(init_list.begin(), init_list.end(), a)
  117. {
  118. my_aggregator.initialize_handler(my_functor_t(this));
  119. heapify();
  120. my_size = data.size();
  121. }
  122. //! Constructor from std::initializer_list
  123. concurrent_priority_queue(std::initializer_list<T> init_list, const Compare& c, const allocator_type &a = allocator_type()) :
  124. mark(0), compare(c), data(init_list.begin(), init_list.end(), a)
  125. {
  126. my_aggregator.initialize_handler(my_functor_t(this));
  127. heapify();
  128. my_size = data.size();
  129. }
  130. #endif //# __TBB_INITIALIZER_LISTS_PRESENT
  131. //! Copy constructor
  132. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  133. concurrent_priority_queue(const concurrent_priority_queue& src) : mark(src.mark),
  134. my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
  135. {
  136. my_aggregator.initialize_handler(my_functor_t(this));
  137. heapify();
  138. }
  139. //! Copy constructor with specific allocator
  140. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  141. concurrent_priority_queue(const concurrent_priority_queue& src, const allocator_type& a) : mark(src.mark),
  142. my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
  143. {
  144. my_aggregator.initialize_handler(my_functor_t(this));
  145. heapify();
  146. }
  147. //! Assignment operator
  148. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  149. concurrent_priority_queue& operator=(const concurrent_priority_queue& src) {
  150. if (this != &src) {
  151. vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
  152. mark = src.mark;
  153. my_size = src.my_size;
  154. }
  155. return *this;
  156. }
  157. #if __TBB_CPP11_RVALUE_REF_PRESENT
  158. //! Move constructor
  159. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  160. concurrent_priority_queue(concurrent_priority_queue&& src) : mark(src.mark),
  161. my_size(src.my_size), data(std::move(src.data))
  162. {
  163. my_aggregator.initialize_handler(my_functor_t(this));
  164. }
  165. //! Move constructor with specific allocator
  166. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  167. concurrent_priority_queue(concurrent_priority_queue&& src, const allocator_type& a) : mark(src.mark),
  168. my_size(src.my_size),
  169. #if __TBB_ALLOCATOR_TRAITS_PRESENT
  170. data(std::move(src.data), a)
  171. #else
  172. // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
  173. // It seems that the reason is absence of support of allocator_traits (stateful allocators).
  174. data(a)
  175. #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
  176. {
  177. my_aggregator.initialize_handler(my_functor_t(this));
  178. #if !__TBB_ALLOCATOR_TRAITS_PRESENT
  179. if (a != src.data.get_allocator()){
  180. data.reserve(src.data.size());
  181. data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
  182. }else{
  183. data = std::move(src.data);
  184. }
  185. #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
  186. }
  187. //! Move assignment operator
  188. /** This operation is unsafe if there are pending concurrent operations on the src queue. */
  189. concurrent_priority_queue& operator=( concurrent_priority_queue&& src) {
  190. if (this != &src) {
  191. mark = src.mark;
  192. my_size = src.my_size;
  193. #if !__TBB_ALLOCATOR_TRAITS_PRESENT
  194. if (data.get_allocator() != src.data.get_allocator()){
  195. vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
  196. }else
  197. #endif //!__TBB_ALLOCATOR_TRAITS_PRESENT
  198. {
  199. data = std::move(src.data);
  200. }
  201. }
  202. return *this;
  203. }
  204. #endif //__TBB_CPP11_RVALUE_REF_PRESENT
  205. //! Assign the queue from [begin,end) range, not thread-safe
  206. template<typename InputIterator>
  207. void assign(InputIterator begin, InputIterator end) {
  208. vector_t(begin, end, data.get_allocator()).swap(data);
  209. mark = 0;
  210. my_size = data.size();
  211. heapify();
  212. }
  213. #if __TBB_INITIALIZER_LISTS_PRESENT
  214. //! Assign the queue from std::initializer_list, not thread-safe
  215. void assign(std::initializer_list<T> il) { this->assign(il.begin(), il.end()); }
  216. //! Assign from std::initializer_list, not thread-safe
  217. concurrent_priority_queue& operator=(std::initializer_list<T> il) {
  218. this->assign(il.begin(), il.end());
  219. return *this;
  220. }
  221. #endif //# __TBB_INITIALIZER_LISTS_PRESENT
  222. //! Returns true if empty, false otherwise
  223. /** Returned value may not reflect results of pending operations.
  224. This operation reads shared data and will trigger a race condition. */
  225. bool empty() const { return size()==0; }
  226. //! Returns the current number of elements contained in the queue
  227. /** Returned value may not reflect results of pending operations.
  228. This operation reads shared data and will trigger a race condition. */
  229. size_type size() const { return __TBB_load_with_acquire(my_size); }
  230. //! Pushes elem onto the queue, increasing capacity of queue if necessary
  231. /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
  232. void push(const_reference elem) {
  233. #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
  234. __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
  235. #endif
  236. cpq_operation op_data(elem, PUSH_OP);
  237. my_aggregator.execute(&op_data);
  238. if (op_data.status == FAILED) // exception thrown
  239. throw_exception(eid_bad_alloc);
  240. }
  241. #if __TBB_CPP11_RVALUE_REF_PRESENT
  242. //! Pushes elem onto the queue, increasing capacity of queue if necessary
  243. /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
  244. void push(value_type &&elem) {
  245. cpq_operation op_data(elem, PUSH_RVALUE_OP);
  246. my_aggregator.execute(&op_data);
  247. if (op_data.status == FAILED) // exception thrown
  248. throw_exception(eid_bad_alloc);
  249. }
  250. #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
  251. //! Constructs a new element using args as the arguments for its construction and pushes it onto the queue */
  252. /** This operation can be safely used concurrently with other push, try_pop or emplace operations. */
  253. template<typename... Args>
  254. void emplace(Args&&... args) {
  255. push(value_type(std::forward<Args>(args)...));
  256. }
  257. #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
  258. #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
  259. //! Gets a reference to and removes highest priority element
  260. /** If a highest priority element was found, sets elem and returns true,
  261. otherwise returns false.
  262. This operation can be safely used concurrently with other push, try_pop or emplace operations. */
  263. bool try_pop(reference elem) {
  264. cpq_operation op_data(POP_OP);
  265. op_data.elem = &elem;
  266. my_aggregator.execute(&op_data);
  267. return op_data.status==SUCCEEDED;
  268. }
  269. //! Clear the queue; not thread-safe
  270. /** This operation is unsafe if there are pending concurrent operations on the queue.
  271. Resets size, effectively emptying queue; does not free space.
  272. May not clear elements added in pending operations. */
  273. void clear() {
  274. data.clear();
  275. mark = 0;
  276. my_size = 0;
  277. }
  278. //! Swap this queue with another; not thread-safe
  279. /** This operation is unsafe if there are pending concurrent operations on the queue. */
  280. void swap(concurrent_priority_queue& q) {
  281. using std::swap;
  282. data.swap(q.data);
  283. swap(mark, q.mark);
  284. swap(my_size, q.my_size);
  285. }
  286. //! Return allocator object
  287. allocator_type get_allocator() const { return data.get_allocator(); }
  288. private:
  289. enum operation_type {INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP};
  290. enum operation_status { WAIT=0, SUCCEEDED, FAILED };
  291. class cpq_operation : public aggregated_operation<cpq_operation> {
  292. public:
  293. operation_type type;
  294. union {
  295. value_type *elem;
  296. size_type sz;
  297. };
  298. cpq_operation(const_reference e, operation_type t) :
  299. type(t), elem(const_cast<value_type*>(&e)) {}
  300. cpq_operation(operation_type t) : type(t) {}
  301. };
  302. class my_functor_t {
  303. concurrent_priority_queue<T, Compare, A> *cpq;
  304. public:
  305. my_functor_t() {}
  306. my_functor_t(concurrent_priority_queue<T, Compare, A> *cpq_) : cpq(cpq_) {}
  307. void operator()(cpq_operation* op_list) {
  308. cpq->handle_operations(op_list);
  309. }
  310. };
  311. typedef tbb::internal::aggregator< my_functor_t, cpq_operation > aggregator_t;
  312. aggregator_t my_aggregator;
  313. //! Padding added to avoid false sharing
  314. char padding1[NFS_MaxLineSize - sizeof(aggregator_t)];
  315. //! The point at which unsorted elements begin
  316. size_type mark;
  317. __TBB_atomic size_type my_size;
  318. Compare compare;
  319. //! Padding added to avoid false sharing
  320. char padding2[NFS_MaxLineSize - (2*sizeof(size_type)) - sizeof(Compare)];
  321. //! Storage for the heap of elements in queue, plus unheapified elements
  322. /** data has the following structure:
  323. binary unheapified
  324. heap elements
  325. ____|_______|____
  326. | | |
  327. v v v
  328. [_|...|_|_|...|_| |...| ]
  329. 0 ^ ^ ^
  330. | | |__capacity
  331. | |__my_size
  332. |__mark
  333. Thus, data stores the binary heap starting at position 0 through
  334. mark-1 (it may be empty). Then there are 0 or more elements
  335. that have not yet been inserted into the heap, in positions
  336. mark through my_size-1. */
  337. typedef std::vector<value_type, allocator_type> vector_t;
  338. vector_t data;
  339. void handle_operations(cpq_operation *op_list) {
  340. cpq_operation *tmp, *pop_list=NULL;
  341. __TBB_ASSERT(mark == data.size(), NULL);
  342. // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
  343. while (op_list) {
  344. // ITT note: &(op_list->status) tag is used to cover accesses to op_list
  345. // node. This thread is going to handle the operation, and so will acquire it
  346. // and perform the associated operation w/o triggering a race condition; the
  347. // thread that created the operation is waiting on the status field, so when
  348. // this thread is done with the operation, it will perform a
  349. // store_with_release to give control back to the waiting thread in
  350. // aggregator::insert_operation.
  351. call_itt_notify(acquired, &(op_list->status));
  352. __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
  353. tmp = op_list;
  354. op_list = itt_hide_load_word(op_list->next);
  355. if (tmp->type == POP_OP) {
  356. if (mark < data.size() &&
  357. compare(data[0], data[data.size()-1])) {
  358. // there are newly pushed elems and the last one
  359. // is higher than top
  360. *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
  361. __TBB_store_with_release(my_size, my_size-1);
  362. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  363. data.pop_back();
  364. __TBB_ASSERT(mark<=data.size(), NULL);
  365. }
  366. else { // no convenient item to pop; postpone
  367. itt_hide_store_word(tmp->next, pop_list);
  368. pop_list = tmp;
  369. }
  370. } else { // PUSH_OP or PUSH_RVALUE_OP
  371. __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
  372. __TBB_TRY{
  373. if (tmp->type == PUSH_OP) {
  374. push_back_helper(*(tmp->elem), typename internal::use_element_copy_constructor<value_type>::type());
  375. } else {
  376. data.push_back(tbb::internal::move(*(tmp->elem)));
  377. }
  378. __TBB_store_with_release(my_size, my_size + 1);
  379. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  380. } __TBB_CATCH(...) {
  381. itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
  382. }
  383. }
  384. }
  385. // second pass processes pop operations
  386. while (pop_list) {
  387. tmp = pop_list;
  388. pop_list = itt_hide_load_word(pop_list->next);
  389. __TBB_ASSERT(tmp->type == POP_OP, NULL);
  390. if (data.empty()) {
  391. itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
  392. }
  393. else {
  394. __TBB_ASSERT(mark<=data.size(), NULL);
  395. if (mark < data.size() &&
  396. compare(data[0], data[data.size()-1])) {
  397. // there are newly pushed elems and the last one is
  398. // higher than top
  399. *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
  400. __TBB_store_with_release(my_size, my_size-1);
  401. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  402. data.pop_back();
  403. }
  404. else { // extract top and push last element down heap
  405. *(tmp->elem) = tbb::internal::move(data[0]);
  406. __TBB_store_with_release(my_size, my_size-1);
  407. itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
  408. reheap();
  409. }
  410. }
  411. }
  412. // heapify any leftover pushed elements before doing the next
  413. // batch of operations
  414. if (mark<data.size()) heapify();
  415. __TBB_ASSERT(mark == data.size(), NULL);
  416. }
  417. //! Merge unsorted elements into heap
  418. void heapify() {
  419. if (!mark && data.size()>0) mark = 1;
  420. for (; mark<data.size(); ++mark) {
  421. // for each unheapified element under size
  422. size_type cur_pos = mark;
  423. value_type to_place = tbb::internal::move(data[mark]);
  424. do { // push to_place up the heap
  425. size_type parent = (cur_pos-1)>>1;
  426. if (!compare(data[parent], to_place)) break;
  427. data[cur_pos] = tbb::internal::move(data[parent]);
  428. cur_pos = parent;
  429. } while( cur_pos );
  430. data[cur_pos] = tbb::internal::move(to_place);
  431. }
  432. }
  433. //! Re-heapify after an extraction
  434. /** Re-heapify by pushing last element down the heap from the root. */
  435. void reheap() {
  436. size_type cur_pos=0, child=1;
  437. while (child < mark) {
  438. size_type target = child;
  439. if (child+1 < mark && compare(data[child], data[child+1]))
  440. ++target;
  441. // target now has the higher priority child
  442. if (compare(data[target], data[data.size()-1])) break;
  443. data[cur_pos] = tbb::internal::move(data[target]);
  444. cur_pos = target;
  445. child = (cur_pos<<1)+1;
  446. }
  447. if (cur_pos != data.size()-1)
  448. data[cur_pos] = tbb::internal::move(data[data.size()-1]);
  449. data.pop_back();
  450. if (mark > data.size()) mark = data.size();
  451. }
  452. void push_back_helper(const T& t, tbb::internal::true_type) {
  453. data.push_back(t);
  454. }
  455. void push_back_helper(const T&, tbb::internal::false_type) {
  456. __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
  457. }
  458. };
  459. #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
  460. namespace internal {
  461. template<typename T, typename... Args>
  462. using priority_queue_t = concurrent_priority_queue<
  463. T,
  464. std::conditional_t< (sizeof...(Args)>0) && !is_allocator_v< pack_element_t<0, Args...> >,
  465. pack_element_t<0, Args...>, std::less<T> >,
  466. std::conditional_t< (sizeof...(Args)>0) && is_allocator_v< pack_element_t<sizeof...(Args)-1, Args...> >,
  467. pack_element_t<sizeof...(Args)-1, Args...>, cache_aligned_allocator<T> >
  468. >;
  469. }
  470. // Deduction guide for the constructor from two iterators
  471. template<typename InputIterator,
  472. typename T = typename std::iterator_traits<InputIterator>::value_type,
  473. typename... Args
  474. > concurrent_priority_queue(InputIterator, InputIterator, Args...)
  475. -> internal::priority_queue_t<T, Args...>;
  476. template<typename T, typename CompareOrAllocalor>
  477. concurrent_priority_queue(std::initializer_list<T> init_list, CompareOrAllocalor)
  478. -> internal::priority_queue_t<T, CompareOrAllocalor>;
  479. #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
  480. } // namespace interface5
  481. using interface5::concurrent_priority_queue;
  482. } // namespace tbb
  483. #include "internal/_warning_suppress_disable_notice.h"
  484. #undef __TBB_concurrent_priority_queue_H_include_area
  485. #endif /* __TBB_concurrent_priority_queue_H */