_flow_graph_async_msg_impl.h 4.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB__flow_graph_async_msg_impl_H
  14. #define __TBB__flow_graph_async_msg_impl_H
  15. #ifndef __TBB_flow_graph_H
  16. #error Do not #include this internal file directly; use public TBB headers instead.
  17. #endif
  18. namespace internal {
  19. template <typename T>
  20. class async_storage {
  21. public:
  22. typedef receiver<T> async_storage_client;
  23. async_storage() : my_graph(nullptr) {
  24. my_data_ready.store<tbb::relaxed>(false);
  25. }
  26. ~async_storage() {
  27. // Release reference to the graph if async_storage
  28. // was destructed before set() call
  29. if (my_graph) {
  30. my_graph->release_wait();
  31. my_graph = nullptr;
  32. }
  33. }
  34. template<typename C>
  35. async_storage(C&& data) : my_graph(nullptr), my_data( std::forward<C>(data) ) {
  36. using namespace tbb::internal;
  37. __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
  38. my_data_ready.store<tbb::relaxed>(true);
  39. }
  40. template<typename C>
  41. bool set(C&& data) {
  42. using namespace tbb::internal;
  43. __TBB_STATIC_ASSERT( (is_same_type<typename strip<C>::type, typename strip<T>::type>::value), "incoming type must be T" );
  44. {
  45. tbb::spin_mutex::scoped_lock locker(my_mutex);
  46. if (my_data_ready.load<tbb::relaxed>()) {
  47. __TBB_ASSERT(false, "double set() call");
  48. return false;
  49. }
  50. my_data = std::forward<C>(data);
  51. my_data_ready.store<tbb::release>(true);
  52. }
  53. // Thread sync is on my_data_ready flag
  54. for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
  55. (*it)->try_put(my_data);
  56. }
  57. // Data was sent, release reference to the graph
  58. if (my_graph) {
  59. my_graph->release_wait();
  60. my_graph = nullptr;
  61. }
  62. return true;
  63. }
  64. task* subscribe(async_storage_client& client, graph& g) {
  65. if (! my_data_ready.load<tbb::acquire>())
  66. {
  67. tbb::spin_mutex::scoped_lock locker(my_mutex);
  68. if (! my_data_ready.load<tbb::relaxed>()) {
  69. #if TBB_USE_ASSERT
  70. for (typename subscriber_list_type::iterator it = my_clients.begin(); it != my_clients.end(); ++it) {
  71. __TBB_ASSERT(*it != &client, "unexpected double subscription");
  72. }
  73. #endif // TBB_USE_ASSERT
  74. // Increase graph lifetime
  75. my_graph = &g;
  76. my_graph->reserve_wait();
  77. // Subscribe
  78. my_clients.push_back(&client);
  79. return SUCCESSFULLY_ENQUEUED;
  80. }
  81. }
  82. __TBB_ASSERT(my_data_ready.load<tbb::relaxed>(), "data is NOT ready");
  83. return client.try_put_task(my_data);
  84. }
  85. private:
  86. graph* my_graph;
  87. tbb::spin_mutex my_mutex;
  88. tbb::atomic<bool> my_data_ready;
  89. T my_data;
  90. typedef std::vector<async_storage_client*> subscriber_list_type;
  91. subscriber_list_type my_clients;
  92. };
  93. } // namespace internal
  94. template <typename T>
  95. class __TBB_DEPRECATED async_msg {
  96. template< typename > friend class receiver;
  97. template< typename, typename > friend struct internal::async_helpers;
  98. public:
  99. typedef T async_msg_data_type;
  100. async_msg() : my_storage(std::make_shared< internal::async_storage<T> >()) {}
  101. async_msg(const T& t) : my_storage(std::make_shared< internal::async_storage<T> >(t)) {}
  102. async_msg(T&& t) : my_storage(std::make_shared< internal::async_storage<T> >( std::move(t) )) {}
  103. virtual ~async_msg() {}
  104. void set(const T& t) {
  105. my_storage->set(t);
  106. }
  107. void set(T&& t) {
  108. my_storage->set( std::move(t) );
  109. }
  110. protected:
  111. // Can be overridden in derived class to inform that
  112. // async calculation chain is over
  113. virtual void finalize() const {}
  114. private:
  115. typedef std::shared_ptr< internal::async_storage<T> > async_storage_ptr;
  116. async_storage_ptr my_storage;
  117. };
  118. #endif // __TBB__flow_graph_async_msg_impl_H