_flow_graph_impl.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB_flow_graph_impl_H
  14. #define __TBB_flow_graph_impl_H
  15. #include "../tbb_stddef.h"
  16. #include "../task.h"
  17. #include "../task_arena.h"
  18. #include "../flow_graph_abstractions.h"
  19. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  20. #include "../concurrent_priority_queue.h"
  21. #endif
  22. #include <list>
  23. #if TBB_DEPRECATED_FLOW_ENQUEUE
  24. #define FLOW_SPAWN(a) tbb::task::enqueue((a))
  25. #else
  26. #define FLOW_SPAWN(a) tbb::task::spawn((a))
  27. #endif
  28. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  29. #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
  30. #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
  31. #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
  32. #else
  33. #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
  34. #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
  35. #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
  36. #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  37. #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
  38. #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
  39. #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
  40. #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
  41. #else
  42. #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
  43. #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
  44. #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
  45. #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
  46. namespace tbb {
  47. namespace flow {
  48. namespace internal {
  49. static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
  50. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  51. typedef unsigned int node_priority_t;
  52. static const node_priority_t no_priority = node_priority_t(0);
  53. #endif
  54. }
  55. namespace interface10 {
  56. class graph;
  57. }
  58. namespace interface11 {
  59. using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
  60. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  61. using tbb::flow::internal::node_priority_t;
  62. using tbb::flow::internal::no_priority;
  63. //! Base class for tasks generated by graph nodes.
  64. struct graph_task : public task {
  65. graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
  66. node_priority_t priority;
  67. };
  68. #else
  69. typedef task graph_task;
  70. #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
  71. class graph_node;
  72. template <typename GraphContainerType, typename GraphNodeType>
  73. class graph_iterator {
  74. friend class tbb::flow::interface10::graph;
  75. friend class graph_node;
  76. public:
  77. typedef size_t size_type;
  78. typedef GraphNodeType value_type;
  79. typedef GraphNodeType* pointer;
  80. typedef GraphNodeType& reference;
  81. typedef const GraphNodeType& const_reference;
  82. typedef std::forward_iterator_tag iterator_category;
  83. //! Default constructor
  84. graph_iterator() : my_graph(NULL), current_node(NULL) {}
  85. //! Copy constructor
  86. graph_iterator(const graph_iterator& other) :
  87. my_graph(other.my_graph), current_node(other.current_node)
  88. {}
  89. //! Assignment
  90. graph_iterator& operator=(const graph_iterator& other) {
  91. if (this != &other) {
  92. my_graph = other.my_graph;
  93. current_node = other.current_node;
  94. }
  95. return *this;
  96. }
  97. //! Dereference
  98. reference operator*() const;
  99. //! Dereference
  100. pointer operator->() const;
  101. //! Equality
  102. bool operator==(const graph_iterator& other) const {
  103. return ((my_graph == other.my_graph) && (current_node == other.current_node));
  104. }
  105. //! Inequality
  106. bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
  107. //! Pre-increment
  108. graph_iterator& operator++() {
  109. internal_forward();
  110. return *this;
  111. }
  112. //! Post-increment
  113. graph_iterator operator++(int) {
  114. graph_iterator result = *this;
  115. operator++();
  116. return result;
  117. }
  118. private:
  119. // the graph over which we are iterating
  120. GraphContainerType *my_graph;
  121. // pointer into my_graph's my_nodes list
  122. pointer current_node;
  123. //! Private initializing constructor for begin() and end() iterators
  124. graph_iterator(GraphContainerType *g, bool begin);
  125. void internal_forward();
  126. }; // class graph_iterator
  127. // flags to modify the behavior of the graph reset(). Can be combined.
  128. enum reset_flags {
  129. rf_reset_protocol = 0,
  130. rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
  131. rf_clear_edges = 1 << 1 // delete edges
  132. };
  133. namespace internal {
  134. void activate_graph(tbb::flow::interface10::graph& g);
  135. void deactivate_graph(tbb::flow::interface10::graph& g);
  136. bool is_graph_active(tbb::flow::interface10::graph& g);
  137. tbb::task& prioritize_task(tbb::flow::interface10::graph& g, tbb::task& arena_task);
  138. void spawn_in_graph_arena(tbb::flow::interface10::graph& g, tbb::task& arena_task);
  139. void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task& arena_task);
  140. void add_task_to_graph_reset_list(tbb::flow::interface10::graph& g, tbb::task *tp);
  141. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  142. struct graph_task_comparator {
  143. bool operator()(const graph_task* left, const graph_task* right) {
  144. return left->priority < right->priority;
  145. }
  146. };
  147. typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
  148. class priority_task_selector : public task {
  149. public:
  150. priority_task_selector(graph_task_priority_queue_t& priority_queue)
  151. : my_priority_queue(priority_queue) {}
  152. task* execute() __TBB_override {
  153. graph_task* t = NULL;
  154. bool result = my_priority_queue.try_pop(t);
  155. __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
  156. " in graph's priority queue mismatched" );
  157. __TBB_ASSERT( t && t != SUCCESSFULLY_ENQUEUED,
  158. "Incorrect task submitted to graph priority queue" );
  159. __TBB_ASSERT( t->priority != tbb::flow::internal::no_priority,
  160. "Tasks from graph's priority queue must have priority" );
  161. task* t_next = t->execute();
  162. task::destroy(*t);
  163. return t_next;
  164. }
  165. private:
  166. graph_task_priority_queue_t& my_priority_queue;
  167. };
  168. #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
  169. }
  170. } // namespace interfaceX
  171. namespace interface10 {
  172. //! The graph class
  173. /** This class serves as a handle to the graph */
  174. class graph : tbb::internal::no_copy, public tbb::flow::graph_proxy {
  175. friend class tbb::flow::interface11::graph_node;
  176. template< typename Body >
  177. class run_task : public tbb::flow::interface11::graph_task {
  178. public:
  179. run_task(Body& body
  180. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  181. , tbb::flow::interface11::node_priority_t node_priority = tbb::flow::interface11::no_priority
  182. ) : tbb::flow::interface11::graph_task(node_priority),
  183. #else
  184. ) :
  185. #endif
  186. my_body(body) { }
  187. tbb::task *execute() __TBB_override {
  188. my_body();
  189. return NULL;
  190. }
  191. private:
  192. Body my_body;
  193. };
  194. template< typename Receiver, typename Body >
  195. class run_and_put_task : public tbb::flow::interface11::graph_task {
  196. public:
  197. run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
  198. tbb::task *execute() __TBB_override {
  199. tbb::task *res = my_receiver.try_put_task(my_body());
  200. if (res == tbb::flow::interface11::SUCCESSFULLY_ENQUEUED) res = NULL;
  201. return res;
  202. }
  203. private:
  204. Receiver &my_receiver;
  205. Body my_body;
  206. };
  207. typedef std::list<tbb::task *> task_list_type;
  208. class wait_functor {
  209. tbb::task* graph_root_task;
  210. public:
  211. wait_functor(tbb::task* t) : graph_root_task(t) {}
  212. void operator()() const { graph_root_task->wait_for_all(); }
  213. };
  214. //! A functor that spawns a task
  215. class spawn_functor : tbb::internal::no_assign {
  216. tbb::task& spawn_task;
  217. public:
  218. spawn_functor(tbb::task& t) : spawn_task(t) {}
  219. void operator()() const {
  220. FLOW_SPAWN(spawn_task);
  221. }
  222. };
  223. void prepare_task_arena(bool reinit = false) {
  224. if (reinit) {
  225. __TBB_ASSERT(my_task_arena, "task arena is NULL");
  226. my_task_arena->terminate();
  227. my_task_arena->initialize(tbb::task_arena::attach());
  228. }
  229. else {
  230. __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
  231. my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
  232. }
  233. if (!my_task_arena->is_active()) // failed to attach
  234. my_task_arena->initialize(); // create a new, default-initialized arena
  235. __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
  236. }
  237. public:
  238. //! Constructs a graph with isolated task_group_context
  239. graph();
  240. //! Constructs a graph with use_this_context as context
  241. explicit graph(tbb::task_group_context& use_this_context);
  242. //! Destroys the graph.
  243. /** Calls wait_for_all, then destroys the root task and context. */
  244. ~graph();
  245. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  246. void set_name(const char *name);
  247. #endif
  248. __TBB_DEPRECATED void increment_wait_count() {
  249. reserve_wait();
  250. }
  251. __TBB_DEPRECATED void decrement_wait_count() {
  252. release_wait();
  253. }
  254. //! Used to register that an external entity may still interact with the graph.
  255. /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
  256. is made. */
  257. void reserve_wait() __TBB_override;
  258. //! Deregisters an external entity that may have interacted with the graph.
  259. /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
  260. matches the number of increment_wait_count calls. */
  261. void release_wait() __TBB_override;
  262. //! Spawns a task that runs a body and puts its output to a specific receiver
  263. /** The task is spawned as a child of the graph. This is useful for running tasks
  264. that need to block a wait_for_all() on the graph. For example a one-off source. */
  265. template< typename Receiver, typename Body >
  266. __TBB_DEPRECATED void run(Receiver &r, Body body) {
  267. if (tbb::flow::interface11::internal::is_graph_active(*this)) {
  268. task* rtask = new (task::allocate_additional_child_of(*root_task()))
  269. run_and_put_task< Receiver, Body >(r, body);
  270. my_task_arena->execute(spawn_functor(*rtask));
  271. }
  272. }
  273. //! Spawns a task that runs a function object
  274. /** The task is spawned as a child of the graph. This is useful for running tasks
  275. that need to block a wait_for_all() on the graph. For example a one-off source. */
  276. template< typename Body >
  277. __TBB_DEPRECATED void run(Body body) {
  278. if (tbb::flow::interface11::internal::is_graph_active(*this)) {
  279. task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
  280. my_task_arena->execute(spawn_functor(*rtask));
  281. }
  282. }
  283. //! Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
  284. /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
  285. void wait_for_all() {
  286. cancelled = false;
  287. caught_exception = false;
  288. if (my_root_task) {
  289. #if TBB_USE_EXCEPTIONS
  290. try {
  291. #endif
  292. my_task_arena->execute(wait_functor(my_root_task));
  293. #if __TBB_TASK_GROUP_CONTEXT
  294. cancelled = my_context->is_group_execution_cancelled();
  295. #endif
  296. #if TBB_USE_EXCEPTIONS
  297. }
  298. catch (...) {
  299. my_root_task->set_ref_count(1);
  300. my_context->reset();
  301. caught_exception = true;
  302. cancelled = true;
  303. throw;
  304. }
  305. #endif
  306. #if __TBB_TASK_GROUP_CONTEXT
  307. // TODO: the "if" condition below is just a work-around to support the concurrent wait
  308. // mode. The cancellation and exception mechanisms are still broken in this mode.
  309. // Consider using task group not to re-implement the same functionality.
  310. if (!(my_context->traits() & tbb::task_group_context::concurrent_wait)) {
  311. my_context->reset(); // consistent with behavior in catch()
  312. #endif
  313. my_root_task->set_ref_count(1);
  314. #if __TBB_TASK_GROUP_CONTEXT
  315. }
  316. #endif
  317. }
  318. }
  319. //! Returns the root task of the graph
  320. __TBB_DEPRECATED tbb::task * root_task() {
  321. return my_root_task;
  322. }
  323. // ITERATORS
  324. template<typename C, typename N>
  325. friend class tbb::flow::interface11::graph_iterator;
  326. // Graph iterator typedefs
  327. typedef tbb::flow::interface11::graph_iterator<graph, tbb::flow::interface11::graph_node> iterator;
  328. typedef tbb::flow::interface11::graph_iterator<const graph, const tbb::flow::interface11::graph_node> const_iterator;
  329. // Graph iterator constructors
  330. //! start iterator
  331. iterator begin();
  332. //! end iterator
  333. iterator end();
  334. //! start const iterator
  335. const_iterator begin() const;
  336. //! end const iterator
  337. const_iterator end() const;
  338. //! start const iterator
  339. const_iterator cbegin() const;
  340. //! end const iterator
  341. const_iterator cend() const;
  342. //! return status of graph execution
  343. bool is_cancelled() { return cancelled; }
  344. bool exception_thrown() { return caught_exception; }
  345. // thread-unsafe state reset.
  346. void reset(tbb::flow::interface11::reset_flags f = tbb::flow::interface11::rf_reset_protocol);
  347. private:
  348. tbb::task *my_root_task;
  349. #if __TBB_TASK_GROUP_CONTEXT
  350. tbb::task_group_context *my_context;
  351. #endif
  352. bool own_context;
  353. bool cancelled;
  354. bool caught_exception;
  355. bool my_is_active;
  356. task_list_type my_reset_task_list;
  357. tbb::flow::interface11::graph_node *my_nodes, *my_nodes_last;
  358. tbb::spin_mutex nodelist_mutex;
  359. void register_node(tbb::flow::interface11::graph_node *n);
  360. void remove_node(tbb::flow::interface11::graph_node *n);
  361. tbb::task_arena* my_task_arena;
  362. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  363. tbb::flow::interface11::internal::graph_task_priority_queue_t my_priority_queue;
  364. #endif
  365. friend void tbb::flow::interface11::internal::activate_graph(graph& g);
  366. friend void tbb::flow::interface11::internal::deactivate_graph(graph& g);
  367. friend bool tbb::flow::interface11::internal::is_graph_active(graph& g);
  368. friend tbb::task& tbb::flow::interface11::internal::prioritize_task(graph& g, tbb::task& arena_task);
  369. friend void tbb::flow::interface11::internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
  370. friend void tbb::flow::interface11::internal::enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
  371. friend void tbb::flow::interface11::internal::add_task_to_graph_reset_list(graph& g, tbb::task *tp);
  372. friend class tbb::interface7::internal::task_arena_base;
  373. }; // class graph
  374. } // namespace interface10
  375. namespace interface11 {
  376. using tbb::flow::interface10::graph;
  377. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  378. namespace internal{
  379. class get_graph_helper;
  380. }
  381. #endif
  382. //! The base of all graph nodes.
  383. class graph_node : tbb::internal::no_copy {
  384. friend class graph;
  385. template<typename C, typename N>
  386. friend class graph_iterator;
  387. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  388. friend class internal::get_graph_helper;
  389. #endif
  390. protected:
  391. graph& my_graph;
  392. graph_node *next, *prev;
  393. public:
  394. explicit graph_node(graph& g);
  395. virtual ~graph_node();
  396. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  397. virtual void set_name(const char *name) = 0;
  398. #endif
  399. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  400. virtual void extract() = 0;
  401. #endif
  402. protected:
  403. // performs the reset on an individual node.
  404. virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
  405. }; // class graph_node
  406. namespace internal {
  407. inline void activate_graph(graph& g) {
  408. g.my_is_active = true;
  409. }
  410. inline void deactivate_graph(graph& g) {
  411. g.my_is_active = false;
  412. }
  413. inline bool is_graph_active(graph& g) {
  414. return g.my_is_active;
  415. }
  416. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  417. inline tbb::task& prioritize_task(graph& g, tbb::task& t) {
  418. task* critical_task = &t;
  419. // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
  420. graph_task* gt = static_cast<graph_task*>(&t);
  421. if( gt->priority != no_priority ) {
  422. //! Non-preemptive priority pattern. The original task is submitted as a work item to the
  423. //! priority queue, and a new critical task is created to take and execute a work item with
  424. //! the highest known priority. The reference counting responsibility is transferred (via
  425. //! allocate_continuation) to the new task.
  426. critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
  427. tbb::internal::make_critical( *critical_task );
  428. g.my_priority_queue.push(gt);
  429. }
  430. return *critical_task;
  431. }
  432. #else
  433. inline tbb::task& prioritize_task(graph&, tbb::task& t) {
  434. return t;
  435. }
  436. #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
  437. //! Spawns a task inside graph arena
  438. inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
  439. if (is_graph_active(g)) {
  440. graph::spawn_functor s_fn(prioritize_task(g, arena_task));
  441. __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL);
  442. g.my_task_arena->execute(s_fn);
  443. }
  444. }
  445. //! Enqueues a task inside graph arena
  446. inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
  447. if (is_graph_active(g)) {
  448. __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
  449. task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
  450. }
  451. }
  452. inline void add_task_to_graph_reset_list(graph& g, tbb::task *tp) {
  453. g.my_reset_task_list.push_back(tp);
  454. }
  455. } // namespace internal
  456. } // namespace interfaceX
  457. } // namespace flow
  458. } // namespace tbb
  459. #endif // __TBB_flow_graph_impl_H