_flow_graph_cache_impl.h 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB__flow_graph_cache_impl_H
  14. #define __TBB__flow_graph_cache_impl_H
  15. #ifndef __TBB_flow_graph_H
  16. #error Do not #include this internal file directly; use public TBB headers instead.
  17. #endif
  18. // included in namespace tbb::flow::interfaceX (in flow_graph.h)
  19. namespace internal {
  20. //! A node_cache maintains a std::queue of elements of type T. Each operation is protected by a lock.
  21. template< typename T, typename M=spin_mutex >
  22. class node_cache {
  23. public:
  24. typedef size_t size_type;
  25. bool empty() {
  26. typename mutex_type::scoped_lock lock( my_mutex );
  27. return internal_empty();
  28. }
  29. void add( T &n ) {
  30. typename mutex_type::scoped_lock lock( my_mutex );
  31. internal_push(n);
  32. }
  33. void remove( T &n ) {
  34. typename mutex_type::scoped_lock lock( my_mutex );
  35. for ( size_t i = internal_size(); i != 0; --i ) {
  36. T &s = internal_pop();
  37. if ( &s == &n ) return; // only remove one predecessor per request
  38. internal_push(s);
  39. }
  40. }
  41. void clear() {
  42. while( !my_q.empty()) (void)my_q.pop();
  43. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  44. my_built_predecessors.clear();
  45. #endif
  46. }
  47. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  48. typedef edge_container<T> built_predecessors_type;
  49. built_predecessors_type &built_predecessors() { return my_built_predecessors; }
  50. typedef typename edge_container<T>::edge_list_type predecessor_list_type;
  51. void internal_add_built_predecessor( T &n ) {
  52. typename mutex_type::scoped_lock lock( my_mutex );
  53. my_built_predecessors.add_edge(n);
  54. }
  55. void internal_delete_built_predecessor( T &n ) {
  56. typename mutex_type::scoped_lock lock( my_mutex );
  57. my_built_predecessors.delete_edge(n);
  58. }
  59. void copy_predecessors( predecessor_list_type &v) {
  60. typename mutex_type::scoped_lock lock( my_mutex );
  61. my_built_predecessors.copy_edges(v);
  62. }
  63. size_t predecessor_count() {
  64. typename mutex_type::scoped_lock lock(my_mutex);
  65. return (size_t)(my_built_predecessors.edge_count());
  66. }
  67. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  68. protected:
  69. typedef M mutex_type;
  70. mutex_type my_mutex;
  71. std::queue< T * > my_q;
  72. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  73. built_predecessors_type my_built_predecessors;
  74. #endif
  75. // Assumes lock is held
  76. inline bool internal_empty( ) {
  77. return my_q.empty();
  78. }
  79. // Assumes lock is held
  80. inline size_type internal_size( ) {
  81. return my_q.size();
  82. }
  83. // Assumes lock is held
  84. inline void internal_push( T &n ) {
  85. my_q.push(&n);
  86. }
  87. // Assumes lock is held
  88. inline T &internal_pop() {
  89. T *v = my_q.front();
  90. my_q.pop();
  91. return *v;
  92. }
  93. };
  94. //! A cache of predecessors that only supports try_get
  95. template< typename T, typename M=spin_mutex >
  96. #if __TBB_PREVIEW_ASYNC_MSG
  97. // TODO: make predecessor_cache type T-independent when async_msg becomes regular feature
  98. class predecessor_cache : public node_cache< untyped_sender, M > {
  99. #else
  100. class predecessor_cache : public node_cache< sender<T>, M > {
  101. #endif // __TBB_PREVIEW_ASYNC_MSG
  102. public:
  103. typedef M mutex_type;
  104. typedef T output_type;
  105. #if __TBB_PREVIEW_ASYNC_MSG
  106. typedef untyped_sender predecessor_type;
  107. typedef untyped_receiver successor_type;
  108. #else
  109. typedef sender<output_type> predecessor_type;
  110. typedef receiver<output_type> successor_type;
  111. #endif // __TBB_PREVIEW_ASYNC_MSG
  112. predecessor_cache( ) : my_owner( NULL ) { }
  113. void set_owner( successor_type *owner ) { my_owner = owner; }
  114. bool get_item( output_type &v ) {
  115. bool msg = false;
  116. do {
  117. predecessor_type *src;
  118. {
  119. typename mutex_type::scoped_lock lock(this->my_mutex);
  120. if ( this->internal_empty() ) {
  121. break;
  122. }
  123. src = &this->internal_pop();
  124. }
  125. // Try to get from this sender
  126. msg = src->try_get( v );
  127. if (msg == false) {
  128. // Relinquish ownership of the edge
  129. if (my_owner)
  130. src->register_successor( *my_owner );
  131. } else {
  132. // Retain ownership of the edge
  133. this->add(*src);
  134. }
  135. } while ( msg == false );
  136. return msg;
  137. }
  138. // If we are removing arcs (rf_clear_edges), call clear() rather than reset().
  139. void reset() {
  140. if (my_owner) {
  141. for(;;) {
  142. predecessor_type *src;
  143. {
  144. if (this->internal_empty()) break;
  145. src = &this->internal_pop();
  146. }
  147. src->register_successor( *my_owner );
  148. }
  149. }
  150. }
  151. protected:
  152. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  153. using node_cache< predecessor_type, M >::my_built_predecessors;
  154. #endif
  155. successor_type *my_owner;
  156. };
  157. //! An cache of predecessors that supports requests and reservations
  158. // TODO: make reservable_predecessor_cache type T-independent when async_msg becomes regular feature
  159. template< typename T, typename M=spin_mutex >
  160. class reservable_predecessor_cache : public predecessor_cache< T, M > {
  161. public:
  162. typedef M mutex_type;
  163. typedef T output_type;
  164. #if __TBB_PREVIEW_ASYNC_MSG
  165. typedef untyped_sender predecessor_type;
  166. typedef untyped_receiver successor_type;
  167. #else
  168. typedef sender<T> predecessor_type;
  169. typedef receiver<T> successor_type;
  170. #endif // __TBB_PREVIEW_ASYNC_MSG
  171. reservable_predecessor_cache( ) : reserved_src(NULL) { }
  172. bool
  173. try_reserve( output_type &v ) {
  174. bool msg = false;
  175. do {
  176. {
  177. typename mutex_type::scoped_lock lock(this->my_mutex);
  178. if ( reserved_src || this->internal_empty() )
  179. return false;
  180. reserved_src = &this->internal_pop();
  181. }
  182. // Try to get from this sender
  183. msg = reserved_src->try_reserve( v );
  184. if (msg == false) {
  185. typename mutex_type::scoped_lock lock(this->my_mutex);
  186. // Relinquish ownership of the edge
  187. reserved_src->register_successor( *this->my_owner );
  188. reserved_src = NULL;
  189. } else {
  190. // Retain ownership of the edge
  191. this->add( *reserved_src );
  192. }
  193. } while ( msg == false );
  194. return msg;
  195. }
  196. bool
  197. try_release( ) {
  198. reserved_src->try_release( );
  199. reserved_src = NULL;
  200. return true;
  201. }
  202. bool
  203. try_consume( ) {
  204. reserved_src->try_consume( );
  205. reserved_src = NULL;
  206. return true;
  207. }
  208. void reset( ) {
  209. reserved_src = NULL;
  210. predecessor_cache<T,M>::reset( );
  211. }
  212. void clear() {
  213. reserved_src = NULL;
  214. predecessor_cache<T,M>::clear();
  215. }
  216. private:
  217. predecessor_type *reserved_src;
  218. };
  219. //! An abstract cache of successors
  220. // TODO: make successor_cache type T-independent when async_msg becomes regular feature
  221. template<typename T, typename M=spin_rw_mutex >
  222. class successor_cache : tbb::internal::no_copy {
  223. protected:
  224. typedef M mutex_type;
  225. mutex_type my_mutex;
  226. #if __TBB_PREVIEW_ASYNC_MSG
  227. typedef untyped_receiver successor_type;
  228. typedef untyped_receiver *pointer_type;
  229. typedef untyped_sender owner_type;
  230. #else
  231. typedef receiver<T> successor_type;
  232. typedef receiver<T> *pointer_type;
  233. typedef sender<T> owner_type;
  234. #endif // __TBB_PREVIEW_ASYNC_MSG
  235. typedef std::list< pointer_type > successors_type;
  236. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  237. edge_container<successor_type> my_built_successors;
  238. #endif
  239. successors_type my_successors;
  240. owner_type *my_owner;
  241. public:
  242. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  243. typedef typename edge_container<successor_type>::edge_list_type successor_list_type;
  244. edge_container<successor_type> &built_successors() { return my_built_successors; }
  245. void internal_add_built_successor( successor_type &r) {
  246. typename mutex_type::scoped_lock l(my_mutex, true);
  247. my_built_successors.add_edge( r );
  248. }
  249. void internal_delete_built_successor( successor_type &r) {
  250. typename mutex_type::scoped_lock l(my_mutex, true);
  251. my_built_successors.delete_edge(r);
  252. }
  253. void copy_successors( successor_list_type &v) {
  254. typename mutex_type::scoped_lock l(my_mutex, false);
  255. my_built_successors.copy_edges(v);
  256. }
  257. size_t successor_count() {
  258. typename mutex_type::scoped_lock l(my_mutex,false);
  259. return my_built_successors.edge_count();
  260. }
  261. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  262. successor_cache( ) : my_owner(NULL) {}
  263. void set_owner( owner_type *owner ) { my_owner = owner; }
  264. virtual ~successor_cache() {}
  265. void register_successor( successor_type &r ) {
  266. typename mutex_type::scoped_lock l(my_mutex, true);
  267. my_successors.push_back( &r );
  268. }
  269. void remove_successor( successor_type &r ) {
  270. typename mutex_type::scoped_lock l(my_mutex, true);
  271. for ( typename successors_type::iterator i = my_successors.begin();
  272. i != my_successors.end(); ++i ) {
  273. if ( *i == & r ) {
  274. my_successors.erase(i);
  275. break;
  276. }
  277. }
  278. }
  279. bool empty() {
  280. typename mutex_type::scoped_lock l(my_mutex, false);
  281. return my_successors.empty();
  282. }
  283. void clear() {
  284. my_successors.clear();
  285. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  286. my_built_successors.clear();
  287. #endif
  288. }
  289. #if !__TBB_PREVIEW_ASYNC_MSG
  290. virtual task * try_put_task( const T &t ) = 0;
  291. #endif // __TBB_PREVIEW_ASYNC_MSG
  292. }; // successor_cache<T>
  293. //! An abstract cache of successors, specialized to continue_msg
  294. template<typename M>
  295. class successor_cache< continue_msg, M > : tbb::internal::no_copy {
  296. protected:
  297. typedef M mutex_type;
  298. mutex_type my_mutex;
  299. #if __TBB_PREVIEW_ASYNC_MSG
  300. typedef untyped_receiver successor_type;
  301. typedef untyped_receiver *pointer_type;
  302. #else
  303. typedef receiver<continue_msg> successor_type;
  304. typedef receiver<continue_msg> *pointer_type;
  305. #endif // __TBB_PREVIEW_ASYNC_MSG
  306. typedef std::list< pointer_type > successors_type;
  307. successors_type my_successors;
  308. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  309. edge_container<successor_type> my_built_successors;
  310. typedef edge_container<successor_type>::edge_list_type successor_list_type;
  311. #endif
  312. sender<continue_msg> *my_owner;
  313. public:
  314. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  315. edge_container<successor_type> &built_successors() { return my_built_successors; }
  316. void internal_add_built_successor( successor_type &r) {
  317. typename mutex_type::scoped_lock l(my_mutex, true);
  318. my_built_successors.add_edge( r );
  319. }
  320. void internal_delete_built_successor( successor_type &r) {
  321. typename mutex_type::scoped_lock l(my_mutex, true);
  322. my_built_successors.delete_edge(r);
  323. }
  324. void copy_successors( successor_list_type &v) {
  325. typename mutex_type::scoped_lock l(my_mutex, false);
  326. my_built_successors.copy_edges(v);
  327. }
  328. size_t successor_count() {
  329. typename mutex_type::scoped_lock l(my_mutex,false);
  330. return my_built_successors.edge_count();
  331. }
  332. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  333. successor_cache( ) : my_owner(NULL) {}
  334. void set_owner( sender<continue_msg> *owner ) { my_owner = owner; }
  335. virtual ~successor_cache() {}
  336. void register_successor( successor_type &r ) {
  337. typename mutex_type::scoped_lock l(my_mutex, true);
  338. my_successors.push_back( &r );
  339. if ( my_owner && r.is_continue_receiver() ) {
  340. r.register_predecessor( *my_owner );
  341. }
  342. }
  343. void remove_successor( successor_type &r ) {
  344. typename mutex_type::scoped_lock l(my_mutex, true);
  345. for ( successors_type::iterator i = my_successors.begin();
  346. i != my_successors.end(); ++i ) {
  347. if ( *i == & r ) {
  348. // TODO: Check if we need to test for continue_receiver before
  349. // removing from r.
  350. if ( my_owner )
  351. r.remove_predecessor( *my_owner );
  352. my_successors.erase(i);
  353. break;
  354. }
  355. }
  356. }
  357. bool empty() {
  358. typename mutex_type::scoped_lock l(my_mutex, false);
  359. return my_successors.empty();
  360. }
  361. void clear() {
  362. my_successors.clear();
  363. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  364. my_built_successors.clear();
  365. #endif
  366. }
  367. #if !__TBB_PREVIEW_ASYNC_MSG
  368. virtual task * try_put_task( const continue_msg &t ) = 0;
  369. #endif // __TBB_PREVIEW_ASYNC_MSG
  370. }; // successor_cache< continue_msg >
  371. //! A cache of successors that are broadcast to
  372. // TODO: make broadcast_cache type T-independent when async_msg becomes regular feature
  373. template<typename T, typename M=spin_rw_mutex>
  374. class broadcast_cache : public successor_cache<T, M> {
  375. typedef M mutex_type;
  376. typedef typename successor_cache<T,M>::successors_type successors_type;
  377. public:
  378. broadcast_cache( ) {}
  379. // as above, but call try_put_task instead, and return the last task we received (if any)
  380. #if __TBB_PREVIEW_ASYNC_MSG
  381. template<typename X>
  382. task * try_put_task( const X &t ) {
  383. #else
  384. task * try_put_task( const T &t ) __TBB_override {
  385. #endif // __TBB_PREVIEW_ASYNC_MSG
  386. task * last_task = NULL;
  387. bool upgraded = true;
  388. typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
  389. typename successors_type::iterator i = this->my_successors.begin();
  390. while ( i != this->my_successors.end() ) {
  391. task *new_task = (*i)->try_put_task(t);
  392. // workaround for icc bug
  393. graph& graph_ref = (*i)->graph_reference();
  394. last_task = combine_tasks(graph_ref, last_task, new_task); // enqueue if necessary
  395. if(new_task) {
  396. ++i;
  397. }
  398. else { // failed
  399. if ( (*i)->register_predecessor(*this->my_owner) ) {
  400. if (!upgraded) {
  401. l.upgrade_to_writer();
  402. upgraded = true;
  403. }
  404. i = this->my_successors.erase(i);
  405. } else {
  406. ++i;
  407. }
  408. }
  409. }
  410. return last_task;
  411. }
  412. // call try_put_task and return list of received tasks
  413. #if __TBB_PREVIEW_ASYNC_MSG
  414. template<typename X>
  415. bool gather_successful_try_puts( const X &t, task_list &tasks ) {
  416. #else
  417. bool gather_successful_try_puts( const T &t, task_list &tasks ) {
  418. #endif // __TBB_PREVIEW_ASYNC_MSG
  419. bool upgraded = true;
  420. bool is_at_least_one_put_successful = false;
  421. typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
  422. typename successors_type::iterator i = this->my_successors.begin();
  423. while ( i != this->my_successors.end() ) {
  424. task * new_task = (*i)->try_put_task(t);
  425. if(new_task) {
  426. ++i;
  427. if(new_task != SUCCESSFULLY_ENQUEUED) {
  428. tasks.push_back(*new_task);
  429. }
  430. is_at_least_one_put_successful = true;
  431. }
  432. else { // failed
  433. if ( (*i)->register_predecessor(*this->my_owner) ) {
  434. if (!upgraded) {
  435. l.upgrade_to_writer();
  436. upgraded = true;
  437. }
  438. i = this->my_successors.erase(i);
  439. } else {
  440. ++i;
  441. }
  442. }
  443. }
  444. return is_at_least_one_put_successful;
  445. }
  446. };
  447. //! A cache of successors that are put in a round-robin fashion
  448. // TODO: make round_robin_cache type T-independent when async_msg becomes regular feature
  449. template<typename T, typename M=spin_rw_mutex >
  450. class round_robin_cache : public successor_cache<T, M> {
  451. typedef size_t size_type;
  452. typedef M mutex_type;
  453. typedef typename successor_cache<T,M>::successors_type successors_type;
  454. public:
  455. round_robin_cache( ) {}
  456. size_type size() {
  457. typename mutex_type::scoped_lock l(this->my_mutex, false);
  458. return this->my_successors.size();
  459. }
  460. #if __TBB_PREVIEW_ASYNC_MSG
  461. template<typename X>
  462. task * try_put_task( const X &t ) {
  463. #else
  464. task *try_put_task( const T &t ) __TBB_override {
  465. #endif // __TBB_PREVIEW_ASYNC_MSG
  466. bool upgraded = true;
  467. typename mutex_type::scoped_lock l(this->my_mutex, upgraded);
  468. typename successors_type::iterator i = this->my_successors.begin();
  469. while ( i != this->my_successors.end() ) {
  470. task *new_task = (*i)->try_put_task(t);
  471. if ( new_task ) {
  472. return new_task;
  473. } else {
  474. if ( (*i)->register_predecessor(*this->my_owner) ) {
  475. if (!upgraded) {
  476. l.upgrade_to_writer();
  477. upgraded = true;
  478. }
  479. i = this->my_successors.erase(i);
  480. }
  481. else {
  482. ++i;
  483. }
  484. }
  485. }
  486. return NULL;
  487. }
  488. };
  489. } // namespace internal
  490. #endif // __TBB__flow_graph_cache_impl_H