| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547 |
- /*
- Copyright (c) 2005-2020 Intel Corporation
- Licensed under the Apache License, Version 2.0 (the "License");
- you may not use this file except in compliance with the License.
- You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
- */
- #ifndef __TBB_flow_graph_impl_H
- #define __TBB_flow_graph_impl_H
- #include "../tbb_stddef.h"
- #include "../task.h"
- #include "../task_arena.h"
- #include "../flow_graph_abstractions.h"
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- #include "../concurrent_priority_queue.h"
- #endif
- #include <list>
- #if TBB_DEPRECATED_FLOW_ENQUEUE
- #define FLOW_SPAWN(a) tbb::task::enqueue((a))
- #else
- #define FLOW_SPAWN(a) tbb::task::spawn((a))
- #endif
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
- #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
- #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
- #else
- #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
- #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
- #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
- #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
- #define __TBB_DEPRECATED_LIMITER_EXPR( expr ) expr
- #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1, arg2
- #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg3, arg4
- #else
- #define __TBB_DEPRECATED_LIMITER_EXPR( expr )
- #define __TBB_DEPRECATED_LIMITER_ARG2( arg1, arg2 ) arg1
- #define __TBB_DEPRECATED_LIMITER_ARG4( arg1, arg2, arg3, arg4 ) arg1, arg2
- #endif // TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
- namespace tbb {
- namespace flow {
- namespace internal {
- static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- typedef unsigned int node_priority_t;
- static const node_priority_t no_priority = node_priority_t(0);
- #endif
- }
- namespace interface10 {
- class graph;
- }
- namespace interface11 {
- using tbb::flow::internal::SUCCESSFULLY_ENQUEUED;
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- using tbb::flow::internal::node_priority_t;
- using tbb::flow::internal::no_priority;
- //! Base class for tasks generated by graph nodes.
- struct graph_task : public task {
- graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
- node_priority_t priority;
- };
- #else
- typedef task graph_task;
- #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
- class graph_node;
- template <typename GraphContainerType, typename GraphNodeType>
- class graph_iterator {
- friend class tbb::flow::interface10::graph;
- friend class graph_node;
- public:
- typedef size_t size_type;
- typedef GraphNodeType value_type;
- typedef GraphNodeType* pointer;
- typedef GraphNodeType& reference;
- typedef const GraphNodeType& const_reference;
- typedef std::forward_iterator_tag iterator_category;
- //! Default constructor
- graph_iterator() : my_graph(NULL), current_node(NULL) {}
- //! Copy constructor
- graph_iterator(const graph_iterator& other) :
- my_graph(other.my_graph), current_node(other.current_node)
- {}
- //! Assignment
- graph_iterator& operator=(const graph_iterator& other) {
- if (this != &other) {
- my_graph = other.my_graph;
- current_node = other.current_node;
- }
- return *this;
- }
- //! Dereference
- reference operator*() const;
- //! Dereference
- pointer operator->() const;
- //! Equality
- bool operator==(const graph_iterator& other) const {
- return ((my_graph == other.my_graph) && (current_node == other.current_node));
- }
- //! Inequality
- bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
- //! Pre-increment
- graph_iterator& operator++() {
- internal_forward();
- return *this;
- }
- //! Post-increment
- graph_iterator operator++(int) {
- graph_iterator result = *this;
- operator++();
- return result;
- }
- private:
- // the graph over which we are iterating
- GraphContainerType *my_graph;
- // pointer into my_graph's my_nodes list
- pointer current_node;
- //! Private initializing constructor for begin() and end() iterators
- graph_iterator(GraphContainerType *g, bool begin);
- void internal_forward();
- }; // class graph_iterator
- // flags to modify the behavior of the graph reset(). Can be combined.
- enum reset_flags {
- rf_reset_protocol = 0,
- rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
- rf_clear_edges = 1 << 1 // delete edges
- };
- namespace internal {
- void activate_graph(tbb::flow::interface10::graph& g);
- void deactivate_graph(tbb::flow::interface10::graph& g);
- bool is_graph_active(tbb::flow::interface10::graph& g);
- tbb::task& prioritize_task(tbb::flow::interface10::graph& g, tbb::task& arena_task);
- void spawn_in_graph_arena(tbb::flow::interface10::graph& g, tbb::task& arena_task);
- void enqueue_in_graph_arena(tbb::flow::interface10::graph &g, tbb::task& arena_task);
- void add_task_to_graph_reset_list(tbb::flow::interface10::graph& g, tbb::task *tp);
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- struct graph_task_comparator {
- bool operator()(const graph_task* left, const graph_task* right) {
- return left->priority < right->priority;
- }
- };
- typedef tbb::concurrent_priority_queue<graph_task*, graph_task_comparator> graph_task_priority_queue_t;
- class priority_task_selector : public task {
- public:
- priority_task_selector(graph_task_priority_queue_t& priority_queue)
- : my_priority_queue(priority_queue) {}
- task* execute() __TBB_override {
- graph_task* t = NULL;
- bool result = my_priority_queue.try_pop(t);
- __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
- " in graph's priority queue mismatched" );
- __TBB_ASSERT( t && t != SUCCESSFULLY_ENQUEUED,
- "Incorrect task submitted to graph priority queue" );
- __TBB_ASSERT( t->priority != tbb::flow::internal::no_priority,
- "Tasks from graph's priority queue must have priority" );
- task* t_next = t->execute();
- task::destroy(*t);
- return t_next;
- }
- private:
- graph_task_priority_queue_t& my_priority_queue;
- };
- #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
- }
- } // namespace interfaceX
- namespace interface10 {
- //! The graph class
- /** This class serves as a handle to the graph */
- class graph : tbb::internal::no_copy, public tbb::flow::graph_proxy {
- friend class tbb::flow::interface11::graph_node;
- template< typename Body >
- class run_task : public tbb::flow::interface11::graph_task {
- public:
- run_task(Body& body
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- , tbb::flow::interface11::node_priority_t node_priority = tbb::flow::interface11::no_priority
- ) : tbb::flow::interface11::graph_task(node_priority),
- #else
- ) :
- #endif
- my_body(body) { }
- tbb::task *execute() __TBB_override {
- my_body();
- return NULL;
- }
- private:
- Body my_body;
- };
- template< typename Receiver, typename Body >
- class run_and_put_task : public tbb::flow::interface11::graph_task {
- public:
- run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
- tbb::task *execute() __TBB_override {
- tbb::task *res = my_receiver.try_put_task(my_body());
- if (res == tbb::flow::interface11::SUCCESSFULLY_ENQUEUED) res = NULL;
- return res;
- }
- private:
- Receiver &my_receiver;
- Body my_body;
- };
- typedef std::list<tbb::task *> task_list_type;
- class wait_functor {
- tbb::task* graph_root_task;
- public:
- wait_functor(tbb::task* t) : graph_root_task(t) {}
- void operator()() const { graph_root_task->wait_for_all(); }
- };
- //! A functor that spawns a task
- class spawn_functor : tbb::internal::no_assign {
- tbb::task& spawn_task;
- public:
- spawn_functor(tbb::task& t) : spawn_task(t) {}
- void operator()() const {
- FLOW_SPAWN(spawn_task);
- }
- };
- void prepare_task_arena(bool reinit = false) {
- if (reinit) {
- __TBB_ASSERT(my_task_arena, "task arena is NULL");
- my_task_arena->terminate();
- my_task_arena->initialize(tbb::task_arena::attach());
- }
- else {
- __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
- my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
- }
- if (!my_task_arena->is_active()) // failed to attach
- my_task_arena->initialize(); // create a new, default-initialized arena
- __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
- }
- public:
- //! Constructs a graph with isolated task_group_context
- graph();
- //! Constructs a graph with use_this_context as context
- explicit graph(tbb::task_group_context& use_this_context);
- //! Destroys the graph.
- /** Calls wait_for_all, then destroys the root task and context. */
- ~graph();
- #if TBB_PREVIEW_FLOW_GRAPH_TRACE
- void set_name(const char *name);
- #endif
- __TBB_DEPRECATED void increment_wait_count() {
- reserve_wait();
- }
- __TBB_DEPRECATED void decrement_wait_count() {
- release_wait();
- }
- //! Used to register that an external entity may still interact with the graph.
- /** The graph will not return from wait_for_all until a matching number of decrement_wait_count calls
- is made. */
- void reserve_wait() __TBB_override;
- //! Deregisters an external entity that may have interacted with the graph.
- /** The graph will not return from wait_for_all until all the number of decrement_wait_count calls
- matches the number of increment_wait_count calls. */
- void release_wait() __TBB_override;
- //! Spawns a task that runs a body and puts its output to a specific receiver
- /** The task is spawned as a child of the graph. This is useful for running tasks
- that need to block a wait_for_all() on the graph. For example a one-off source. */
- template< typename Receiver, typename Body >
- __TBB_DEPRECATED void run(Receiver &r, Body body) {
- if (tbb::flow::interface11::internal::is_graph_active(*this)) {
- task* rtask = new (task::allocate_additional_child_of(*root_task()))
- run_and_put_task< Receiver, Body >(r, body);
- my_task_arena->execute(spawn_functor(*rtask));
- }
- }
- //! Spawns a task that runs a function object
- /** The task is spawned as a child of the graph. This is useful for running tasks
- that need to block a wait_for_all() on the graph. For example a one-off source. */
- template< typename Body >
- __TBB_DEPRECATED void run(Body body) {
- if (tbb::flow::interface11::internal::is_graph_active(*this)) {
- task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
- my_task_arena->execute(spawn_functor(*rtask));
- }
- }
- //! Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls.
- /** The waiting thread will go off and steal work while it is block in the wait_for_all. */
- void wait_for_all() {
- cancelled = false;
- caught_exception = false;
- if (my_root_task) {
- #if TBB_USE_EXCEPTIONS
- try {
- #endif
- my_task_arena->execute(wait_functor(my_root_task));
- #if __TBB_TASK_GROUP_CONTEXT
- cancelled = my_context->is_group_execution_cancelled();
- #endif
- #if TBB_USE_EXCEPTIONS
- }
- catch (...) {
- my_root_task->set_ref_count(1);
- my_context->reset();
- caught_exception = true;
- cancelled = true;
- throw;
- }
- #endif
- #if __TBB_TASK_GROUP_CONTEXT
- // TODO: the "if" condition below is just a work-around to support the concurrent wait
- // mode. The cancellation and exception mechanisms are still broken in this mode.
- // Consider using task group not to re-implement the same functionality.
- if (!(my_context->traits() & tbb::task_group_context::concurrent_wait)) {
- my_context->reset(); // consistent with behavior in catch()
- #endif
- my_root_task->set_ref_count(1);
- #if __TBB_TASK_GROUP_CONTEXT
- }
- #endif
- }
- }
- //! Returns the root task of the graph
- __TBB_DEPRECATED tbb::task * root_task() {
- return my_root_task;
- }
- // ITERATORS
- template<typename C, typename N>
- friend class tbb::flow::interface11::graph_iterator;
- // Graph iterator typedefs
- typedef tbb::flow::interface11::graph_iterator<graph, tbb::flow::interface11::graph_node> iterator;
- typedef tbb::flow::interface11::graph_iterator<const graph, const tbb::flow::interface11::graph_node> const_iterator;
- // Graph iterator constructors
- //! start iterator
- iterator begin();
- //! end iterator
- iterator end();
- //! start const iterator
- const_iterator begin() const;
- //! end const iterator
- const_iterator end() const;
- //! start const iterator
- const_iterator cbegin() const;
- //! end const iterator
- const_iterator cend() const;
- //! return status of graph execution
- bool is_cancelled() { return cancelled; }
- bool exception_thrown() { return caught_exception; }
- // thread-unsafe state reset.
- void reset(tbb::flow::interface11::reset_flags f = tbb::flow::interface11::rf_reset_protocol);
- private:
- tbb::task *my_root_task;
- #if __TBB_TASK_GROUP_CONTEXT
- tbb::task_group_context *my_context;
- #endif
- bool own_context;
- bool cancelled;
- bool caught_exception;
- bool my_is_active;
- task_list_type my_reset_task_list;
- tbb::flow::interface11::graph_node *my_nodes, *my_nodes_last;
- tbb::spin_mutex nodelist_mutex;
- void register_node(tbb::flow::interface11::graph_node *n);
- void remove_node(tbb::flow::interface11::graph_node *n);
- tbb::task_arena* my_task_arena;
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- tbb::flow::interface11::internal::graph_task_priority_queue_t my_priority_queue;
- #endif
- friend void tbb::flow::interface11::internal::activate_graph(graph& g);
- friend void tbb::flow::interface11::internal::deactivate_graph(graph& g);
- friend bool tbb::flow::interface11::internal::is_graph_active(graph& g);
- friend tbb::task& tbb::flow::interface11::internal::prioritize_task(graph& g, tbb::task& arena_task);
- friend void tbb::flow::interface11::internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
- friend void tbb::flow::interface11::internal::enqueue_in_graph_arena(graph &g, tbb::task& arena_task);
- friend void tbb::flow::interface11::internal::add_task_to_graph_reset_list(graph& g, tbb::task *tp);
- friend class tbb::interface7::internal::task_arena_base;
- }; // class graph
- } // namespace interface10
- namespace interface11 {
- using tbb::flow::interface10::graph;
- #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
- namespace internal{
- class get_graph_helper;
- }
- #endif
- //! The base of all graph nodes.
- class graph_node : tbb::internal::no_copy {
- friend class graph;
- template<typename C, typename N>
- friend class graph_iterator;
- #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
- friend class internal::get_graph_helper;
- #endif
- protected:
- graph& my_graph;
- graph_node *next, *prev;
- public:
- explicit graph_node(graph& g);
- virtual ~graph_node();
- #if TBB_PREVIEW_FLOW_GRAPH_TRACE
- virtual void set_name(const char *name) = 0;
- #endif
- #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
- virtual void extract() = 0;
- #endif
- protected:
- // performs the reset on an individual node.
- virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
- }; // class graph_node
- namespace internal {
- inline void activate_graph(graph& g) {
- g.my_is_active = true;
- }
- inline void deactivate_graph(graph& g) {
- g.my_is_active = false;
- }
- inline bool is_graph_active(graph& g) {
- return g.my_is_active;
- }
- #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
- inline tbb::task& prioritize_task(graph& g, tbb::task& t) {
- task* critical_task = &t;
- // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
- graph_task* gt = static_cast<graph_task*>(&t);
- if( gt->priority != no_priority ) {
- //! Non-preemptive priority pattern. The original task is submitted as a work item to the
- //! priority queue, and a new critical task is created to take and execute a work item with
- //! the highest known priority. The reference counting responsibility is transferred (via
- //! allocate_continuation) to the new task.
- critical_task = new( gt->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
- tbb::internal::make_critical( *critical_task );
- g.my_priority_queue.push(gt);
- }
- return *critical_task;
- }
- #else
- inline tbb::task& prioritize_task(graph&, tbb::task& t) {
- return t;
- }
- #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
- //! Spawns a task inside graph arena
- inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
- if (is_graph_active(g)) {
- graph::spawn_functor s_fn(prioritize_task(g, arena_task));
- __TBB_ASSERT(g.my_task_arena && g.my_task_arena->is_active(), NULL);
- g.my_task_arena->execute(s_fn);
- }
- }
- //! Enqueues a task inside graph arena
- inline void enqueue_in_graph_arena(graph &g, tbb::task& arena_task) {
- if (is_graph_active(g)) {
- __TBB_ASSERT( g.my_task_arena && g.my_task_arena->is_active(), "Is graph's arena initialized and active?" );
- task::enqueue(prioritize_task(g, arena_task), *g.my_task_arena);
- }
- }
- inline void add_task_to_graph_reset_list(graph& g, tbb::task *tp) {
- g.my_reset_task_list.push_back(tp);
- }
- } // namespace internal
- } // namespace interfaceX
- } // namespace flow
- } // namespace tbb
- #endif // __TBB_flow_graph_impl_H
|