aggregator.h 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB__aggregator_H
  14. #define __TBB__aggregator_H
  15. #define __TBB_aggregator_H_include_area
  16. #include "internal/_warning_suppress_enable_notice.h"
  17. #if !TBB_PREVIEW_AGGREGATOR
  18. #error Set TBB_PREVIEW_AGGREGATOR before including aggregator.h
  19. #endif
  20. #include "atomic.h"
  21. #include "tbb_profiling.h"
  22. namespace tbb {
  23. namespace interface6 {
  24. using namespace tbb::internal;
  25. class aggregator_operation {
  26. template<typename handler_type> friend class aggregator_ext;
  27. uintptr_t status;
  28. aggregator_operation* my_next;
  29. public:
  30. enum aggregator_operation_status { agg_waiting=0, agg_finished };
  31. aggregator_operation() : status(agg_waiting), my_next(NULL) {}
  32. /// Call start before handling this operation
  33. void start() { call_itt_notify(acquired, &status); }
  34. /// Call finish when done handling this operation
  35. /** The operation will be released to its originating thread, and possibly deleted. */
  36. void finish() { itt_store_word_with_release(status, uintptr_t(agg_finished)); }
  37. aggregator_operation* next() { return itt_hide_load_word(my_next);}
  38. void set_next(aggregator_operation* n) { itt_hide_store_word(my_next, n); }
  39. };
  40. namespace internal {
  41. class basic_operation_base : public aggregator_operation {
  42. friend class basic_handler;
  43. virtual void apply_body() = 0;
  44. public:
  45. basic_operation_base() : aggregator_operation() {}
  46. virtual ~basic_operation_base() {}
  47. };
  48. template<typename Body>
  49. class basic_operation : public basic_operation_base, no_assign {
  50. const Body& my_body;
  51. void apply_body() __TBB_override { my_body(); }
  52. public:
  53. basic_operation(const Body& b) : basic_operation_base(), my_body(b) {}
  54. };
  55. class basic_handler {
  56. public:
  57. basic_handler() {}
  58. void operator()(aggregator_operation* op_list) const {
  59. while (op_list) {
  60. // ITT note: &(op_list->status) tag is used to cover accesses to the operation data.
  61. // The executing thread "acquires" the tag (see start()) and then performs
  62. // the associated operation w/o triggering a race condition diagnostics.
  63. // A thread that created the operation is waiting for its status (see execute_impl()),
  64. // so when this thread is done with the operation, it will "release" the tag
  65. // and update the status (see finish()) to give control back to the waiting thread.
  66. basic_operation_base& request = static_cast<basic_operation_base&>(*op_list);
  67. // IMPORTANT: need to advance op_list to op_list->next() before calling request.finish()
  68. op_list = op_list->next();
  69. request.start();
  70. request.apply_body();
  71. request.finish();
  72. }
  73. }
  74. };
  75. } // namespace internal
  76. //! Aggregator base class and expert interface
  77. /** An aggregator for collecting operations coming from multiple sources and executing
  78. them serially on a single thread. */
  79. template <typename handler_type>
  80. class aggregator_ext : tbb::internal::no_copy {
  81. public:
  82. aggregator_ext(const handler_type& h) : handler_busy(0), handle_operations(h) { mailbox = NULL; }
  83. //! EXPERT INTERFACE: Enter a user-made operation into the aggregator's mailbox.
  84. /** Details of user-made operations must be handled by user-provided handler */
  85. void process(aggregator_operation *op) { execute_impl(*op); }
  86. protected:
  87. /** Place operation in mailbox, then either handle mailbox or wait for the operation
  88. to be completed by a different thread. */
  89. void execute_impl(aggregator_operation& op) {
  90. aggregator_operation* res;
  91. // ITT note: &(op.status) tag is used to cover accesses to this operation. This
  92. // thread has created the operation, and now releases it so that the handler
  93. // thread may handle the associated operation w/o triggering a race condition;
  94. // thus this tag will be acquired just before the operation is handled in the
  95. // handle_operations functor.
  96. call_itt_notify(releasing, &(op.status));
  97. // insert the operation into the list
  98. do {
  99. // ITT may flag the following line as a race; it is a false positive:
  100. // This is an atomic read; we don't provide itt_hide_load_word for atomics
  101. op.my_next = res = mailbox; // NOT A RACE
  102. } while (mailbox.compare_and_swap(&op, res) != res);
  103. if (!res) { // first in the list; handle the operations
  104. // ITT note: &mailbox tag covers access to the handler_busy flag, which this
  105. // waiting handler thread will try to set before entering handle_operations.
  106. call_itt_notify(acquired, &mailbox);
  107. start_handle_operations();
  108. __TBB_ASSERT(op.status, NULL);
  109. }
  110. else { // not first; wait for op to be ready
  111. call_itt_notify(prepare, &(op.status));
  112. spin_wait_while_eq(op.status, uintptr_t(aggregator_operation::agg_waiting));
  113. itt_load_word_with_acquire(op.status);
  114. }
  115. }
  116. private:
  117. //! An atomically updated list (aka mailbox) of aggregator_operations
  118. atomic<aggregator_operation *> mailbox;
  119. //! Controls thread access to handle_operations
  120. /** Behaves as boolean flag where 0=false, 1=true */
  121. uintptr_t handler_busy;
  122. handler_type handle_operations;
  123. //! Trigger the handling of operations when the handler is free
  124. void start_handle_operations() {
  125. aggregator_operation *pending_operations;
  126. // ITT note: &handler_busy tag covers access to mailbox as it is passed
  127. // between active and waiting handlers. Below, the waiting handler waits until
  128. // the active handler releases, and the waiting handler acquires &handler_busy as
  129. // it becomes the active_handler. The release point is at the end of this
  130. // function, when all operations in mailbox have been handled by the
  131. // owner of this aggregator.
  132. call_itt_notify(prepare, &handler_busy);
  133. // get handler_busy: only one thread can possibly spin here at a time
  134. spin_wait_until_eq(handler_busy, uintptr_t(0));
  135. call_itt_notify(acquired, &handler_busy);
  136. // acquire fence not necessary here due to causality rule and surrounding atomics
  137. __TBB_store_with_release(handler_busy, uintptr_t(1));
  138. // ITT note: &mailbox tag covers access to the handler_busy flag itself.
  139. // Capturing the state of the mailbox signifies that handler_busy has been
  140. // set and a new active handler will now process that list's operations.
  141. call_itt_notify(releasing, &mailbox);
  142. // grab pending_operations
  143. pending_operations = mailbox.fetch_and_store(NULL);
  144. // handle all the operations
  145. handle_operations(pending_operations);
  146. // release the handler
  147. itt_store_word_with_release(handler_busy, uintptr_t(0));
  148. }
  149. };
  150. //! Basic aggregator interface
  151. class aggregator : private aggregator_ext<internal::basic_handler> {
  152. public:
  153. aggregator() : aggregator_ext<internal::basic_handler>(internal::basic_handler()) {}
  154. //! BASIC INTERFACE: Enter a function for exclusive execution by the aggregator.
  155. /** The calling thread stores the function object in a basic_operation and
  156. places the operation in the aggregator's mailbox */
  157. template<typename Body>
  158. void execute(const Body& b) {
  159. internal::basic_operation<Body> op(b);
  160. this->execute_impl(op);
  161. }
  162. };
  163. } // namespace interface6
  164. using interface6::aggregator;
  165. using interface6::aggregator_ext;
  166. using interface6::aggregator_operation;
  167. } // namespace tbb
  168. #include "internal/_warning_suppress_disable_notice.h"
  169. #undef __TBB_aggregator_H_include_area
  170. #endif // __TBB__aggregator_H