flow_graph.h 180 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583258425852586258725882589259025912592259325942595259625972598259926002601260226032604260526062607260826092610261126122613261426152616261726182619262026212622262326242625262626272628262926302631263226332634263526362637263826392640264126422643264426452646264726482649265026512652265326542655265626572658265926602661266226632664266526662667266826692670267126722673267426752676267726782679268026812682268326842685268626872688268926902691269226932694269526962697269826992700270127022703270427052706270727082709271027112712271327142715271627172718271927202721272227232724272527262727272827292730273127322733273427352736273727382739274027412742274327442745274627472748274927502751275227532754275527562757275827592760276127622763276427652766276727682769277027712772277327742775277627772778277927802781278227832784278527862787278827892790279127922793279427952796279727982799280028012802280328042805280628072808280928102811281228132814281528162817281828192820282128222823282428252826282728282829283028312832283328342835283628372838283928402841284228432844284528462847284828492850285128522853285428552856285728582859286028612862286328642865286628672868286928702871287228732874287528762877287828792880288128822883288428852886288728882889289028912892289328942895289628972898289929002901290229032904290529062907290829092910291129122913291429152916291729182919292029212922292329242925292629272928292929302931293229332934293529362937293829392940294129422943294429452946294729482949295029512952295329542955295629572958295929602961296229632964296529662967296829692970297129722973297429752976297729782979298029812982298329842985298629872988298929902991299229932994299529962997299829993000300130023003300430053006300730083009301030113012301330143015301630173018301930203021302230233024302530263027302830293030303130323033303430353036303730383039304030413042304330443045304630473048304930503051305230533054305530563057305830593060306130623063306430653066306730683069307030713072307330743075307630773078307930803081308230833084308530863087308830893090309130923093309430953096309730983099310031013102310331043105310631073108310931103111311231133114311531163117311831193120312131223123312431253126312731283129313031313132313331343135313631373138313931403141314231433144314531463147314831493150315131523153315431553156315731583159316031613162316331643165316631673168316931703171317231733174317531763177317831793180318131823183318431853186318731883189319031913192319331943195319631973198319932003201320232033204320532063207320832093210321132123213321432153216321732183219322032213222322332243225322632273228322932303231323232333234323532363237323832393240324132423243324432453246324732483249325032513252325332543255325632573258325932603261326232633264326532663267326832693270327132723273327432753276327732783279328032813282328332843285328632873288328932903291329232933294329532963297329832993300330133023303330433053306330733083309331033113312331333143315331633173318331933203321332233233324332533263327332833293330333133323333333433353336333733383339334033413342334333443345334633473348334933503351335233533354335533563357335833593360336133623363336433653366336733683369337033713372337333743375337633773378337933803381338233833384338533863387338833893390339133923393339433953396339733983399340034013402340334043405340634073408340934103411341234133414341534163417341834193420342134223423342434253426342734283429343034313432343334343435343634373438343934403441344234433444344534463447344834493450345134523453345434553456345734583459346034613462346334643465346634673468346934703471347234733474347534763477347834793480348134823483348434853486348734883489349034913492349334943495349634973498349935003501350235033504350535063507350835093510351135123513351435153516351735183519352035213522352335243525352635273528352935303531353235333534353535363537353835393540354135423543354435453546354735483549355035513552355335543555355635573558355935603561356235633564356535663567356835693570357135723573357435753576357735783579358035813582358335843585358635873588358935903591359235933594359535963597359835993600360136023603360436053606360736083609361036113612361336143615361636173618361936203621362236233624362536263627362836293630363136323633363436353636363736383639364036413642364336443645364636473648364936503651365236533654365536563657365836593660366136623663366436653666366736683669367036713672367336743675367636773678367936803681368236833684368536863687368836893690369136923693369436953696369736983699370037013702370337043705370637073708370937103711371237133714371537163717371837193720372137223723372437253726372737283729373037313732373337343735373637373738373937403741374237433744374537463747374837493750375137523753375437553756375737583759376037613762376337643765376637673768376937703771377237733774377537763777377837793780378137823783378437853786378737883789379037913792379337943795379637973798379938003801380238033804380538063807380838093810381138123813381438153816381738183819382038213822382338243825382638273828382938303831383238333834383538363837383838393840384138423843384438453846384738483849385038513852385338543855385638573858385938603861386238633864386538663867386838693870387138723873387438753876387738783879388038813882388338843885388638873888388938903891389238933894389538963897389838993900390139023903390439053906390739083909391039113912391339143915391639173918391939203921392239233924392539263927392839293930393139323933393439353936393739383939394039413942394339443945394639473948394939503951395239533954395539563957395839593960396139623963396439653966396739683969397039713972397339743975397639773978397939803981398239833984398539863987398839893990399139923993399439953996399739983999400040014002400340044005400640074008400940104011401240134014401540164017401840194020402140224023402440254026402740284029403040314032403340344035403640374038403940404041404240434044404540464047404840494050405140524053405440554056405740584059406040614062406340644065406640674068406940704071407240734074407540764077407840794080408140824083408440854086408740884089409040914092409340944095409640974098409941004101410241034104410541064107410841094110411141124113411441154116411741184119412041214122412341244125412641274128412941304131413241334134413541364137413841394140414141424143414441454146414741484149415041514152415341544155415641574158415941604161416241634164416541664167416841694170417141724173417441754176417741784179418041814182418341844185418641874188418941904191419241934194419541964197419841994200420142024203420442054206420742084209421042114212421342144215421642174218421942204221422242234224422542264227422842294230423142324233423442354236423742384239424042414242424342444245424642474248424942504251425242534254425542564257425842594260426142624263426442654266426742684269427042714272427342744275427642774278427942804281428242834284428542864287428842894290429142924293429442954296429742984299430043014302430343044305430643074308430943104311431243134314431543164317431843194320432143224323432443254326432743284329433043314332433343344335433643374338433943404341434243434344434543464347434843494350435143524353435443554356435743584359436043614362436343644365436643674368436943704371437243734374437543764377437843794380438143824383438443854386438743884389439043914392439343944395439643974398439944004401440244034404440544064407440844094410441144124413441444154416441744184419442044214422442344244425442644274428442944304431443244334434443544364437443844394440444144424443444444454446444744484449445044514452445344544455445644574458445944604461446244634464446544664467446844694470447144724473447444754476447744784479448044814482448344844485448644874488448944904491449244934494449544964497449844994500450145024503450445054506450745084509451045114512451345144515451645174518451945204521452245234524452545264527452845294530453145324533453445354536453745384539454045414542454345444545454645474548454945504551455245534554455545564557455845594560456145624563456445654566456745684569457045714572457345744575457645774578457945804581458245834584458545864587458845894590459145924593459445954596459745984599460046014602460346044605460646074608460946104611461246134614461546164617461846194620462146224623462446254626462746284629463046314632463346344635463646374638463946404641464246434644464546464647464846494650465146524653465446554656465746584659466046614662466346644665466646674668466946704671467246734674467546764677467846794680468146824683468446854686468746884689469046914692469346944695469646974698469947004701470247034704470547064707470847094710471147124713471447154716471747184719472047214722472347244725472647274728472947304731473247334734473547364737473847394740474147424743
  1. /*
  2. Copyright (c) 2005-2020 Intel Corporation
  3. Licensed under the Apache License, Version 2.0 (the "License");
  4. you may not use this file except in compliance with the License.
  5. You may obtain a copy of the License at
  6. http://www.apache.org/licenses/LICENSE-2.0
  7. Unless required by applicable law or agreed to in writing, software
  8. distributed under the License is distributed on an "AS IS" BASIS,
  9. WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  10. See the License for the specific language governing permissions and
  11. limitations under the License.
  12. */
  13. #ifndef __TBB_flow_graph_H
  14. #define __TBB_flow_graph_H
  15. #define __TBB_flow_graph_H_include_area
  16. #include "internal/_warning_suppress_enable_notice.h"
  17. #include "tbb_stddef.h"
  18. #include "atomic.h"
  19. #include "spin_mutex.h"
  20. #include "null_mutex.h"
  21. #include "spin_rw_mutex.h"
  22. #include "null_rw_mutex.h"
  23. #include "task.h"
  24. #include "cache_aligned_allocator.h"
  25. #include "tbb_exception.h"
  26. #include "pipeline.h"
  27. #include "internal/_template_helpers.h"
  28. #include "internal/_aggregator_impl.h"
  29. #include "tbb/internal/_allocator_traits.h"
  30. #include "tbb_profiling.h"
  31. #include "task_arena.h"
  32. #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
  33. #if __INTEL_COMPILER
  34. // Disabled warning "routine is both inline and noinline"
  35. #pragma warning (push)
  36. #pragma warning( disable: 2196 )
  37. #endif
  38. #define __TBB_NOINLINE_SYM __attribute__((noinline))
  39. #else
  40. #define __TBB_NOINLINE_SYM
  41. #endif
  42. #if __TBB_PREVIEW_ASYNC_MSG
  43. #include <vector> // std::vector in internal::async_storage
  44. #include <memory> // std::shared_ptr in async_msg
  45. #endif
  46. #if __TBB_PREVIEW_STREAMING_NODE
  47. // For streaming_node
  48. #include <array> // std::array
  49. #include <unordered_map> // std::unordered_map
  50. #include <type_traits> // std::decay, std::true_type, std::false_type
  51. #endif // __TBB_PREVIEW_STREAMING_NODE
  52. #if TBB_DEPRECATED_FLOW_ENQUEUE
  53. #define FLOW_SPAWN(a) tbb::task::enqueue((a))
  54. #else
  55. #define FLOW_SPAWN(a) tbb::task::spawn((a))
  56. #endif
  57. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  58. #define __TBB_DEFAULT_NODE_ALLOCATOR(T) cache_aligned_allocator<T>
  59. #else
  60. #define __TBB_DEFAULT_NODE_ALLOCATOR(T) null_type
  61. #endif
  62. // use the VC10 or gcc version of tuple if it is available.
  63. #if __TBB_CPP11_TUPLE_PRESENT
  64. #include <tuple>
  65. namespace tbb {
  66. namespace flow {
  67. using std::tuple;
  68. using std::tuple_size;
  69. using std::tuple_element;
  70. using std::get;
  71. }
  72. }
  73. #else
  74. #include "compat/tuple"
  75. #endif
  76. #include<list>
  77. #include<queue>
  78. /** @file
  79. \brief The graph related classes and functions
  80. There are some applications that best express dependencies as messages
  81. passed between nodes in a graph. These messages may contain data or
  82. simply act as signals that a predecessors has completed. The graph
  83. class and its associated node classes can be used to express such
  84. applications.
  85. */
  86. namespace tbb {
  87. namespace flow {
  88. //! An enumeration the provides the two most common concurrency levels: unlimited and serial
  89. enum concurrency { unlimited = 0, serial = 1 };
  90. namespace interface11 {
  91. //! A generic null type
  92. struct null_type {};
  93. //! An empty class used for messages that mean "I'm done"
  94. class continue_msg {};
  95. //! Forward declaration section
  96. template< typename T > class sender;
  97. template< typename T > class receiver;
  98. class continue_receiver;
  99. template< typename T, typename U > class limiter_node; // needed for resetting decrementer
  100. template< typename R, typename B > class run_and_put_task;
  101. namespace internal {
  102. template<typename T, typename M> class successor_cache;
  103. template<typename T, typename M> class broadcast_cache;
  104. template<typename T, typename M> class round_robin_cache;
  105. template<typename T, typename M> class predecessor_cache;
  106. template<typename T, typename M> class reservable_predecessor_cache;
  107. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  108. namespace order {
  109. struct following;
  110. struct preceding;
  111. }
  112. template<typename Order, typename... Args> struct node_set;
  113. #endif
  114. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  115. // Holder of edges both for caches and for those nodes which do not have predecessor caches.
  116. // C == receiver< ... > or sender< ... >, depending.
  117. template<typename C>
  118. class edge_container {
  119. public:
  120. typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
  121. void add_edge(C &s) {
  122. built_edges.push_back(&s);
  123. }
  124. void delete_edge(C &s) {
  125. for (typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
  126. if (*i == &s) {
  127. (void)built_edges.erase(i);
  128. return; // only remove one predecessor per request
  129. }
  130. }
  131. }
  132. void copy_edges(edge_list_type &v) {
  133. v = built_edges;
  134. }
  135. size_t edge_count() {
  136. return (size_t)(built_edges.size());
  137. }
  138. void clear() {
  139. built_edges.clear();
  140. }
  141. // methods remove the statement from all predecessors/successors liste in the edge
  142. // container.
  143. template< typename S > void sender_extract(S &s);
  144. template< typename R > void receiver_extract(R &r);
  145. private:
  146. edge_list_type built_edges;
  147. }; // class edge_container
  148. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  149. } // namespace internal
  150. } // namespace interfaceX
  151. } // namespace flow
  152. } // namespace tbb
  153. //! The graph class
  154. #include "internal/_flow_graph_impl.h"
  155. namespace tbb {
  156. namespace flow {
  157. namespace interface11 {
  158. // enqueue left task if necessary. Returns the non-enqueued task if there is one.
  159. static inline tbb::task *combine_tasks(graph& g, tbb::task * left, tbb::task * right) {
  160. // if no RHS task, don't change left.
  161. if (right == NULL) return left;
  162. // right != NULL
  163. if (left == NULL) return right;
  164. if (left == SUCCESSFULLY_ENQUEUED) return right;
  165. // left contains a task
  166. if (right != SUCCESSFULLY_ENQUEUED) {
  167. // both are valid tasks
  168. internal::spawn_in_graph_arena(g, *left);
  169. return right;
  170. }
  171. return left;
  172. }
  173. #if __TBB_PREVIEW_ASYNC_MSG
  174. template < typename T > class __TBB_DEPRECATED async_msg;
  175. namespace internal {
  176. template < typename T > class async_storage;
  177. template< typename T, typename = void >
  178. struct async_helpers {
  179. typedef async_msg<T> async_type;
  180. typedef T filtered_type;
  181. static const bool is_async_type = false;
  182. static const void* to_void_ptr(const T& t) {
  183. return static_cast<const void*>(&t);
  184. }
  185. static void* to_void_ptr(T& t) {
  186. return static_cast<void*>(&t);
  187. }
  188. static const T& from_void_ptr(const void* p) {
  189. return *static_cast<const T*>(p);
  190. }
  191. static T& from_void_ptr(void* p) {
  192. return *static_cast<T*>(p);
  193. }
  194. static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
  195. if (is_async) {
  196. // This (T) is NOT async and incoming 'A<X> t' IS async
  197. // Get data from async_msg
  198. const async_msg<filtered_type>& msg = async_helpers< async_msg<filtered_type> >::from_void_ptr(p);
  199. task* const new_task = msg.my_storage->subscribe(*this_recv, this_recv->graph_reference());
  200. // finalize() must be called after subscribe() because set() can be called in finalize()
  201. // and 'this_recv' client must be subscribed by this moment
  202. msg.finalize();
  203. return new_task;
  204. }
  205. else {
  206. // Incoming 't' is NOT async
  207. return this_recv->try_put_task(from_void_ptr(p));
  208. }
  209. }
  210. };
  211. template< typename T >
  212. struct async_helpers< T, typename std::enable_if< std::is_base_of<async_msg<typename T::async_msg_data_type>, T>::value >::type > {
  213. typedef T async_type;
  214. typedef typename T::async_msg_data_type filtered_type;
  215. static const bool is_async_type = true;
  216. // Receiver-classes use const interfaces
  217. static const void* to_void_ptr(const T& t) {
  218. return static_cast<const void*>(&static_cast<const async_msg<filtered_type>&>(t));
  219. }
  220. static void* to_void_ptr(T& t) {
  221. return static_cast<void*>(&static_cast<async_msg<filtered_type>&>(t));
  222. }
  223. // Sender-classes use non-const interfaces
  224. static const T& from_void_ptr(const void* p) {
  225. return *static_cast<const T*>(static_cast<const async_msg<filtered_type>*>(p));
  226. }
  227. static T& from_void_ptr(void* p) {
  228. return *static_cast<T*>(static_cast<async_msg<filtered_type>*>(p));
  229. }
  230. // Used in receiver<T> class
  231. static task* try_put_task_wrapper_impl(receiver<T>* const this_recv, const void *p, bool is_async) {
  232. if (is_async) {
  233. // Both are async
  234. return this_recv->try_put_task(from_void_ptr(p));
  235. }
  236. else {
  237. // This (T) is async and incoming 'X t' is NOT async
  238. // Create async_msg for X
  239. const filtered_type& t = async_helpers<filtered_type>::from_void_ptr(p);
  240. const T msg(t);
  241. return this_recv->try_put_task(msg);
  242. }
  243. }
  244. };
  245. class untyped_receiver;
  246. class untyped_sender {
  247. template< typename, typename > friend class internal::predecessor_cache;
  248. template< typename, typename > friend class internal::reservable_predecessor_cache;
  249. public:
  250. //! The successor type for this node
  251. typedef untyped_receiver successor_type;
  252. virtual ~untyped_sender() {}
  253. // NOTE: Following part of PUBLIC section is copy-paste from original sender<T> class
  254. // TODO: Prevent untyped successor registration
  255. //! Add a new successor to this node
  256. virtual bool register_successor( successor_type &r ) = 0;
  257. //! Removes a successor from this node
  258. virtual bool remove_successor( successor_type &r ) = 0;
  259. //! Releases the reserved item
  260. virtual bool try_release( ) { return false; }
  261. //! Consumes the reserved item
  262. virtual bool try_consume( ) { return false; }
  263. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  264. //! interface to record edges for traversal & deletion
  265. typedef internal::edge_container<successor_type> built_successors_type;
  266. typedef built_successors_type::edge_list_type successor_list_type;
  267. virtual built_successors_type &built_successors() = 0;
  268. virtual void internal_add_built_successor( successor_type & ) = 0;
  269. virtual void internal_delete_built_successor( successor_type & ) = 0;
  270. virtual void copy_successors( successor_list_type &) = 0;
  271. virtual size_t successor_count() = 0;
  272. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  273. protected:
  274. //! Request an item from the sender
  275. template< typename X >
  276. bool try_get( X &t ) {
  277. return try_get_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
  278. }
  279. //! Reserves an item in the sender
  280. template< typename X >
  281. bool try_reserve( X &t ) {
  282. return try_reserve_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
  283. }
  284. virtual bool try_get_wrapper( void* p, bool is_async ) = 0;
  285. virtual bool try_reserve_wrapper( void* p, bool is_async ) = 0;
  286. };
  287. class untyped_receiver {
  288. template< typename, typename > friend class run_and_put_task;
  289. template< typename, typename > friend class internal::broadcast_cache;
  290. template< typename, typename > friend class internal::round_robin_cache;
  291. template< typename, typename > friend class internal::successor_cache;
  292. #if __TBB_PREVIEW_OPENCL_NODE
  293. template< typename, typename > friend class proxy_dependency_receiver;
  294. #endif /* __TBB_PREVIEW_OPENCL_NODE */
  295. public:
  296. //! The predecessor type for this node
  297. typedef untyped_sender predecessor_type;
  298. //! Destructor
  299. virtual ~untyped_receiver() {}
  300. //! Put an item to the receiver
  301. template<typename X>
  302. bool try_put(const X& t) {
  303. task *res = try_put_task(t);
  304. if (!res) return false;
  305. if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
  306. return true;
  307. }
  308. // NOTE: Following part of PUBLIC section is copy-paste from original receiver<T> class
  309. // TODO: Prevent untyped predecessor registration
  310. //! Add a predecessor to the node
  311. virtual bool register_predecessor( predecessor_type & ) { return false; }
  312. //! Remove a predecessor from the node
  313. virtual bool remove_predecessor( predecessor_type & ) { return false; }
  314. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  315. typedef internal::edge_container<predecessor_type> built_predecessors_type;
  316. typedef built_predecessors_type::edge_list_type predecessor_list_type;
  317. virtual built_predecessors_type &built_predecessors() = 0;
  318. virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
  319. virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
  320. virtual void copy_predecessors( predecessor_list_type & ) = 0;
  321. virtual size_t predecessor_count() = 0;
  322. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  323. protected:
  324. template<typename X>
  325. task *try_put_task(const X& t) {
  326. return try_put_task_wrapper( internal::async_helpers<X>::to_void_ptr(t), internal::async_helpers<X>::is_async_type );
  327. }
  328. virtual task* try_put_task_wrapper( const void* p, bool is_async ) = 0;
  329. virtual graph& graph_reference() const = 0;
  330. // NOTE: Following part of PROTECTED and PRIVATE sections is copy-paste from original receiver<T> class
  331. //! put receiver back in initial state
  332. virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
  333. virtual bool is_continue_receiver() { return false; }
  334. };
  335. } // namespace internal
  336. //! Pure virtual template class that defines a sender of messages of type T
  337. template< typename T >
  338. class sender : public internal::untyped_sender {
  339. public:
  340. //! The output type of this sender
  341. __TBB_DEPRECATED typedef T output_type;
  342. __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
  343. //! Request an item from the sender
  344. virtual bool try_get( T & ) { return false; }
  345. //! Reserves an item in the sender
  346. virtual bool try_reserve( T & ) { return false; }
  347. protected:
  348. virtual bool try_get_wrapper( void* p, bool is_async ) __TBB_override {
  349. // Both async OR both are NOT async
  350. if ( internal::async_helpers<T>::is_async_type == is_async ) {
  351. return try_get( internal::async_helpers<T>::from_void_ptr(p) );
  352. }
  353. // Else: this (T) is async OR incoming 't' is async
  354. __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_get()");
  355. return false;
  356. }
  357. virtual bool try_reserve_wrapper( void* p, bool is_async ) __TBB_override {
  358. // Both async OR both are NOT async
  359. if ( internal::async_helpers<T>::is_async_type == is_async ) {
  360. return try_reserve( internal::async_helpers<T>::from_void_ptr(p) );
  361. }
  362. // Else: this (T) is async OR incoming 't' is async
  363. __TBB_ASSERT(false, "async_msg interface does not support 'pull' protocol in try_reserve()");
  364. return false;
  365. }
  366. }; // class sender<T>
  367. //! Pure virtual template class that defines a receiver of messages of type T
  368. template< typename T >
  369. class receiver : public internal::untyped_receiver {
  370. template< typename > friend class internal::async_storage;
  371. template< typename, typename > friend struct internal::async_helpers;
  372. public:
  373. //! The input type of this receiver
  374. __TBB_DEPRECATED typedef T input_type;
  375. __TBB_DEPRECATED typedef typename internal::async_helpers<T>::filtered_type filtered_type;
  376. //! Put an item to the receiver
  377. bool try_put( const typename internal::async_helpers<T>::filtered_type& t ) {
  378. return internal::untyped_receiver::try_put(t);
  379. }
  380. bool try_put( const typename internal::async_helpers<T>::async_type& t ) {
  381. return internal::untyped_receiver::try_put(t);
  382. }
  383. protected:
  384. virtual task* try_put_task_wrapper( const void *p, bool is_async ) __TBB_override {
  385. return internal::async_helpers<T>::try_put_task_wrapper_impl(this, p, is_async);
  386. }
  387. //! Put item to successor; return task to run the successor if possible.
  388. virtual task *try_put_task(const T& t) = 0;
  389. }; // class receiver<T>
  390. #else // __TBB_PREVIEW_ASYNC_MSG
  391. //! Pure virtual template class that defines a sender of messages of type T
  392. template< typename T >
  393. class sender {
  394. public:
  395. //! The output type of this sender
  396. __TBB_DEPRECATED typedef T output_type;
  397. //! The successor type for this node
  398. __TBB_DEPRECATED typedef receiver<T> successor_type;
  399. virtual ~sender() {}
  400. // NOTE: Following part of PUBLIC section is partly copy-pasted in sender<T> under #if __TBB_PREVIEW_ASYNC_MSG
  401. //! Add a new successor to this node
  402. __TBB_DEPRECATED virtual bool register_successor( successor_type &r ) = 0;
  403. //! Removes a successor from this node
  404. __TBB_DEPRECATED virtual bool remove_successor( successor_type &r ) = 0;
  405. //! Request an item from the sender
  406. virtual bool try_get( T & ) { return false; }
  407. //! Reserves an item in the sender
  408. virtual bool try_reserve( T & ) { return false; }
  409. //! Releases the reserved item
  410. virtual bool try_release( ) { return false; }
  411. //! Consumes the reserved item
  412. virtual bool try_consume( ) { return false; }
  413. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  414. //! interface to record edges for traversal & deletion
  415. __TBB_DEPRECATED typedef typename internal::edge_container<successor_type> built_successors_type;
  416. __TBB_DEPRECATED typedef typename built_successors_type::edge_list_type successor_list_type;
  417. __TBB_DEPRECATED virtual built_successors_type &built_successors() = 0;
  418. __TBB_DEPRECATED virtual void internal_add_built_successor( successor_type & ) = 0;
  419. __TBB_DEPRECATED virtual void internal_delete_built_successor( successor_type & ) = 0;
  420. __TBB_DEPRECATED virtual void copy_successors( successor_list_type &) = 0;
  421. __TBB_DEPRECATED virtual size_t successor_count() = 0;
  422. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  423. }; // class sender<T>
  424. //! Pure virtual template class that defines a receiver of messages of type T
  425. template< typename T >
  426. class receiver {
  427. public:
  428. //! The input type of this receiver
  429. __TBB_DEPRECATED typedef T input_type;
  430. //! The predecessor type for this node
  431. __TBB_DEPRECATED typedef sender<T> predecessor_type;
  432. //! Destructor
  433. virtual ~receiver() {}
  434. //! Put an item to the receiver
  435. bool try_put( const T& t ) {
  436. task *res = try_put_task(t);
  437. if (!res) return false;
  438. if (res != SUCCESSFULLY_ENQUEUED) internal::spawn_in_graph_arena(graph_reference(), *res);
  439. return true;
  440. }
  441. //! put item to successor; return task to run the successor if possible.
  442. protected:
  443. template< typename R, typename B > friend class run_and_put_task;
  444. template< typename X, typename Y > friend class internal::broadcast_cache;
  445. template< typename X, typename Y > friend class internal::round_robin_cache;
  446. virtual task *try_put_task(const T& t) = 0;
  447. virtual graph& graph_reference() const = 0;
  448. public:
  449. // NOTE: Following part of PUBLIC and PROTECTED sections is copy-pasted in receiver<T> under #if __TBB_PREVIEW_ASYNC_MSG
  450. //! Add a predecessor to the node
  451. __TBB_DEPRECATED virtual bool register_predecessor( predecessor_type & ) { return false; }
  452. //! Remove a predecessor from the node
  453. __TBB_DEPRECATED virtual bool remove_predecessor( predecessor_type & ) { return false; }
  454. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  455. __TBB_DEPRECATED typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
  456. __TBB_DEPRECATED typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
  457. __TBB_DEPRECATED virtual built_predecessors_type &built_predecessors() = 0;
  458. __TBB_DEPRECATED virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
  459. __TBB_DEPRECATED virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
  460. __TBB_DEPRECATED virtual void copy_predecessors( predecessor_list_type & ) = 0;
  461. __TBB_DEPRECATED virtual size_t predecessor_count() = 0;
  462. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  463. protected:
  464. //! put receiver back in initial state
  465. virtual void reset_receiver(reset_flags f = rf_reset_protocol) = 0;
  466. template<typename TT, typename M> friend class internal::successor_cache;
  467. virtual bool is_continue_receiver() { return false; }
  468. #if __TBB_PREVIEW_OPENCL_NODE
  469. template< typename, typename > friend class proxy_dependency_receiver;
  470. #endif /* __TBB_PREVIEW_OPENCL_NODE */
  471. }; // class receiver<T>
  472. #endif // __TBB_PREVIEW_ASYNC_MSG
  473. //! Base class for receivers of completion messages
  474. /** These receivers automatically reset, but cannot be explicitly waited on */
  475. class continue_receiver : public receiver< continue_msg > {
  476. public:
  477. //! The input type
  478. __TBB_DEPRECATED typedef continue_msg input_type;
  479. //! The predecessor type for this node
  480. __TBB_DEPRECATED typedef receiver<input_type>::predecessor_type predecessor_type;
  481. //! Constructor
  482. __TBB_DEPRECATED explicit continue_receiver(
  483. __TBB_FLOW_GRAPH_PRIORITY_ARG1(int number_of_predecessors, node_priority_t priority)) {
  484. my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
  485. my_current_count = 0;
  486. __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = priority; )
  487. }
  488. //! Copy constructor
  489. __TBB_DEPRECATED continue_receiver( const continue_receiver& src ) : receiver<continue_msg>() {
  490. my_predecessor_count = my_initial_predecessor_count = src.my_initial_predecessor_count;
  491. my_current_count = 0;
  492. __TBB_FLOW_GRAPH_PRIORITY_EXPR( my_priority = src.my_priority; )
  493. }
  494. //! Increments the trigger threshold
  495. __TBB_DEPRECATED bool register_predecessor( predecessor_type & ) __TBB_override {
  496. spin_mutex::scoped_lock l(my_mutex);
  497. ++my_predecessor_count;
  498. return true;
  499. }
  500. //! Decrements the trigger threshold
  501. /** Does not check to see if the removal of the predecessor now makes the current count
  502. exceed the new threshold. So removing a predecessor while the graph is active can cause
  503. unexpected results. */
  504. __TBB_DEPRECATED bool remove_predecessor( predecessor_type & ) __TBB_override {
  505. spin_mutex::scoped_lock l(my_mutex);
  506. --my_predecessor_count;
  507. return true;
  508. }
  509. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  510. __TBB_DEPRECATED typedef internal::edge_container<predecessor_type> built_predecessors_type;
  511. __TBB_DEPRECATED typedef built_predecessors_type::edge_list_type predecessor_list_type;
  512. built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
  513. __TBB_DEPRECATED void internal_add_built_predecessor( predecessor_type &s) __TBB_override {
  514. spin_mutex::scoped_lock l(my_mutex);
  515. my_built_predecessors.add_edge( s );
  516. }
  517. __TBB_DEPRECATED void internal_delete_built_predecessor( predecessor_type &s) __TBB_override {
  518. spin_mutex::scoped_lock l(my_mutex);
  519. my_built_predecessors.delete_edge(s);
  520. }
  521. __TBB_DEPRECATED void copy_predecessors( predecessor_list_type &v) __TBB_override {
  522. spin_mutex::scoped_lock l(my_mutex);
  523. my_built_predecessors.copy_edges(v);
  524. }
  525. __TBB_DEPRECATED size_t predecessor_count() __TBB_override {
  526. spin_mutex::scoped_lock l(my_mutex);
  527. return my_built_predecessors.edge_count();
  528. }
  529. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  530. protected:
  531. template< typename R, typename B > friend class run_and_put_task;
  532. template<typename X, typename Y> friend class internal::broadcast_cache;
  533. template<typename X, typename Y> friend class internal::round_robin_cache;
  534. // execute body is supposed to be too small to create a task for.
  535. task *try_put_task( const input_type & ) __TBB_override {
  536. {
  537. spin_mutex::scoped_lock l(my_mutex);
  538. if ( ++my_current_count < my_predecessor_count )
  539. return SUCCESSFULLY_ENQUEUED;
  540. else
  541. my_current_count = 0;
  542. }
  543. task * res = execute();
  544. return res? res : SUCCESSFULLY_ENQUEUED;
  545. }
  546. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  547. // continue_receiver must contain its own built_predecessors because it does
  548. // not have a node_cache.
  549. built_predecessors_type my_built_predecessors;
  550. #endif
  551. spin_mutex my_mutex;
  552. int my_predecessor_count;
  553. int my_current_count;
  554. int my_initial_predecessor_count;
  555. __TBB_FLOW_GRAPH_PRIORITY_EXPR( node_priority_t my_priority; )
  556. // the friend declaration in the base class did not eliminate the "protected class"
  557. // error in gcc 4.1.2
  558. template<typename U, typename V> friend class tbb::flow::interface11::limiter_node;
  559. void reset_receiver( reset_flags f ) __TBB_override {
  560. my_current_count = 0;
  561. if (f & rf_clear_edges) {
  562. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  563. my_built_predecessors.clear();
  564. #endif
  565. my_predecessor_count = my_initial_predecessor_count;
  566. }
  567. }
  568. //! Does whatever should happen when the threshold is reached
  569. /** This should be very fast or else spawn a task. This is
  570. called while the sender is blocked in the try_put(). */
  571. virtual task * execute() = 0;
  572. template<typename TT, typename M> friend class internal::successor_cache;
  573. bool is_continue_receiver() __TBB_override { return true; }
  574. }; // class continue_receiver
  575. } // interfaceX
  576. #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
  577. template <typename K, typename T>
  578. K key_from_message( const T &t ) {
  579. return t.key();
  580. }
  581. #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
  582. using interface11::sender;
  583. using interface11::receiver;
  584. using interface11::continue_receiver;
  585. } // flow
  586. } // tbb
  587. #include "internal/_flow_graph_trace_impl.h"
  588. #include "internal/_tbb_hash_compare_impl.h"
  589. namespace tbb {
  590. namespace flow {
  591. namespace interface11 {
  592. #include "internal/_flow_graph_body_impl.h"
  593. #include "internal/_flow_graph_cache_impl.h"
  594. #include "internal/_flow_graph_types_impl.h"
  595. #if __TBB_PREVIEW_ASYNC_MSG
  596. #include "internal/_flow_graph_async_msg_impl.h"
  597. #endif
  598. using namespace internal::graph_policy_namespace;
  599. template <typename C, typename N>
  600. graph_iterator<C,N>::graph_iterator(C *g, bool begin) : my_graph(g), current_node(NULL)
  601. {
  602. if (begin) current_node = my_graph->my_nodes;
  603. //else it is an end iterator by default
  604. }
  605. template <typename C, typename N>
  606. typename graph_iterator<C,N>::reference graph_iterator<C,N>::operator*() const {
  607. __TBB_ASSERT(current_node, "graph_iterator at end");
  608. return *operator->();
  609. }
  610. template <typename C, typename N>
  611. typename graph_iterator<C,N>::pointer graph_iterator<C,N>::operator->() const {
  612. return current_node;
  613. }
  614. template <typename C, typename N>
  615. void graph_iterator<C,N>::internal_forward() {
  616. if (current_node) current_node = current_node->next;
  617. }
  618. } // namespace interfaceX
  619. namespace interface10 {
  620. //! Constructs a graph with isolated task_group_context
  621. inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
  622. prepare_task_arena();
  623. own_context = true;
  624. cancelled = false;
  625. caught_exception = false;
  626. my_context = new task_group_context(tbb::internal::FLOW_TASKS);
  627. my_root_task = (new (task::allocate_root(*my_context)) empty_task);
  628. my_root_task->set_ref_count(1);
  629. tbb::internal::fgt_graph(this);
  630. my_is_active = true;
  631. }
  632. inline graph::graph(task_group_context& use_this_context) :
  633. my_context(&use_this_context), my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
  634. prepare_task_arena();
  635. own_context = false;
  636. cancelled = false;
  637. caught_exception = false;
  638. my_root_task = (new (task::allocate_root(*my_context)) empty_task);
  639. my_root_task->set_ref_count(1);
  640. tbb::internal::fgt_graph(this);
  641. my_is_active = true;
  642. }
  643. inline graph::~graph() {
  644. wait_for_all();
  645. my_root_task->set_ref_count(0);
  646. tbb::task::destroy(*my_root_task);
  647. if (own_context) delete my_context;
  648. delete my_task_arena;
  649. }
  650. inline void graph::reserve_wait() {
  651. if (my_root_task) {
  652. my_root_task->increment_ref_count();
  653. tbb::internal::fgt_reserve_wait(this);
  654. }
  655. }
  656. inline void graph::release_wait() {
  657. if (my_root_task) {
  658. tbb::internal::fgt_release_wait(this);
  659. my_root_task->decrement_ref_count();
  660. }
  661. }
  662. inline void graph::register_node(tbb::flow::interface11::graph_node *n) {
  663. n->next = NULL;
  664. {
  665. spin_mutex::scoped_lock lock(nodelist_mutex);
  666. n->prev = my_nodes_last;
  667. if (my_nodes_last) my_nodes_last->next = n;
  668. my_nodes_last = n;
  669. if (!my_nodes) my_nodes = n;
  670. }
  671. }
  672. inline void graph::remove_node(tbb::flow::interface11::graph_node *n) {
  673. {
  674. spin_mutex::scoped_lock lock(nodelist_mutex);
  675. __TBB_ASSERT(my_nodes && my_nodes_last, "graph::remove_node: Error: no registered nodes");
  676. if (n->prev) n->prev->next = n->next;
  677. if (n->next) n->next->prev = n->prev;
  678. if (my_nodes_last == n) my_nodes_last = n->prev;
  679. if (my_nodes == n) my_nodes = n->next;
  680. }
  681. n->prev = n->next = NULL;
  682. }
  683. inline void graph::reset( tbb::flow::interface11::reset_flags f ) {
  684. // reset context
  685. tbb::flow::interface11::internal::deactivate_graph(*this);
  686. if(my_context) my_context->reset();
  687. cancelled = false;
  688. caught_exception = false;
  689. // reset all the nodes comprising the graph
  690. for(iterator ii = begin(); ii != end(); ++ii) {
  691. tbb::flow::interface11::graph_node *my_p = &(*ii);
  692. my_p->reset_node(f);
  693. }
  694. // Reattach the arena. Might be useful to run the graph in a particular task_arena
  695. // while not limiting graph lifetime to a single task_arena::execute() call.
  696. prepare_task_arena( /*reinit=*/true );
  697. tbb::flow::interface11::internal::activate_graph(*this);
  698. // now spawn the tasks necessary to start the graph
  699. for(task_list_type::iterator rti = my_reset_task_list.begin(); rti != my_reset_task_list.end(); ++rti) {
  700. tbb::flow::interface11::internal::spawn_in_graph_arena(*this, *(*rti));
  701. }
  702. my_reset_task_list.clear();
  703. }
  704. inline graph::iterator graph::begin() { return iterator(this, true); }
  705. inline graph::iterator graph::end() { return iterator(this, false); }
  706. inline graph::const_iterator graph::begin() const { return const_iterator(this, true); }
  707. inline graph::const_iterator graph::end() const { return const_iterator(this, false); }
  708. inline graph::const_iterator graph::cbegin() const { return const_iterator(this, true); }
  709. inline graph::const_iterator graph::cend() const { return const_iterator(this, false); }
  710. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  711. inline void graph::set_name(const char *name) {
  712. tbb::internal::fgt_graph_desc(this, name);
  713. }
  714. #endif
  715. } // namespace interface10
  716. namespace interface11 {
  717. inline graph_node::graph_node(graph& g) : my_graph(g) {
  718. my_graph.register_node(this);
  719. }
  720. inline graph_node::~graph_node() {
  721. my_graph.remove_node(this);
  722. }
  723. #include "internal/_flow_graph_node_impl.h"
  724. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  725. using internal::node_set;
  726. #endif
  727. //! An executable node that acts as a source, i.e. it has no predecessors
  728. template < typename Output >
  729. class input_node : public graph_node, public sender< Output > {
  730. public:
  731. //! The type of the output message, which is complete
  732. typedef Output output_type;
  733. //! The type of successors of this node
  734. typedef typename sender<output_type>::successor_type successor_type;
  735. //Source node has no input type
  736. typedef null_type input_type;
  737. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  738. typedef typename sender<output_type>::built_successors_type built_successors_type;
  739. typedef typename sender<output_type>::successor_list_type successor_list_type;
  740. #endif
  741. //! Constructor for a node with a successor
  742. template< typename Body >
  743. __TBB_NOINLINE_SYM input_node( graph &g, Body body )
  744. : graph_node(g), my_active(false),
  745. my_body( new internal::input_body_leaf< output_type, Body>(body) ),
  746. my_init_body( new internal::input_body_leaf< output_type, Body>(body) ),
  747. my_reserved(false), my_has_cached_item(false)
  748. {
  749. my_successors.set_owner(this);
  750. tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
  751. static_cast<sender<output_type> *>(this), this->my_body );
  752. }
  753. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  754. template <typename Body, typename... Successors>
  755. input_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
  756. : input_node(successors.graph_reference(), body) {
  757. make_edges(*this, successors);
  758. }
  759. #endif
  760. //! Copy constructor
  761. __TBB_NOINLINE_SYM input_node( const input_node& src ) :
  762. graph_node(src.my_graph), sender<Output>(),
  763. my_active(false),
  764. my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
  765. my_reserved(false), my_has_cached_item(false)
  766. {
  767. my_successors.set_owner(this);
  768. tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
  769. static_cast<sender<output_type> *>(this), this->my_body );
  770. }
  771. //! The destructor
  772. ~input_node() { delete my_body; delete my_init_body; }
  773. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  774. void set_name( const char *name ) __TBB_override {
  775. tbb::internal::fgt_node_desc( this, name );
  776. }
  777. #endif
  778. //! Add a new successor to this node
  779. bool register_successor( successor_type &r ) __TBB_override {
  780. spin_mutex::scoped_lock lock(my_mutex);
  781. my_successors.register_successor(r);
  782. if ( my_active )
  783. spawn_put();
  784. return true;
  785. }
  786. //! Removes a successor from this node
  787. bool remove_successor( successor_type &r ) __TBB_override {
  788. spin_mutex::scoped_lock lock(my_mutex);
  789. my_successors.remove_successor(r);
  790. return true;
  791. }
  792. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  793. built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
  794. void internal_add_built_successor( successor_type &r) __TBB_override {
  795. spin_mutex::scoped_lock lock(my_mutex);
  796. my_successors.internal_add_built_successor(r);
  797. }
  798. void internal_delete_built_successor( successor_type &r) __TBB_override {
  799. spin_mutex::scoped_lock lock(my_mutex);
  800. my_successors.internal_delete_built_successor(r);
  801. }
  802. size_t successor_count() __TBB_override {
  803. spin_mutex::scoped_lock lock(my_mutex);
  804. return my_successors.successor_count();
  805. }
  806. void copy_successors(successor_list_type &v) __TBB_override {
  807. spin_mutex::scoped_lock l(my_mutex);
  808. my_successors.copy_successors(v);
  809. }
  810. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  811. //! Request an item from the node
  812. bool try_get( output_type &v ) __TBB_override {
  813. spin_mutex::scoped_lock lock(my_mutex);
  814. if ( my_reserved )
  815. return false;
  816. if ( my_has_cached_item ) {
  817. v = my_cached_item;
  818. my_has_cached_item = false;
  819. return true;
  820. }
  821. // we've been asked to provide an item, but we have none. enqueue a task to
  822. // provide one.
  823. if ( my_active )
  824. spawn_put();
  825. return false;
  826. }
  827. //! Reserves an item.
  828. bool try_reserve( output_type &v ) __TBB_override {
  829. spin_mutex::scoped_lock lock(my_mutex);
  830. if ( my_reserved ) {
  831. return false;
  832. }
  833. if ( my_has_cached_item ) {
  834. v = my_cached_item;
  835. my_reserved = true;
  836. return true;
  837. } else {
  838. return false;
  839. }
  840. }
  841. //! Release a reserved item.
  842. /** true = item has been released and so remains in sender, dest must request or reserve future items */
  843. bool try_release( ) __TBB_override {
  844. spin_mutex::scoped_lock lock(my_mutex);
  845. __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
  846. my_reserved = false;
  847. if(!my_successors.empty())
  848. spawn_put();
  849. return true;
  850. }
  851. //! Consumes a reserved item
  852. bool try_consume( ) __TBB_override {
  853. spin_mutex::scoped_lock lock(my_mutex);
  854. __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
  855. my_reserved = false;
  856. my_has_cached_item = false;
  857. if ( !my_successors.empty() ) {
  858. spawn_put();
  859. }
  860. return true;
  861. }
  862. //! Activates a node that was created in the inactive state
  863. void activate() {
  864. spin_mutex::scoped_lock lock(my_mutex);
  865. my_active = true;
  866. if (!my_successors.empty())
  867. spawn_put();
  868. }
  869. template<typename Body>
  870. Body copy_function_object() {
  871. internal::input_body<output_type> &body_ref = *this->my_body;
  872. return dynamic_cast< internal::input_body_leaf<output_type, Body> & >(body_ref).get_body();
  873. }
  874. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  875. void extract( ) __TBB_override {
  876. my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
  877. my_active = false;
  878. my_reserved = false;
  879. if(my_has_cached_item) my_has_cached_item = false;
  880. }
  881. #endif
  882. protected:
  883. //! resets the input_node to its initial state
  884. void reset_node( reset_flags f) __TBB_override {
  885. my_active = false;
  886. my_reserved = false;
  887. my_has_cached_item = false;
  888. if(f & rf_clear_edges) my_successors.clear();
  889. if(f & rf_reset_bodies) {
  890. internal::input_body<output_type> *tmp = my_init_body->clone();
  891. delete my_body;
  892. my_body = tmp;
  893. }
  894. }
  895. private:
  896. spin_mutex my_mutex;
  897. bool my_active;
  898. internal::input_body<output_type> *my_body;
  899. internal::input_body<output_type> *my_init_body;
  900. internal::broadcast_cache< output_type > my_successors;
  901. bool my_reserved;
  902. bool my_has_cached_item;
  903. output_type my_cached_item;
  904. // used by apply_body_bypass, can invoke body of node.
  905. bool try_reserve_apply_body(output_type &v) {
  906. spin_mutex::scoped_lock lock(my_mutex);
  907. if ( my_reserved ) {
  908. return false;
  909. }
  910. if ( !my_has_cached_item ) {
  911. tbb::internal::fgt_begin_body( my_body );
  912. #if TBB_DEPRECATED_INPUT_NODE_BODY
  913. bool r = (*my_body)(my_cached_item);
  914. if (r) {
  915. my_has_cached_item = true;
  916. }
  917. #else
  918. flow_control control;
  919. my_cached_item = (*my_body)(control);
  920. my_has_cached_item = !control.is_pipeline_stopped;
  921. #endif
  922. tbb::internal::fgt_end_body( my_body );
  923. }
  924. if ( my_has_cached_item ) {
  925. v = my_cached_item;
  926. my_reserved = true;
  927. return true;
  928. } else {
  929. return false;
  930. }
  931. }
  932. task* create_put_task() {
  933. return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
  934. internal:: source_task_bypass < input_node< output_type > >( *this ) );
  935. }
  936. //! Spawns a task that applies the body
  937. void spawn_put( ) {
  938. if(internal::is_graph_active(this->my_graph)) {
  939. internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
  940. }
  941. }
  942. friend class internal::source_task_bypass< input_node< output_type > >;
  943. //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
  944. task * apply_body_bypass( ) {
  945. output_type v;
  946. if ( !try_reserve_apply_body(v) )
  947. return NULL;
  948. task *last_task = my_successors.try_put_task(v);
  949. if ( last_task )
  950. try_consume();
  951. else
  952. try_release();
  953. return last_task;
  954. }
  955. }; // class input_node
  956. #if TBB_USE_SOURCE_NODE_AS_ALIAS
  957. template < typename Output >
  958. class source_node : public input_node <Output> {
  959. public:
  960. //! Constructor for a node with a successor
  961. template< typename Body >
  962. __TBB_NOINLINE_SYM source_node( graph &g, Body body )
  963. : input_node<Output>(g, body)
  964. {
  965. }
  966. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  967. template <typename Body, typename... Successors>
  968. source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body )
  969. : input_node<Output>(successors, body) {
  970. }
  971. #endif
  972. };
  973. #else // TBB_USE_SOURCE_NODE_AS_ALIAS
  974. //! An executable node that acts as a source, i.e. it has no predecessors
  975. template < typename Output > class
  976. __TBB_DEPRECATED_MSG("TBB Warning: tbb::flow::source_node is deprecated, use tbb::flow::input_node." )
  977. source_node : public graph_node, public sender< Output > {
  978. public:
  979. //! The type of the output message, which is complete
  980. typedef Output output_type;
  981. //! The type of successors of this node
  982. typedef typename sender<output_type>::successor_type successor_type;
  983. //Source node has no input type
  984. typedef null_type input_type;
  985. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  986. typedef typename sender<output_type>::built_successors_type built_successors_type;
  987. typedef typename sender<output_type>::successor_list_type successor_list_type;
  988. #endif
  989. //! Constructor for a node with a successor
  990. template< typename Body >
  991. __TBB_NOINLINE_SYM source_node( graph &g, Body body, bool is_active = true )
  992. : graph_node(g), my_active(is_active), init_my_active(is_active),
  993. my_body( new internal::source_body_leaf< output_type, Body>(body) ),
  994. my_init_body( new internal::source_body_leaf< output_type, Body>(body) ),
  995. my_reserved(false), my_has_cached_item(false)
  996. {
  997. my_successors.set_owner(this);
  998. tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
  999. static_cast<sender<output_type> *>(this), this->my_body );
  1000. }
  1001. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1002. template <typename Body, typename... Successors>
  1003. source_node( const node_set<internal::order::preceding, Successors...>& successors, Body body, bool is_active = true )
  1004. : source_node(successors.graph_reference(), body, is_active) {
  1005. make_edges(*this, successors);
  1006. }
  1007. #endif
  1008. //! Copy constructor
  1009. __TBB_NOINLINE_SYM source_node( const source_node& src ) :
  1010. graph_node(src.my_graph), sender<Output>(),
  1011. my_active(src.init_my_active),
  1012. init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
  1013. my_reserved(false), my_has_cached_item(false)
  1014. {
  1015. my_successors.set_owner(this);
  1016. tbb::internal::fgt_node_with_body(CODEPTR(), tbb::internal::FLOW_SOURCE_NODE, &this->my_graph,
  1017. static_cast<sender<output_type> *>(this), this->my_body );
  1018. }
  1019. //! The destructor
  1020. ~source_node() { delete my_body; delete my_init_body; }
  1021. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  1022. void set_name( const char *name ) __TBB_override {
  1023. tbb::internal::fgt_node_desc( this, name );
  1024. }
  1025. #endif
  1026. //! Add a new successor to this node
  1027. bool register_successor( successor_type &r ) __TBB_override {
  1028. spin_mutex::scoped_lock lock(my_mutex);
  1029. my_successors.register_successor(r);
  1030. if ( my_active )
  1031. spawn_put();
  1032. return true;
  1033. }
  1034. //! Removes a successor from this node
  1035. bool remove_successor( successor_type &r ) __TBB_override {
  1036. spin_mutex::scoped_lock lock(my_mutex);
  1037. my_successors.remove_successor(r);
  1038. return true;
  1039. }
  1040. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1041. built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
  1042. void internal_add_built_successor( successor_type &r) __TBB_override {
  1043. spin_mutex::scoped_lock lock(my_mutex);
  1044. my_successors.internal_add_built_successor(r);
  1045. }
  1046. void internal_delete_built_successor( successor_type &r) __TBB_override {
  1047. spin_mutex::scoped_lock lock(my_mutex);
  1048. my_successors.internal_delete_built_successor(r);
  1049. }
  1050. size_t successor_count() __TBB_override {
  1051. spin_mutex::scoped_lock lock(my_mutex);
  1052. return my_successors.successor_count();
  1053. }
  1054. void copy_successors(successor_list_type &v) __TBB_override {
  1055. spin_mutex::scoped_lock l(my_mutex);
  1056. my_successors.copy_successors(v);
  1057. }
  1058. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  1059. //! Request an item from the node
  1060. bool try_get( output_type &v ) __TBB_override {
  1061. spin_mutex::scoped_lock lock(my_mutex);
  1062. if ( my_reserved )
  1063. return false;
  1064. if ( my_has_cached_item ) {
  1065. v = my_cached_item;
  1066. my_has_cached_item = false;
  1067. return true;
  1068. }
  1069. // we've been asked to provide an item, but we have none. enqueue a task to
  1070. // provide one.
  1071. spawn_put();
  1072. return false;
  1073. }
  1074. //! Reserves an item.
  1075. bool try_reserve( output_type &v ) __TBB_override {
  1076. spin_mutex::scoped_lock lock(my_mutex);
  1077. if ( my_reserved ) {
  1078. return false;
  1079. }
  1080. if ( my_has_cached_item ) {
  1081. v = my_cached_item;
  1082. my_reserved = true;
  1083. return true;
  1084. } else {
  1085. return false;
  1086. }
  1087. }
  1088. //! Release a reserved item.
  1089. /** true = item has been released and so remains in sender, dest must request or reserve future items */
  1090. bool try_release( ) __TBB_override {
  1091. spin_mutex::scoped_lock lock(my_mutex);
  1092. __TBB_ASSERT( my_reserved && my_has_cached_item, "releasing non-existent reservation" );
  1093. my_reserved = false;
  1094. if(!my_successors.empty())
  1095. spawn_put();
  1096. return true;
  1097. }
  1098. //! Consumes a reserved item
  1099. bool try_consume( ) __TBB_override {
  1100. spin_mutex::scoped_lock lock(my_mutex);
  1101. __TBB_ASSERT( my_reserved && my_has_cached_item, "consuming non-existent reservation" );
  1102. my_reserved = false;
  1103. my_has_cached_item = false;
  1104. if ( !my_successors.empty() ) {
  1105. spawn_put();
  1106. }
  1107. return true;
  1108. }
  1109. //! Activates a node that was created in the inactive state
  1110. void activate() {
  1111. spin_mutex::scoped_lock lock(my_mutex);
  1112. my_active = true;
  1113. if (!my_successors.empty())
  1114. spawn_put();
  1115. }
  1116. template<typename Body>
  1117. Body copy_function_object() {
  1118. internal::source_body<output_type> &body_ref = *this->my_body;
  1119. return dynamic_cast< internal::source_body_leaf<output_type, Body> & >(body_ref).get_body();
  1120. }
  1121. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1122. void extract( ) __TBB_override {
  1123. my_successors.built_successors().sender_extract(*this); // removes "my_owner" == this from each successor
  1124. my_active = init_my_active;
  1125. my_reserved = false;
  1126. if(my_has_cached_item) my_has_cached_item = false;
  1127. }
  1128. #endif
  1129. protected:
  1130. //! resets the source_node to its initial state
  1131. void reset_node( reset_flags f) __TBB_override {
  1132. my_active = init_my_active;
  1133. my_reserved =false;
  1134. if(my_has_cached_item) {
  1135. my_has_cached_item = false;
  1136. }
  1137. if(f & rf_clear_edges) my_successors.clear();
  1138. if(f & rf_reset_bodies) {
  1139. internal::source_body<output_type> *tmp = my_init_body->clone();
  1140. delete my_body;
  1141. my_body = tmp;
  1142. }
  1143. if(my_active)
  1144. internal::add_task_to_graph_reset_list(this->my_graph, create_put_task());
  1145. }
  1146. private:
  1147. spin_mutex my_mutex;
  1148. bool my_active;
  1149. bool init_my_active;
  1150. internal::source_body<output_type> *my_body;
  1151. internal::source_body<output_type> *my_init_body;
  1152. internal::broadcast_cache< output_type > my_successors;
  1153. bool my_reserved;
  1154. bool my_has_cached_item;
  1155. output_type my_cached_item;
  1156. // used by apply_body_bypass, can invoke body of node.
  1157. bool try_reserve_apply_body(output_type &v) {
  1158. spin_mutex::scoped_lock lock(my_mutex);
  1159. if ( my_reserved ) {
  1160. return false;
  1161. }
  1162. if ( !my_has_cached_item ) {
  1163. tbb::internal::fgt_begin_body( my_body );
  1164. bool r = (*my_body)(my_cached_item);
  1165. tbb::internal::fgt_end_body( my_body );
  1166. if (r) {
  1167. my_has_cached_item = true;
  1168. }
  1169. }
  1170. if ( my_has_cached_item ) {
  1171. v = my_cached_item;
  1172. my_reserved = true;
  1173. return true;
  1174. } else {
  1175. return false;
  1176. }
  1177. }
  1178. // when resetting, and if the source_node was created with my_active == true, then
  1179. // when we reset the node we must store a task to run the node, and spawn it only
  1180. // after the reset is complete and is_active() is again true. This is why we don't
  1181. // test for is_active() here.
  1182. task* create_put_task() {
  1183. return ( new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
  1184. internal:: source_task_bypass < source_node< output_type > >( *this ) );
  1185. }
  1186. //! Spawns a task that applies the body
  1187. void spawn_put( ) {
  1188. if(internal::is_graph_active(this->my_graph)) {
  1189. internal::spawn_in_graph_arena(this->my_graph, *create_put_task());
  1190. }
  1191. }
  1192. friend class internal::source_task_bypass< source_node< output_type > >;
  1193. //! Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it.
  1194. task * apply_body_bypass( ) {
  1195. output_type v;
  1196. if ( !try_reserve_apply_body(v) )
  1197. return NULL;
  1198. task *last_task = my_successors.try_put_task(v);
  1199. if ( last_task )
  1200. try_consume();
  1201. else
  1202. try_release();
  1203. return last_task;
  1204. }
  1205. }; // class source_node
  1206. #endif // TBB_USE_SOURCE_NODE_AS_ALIAS
  1207. //! Implements a function node that supports Input -> Output
  1208. template<typename Input, typename Output = continue_msg, typename Policy = queueing,
  1209. typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
  1210. class function_node
  1211. : public graph_node
  1212. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1213. , public internal::function_input< Input, Output, Policy, Allocator >
  1214. #else
  1215. , public internal::function_input< Input, Output, Policy, cache_aligned_allocator<Input> >
  1216. #endif
  1217. , public internal::function_output<Output> {
  1218. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1219. typedef Allocator internals_allocator;
  1220. #else
  1221. typedef cache_aligned_allocator<Input> internals_allocator;
  1222. __TBB_STATIC_ASSERT(
  1223. (tbb::internal::is_same_type<Allocator, null_type>::value),
  1224. "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
  1225. "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
  1226. );
  1227. #endif
  1228. public:
  1229. typedef Input input_type;
  1230. typedef Output output_type;
  1231. typedef internal::function_input<input_type,output_type,Policy,internals_allocator> input_impl_type;
  1232. typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
  1233. typedef internal::function_output<output_type> fOutput_type;
  1234. typedef typename input_impl_type::predecessor_type predecessor_type;
  1235. typedef typename fOutput_type::successor_type successor_type;
  1236. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1237. typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
  1238. typedef typename fOutput_type::successor_list_type successor_list_type;
  1239. #endif
  1240. using input_impl_type::my_predecessors;
  1241. //! Constructor
  1242. // input_queue_type is allocated here, but destroyed in the function_input_base.
  1243. // TODO: pass the graph_buffer_policy to the function_input_base so it can all
  1244. // be done in one place. This would be an interface-breaking change.
  1245. template< typename Body >
  1246. __TBB_NOINLINE_SYM function_node( graph &g, size_t concurrency,
  1247. #if __TBB_CPP11_PRESENT
  1248. Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
  1249. #else
  1250. __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority ))
  1251. #endif
  1252. : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
  1253. fOutput_type(g) {
  1254. tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
  1255. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
  1256. }
  1257. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
  1258. template <typename Body>
  1259. function_node( graph& g, size_t concurrency, Body body, node_priority_t priority )
  1260. : function_node(g, concurrency, body, Policy(), priority) {}
  1261. #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
  1262. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1263. template <typename Body, typename... Args>
  1264. function_node( const node_set<Args...>& nodes, size_t concurrency, Body body,
  1265. __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
  1266. : function_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
  1267. make_edges_in_order(nodes, *this);
  1268. }
  1269. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  1270. template <typename Body, typename... Args>
  1271. function_node( const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority )
  1272. : function_node(nodes, concurrency, body, Policy(), priority) {}
  1273. #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  1274. #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1275. //! Copy constructor
  1276. __TBB_NOINLINE_SYM function_node( const function_node& src ) :
  1277. graph_node(src.my_graph),
  1278. input_impl_type(src),
  1279. fOutput_type(src.my_graph) {
  1280. tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_FUNCTION_NODE, &this->my_graph,
  1281. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this), this->my_body );
  1282. }
  1283. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  1284. void set_name( const char *name ) __TBB_override {
  1285. tbb::internal::fgt_node_desc( this, name );
  1286. }
  1287. #endif
  1288. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1289. void extract( ) __TBB_override {
  1290. my_predecessors.built_predecessors().receiver_extract(*this);
  1291. successors().built_successors().sender_extract(*this);
  1292. }
  1293. #endif
  1294. protected:
  1295. template< typename R, typename B > friend class run_and_put_task;
  1296. template<typename X, typename Y> friend class internal::broadcast_cache;
  1297. template<typename X, typename Y> friend class internal::round_robin_cache;
  1298. using input_impl_type::try_put_task;
  1299. internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
  1300. void reset_node(reset_flags f) __TBB_override {
  1301. input_impl_type::reset_function_input(f);
  1302. // TODO: use clear() instead.
  1303. if(f & rf_clear_edges) {
  1304. successors().clear();
  1305. my_predecessors.clear();
  1306. }
  1307. __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "function_node successors not empty");
  1308. __TBB_ASSERT(this->my_predecessors.empty(), "function_node predecessors not empty");
  1309. }
  1310. }; // class function_node
  1311. //! implements a function node that supports Input -> (set of outputs)
  1312. // Output is a tuple of output types.
  1313. template<typename Input, typename Output, typename Policy = queueing,
  1314. typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input)>
  1315. class multifunction_node :
  1316. public graph_node,
  1317. public internal::multifunction_input
  1318. <
  1319. Input,
  1320. typename internal::wrap_tuple_elements<
  1321. tbb::flow::tuple_size<Output>::value, // #elements in tuple
  1322. internal::multifunction_output, // wrap this around each element
  1323. Output // the tuple providing the types
  1324. >::type,
  1325. Policy,
  1326. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1327. Allocator
  1328. #else
  1329. cache_aligned_allocator<Input>
  1330. #endif
  1331. > {
  1332. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1333. typedef Allocator internals_allocator;
  1334. #else
  1335. typedef cache_aligned_allocator<Input> internals_allocator;
  1336. __TBB_STATIC_ASSERT(
  1337. (tbb::internal::is_same_type<Allocator, null_type>::value),
  1338. "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
  1339. "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
  1340. );
  1341. #endif
  1342. protected:
  1343. static const int N = tbb::flow::tuple_size<Output>::value;
  1344. public:
  1345. typedef Input input_type;
  1346. typedef null_type output_type;
  1347. typedef typename internal::wrap_tuple_elements<N,internal::multifunction_output, Output>::type output_ports_type;
  1348. typedef internal::multifunction_input<
  1349. input_type, output_ports_type, Policy, internals_allocator> input_impl_type;
  1350. typedef internal::function_input_queue<input_type, internals_allocator> input_queue_type;
  1351. private:
  1352. using input_impl_type::my_predecessors;
  1353. public:
  1354. template<typename Body>
  1355. __TBB_NOINLINE_SYM multifunction_node(
  1356. graph &g, size_t concurrency,
  1357. #if __TBB_CPP11_PRESENT
  1358. Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
  1359. #else
  1360. __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
  1361. #endif
  1362. ) : graph_node(g), input_impl_type(g, concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
  1363. tbb::internal::fgt_multioutput_node_with_body<N>(
  1364. CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
  1365. &this->my_graph, static_cast<receiver<input_type> *>(this),
  1366. this->output_ports(), this->my_body
  1367. );
  1368. }
  1369. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
  1370. template <typename Body>
  1371. __TBB_NOINLINE_SYM multifunction_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
  1372. : multifunction_node(g, concurrency, body, Policy(), priority) {}
  1373. #endif // TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
  1374. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1375. template <typename Body, typename... Args>
  1376. __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body,
  1377. __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
  1378. : multifunction_node(nodes.graph_reference(), concurrency, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
  1379. make_edges_in_order(nodes, *this);
  1380. }
  1381. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  1382. template <typename Body, typename... Args>
  1383. __TBB_NOINLINE_SYM multifunction_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
  1384. : multifunction_node(nodes, concurrency, body, Policy(), priority) {}
  1385. #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  1386. #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1387. __TBB_NOINLINE_SYM multifunction_node( const multifunction_node &other) :
  1388. graph_node(other.my_graph), input_impl_type(other) {
  1389. tbb::internal::fgt_multioutput_node_with_body<N>( CODEPTR(), tbb::internal::FLOW_MULTIFUNCTION_NODE,
  1390. &this->my_graph, static_cast<receiver<input_type> *>(this),
  1391. this->output_ports(), this->my_body );
  1392. }
  1393. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  1394. void set_name( const char *name ) __TBB_override {
  1395. tbb::internal::fgt_multioutput_node_desc( this, name );
  1396. }
  1397. #endif
  1398. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1399. void extract( ) __TBB_override {
  1400. my_predecessors.built_predecessors().receiver_extract(*this);
  1401. input_impl_type::extract();
  1402. }
  1403. #endif
  1404. // all the guts are in multifunction_input...
  1405. protected:
  1406. void reset_node(reset_flags f) __TBB_override { input_impl_type::reset(f); }
  1407. }; // multifunction_node
  1408. //! split_node: accepts a tuple as input, forwards each element of the tuple to its
  1409. // successors. The node has unlimited concurrency, so it does not reject inputs.
  1410. template<typename TupleType, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(TupleType)>
  1411. class split_node : public graph_node, public receiver<TupleType> {
  1412. static const int N = tbb::flow::tuple_size<TupleType>::value;
  1413. typedef receiver<TupleType> base_type;
  1414. public:
  1415. typedef TupleType input_type;
  1416. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1417. typedef Allocator allocator_type;
  1418. #else
  1419. __TBB_STATIC_ASSERT(
  1420. (tbb::internal::is_same_type<Allocator, null_type>::value),
  1421. "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
  1422. "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
  1423. );
  1424. #endif
  1425. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1426. typedef typename base_type::predecessor_type predecessor_type;
  1427. typedef typename base_type::predecessor_list_type predecessor_list_type;
  1428. typedef internal::predecessor_cache<input_type, null_mutex > predecessor_cache_type;
  1429. typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
  1430. #endif
  1431. typedef typename internal::wrap_tuple_elements<
  1432. N, // #elements in tuple
  1433. internal::multifunction_output, // wrap this around each element
  1434. TupleType // the tuple providing the types
  1435. >::type output_ports_type;
  1436. __TBB_NOINLINE_SYM explicit split_node(graph &g)
  1437. : graph_node(g),
  1438. my_output_ports(internal::init_output_ports<output_ports_type>::call(g, my_output_ports))
  1439. {
  1440. tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
  1441. static_cast<receiver<input_type> *>(this), this->output_ports());
  1442. }
  1443. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1444. template <typename... Args>
  1445. __TBB_NOINLINE_SYM split_node(const node_set<Args...>& nodes) : split_node(nodes.graph_reference()) {
  1446. make_edges_in_order(nodes, *this);
  1447. }
  1448. #endif
  1449. __TBB_NOINLINE_SYM split_node(const split_node& other)
  1450. : graph_node(other.my_graph), base_type(other),
  1451. my_output_ports(internal::init_output_ports<output_ports_type>::call(other.my_graph, my_output_ports))
  1452. {
  1453. tbb::internal::fgt_multioutput_node<N>(CODEPTR(), tbb::internal::FLOW_SPLIT_NODE, &this->my_graph,
  1454. static_cast<receiver<input_type> *>(this), this->output_ports());
  1455. }
  1456. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  1457. void set_name( const char *name ) __TBB_override {
  1458. tbb::internal::fgt_multioutput_node_desc( this, name );
  1459. }
  1460. #endif
  1461. output_ports_type &output_ports() { return my_output_ports; }
  1462. protected:
  1463. task *try_put_task(const TupleType& t) __TBB_override {
  1464. // Sending split messages in parallel is not justified, as overheads would prevail.
  1465. // Also, we do not have successors here. So we just tell the task returned here is successful.
  1466. return internal::emit_element<N>::emit_this(this->my_graph, t, output_ports());
  1467. }
  1468. void reset_node(reset_flags f) __TBB_override {
  1469. if (f & rf_clear_edges)
  1470. internal::clear_element<N>::clear_this(my_output_ports);
  1471. __TBB_ASSERT(!(f & rf_clear_edges) || internal::clear_element<N>::this_empty(my_output_ports), "split_node reset failed");
  1472. }
  1473. void reset_receiver(reset_flags /*f*/) __TBB_override {}
  1474. graph& graph_reference() const __TBB_override {
  1475. return my_graph;
  1476. }
  1477. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1478. private: //! split_node doesn't use this "predecessors" functionality; so, we have "dummies" here;
  1479. void extract() __TBB_override {}
  1480. //! Adds to list of predecessors added by make_edge
  1481. void internal_add_built_predecessor(predecessor_type&) __TBB_override {}
  1482. //! removes from to list of predecessors (used by remove_edge)
  1483. void internal_delete_built_predecessor(predecessor_type&) __TBB_override {}
  1484. size_t predecessor_count() __TBB_override { return 0; }
  1485. void copy_predecessors(predecessor_list_type&) __TBB_override {}
  1486. built_predecessors_type &built_predecessors() __TBB_override { return my_predessors; }
  1487. //! dummy member
  1488. built_predecessors_type my_predessors;
  1489. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  1490. private:
  1491. output_ports_type my_output_ports;
  1492. };
  1493. //! Implements an executable node that supports continue_msg -> Output
  1494. template <typename Output, typename Policy = internal::Policy<void> >
  1495. class continue_node : public graph_node, public internal::continue_input<Output, Policy>,
  1496. public internal::function_output<Output> {
  1497. public:
  1498. typedef continue_msg input_type;
  1499. typedef Output output_type;
  1500. typedef internal::continue_input<Output, Policy> input_impl_type;
  1501. typedef internal::function_output<output_type> fOutput_type;
  1502. typedef typename input_impl_type::predecessor_type predecessor_type;
  1503. typedef typename fOutput_type::successor_type successor_type;
  1504. //! Constructor for executable node with continue_msg -> Output
  1505. template <typename Body >
  1506. __TBB_NOINLINE_SYM continue_node(
  1507. graph &g,
  1508. #if __TBB_CPP11_PRESENT
  1509. Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
  1510. #else
  1511. __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
  1512. #endif
  1513. ) : graph_node(g), input_impl_type( g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority) ),
  1514. fOutput_type(g) {
  1515. tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
  1516. static_cast<receiver<input_type> *>(this),
  1517. static_cast<sender<output_type> *>(this), this->my_body );
  1518. }
  1519. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
  1520. template <typename Body>
  1521. continue_node( graph& g, Body body, node_priority_t priority )
  1522. : continue_node(g, body, Policy(), priority) {}
  1523. #endif
  1524. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1525. template <typename Body, typename... Args>
  1526. continue_node( const node_set<Args...>& nodes, Body body,
  1527. __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority))
  1528. : continue_node(nodes.graph_reference(), body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority) ) {
  1529. make_edges_in_order(nodes, *this);
  1530. }
  1531. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  1532. template <typename Body, typename... Args>
  1533. continue_node( const node_set<Args...>& nodes, Body body, node_priority_t priority)
  1534. : continue_node(nodes, body, Policy(), priority) {}
  1535. #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  1536. #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1537. //! Constructor for executable node with continue_msg -> Output
  1538. template <typename Body >
  1539. __TBB_NOINLINE_SYM continue_node(
  1540. graph &g, int number_of_predecessors,
  1541. #if __TBB_CPP11_PRESENT
  1542. Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority )
  1543. #else
  1544. __TBB_FLOW_GRAPH_PRIORITY_ARG1( Body body, node_priority_t priority = tbb::flow::internal::no_priority )
  1545. #endif
  1546. ) : graph_node(g)
  1547. , input_impl_type(g, number_of_predecessors, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)),
  1548. fOutput_type(g) {
  1549. tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
  1550. static_cast<receiver<input_type> *>(this),
  1551. static_cast<sender<output_type> *>(this), this->my_body );
  1552. }
  1553. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
  1554. template <typename Body>
  1555. continue_node( graph& g, int number_of_predecessors, Body body, node_priority_t priority)
  1556. : continue_node(g, number_of_predecessors, body, Policy(), priority) {}
  1557. #endif
  1558. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1559. template <typename Body, typename... Args>
  1560. continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
  1561. Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1( Policy p = Policy(), node_priority_t priority = tbb::flow::internal::no_priority ))
  1562. : continue_node(nodes.graph_reference(), number_of_predecessors, body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(p, priority)) {
  1563. make_edges_in_order(nodes, *this);
  1564. }
  1565. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  1566. template <typename Body, typename... Args>
  1567. continue_node( const node_set<Args...>& nodes, int number_of_predecessors,
  1568. Body body, node_priority_t priority )
  1569. : continue_node(nodes, number_of_predecessors, body, Policy(), priority) {}
  1570. #endif
  1571. #endif
  1572. //! Copy constructor
  1573. __TBB_NOINLINE_SYM continue_node( const continue_node& src ) :
  1574. graph_node(src.my_graph), input_impl_type(src),
  1575. internal::function_output<Output>(src.my_graph) {
  1576. tbb::internal::fgt_node_with_body( CODEPTR(), tbb::internal::FLOW_CONTINUE_NODE, &this->my_graph,
  1577. static_cast<receiver<input_type> *>(this),
  1578. static_cast<sender<output_type> *>(this), this->my_body );
  1579. }
  1580. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  1581. void set_name( const char *name ) __TBB_override {
  1582. tbb::internal::fgt_node_desc( this, name );
  1583. }
  1584. #endif
  1585. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1586. void extract() __TBB_override {
  1587. input_impl_type::my_built_predecessors.receiver_extract(*this);
  1588. successors().built_successors().sender_extract(*this);
  1589. }
  1590. #endif
  1591. protected:
  1592. template< typename R, typename B > friend class run_and_put_task;
  1593. template<typename X, typename Y> friend class internal::broadcast_cache;
  1594. template<typename X, typename Y> friend class internal::round_robin_cache;
  1595. using input_impl_type::try_put_task;
  1596. internal::broadcast_cache<output_type> &successors () __TBB_override { return fOutput_type::my_successors; }
  1597. void reset_node(reset_flags f) __TBB_override {
  1598. input_impl_type::reset_receiver(f);
  1599. if(f & rf_clear_edges)successors().clear();
  1600. __TBB_ASSERT(!(f & rf_clear_edges) || successors().empty(), "continue_node not reset");
  1601. }
  1602. }; // continue_node
  1603. //! Forwards messages of type T to all successors
  1604. template <typename T>
  1605. class broadcast_node : public graph_node, public receiver<T>, public sender<T> {
  1606. public:
  1607. typedef T input_type;
  1608. typedef T output_type;
  1609. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  1610. typedef typename sender<output_type>::successor_type successor_type;
  1611. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1612. typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
  1613. typedef typename sender<output_type>::successor_list_type successor_list_type;
  1614. #endif
  1615. private:
  1616. internal::broadcast_cache<input_type> my_successors;
  1617. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1618. internal::edge_container<predecessor_type> my_built_predecessors;
  1619. spin_mutex pred_mutex; // serialize accesses on edge_container
  1620. #endif
  1621. public:
  1622. __TBB_NOINLINE_SYM explicit broadcast_node(graph& g) : graph_node(g) {
  1623. my_successors.set_owner( this );
  1624. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
  1625. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
  1626. }
  1627. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  1628. template <typename... Args>
  1629. broadcast_node(const node_set<Args...>& nodes) : broadcast_node(nodes.graph_reference()) {
  1630. make_edges_in_order(nodes, *this);
  1631. }
  1632. #endif
  1633. // Copy constructor
  1634. __TBB_NOINLINE_SYM broadcast_node( const broadcast_node& src ) :
  1635. graph_node(src.my_graph), receiver<T>(), sender<T>()
  1636. {
  1637. my_successors.set_owner( this );
  1638. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BROADCAST_NODE, &this->my_graph,
  1639. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
  1640. }
  1641. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  1642. void set_name( const char *name ) __TBB_override {
  1643. tbb::internal::fgt_node_desc( this, name );
  1644. }
  1645. #endif
  1646. //! Adds a successor
  1647. bool register_successor( successor_type &r ) __TBB_override {
  1648. my_successors.register_successor( r );
  1649. return true;
  1650. }
  1651. //! Removes s as a successor
  1652. bool remove_successor( successor_type &r ) __TBB_override {
  1653. my_successors.remove_successor( r );
  1654. return true;
  1655. }
  1656. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1657. typedef typename sender<T>::built_successors_type built_successors_type;
  1658. built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
  1659. void internal_add_built_successor(successor_type &r) __TBB_override {
  1660. my_successors.internal_add_built_successor(r);
  1661. }
  1662. void internal_delete_built_successor(successor_type &r) __TBB_override {
  1663. my_successors.internal_delete_built_successor(r);
  1664. }
  1665. size_t successor_count() __TBB_override {
  1666. return my_successors.successor_count();
  1667. }
  1668. void copy_successors(successor_list_type &v) __TBB_override {
  1669. my_successors.copy_successors(v);
  1670. }
  1671. typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
  1672. built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
  1673. void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
  1674. spin_mutex::scoped_lock l(pred_mutex);
  1675. my_built_predecessors.add_edge(p);
  1676. }
  1677. void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
  1678. spin_mutex::scoped_lock l(pred_mutex);
  1679. my_built_predecessors.delete_edge(p);
  1680. }
  1681. size_t predecessor_count() __TBB_override {
  1682. spin_mutex::scoped_lock l(pred_mutex);
  1683. return my_built_predecessors.edge_count();
  1684. }
  1685. void copy_predecessors(predecessor_list_type &v) __TBB_override {
  1686. spin_mutex::scoped_lock l(pred_mutex);
  1687. my_built_predecessors.copy_edges(v);
  1688. }
  1689. void extract() __TBB_override {
  1690. my_built_predecessors.receiver_extract(*this);
  1691. my_successors.built_successors().sender_extract(*this);
  1692. }
  1693. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  1694. protected:
  1695. template< typename R, typename B > friend class run_and_put_task;
  1696. template<typename X, typename Y> friend class internal::broadcast_cache;
  1697. template<typename X, typename Y> friend class internal::round_robin_cache;
  1698. //! build a task to run the successor if possible. Default is old behavior.
  1699. task *try_put_task(const T& t) __TBB_override {
  1700. task *new_task = my_successors.try_put_task(t);
  1701. if (!new_task) new_task = SUCCESSFULLY_ENQUEUED;
  1702. return new_task;
  1703. }
  1704. graph& graph_reference() const __TBB_override {
  1705. return my_graph;
  1706. }
  1707. void reset_receiver(reset_flags /*f*/) __TBB_override {}
  1708. void reset_node(reset_flags f) __TBB_override {
  1709. if (f&rf_clear_edges) {
  1710. my_successors.clear();
  1711. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1712. my_built_predecessors.clear();
  1713. #endif
  1714. }
  1715. __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.empty(), "Error resetting broadcast_node");
  1716. }
  1717. }; // broadcast_node
  1718. //! Forwards messages in arbitrary order
  1719. template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
  1720. class buffer_node
  1721. : public graph_node
  1722. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1723. , public internal::reservable_item_buffer< T, Allocator >
  1724. #else
  1725. , public internal::reservable_item_buffer< T, cache_aligned_allocator<T> >
  1726. #endif
  1727. , public receiver<T>, public sender<T> {
  1728. #if TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1729. typedef Allocator internals_allocator;
  1730. #else
  1731. typedef cache_aligned_allocator<T> internals_allocator;
  1732. #endif
  1733. public:
  1734. typedef T input_type;
  1735. typedef T output_type;
  1736. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  1737. typedef typename sender<output_type>::successor_type successor_type;
  1738. typedef buffer_node<T, Allocator> class_type;
  1739. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1740. typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
  1741. typedef typename sender<output_type>::successor_list_type successor_list_type;
  1742. #endif
  1743. #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  1744. __TBB_STATIC_ASSERT(
  1745. (tbb::internal::is_same_type<Allocator, null_type>::value),
  1746. "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
  1747. "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
  1748. );
  1749. #endif
  1750. protected:
  1751. typedef size_t size_type;
  1752. internal::round_robin_cache< T, null_rw_mutex > my_successors;
  1753. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1754. internal::edge_container<predecessor_type> my_built_predecessors;
  1755. #endif
  1756. friend class internal::forward_task_bypass< class_type >;
  1757. enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
  1758. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1759. , add_blt_succ, del_blt_succ,
  1760. add_blt_pred, del_blt_pred,
  1761. blt_succ_cnt, blt_pred_cnt,
  1762. blt_succ_cpy, blt_pred_cpy // create vector copies of preds and succs
  1763. #endif
  1764. };
  1765. // implements the aggregator_operation concept
  1766. class buffer_operation : public internal::aggregated_operation< buffer_operation > {
  1767. public:
  1768. char type;
  1769. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1770. task * ltask;
  1771. union {
  1772. input_type *elem;
  1773. successor_type *r;
  1774. predecessor_type *p;
  1775. size_t cnt_val;
  1776. successor_list_type *svec;
  1777. predecessor_list_type *pvec;
  1778. };
  1779. #else
  1780. T *elem;
  1781. task * ltask;
  1782. successor_type *r;
  1783. #endif
  1784. buffer_operation(const T& e, op_type t) : type(char(t))
  1785. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1786. , ltask(NULL), elem(const_cast<T*>(&e))
  1787. #else
  1788. , elem(const_cast<T*>(&e)) , ltask(NULL)
  1789. #endif
  1790. {}
  1791. buffer_operation(op_type t) : type(char(t)), ltask(NULL) {}
  1792. };
  1793. bool forwarder_busy;
  1794. typedef internal::aggregating_functor<class_type, buffer_operation> handler_type;
  1795. friend class internal::aggregating_functor<class_type, buffer_operation>;
  1796. internal::aggregator< handler_type, buffer_operation> my_aggregator;
  1797. virtual void handle_operations(buffer_operation *op_list) {
  1798. handle_operations_impl(op_list, this);
  1799. }
  1800. template<typename derived_type>
  1801. void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
  1802. __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
  1803. buffer_operation *tmp = NULL;
  1804. bool try_forwarding = false;
  1805. while (op_list) {
  1806. tmp = op_list;
  1807. op_list = op_list->next;
  1808. switch (tmp->type) {
  1809. case reg_succ: internal_reg_succ(tmp); try_forwarding = true; break;
  1810. case rem_succ: internal_rem_succ(tmp); break;
  1811. case req_item: internal_pop(tmp); break;
  1812. case res_item: internal_reserve(tmp); break;
  1813. case rel_res: internal_release(tmp); try_forwarding = true; break;
  1814. case con_res: internal_consume(tmp); try_forwarding = true; break;
  1815. case put_item: try_forwarding = internal_push(tmp); break;
  1816. case try_fwd_task: internal_forward_task(tmp); break;
  1817. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1818. // edge recording
  1819. case add_blt_succ: internal_add_built_succ(tmp); break;
  1820. case del_blt_succ: internal_del_built_succ(tmp); break;
  1821. case add_blt_pred: internal_add_built_pred(tmp); break;
  1822. case del_blt_pred: internal_del_built_pred(tmp); break;
  1823. case blt_succ_cnt: internal_succ_cnt(tmp); break;
  1824. case blt_pred_cnt: internal_pred_cnt(tmp); break;
  1825. case blt_succ_cpy: internal_copy_succs(tmp); break;
  1826. case blt_pred_cpy: internal_copy_preds(tmp); break;
  1827. #endif
  1828. }
  1829. }
  1830. derived->order();
  1831. if (try_forwarding && !forwarder_busy) {
  1832. if(internal::is_graph_active(this->my_graph)) {
  1833. forwarder_busy = true;
  1834. task *new_task = new(task::allocate_additional_child_of(*(this->my_graph.root_task()))) internal::
  1835. forward_task_bypass<class_type>(*this);
  1836. // tmp should point to the last item handled by the aggregator. This is the operation
  1837. // the handling thread enqueued. So modifying that record will be okay.
  1838. // workaround for icc bug
  1839. tbb::task *z = tmp->ltask;
  1840. graph &g = this->my_graph;
  1841. tmp->ltask = combine_tasks(g, z, new_task); // in case the op generated a task
  1842. }
  1843. }
  1844. } // handle_operations
  1845. inline task *grab_forwarding_task( buffer_operation &op_data) {
  1846. return op_data.ltask;
  1847. }
  1848. inline bool enqueue_forwarding_task(buffer_operation &op_data) {
  1849. task *ft = grab_forwarding_task(op_data);
  1850. if(ft) {
  1851. internal::spawn_in_graph_arena(graph_reference(), *ft);
  1852. return true;
  1853. }
  1854. return false;
  1855. }
  1856. //! This is executed by an enqueued task, the "forwarder"
  1857. virtual task *forward_task() {
  1858. buffer_operation op_data(try_fwd_task);
  1859. task *last_task = NULL;
  1860. do {
  1861. op_data.status = internal::WAIT;
  1862. op_data.ltask = NULL;
  1863. my_aggregator.execute(&op_data);
  1864. // workaround for icc bug
  1865. tbb::task *xtask = op_data.ltask;
  1866. graph& g = this->my_graph;
  1867. last_task = combine_tasks(g, last_task, xtask);
  1868. } while (op_data.status ==internal::SUCCEEDED);
  1869. return last_task;
  1870. }
  1871. //! Register successor
  1872. virtual void internal_reg_succ(buffer_operation *op) {
  1873. my_successors.register_successor(*(op->r));
  1874. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1875. }
  1876. //! Remove successor
  1877. virtual void internal_rem_succ(buffer_operation *op) {
  1878. my_successors.remove_successor(*(op->r));
  1879. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1880. }
  1881. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  1882. typedef typename sender<T>::built_successors_type built_successors_type;
  1883. built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
  1884. virtual void internal_add_built_succ(buffer_operation *op) {
  1885. my_successors.internal_add_built_successor(*(op->r));
  1886. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1887. }
  1888. virtual void internal_del_built_succ(buffer_operation *op) {
  1889. my_successors.internal_delete_built_successor(*(op->r));
  1890. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1891. }
  1892. typedef typename receiver<T>::built_predecessors_type built_predecessors_type;
  1893. built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
  1894. virtual void internal_add_built_pred(buffer_operation *op) {
  1895. my_built_predecessors.add_edge(*(op->p));
  1896. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1897. }
  1898. virtual void internal_del_built_pred(buffer_operation *op) {
  1899. my_built_predecessors.delete_edge(*(op->p));
  1900. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1901. }
  1902. virtual void internal_succ_cnt(buffer_operation *op) {
  1903. op->cnt_val = my_successors.successor_count();
  1904. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1905. }
  1906. virtual void internal_pred_cnt(buffer_operation *op) {
  1907. op->cnt_val = my_built_predecessors.edge_count();
  1908. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1909. }
  1910. virtual void internal_copy_succs(buffer_operation *op) {
  1911. my_successors.copy_successors(*(op->svec));
  1912. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1913. }
  1914. virtual void internal_copy_preds(buffer_operation *op) {
  1915. my_built_predecessors.copy_edges(*(op->pvec));
  1916. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1917. }
  1918. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  1919. private:
  1920. void order() {}
  1921. bool is_item_valid() {
  1922. return this->my_item_valid(this->my_tail - 1);
  1923. }
  1924. void try_put_and_add_task(task*& last_task) {
  1925. task *new_task = my_successors.try_put_task(this->back());
  1926. if (new_task) {
  1927. // workaround for icc bug
  1928. graph& g = this->my_graph;
  1929. last_task = combine_tasks(g, last_task, new_task);
  1930. this->destroy_back();
  1931. }
  1932. }
  1933. protected:
  1934. //! Tries to forward valid items to successors
  1935. virtual void internal_forward_task(buffer_operation *op) {
  1936. internal_forward_task_impl(op, this);
  1937. }
  1938. template<typename derived_type>
  1939. void internal_forward_task_impl(buffer_operation *op, derived_type* derived) {
  1940. __TBB_ASSERT(static_cast<class_type*>(derived) == this, "'this' is not a base class for derived");
  1941. if (this->my_reserved || !derived->is_item_valid()) {
  1942. __TBB_store_with_release(op->status, internal::FAILED);
  1943. this->forwarder_busy = false;
  1944. return;
  1945. }
  1946. // Try forwarding, giving each successor a chance
  1947. task * last_task = NULL;
  1948. size_type counter = my_successors.size();
  1949. for (; counter > 0 && derived->is_item_valid(); --counter)
  1950. derived->try_put_and_add_task(last_task);
  1951. op->ltask = last_task; // return task
  1952. if (last_task && !counter) {
  1953. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1954. }
  1955. else {
  1956. __TBB_store_with_release(op->status, internal::FAILED);
  1957. forwarder_busy = false;
  1958. }
  1959. }
  1960. virtual bool internal_push(buffer_operation *op) {
  1961. this->push_back(*(op->elem));
  1962. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1963. return true;
  1964. }
  1965. virtual void internal_pop(buffer_operation *op) {
  1966. if(this->pop_back(*(op->elem))) {
  1967. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1968. }
  1969. else {
  1970. __TBB_store_with_release(op->status, internal::FAILED);
  1971. }
  1972. }
  1973. virtual void internal_reserve(buffer_operation *op) {
  1974. if(this->reserve_front(*(op->elem))) {
  1975. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1976. }
  1977. else {
  1978. __TBB_store_with_release(op->status, internal::FAILED);
  1979. }
  1980. }
  1981. virtual void internal_consume(buffer_operation *op) {
  1982. this->consume_front();
  1983. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1984. }
  1985. virtual void internal_release(buffer_operation *op) {
  1986. this->release_front();
  1987. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  1988. }
  1989. public:
  1990. //! Constructor
  1991. __TBB_NOINLINE_SYM explicit buffer_node( graph &g )
  1992. : graph_node(g), internal::reservable_item_buffer<T, internals_allocator>(), receiver<T>(),
  1993. sender<T>(), forwarder_busy(false)
  1994. {
  1995. my_successors.set_owner(this);
  1996. my_aggregator.initialize_handler(handler_type(this));
  1997. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
  1998. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
  1999. }
  2000. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2001. template <typename... Args>
  2002. buffer_node(const node_set<Args...>& nodes) : buffer_node(nodes.graph_reference()) {
  2003. make_edges_in_order(nodes, *this);
  2004. }
  2005. #endif
  2006. //! Copy constructor
  2007. __TBB_NOINLINE_SYM buffer_node( const buffer_node& src )
  2008. : graph_node(src.my_graph), internal::reservable_item_buffer<T, internals_allocator>(),
  2009. receiver<T>(), sender<T>(), forwarder_busy(false)
  2010. {
  2011. my_successors.set_owner(this);
  2012. my_aggregator.initialize_handler(handler_type(this));
  2013. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_BUFFER_NODE, &this->my_graph,
  2014. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
  2015. }
  2016. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2017. void set_name( const char *name ) __TBB_override {
  2018. tbb::internal::fgt_node_desc( this, name );
  2019. }
  2020. #endif
  2021. //
  2022. // message sender implementation
  2023. //
  2024. //! Adds a new successor.
  2025. /** Adds successor r to the list of successors; may forward tasks. */
  2026. bool register_successor( successor_type &r ) __TBB_override {
  2027. buffer_operation op_data(reg_succ);
  2028. op_data.r = &r;
  2029. my_aggregator.execute(&op_data);
  2030. (void)enqueue_forwarding_task(op_data);
  2031. return true;
  2032. }
  2033. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  2034. void internal_add_built_successor( successor_type &r) __TBB_override {
  2035. buffer_operation op_data(add_blt_succ);
  2036. op_data.r = &r;
  2037. my_aggregator.execute(&op_data);
  2038. }
  2039. void internal_delete_built_successor( successor_type &r) __TBB_override {
  2040. buffer_operation op_data(del_blt_succ);
  2041. op_data.r = &r;
  2042. my_aggregator.execute(&op_data);
  2043. }
  2044. void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
  2045. buffer_operation op_data(add_blt_pred);
  2046. op_data.p = &p;
  2047. my_aggregator.execute(&op_data);
  2048. }
  2049. void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
  2050. buffer_operation op_data(del_blt_pred);
  2051. op_data.p = &p;
  2052. my_aggregator.execute(&op_data);
  2053. }
  2054. size_t predecessor_count() __TBB_override {
  2055. buffer_operation op_data(blt_pred_cnt);
  2056. my_aggregator.execute(&op_data);
  2057. return op_data.cnt_val;
  2058. }
  2059. size_t successor_count() __TBB_override {
  2060. buffer_operation op_data(blt_succ_cnt);
  2061. my_aggregator.execute(&op_data);
  2062. return op_data.cnt_val;
  2063. }
  2064. void copy_predecessors( predecessor_list_type &v ) __TBB_override {
  2065. buffer_operation op_data(blt_pred_cpy);
  2066. op_data.pvec = &v;
  2067. my_aggregator.execute(&op_data);
  2068. }
  2069. void copy_successors( successor_list_type &v ) __TBB_override {
  2070. buffer_operation op_data(blt_succ_cpy);
  2071. op_data.svec = &v;
  2072. my_aggregator.execute(&op_data);
  2073. }
  2074. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  2075. //! Removes a successor.
  2076. /** Removes successor r from the list of successors.
  2077. It also calls r.remove_predecessor(*this) to remove this node as a predecessor. */
  2078. bool remove_successor( successor_type &r ) __TBB_override {
  2079. r.remove_predecessor(*this);
  2080. buffer_operation op_data(rem_succ);
  2081. op_data.r = &r;
  2082. my_aggregator.execute(&op_data);
  2083. // even though this operation does not cause a forward, if we are the handler, and
  2084. // a forward is scheduled, we may be the first to reach this point after the aggregator,
  2085. // and so should check for the task.
  2086. (void)enqueue_forwarding_task(op_data);
  2087. return true;
  2088. }
  2089. //! Request an item from the buffer_node
  2090. /** true = v contains the returned item<BR>
  2091. false = no item has been returned */
  2092. bool try_get( T &v ) __TBB_override {
  2093. buffer_operation op_data(req_item);
  2094. op_data.elem = &v;
  2095. my_aggregator.execute(&op_data);
  2096. (void)enqueue_forwarding_task(op_data);
  2097. return (op_data.status==internal::SUCCEEDED);
  2098. }
  2099. //! Reserves an item.
  2100. /** false = no item can be reserved<BR>
  2101. true = an item is reserved */
  2102. bool try_reserve( T &v ) __TBB_override {
  2103. buffer_operation op_data(res_item);
  2104. op_data.elem = &v;
  2105. my_aggregator.execute(&op_data);
  2106. (void)enqueue_forwarding_task(op_data);
  2107. return (op_data.status==internal::SUCCEEDED);
  2108. }
  2109. //! Release a reserved item.
  2110. /** true = item has been released and so remains in sender */
  2111. bool try_release() __TBB_override {
  2112. buffer_operation op_data(rel_res);
  2113. my_aggregator.execute(&op_data);
  2114. (void)enqueue_forwarding_task(op_data);
  2115. return true;
  2116. }
  2117. //! Consumes a reserved item.
  2118. /** true = item is removed from sender and reservation removed */
  2119. bool try_consume() __TBB_override {
  2120. buffer_operation op_data(con_res);
  2121. my_aggregator.execute(&op_data);
  2122. (void)enqueue_forwarding_task(op_data);
  2123. return true;
  2124. }
  2125. protected:
  2126. template< typename R, typename B > friend class run_and_put_task;
  2127. template<typename X, typename Y> friend class internal::broadcast_cache;
  2128. template<typename X, typename Y> friend class internal::round_robin_cache;
  2129. //! receive an item, return a task *if possible
  2130. task *try_put_task(const T &t) __TBB_override {
  2131. buffer_operation op_data(t, put_item);
  2132. my_aggregator.execute(&op_data);
  2133. task *ft = grab_forwarding_task(op_data);
  2134. // sequencer_nodes can return failure (if an item has been previously inserted)
  2135. // We have to spawn the returned task if our own operation fails.
  2136. if(ft && op_data.status ==internal::FAILED) {
  2137. // we haven't succeeded queueing the item, but for some reason the
  2138. // call returned a task (if another request resulted in a successful
  2139. // forward this could happen.) Queue the task and reset the pointer.
  2140. internal::spawn_in_graph_arena(graph_reference(), *ft); ft = NULL;
  2141. }
  2142. else if(!ft && op_data.status ==internal::SUCCEEDED) {
  2143. ft = SUCCESSFULLY_ENQUEUED;
  2144. }
  2145. return ft;
  2146. }
  2147. graph& graph_reference() const __TBB_override {
  2148. return my_graph;
  2149. }
  2150. void reset_receiver(reset_flags /*f*/) __TBB_override { }
  2151. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  2152. public:
  2153. void extract() __TBB_override {
  2154. my_built_predecessors.receiver_extract(*this);
  2155. my_successors.built_successors().sender_extract(*this);
  2156. }
  2157. #endif
  2158. protected:
  2159. void reset_node( reset_flags f) __TBB_override {
  2160. internal::reservable_item_buffer<T, internals_allocator>::reset();
  2161. // TODO: just clear structures
  2162. if (f&rf_clear_edges) {
  2163. my_successors.clear();
  2164. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  2165. my_built_predecessors.clear();
  2166. #endif
  2167. }
  2168. forwarder_busy = false;
  2169. }
  2170. }; // buffer_node
  2171. //! Forwards messages in FIFO order
  2172. template <typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
  2173. class queue_node : public buffer_node<T, Allocator> {
  2174. #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  2175. __TBB_STATIC_ASSERT(
  2176. (tbb::internal::is_same_type<Allocator, null_type>::value),
  2177. "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
  2178. "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
  2179. );
  2180. #endif
  2181. protected:
  2182. typedef buffer_node<T, Allocator> base_type;
  2183. typedef typename base_type::size_type size_type;
  2184. typedef typename base_type::buffer_operation queue_operation;
  2185. typedef queue_node class_type;
  2186. private:
  2187. template<typename, typename> friend class buffer_node;
  2188. bool is_item_valid() {
  2189. return this->my_item_valid(this->my_head);
  2190. }
  2191. void try_put_and_add_task(task*& last_task) {
  2192. task *new_task = this->my_successors.try_put_task(this->front());
  2193. if (new_task) {
  2194. // workaround for icc bug
  2195. graph& graph_ref = this->graph_reference();
  2196. last_task = combine_tasks(graph_ref, last_task, new_task);
  2197. this->destroy_front();
  2198. }
  2199. }
  2200. protected:
  2201. void internal_forward_task(queue_operation *op) __TBB_override {
  2202. this->internal_forward_task_impl(op, this);
  2203. }
  2204. void internal_pop(queue_operation *op) __TBB_override {
  2205. if ( this->my_reserved || !this->my_item_valid(this->my_head)){
  2206. __TBB_store_with_release(op->status, internal::FAILED);
  2207. }
  2208. else {
  2209. this->pop_front(*(op->elem));
  2210. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2211. }
  2212. }
  2213. void internal_reserve(queue_operation *op) __TBB_override {
  2214. if (this->my_reserved || !this->my_item_valid(this->my_head)) {
  2215. __TBB_store_with_release(op->status, internal::FAILED);
  2216. }
  2217. else {
  2218. this->reserve_front(*(op->elem));
  2219. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2220. }
  2221. }
  2222. void internal_consume(queue_operation *op) __TBB_override {
  2223. this->consume_front();
  2224. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2225. }
  2226. public:
  2227. typedef T input_type;
  2228. typedef T output_type;
  2229. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  2230. typedef typename sender<output_type>::successor_type successor_type;
  2231. //! Constructor
  2232. __TBB_NOINLINE_SYM explicit queue_node( graph &g ) : base_type(g) {
  2233. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
  2234. static_cast<receiver<input_type> *>(this),
  2235. static_cast<sender<output_type> *>(this) );
  2236. }
  2237. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2238. template <typename... Args>
  2239. queue_node( const node_set<Args...>& nodes) : queue_node(nodes.graph_reference()) {
  2240. make_edges_in_order(nodes, *this);
  2241. }
  2242. #endif
  2243. //! Copy constructor
  2244. __TBB_NOINLINE_SYM queue_node( const queue_node& src) : base_type(src) {
  2245. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_QUEUE_NODE, &(this->my_graph),
  2246. static_cast<receiver<input_type> *>(this),
  2247. static_cast<sender<output_type> *>(this) );
  2248. }
  2249. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2250. void set_name( const char *name ) __TBB_override {
  2251. tbb::internal::fgt_node_desc( this, name );
  2252. }
  2253. #endif
  2254. protected:
  2255. void reset_node( reset_flags f) __TBB_override {
  2256. base_type::reset_node(f);
  2257. }
  2258. }; // queue_node
  2259. //! Forwards messages in sequence order
  2260. template< typename T, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T) >
  2261. class sequencer_node : public queue_node<T, Allocator> {
  2262. internal::function_body< T, size_t > *my_sequencer;
  2263. // my_sequencer should be a benign function and must be callable
  2264. // from a parallel context. Does this mean it needn't be reset?
  2265. public:
  2266. #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  2267. __TBB_STATIC_ASSERT(
  2268. (tbb::internal::is_same_type<Allocator, null_type>::value),
  2269. "Allocator template parameter for flow graph nodes is deprecated and will be removed. "
  2270. "Specify TBB_DEPRECATED_FLOW_NODE_ALLOCATOR to temporary enable the deprecated interface."
  2271. );
  2272. #endif
  2273. typedef T input_type;
  2274. typedef T output_type;
  2275. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  2276. typedef typename sender<output_type>::successor_type successor_type;
  2277. //! Constructor
  2278. template< typename Sequencer >
  2279. __TBB_NOINLINE_SYM sequencer_node( graph &g, const Sequencer& s ) : queue_node<T, Allocator>(g),
  2280. my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
  2281. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
  2282. static_cast<receiver<input_type> *>(this),
  2283. static_cast<sender<output_type> *>(this) );
  2284. }
  2285. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2286. template <typename Sequencer, typename... Args>
  2287. sequencer_node( const node_set<Args...>& nodes, const Sequencer& s)
  2288. : sequencer_node(nodes.graph_reference(), s) {
  2289. make_edges_in_order(nodes, *this);
  2290. }
  2291. #endif
  2292. //! Copy constructor
  2293. __TBB_NOINLINE_SYM sequencer_node( const sequencer_node& src ) : queue_node<T, Allocator>(src),
  2294. my_sequencer( src.my_sequencer->clone() ) {
  2295. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_SEQUENCER_NODE, &(this->my_graph),
  2296. static_cast<receiver<input_type> *>(this),
  2297. static_cast<sender<output_type> *>(this) );
  2298. }
  2299. //! Destructor
  2300. ~sequencer_node() { delete my_sequencer; }
  2301. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2302. void set_name( const char *name ) __TBB_override {
  2303. tbb::internal::fgt_node_desc( this, name );
  2304. }
  2305. #endif
  2306. protected:
  2307. typedef typename buffer_node<T, Allocator>::size_type size_type;
  2308. typedef typename buffer_node<T, Allocator>::buffer_operation sequencer_operation;
  2309. private:
  2310. bool internal_push(sequencer_operation *op) __TBB_override {
  2311. size_type tag = (*my_sequencer)(*(op->elem));
  2312. #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES
  2313. if (tag < this->my_head) {
  2314. // have already emitted a message with this tag
  2315. __TBB_store_with_release(op->status, internal::FAILED);
  2316. return false;
  2317. }
  2318. #endif
  2319. // cannot modify this->my_tail now; the buffer would be inconsistent.
  2320. size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
  2321. if (this->size(new_tail) > this->capacity()) {
  2322. this->grow_my_array(this->size(new_tail));
  2323. }
  2324. this->my_tail = new_tail;
  2325. const internal::op_stat res = this->place_item(tag, *(op->elem)) ? internal::SUCCEEDED : internal::FAILED;
  2326. __TBB_store_with_release(op->status, res);
  2327. return res ==internal::SUCCEEDED;
  2328. }
  2329. }; // sequencer_node
  2330. //! Forwards messages in priority order
  2331. template<typename T, typename Compare = std::less<T>, typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(T)>
  2332. class priority_queue_node : public buffer_node<T, Allocator> {
  2333. public:
  2334. #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  2335. __TBB_STATIC_ASSERT(
  2336. (tbb::internal::is_same_type<Allocator, null_type>::value),
  2337. "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
  2338. "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
  2339. );
  2340. #endif
  2341. typedef T input_type;
  2342. typedef T output_type;
  2343. typedef buffer_node<T,Allocator> base_type;
  2344. typedef priority_queue_node class_type;
  2345. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  2346. typedef typename sender<output_type>::successor_type successor_type;
  2347. //! Constructor
  2348. __TBB_NOINLINE_SYM explicit priority_queue_node( graph &g, const Compare& comp = Compare() )
  2349. : buffer_node<T, Allocator>(g), compare(comp), mark(0) {
  2350. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
  2351. static_cast<receiver<input_type> *>(this),
  2352. static_cast<sender<output_type> *>(this) );
  2353. }
  2354. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2355. template <typename... Args>
  2356. priority_queue_node(const node_set<Args...>& nodes, const Compare& comp = Compare())
  2357. : priority_queue_node(nodes.graph_reference(), comp) {
  2358. make_edges_in_order(nodes, *this);
  2359. }
  2360. #endif
  2361. //! Copy constructor
  2362. __TBB_NOINLINE_SYM priority_queue_node( const priority_queue_node &src )
  2363. : buffer_node<T, Allocator>(src), mark(0)
  2364. {
  2365. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_PRIORITY_QUEUE_NODE, &(this->my_graph),
  2366. static_cast<receiver<input_type> *>(this),
  2367. static_cast<sender<output_type> *>(this) );
  2368. }
  2369. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2370. void set_name( const char *name ) __TBB_override {
  2371. tbb::internal::fgt_node_desc( this, name );
  2372. }
  2373. #endif
  2374. protected:
  2375. void reset_node( reset_flags f) __TBB_override {
  2376. mark = 0;
  2377. base_type::reset_node(f);
  2378. }
  2379. typedef typename buffer_node<T, Allocator>::size_type size_type;
  2380. typedef typename buffer_node<T, Allocator>::item_type item_type;
  2381. typedef typename buffer_node<T, Allocator>::buffer_operation prio_operation;
  2382. //! Tries to forward valid items to successors
  2383. void internal_forward_task(prio_operation *op) __TBB_override {
  2384. this->internal_forward_task_impl(op, this);
  2385. }
  2386. void handle_operations(prio_operation *op_list) __TBB_override {
  2387. this->handle_operations_impl(op_list, this);
  2388. }
  2389. bool internal_push(prio_operation *op) __TBB_override {
  2390. prio_push(*(op->elem));
  2391. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2392. return true;
  2393. }
  2394. void internal_pop(prio_operation *op) __TBB_override {
  2395. // if empty or already reserved, don't pop
  2396. if ( this->my_reserved == true || this->my_tail == 0 ) {
  2397. __TBB_store_with_release(op->status, internal::FAILED);
  2398. return;
  2399. }
  2400. *(op->elem) = prio();
  2401. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2402. prio_pop();
  2403. }
  2404. // pops the highest-priority item, saves copy
  2405. void internal_reserve(prio_operation *op) __TBB_override {
  2406. if (this->my_reserved == true || this->my_tail == 0) {
  2407. __TBB_store_with_release(op->status, internal::FAILED);
  2408. return;
  2409. }
  2410. this->my_reserved = true;
  2411. *(op->elem) = prio();
  2412. reserved_item = *(op->elem);
  2413. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2414. prio_pop();
  2415. }
  2416. void internal_consume(prio_operation *op) __TBB_override {
  2417. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2418. this->my_reserved = false;
  2419. reserved_item = input_type();
  2420. }
  2421. void internal_release(prio_operation *op) __TBB_override {
  2422. __TBB_store_with_release(op->status, internal::SUCCEEDED);
  2423. prio_push(reserved_item);
  2424. this->my_reserved = false;
  2425. reserved_item = input_type();
  2426. }
  2427. private:
  2428. template<typename, typename> friend class buffer_node;
  2429. void order() {
  2430. if (mark < this->my_tail) heapify();
  2431. __TBB_ASSERT(mark == this->my_tail, "mark unequal after heapify");
  2432. }
  2433. bool is_item_valid() {
  2434. return this->my_tail > 0;
  2435. }
  2436. void try_put_and_add_task(task*& last_task) {
  2437. task * new_task = this->my_successors.try_put_task(this->prio());
  2438. if (new_task) {
  2439. // workaround for icc bug
  2440. graph& graph_ref = this->graph_reference();
  2441. last_task = combine_tasks(graph_ref, last_task, new_task);
  2442. prio_pop();
  2443. }
  2444. }
  2445. private:
  2446. Compare compare;
  2447. size_type mark;
  2448. input_type reserved_item;
  2449. // in case a reheap has not been done after a push, check if the mark item is higher than the 0'th item
  2450. bool prio_use_tail() {
  2451. __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds before test");
  2452. return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
  2453. }
  2454. // prio_push: checks that the item will fit, expand array if necessary, put at end
  2455. void prio_push(const T &src) {
  2456. if ( this->my_tail >= this->my_array_size )
  2457. this->grow_my_array( this->my_tail + 1 );
  2458. (void) this->place_item(this->my_tail, src);
  2459. ++(this->my_tail);
  2460. __TBB_ASSERT(mark < this->my_tail, "mark outside bounds after push");
  2461. }
  2462. // prio_pop: deletes highest priority item from the array, and if it is item
  2463. // 0, move last item to 0 and reheap. If end of array, just destroy and decrement tail
  2464. // and mark. Assumes the array has already been tested for emptiness; no failure.
  2465. void prio_pop() {
  2466. if (prio_use_tail()) {
  2467. // there are newly pushed elements; last one higher than top
  2468. // copy the data
  2469. this->destroy_item(this->my_tail-1);
  2470. --(this->my_tail);
  2471. __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
  2472. return;
  2473. }
  2474. this->destroy_item(0);
  2475. if(this->my_tail > 1) {
  2476. // push the last element down heap
  2477. __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
  2478. this->move_item(0,this->my_tail - 1);
  2479. }
  2480. --(this->my_tail);
  2481. if(mark > this->my_tail) --mark;
  2482. if (this->my_tail > 1) // don't reheap for heap of size 1
  2483. reheap();
  2484. __TBB_ASSERT(mark <= this->my_tail, "mark outside bounds after pop");
  2485. }
  2486. const T& prio() {
  2487. return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
  2488. }
  2489. // turn array into heap
  2490. void heapify() {
  2491. if(this->my_tail == 0) {
  2492. mark = 0;
  2493. return;
  2494. }
  2495. if (!mark) mark = 1;
  2496. for (; mark<this->my_tail; ++mark) { // for each unheaped element
  2497. size_type cur_pos = mark;
  2498. input_type to_place;
  2499. this->fetch_item(mark,to_place);
  2500. do { // push to_place up the heap
  2501. size_type parent = (cur_pos-1)>>1;
  2502. if (!compare(this->get_my_item(parent), to_place))
  2503. break;
  2504. this->move_item(cur_pos, parent);
  2505. cur_pos = parent;
  2506. } while( cur_pos );
  2507. (void) this->place_item(cur_pos, to_place);
  2508. }
  2509. }
  2510. // otherwise heapified array with new root element; rearrange to heap
  2511. void reheap() {
  2512. size_type cur_pos=0, child=1;
  2513. while (child < mark) {
  2514. size_type target = child;
  2515. if (child+1<mark &&
  2516. compare(this->get_my_item(child),
  2517. this->get_my_item(child+1)))
  2518. ++target;
  2519. // target now has the higher priority child
  2520. if (compare(this->get_my_item(target),
  2521. this->get_my_item(cur_pos)))
  2522. break;
  2523. // swap
  2524. this->swap_items(cur_pos, target);
  2525. cur_pos = target;
  2526. child = (cur_pos<<1)+1;
  2527. }
  2528. }
  2529. }; // priority_queue_node
  2530. } // interfaceX
  2531. namespace interface11 {
  2532. //! Forwards messages only if the threshold has not been reached
  2533. /** This node forwards items until its threshold is reached.
  2534. It contains no buffering. If the downstream node rejects, the
  2535. message is dropped. */
  2536. template< typename T, typename DecrementType=continue_msg >
  2537. class limiter_node : public graph_node, public receiver< T >, public sender< T > {
  2538. public:
  2539. typedef T input_type;
  2540. typedef T output_type;
  2541. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  2542. typedef typename sender<output_type>::successor_type successor_type;
  2543. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  2544. typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
  2545. typedef typename sender<output_type>::built_successors_type built_successors_type;
  2546. typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
  2547. typedef typename sender<output_type>::successor_list_type successor_list_type;
  2548. #endif
  2549. //TODO: There is a lack of predefined types for its controlling "decrementer" port. It should be fixed later.
  2550. private:
  2551. size_t my_threshold;
  2552. size_t my_count; //number of successful puts
  2553. size_t my_tries; //number of active put attempts
  2554. internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors;
  2555. spin_mutex my_mutex;
  2556. internal::broadcast_cache< T > my_successors;
  2557. __TBB_DEPRECATED_LIMITER_EXPR( int init_decrement_predecessors; )
  2558. friend class internal::forward_task_bypass< limiter_node<T,DecrementType> >;
  2559. // Let decrementer call decrement_counter()
  2560. friend class internal::decrementer< limiter_node<T,DecrementType>, DecrementType >;
  2561. bool check_conditions() { // always called under lock
  2562. return ( my_count + my_tries < my_threshold && !my_predecessors.empty() && !my_successors.empty() );
  2563. }
  2564. // only returns a valid task pointer or NULL, never SUCCESSFULLY_ENQUEUED
  2565. task *forward_task() {
  2566. input_type v;
  2567. task *rval = NULL;
  2568. bool reserved = false;
  2569. {
  2570. spin_mutex::scoped_lock lock(my_mutex);
  2571. if ( check_conditions() )
  2572. ++my_tries;
  2573. else
  2574. return NULL;
  2575. }
  2576. //SUCCESS
  2577. // if we can reserve and can put, we consume the reservation
  2578. // we increment the count and decrement the tries
  2579. if ( (my_predecessors.try_reserve(v)) == true ){
  2580. reserved=true;
  2581. if ( (rval = my_successors.try_put_task(v)) != NULL ){
  2582. {
  2583. spin_mutex::scoped_lock lock(my_mutex);
  2584. ++my_count;
  2585. --my_tries;
  2586. my_predecessors.try_consume();
  2587. if ( check_conditions() ) {
  2588. if ( internal::is_graph_active(this->my_graph) ) {
  2589. task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
  2590. internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
  2591. internal::spawn_in_graph_arena(graph_reference(), *rtask);
  2592. }
  2593. }
  2594. }
  2595. return rval;
  2596. }
  2597. }
  2598. //FAILURE
  2599. //if we can't reserve, we decrement the tries
  2600. //if we can reserve but can't put, we decrement the tries and release the reservation
  2601. {
  2602. spin_mutex::scoped_lock lock(my_mutex);
  2603. --my_tries;
  2604. if (reserved) my_predecessors.try_release();
  2605. if ( check_conditions() ) {
  2606. if ( internal::is_graph_active(this->my_graph) ) {
  2607. task *rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
  2608. internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
  2609. __TBB_ASSERT(!rval, "Have two tasks to handle");
  2610. return rtask;
  2611. }
  2612. }
  2613. return rval;
  2614. }
  2615. }
  2616. void forward() {
  2617. __TBB_ASSERT(false, "Should never be called");
  2618. return;
  2619. }
  2620. task* decrement_counter( long long delta ) {
  2621. {
  2622. spin_mutex::scoped_lock lock(my_mutex);
  2623. if( delta > 0 && size_t(delta) > my_count )
  2624. my_count = 0;
  2625. else if( delta < 0 && size_t(delta) > my_threshold - my_count )
  2626. my_count = my_threshold;
  2627. else
  2628. my_count -= size_t(delta); // absolute value of delta is sufficiently small
  2629. }
  2630. return forward_task();
  2631. }
  2632. void initialize() {
  2633. my_predecessors.set_owner(this);
  2634. my_successors.set_owner(this);
  2635. decrement.set_owner(this);
  2636. tbb::internal::fgt_node(
  2637. CODEPTR(), tbb::internal::FLOW_LIMITER_NODE, &this->my_graph,
  2638. static_cast<receiver<input_type> *>(this), static_cast<receiver<DecrementType> *>(&decrement),
  2639. static_cast<sender<output_type> *>(this)
  2640. );
  2641. }
  2642. public:
  2643. //! The internal receiver< DecrementType > that decrements the count
  2644. internal::decrementer< limiter_node<T, DecrementType>, DecrementType > decrement;
  2645. #if TBB_DEPRECATED_LIMITER_NODE_CONSTRUCTOR
  2646. __TBB_STATIC_ASSERT( (tbb::internal::is_same_type<DecrementType, continue_msg>::value),
  2647. "Deprecated interface of the limiter node can be used only in conjunction "
  2648. "with continue_msg as the type of DecrementType template parameter." );
  2649. #endif // Check for incompatible interface
  2650. //! Constructor
  2651. limiter_node(graph &g,
  2652. __TBB_DEPRECATED_LIMITER_ARG2(size_t threshold, int num_decrement_predecessors=0))
  2653. : graph_node(g), my_threshold(threshold), my_count(0),
  2654. __TBB_DEPRECATED_LIMITER_ARG4(
  2655. my_tries(0), decrement(),
  2656. init_decrement_predecessors(num_decrement_predecessors),
  2657. decrement(num_decrement_predecessors)) {
  2658. initialize();
  2659. }
  2660. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2661. template <typename... Args>
  2662. limiter_node(const node_set<Args...>& nodes, size_t threshold)
  2663. : limiter_node(nodes.graph_reference(), threshold) {
  2664. make_edges_in_order(nodes, *this);
  2665. }
  2666. #endif
  2667. //! Copy constructor
  2668. limiter_node( const limiter_node& src ) :
  2669. graph_node(src.my_graph), receiver<T>(), sender<T>(),
  2670. my_threshold(src.my_threshold), my_count(0),
  2671. __TBB_DEPRECATED_LIMITER_ARG4(
  2672. my_tries(0), decrement(),
  2673. init_decrement_predecessors(src.init_decrement_predecessors),
  2674. decrement(src.init_decrement_predecessors)) {
  2675. initialize();
  2676. }
  2677. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2678. void set_name( const char *name ) __TBB_override {
  2679. tbb::internal::fgt_node_desc( this, name );
  2680. }
  2681. #endif
  2682. //! Replace the current successor with this new successor
  2683. bool register_successor( successor_type &r ) __TBB_override {
  2684. spin_mutex::scoped_lock lock(my_mutex);
  2685. bool was_empty = my_successors.empty();
  2686. my_successors.register_successor(r);
  2687. //spawn a forward task if this is the only successor
  2688. if ( was_empty && !my_predecessors.empty() && my_count + my_tries < my_threshold ) {
  2689. if ( internal::is_graph_active(this->my_graph) ) {
  2690. task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
  2691. internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
  2692. internal::spawn_in_graph_arena(graph_reference(), *task);
  2693. }
  2694. }
  2695. return true;
  2696. }
  2697. //! Removes a successor from this node
  2698. /** r.remove_predecessor(*this) is also called. */
  2699. bool remove_successor( successor_type &r ) __TBB_override {
  2700. r.remove_predecessor(*this);
  2701. my_successors.remove_successor(r);
  2702. return true;
  2703. }
  2704. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  2705. built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
  2706. built_predecessors_type &built_predecessors() __TBB_override { return my_predecessors.built_predecessors(); }
  2707. void internal_add_built_successor(successor_type &src) __TBB_override {
  2708. my_successors.internal_add_built_successor(src);
  2709. }
  2710. void internal_delete_built_successor(successor_type &src) __TBB_override {
  2711. my_successors.internal_delete_built_successor(src);
  2712. }
  2713. size_t successor_count() __TBB_override { return my_successors.successor_count(); }
  2714. void copy_successors(successor_list_type &v) __TBB_override {
  2715. my_successors.copy_successors(v);
  2716. }
  2717. void internal_add_built_predecessor(predecessor_type &src) __TBB_override {
  2718. my_predecessors.internal_add_built_predecessor(src);
  2719. }
  2720. void internal_delete_built_predecessor(predecessor_type &src) __TBB_override {
  2721. my_predecessors.internal_delete_built_predecessor(src);
  2722. }
  2723. size_t predecessor_count() __TBB_override { return my_predecessors.predecessor_count(); }
  2724. void copy_predecessors(predecessor_list_type &v) __TBB_override {
  2725. my_predecessors.copy_predecessors(v);
  2726. }
  2727. void extract() __TBB_override {
  2728. my_count = 0;
  2729. my_successors.built_successors().sender_extract(*this);
  2730. my_predecessors.built_predecessors().receiver_extract(*this);
  2731. decrement.built_predecessors().receiver_extract(decrement);
  2732. }
  2733. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  2734. //! Adds src to the list of cached predecessors.
  2735. bool register_predecessor( predecessor_type &src ) __TBB_override {
  2736. spin_mutex::scoped_lock lock(my_mutex);
  2737. my_predecessors.add( src );
  2738. if ( my_count + my_tries < my_threshold && !my_successors.empty() && internal::is_graph_active(this->my_graph) ) {
  2739. task* task = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
  2740. internal::forward_task_bypass < limiter_node<T, DecrementType> >( *this );
  2741. internal::spawn_in_graph_arena(graph_reference(), *task);
  2742. }
  2743. return true;
  2744. }
  2745. //! Removes src from the list of cached predecessors.
  2746. bool remove_predecessor( predecessor_type &src ) __TBB_override {
  2747. my_predecessors.remove( src );
  2748. return true;
  2749. }
  2750. protected:
  2751. template< typename R, typename B > friend class run_and_put_task;
  2752. template<typename X, typename Y> friend class internal::broadcast_cache;
  2753. template<typename X, typename Y> friend class internal::round_robin_cache;
  2754. //! Puts an item to this receiver
  2755. task *try_put_task( const T &t ) __TBB_override {
  2756. {
  2757. spin_mutex::scoped_lock lock(my_mutex);
  2758. if ( my_count + my_tries >= my_threshold )
  2759. return NULL;
  2760. else
  2761. ++my_tries;
  2762. }
  2763. task * rtask = my_successors.try_put_task(t);
  2764. if ( !rtask ) { // try_put_task failed.
  2765. spin_mutex::scoped_lock lock(my_mutex);
  2766. --my_tries;
  2767. if (check_conditions() && internal::is_graph_active(this->my_graph)) {
  2768. rtask = new ( task::allocate_additional_child_of( *(this->my_graph.root_task()) ) )
  2769. internal::forward_task_bypass< limiter_node<T, DecrementType> >( *this );
  2770. }
  2771. }
  2772. else {
  2773. spin_mutex::scoped_lock lock(my_mutex);
  2774. ++my_count;
  2775. --my_tries;
  2776. }
  2777. return rtask;
  2778. }
  2779. graph& graph_reference() const __TBB_override { return my_graph; }
  2780. void reset_receiver(reset_flags /*f*/) __TBB_override {
  2781. __TBB_ASSERT(false,NULL); // should never be called
  2782. }
  2783. void reset_node( reset_flags f) __TBB_override {
  2784. my_count = 0;
  2785. if(f & rf_clear_edges) {
  2786. my_predecessors.clear();
  2787. my_successors.clear();
  2788. }
  2789. else
  2790. {
  2791. my_predecessors.reset( );
  2792. }
  2793. decrement.reset_receiver(f);
  2794. }
  2795. }; // limiter_node
  2796. #include "internal/_flow_graph_join_impl.h"
  2797. using internal::reserving_port;
  2798. using internal::queueing_port;
  2799. using internal::key_matching_port;
  2800. using internal::input_port;
  2801. using internal::tag_value;
  2802. template<typename OutputTuple, typename JP=queueing> class join_node;
  2803. template<typename OutputTuple>
  2804. class join_node<OutputTuple,reserving>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, reserving_port, OutputTuple, reserving> {
  2805. private:
  2806. static const int N = tbb::flow::tuple_size<OutputTuple>::value;
  2807. typedef typename internal::unfolded_join_node<N, reserving_port, OutputTuple, reserving> unfolded_type;
  2808. public:
  2809. typedef OutputTuple output_type;
  2810. typedef typename unfolded_type::input_ports_type input_ports_type;
  2811. __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
  2812. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
  2813. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2814. }
  2815. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2816. template <typename... Args>
  2817. __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, reserving = reserving()) : join_node(nodes.graph_reference()) {
  2818. make_edges_in_order(nodes, *this);
  2819. }
  2820. #endif
  2821. __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
  2822. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->my_graph,
  2823. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2824. }
  2825. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2826. void set_name( const char *name ) __TBB_override {
  2827. tbb::internal::fgt_node_desc( this, name );
  2828. }
  2829. #endif
  2830. };
  2831. template<typename OutputTuple>
  2832. class join_node<OutputTuple,queueing>: public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value, queueing_port, OutputTuple, queueing> {
  2833. private:
  2834. static const int N = tbb::flow::tuple_size<OutputTuple>::value;
  2835. typedef typename internal::unfolded_join_node<N, queueing_port, OutputTuple, queueing> unfolded_type;
  2836. public:
  2837. typedef OutputTuple output_type;
  2838. typedef typename unfolded_type::input_ports_type input_ports_type;
  2839. __TBB_NOINLINE_SYM explicit join_node(graph &g) : unfolded_type(g) {
  2840. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
  2841. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2842. }
  2843. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2844. template <typename... Args>
  2845. __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, queueing = queueing()) : join_node(nodes.graph_reference()) {
  2846. make_edges_in_order(nodes, *this);
  2847. }
  2848. #endif
  2849. __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
  2850. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->my_graph,
  2851. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2852. }
  2853. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2854. void set_name( const char *name ) __TBB_override {
  2855. tbb::internal::fgt_node_desc( this, name );
  2856. }
  2857. #endif
  2858. };
  2859. // template for key_matching join_node
  2860. // tag_matching join_node is a specialization of key_matching, and is source-compatible.
  2861. template<typename OutputTuple, typename K, typename KHash>
  2862. class join_node<OutputTuple, key_matching<K, KHash> > : public internal::unfolded_join_node<tbb::flow::tuple_size<OutputTuple>::value,
  2863. key_matching_port, OutputTuple, key_matching<K,KHash> > {
  2864. private:
  2865. static const int N = tbb::flow::tuple_size<OutputTuple>::value;
  2866. typedef typename internal::unfolded_join_node<N, key_matching_port, OutputTuple, key_matching<K,KHash> > unfolded_type;
  2867. public:
  2868. typedef OutputTuple output_type;
  2869. typedef typename unfolded_type::input_ports_type input_ports_type;
  2870. #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING
  2871. join_node(graph &g) : unfolded_type(g) {}
  2872. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2873. template <typename... Args>
  2874. join_node(const node_set<Args...>& nodes, key_matching<K, KHash> = key_matching<K, KHash>())
  2875. : join_node(nodes.graph_reference()) {
  2876. make_edges_in_order(nodes, *this);
  2877. }
  2878. #endif
  2879. #endif /* __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING */
  2880. template<typename __TBB_B0, typename __TBB_B1>
  2881. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1) : unfolded_type(g, b0, b1) {
  2882. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2883. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2884. }
  2885. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2>
  2886. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
  2887. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2888. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2889. }
  2890. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3>
  2891. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
  2892. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2893. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2894. }
  2895. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4>
  2896. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
  2897. unfolded_type(g, b0, b1, b2, b3, b4) {
  2898. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2899. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2900. }
  2901. #if __TBB_VARIADIC_MAX >= 6
  2902. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
  2903. typename __TBB_B5>
  2904. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
  2905. unfolded_type(g, b0, b1, b2, b3, b4, b5) {
  2906. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2907. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2908. }
  2909. #endif
  2910. #if __TBB_VARIADIC_MAX >= 7
  2911. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
  2912. typename __TBB_B5, typename __TBB_B6>
  2913. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
  2914. unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
  2915. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2916. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2917. }
  2918. #endif
  2919. #if __TBB_VARIADIC_MAX >= 8
  2920. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
  2921. typename __TBB_B5, typename __TBB_B6, typename __TBB_B7>
  2922. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
  2923. __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
  2924. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2925. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2926. }
  2927. #endif
  2928. #if __TBB_VARIADIC_MAX >= 9
  2929. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
  2930. typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8>
  2931. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
  2932. __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
  2933. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2934. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2935. }
  2936. #endif
  2937. #if __TBB_VARIADIC_MAX >= 10
  2938. template<typename __TBB_B0, typename __TBB_B1, typename __TBB_B2, typename __TBB_B3, typename __TBB_B4,
  2939. typename __TBB_B5, typename __TBB_B6, typename __TBB_B7, typename __TBB_B8, typename __TBB_B9>
  2940. __TBB_NOINLINE_SYM join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
  2941. __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
  2942. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2943. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2944. }
  2945. #endif
  2946. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2947. template <typename... Args, typename... Bodies>
  2948. __TBB_NOINLINE_SYM join_node(const node_set<Args...>& nodes, Bodies... bodies)
  2949. : join_node(nodes.graph_reference(), bodies...) {
  2950. make_edges_in_order(nodes, *this);
  2951. }
  2952. #endif
  2953. __TBB_NOINLINE_SYM join_node(const join_node &other) : unfolded_type(other) {
  2954. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->my_graph,
  2955. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2956. }
  2957. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2958. void set_name( const char *name ) __TBB_override {
  2959. tbb::internal::fgt_node_desc( this, name );
  2960. }
  2961. #endif
  2962. };
  2963. // indexer node
  2964. #include "internal/_flow_graph_indexer_impl.h"
  2965. // TODO: Implement interface with variadic template or tuple
  2966. template<typename T0, typename T1=null_type, typename T2=null_type, typename T3=null_type,
  2967. typename T4=null_type, typename T5=null_type, typename T6=null_type,
  2968. typename T7=null_type, typename T8=null_type, typename T9=null_type> class indexer_node;
  2969. //indexer node specializations
  2970. template<typename T0>
  2971. class indexer_node<T0> : public internal::unfolded_indexer_node<tuple<T0> > {
  2972. private:
  2973. static const int N = 1;
  2974. public:
  2975. typedef tuple<T0> InputTuple;
  2976. typedef typename internal::tagged_msg<size_t, T0> output_type;
  2977. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  2978. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  2979. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  2980. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2981. }
  2982. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  2983. template <typename... Args>
  2984. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  2985. make_edges_in_order(nodes, *this);
  2986. }
  2987. #endif
  2988. // Copy constructor
  2989. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  2990. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  2991. this->input_ports(), static_cast< sender< output_type > *>(this) );
  2992. }
  2993. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  2994. void set_name( const char *name ) __TBB_override {
  2995. tbb::internal::fgt_node_desc( this, name );
  2996. }
  2997. #endif
  2998. };
  2999. template<typename T0, typename T1>
  3000. class indexer_node<T0, T1> : public internal::unfolded_indexer_node<tuple<T0, T1> > {
  3001. private:
  3002. static const int N = 2;
  3003. public:
  3004. typedef tuple<T0, T1> InputTuple;
  3005. typedef typename internal::tagged_msg<size_t, T0, T1> output_type;
  3006. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3007. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3008. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3009. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3010. }
  3011. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3012. template <typename... Args>
  3013. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3014. make_edges_in_order(nodes, *this);
  3015. }
  3016. #endif
  3017. // Copy constructor
  3018. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3019. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3020. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3021. }
  3022. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3023. void set_name( const char *name ) __TBB_override {
  3024. tbb::internal::fgt_node_desc( this, name );
  3025. }
  3026. #endif
  3027. };
  3028. template<typename T0, typename T1, typename T2>
  3029. class indexer_node<T0, T1, T2> : public internal::unfolded_indexer_node<tuple<T0, T1, T2> > {
  3030. private:
  3031. static const int N = 3;
  3032. public:
  3033. typedef tuple<T0, T1, T2> InputTuple;
  3034. typedef typename internal::tagged_msg<size_t, T0, T1, T2> output_type;
  3035. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3036. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3037. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3038. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3039. }
  3040. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3041. template <typename... Args>
  3042. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3043. make_edges_in_order(nodes, *this);
  3044. }
  3045. #endif
  3046. // Copy constructor
  3047. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3048. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3049. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3050. }
  3051. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3052. void set_name( const char *name ) __TBB_override {
  3053. tbb::internal::fgt_node_desc( this, name );
  3054. }
  3055. #endif
  3056. };
  3057. template<typename T0, typename T1, typename T2, typename T3>
  3058. class indexer_node<T0, T1, T2, T3> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3> > {
  3059. private:
  3060. static const int N = 4;
  3061. public:
  3062. typedef tuple<T0, T1, T2, T3> InputTuple;
  3063. typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3> output_type;
  3064. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3065. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3066. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3067. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3068. }
  3069. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3070. template <typename... Args>
  3071. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3072. make_edges_in_order(nodes, *this);
  3073. }
  3074. #endif
  3075. // Copy constructor
  3076. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3077. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3078. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3079. }
  3080. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3081. void set_name( const char *name ) __TBB_override {
  3082. tbb::internal::fgt_node_desc( this, name );
  3083. }
  3084. #endif
  3085. };
  3086. template<typename T0, typename T1, typename T2, typename T3, typename T4>
  3087. class indexer_node<T0, T1, T2, T3, T4> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4> > {
  3088. private:
  3089. static const int N = 5;
  3090. public:
  3091. typedef tuple<T0, T1, T2, T3, T4> InputTuple;
  3092. typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4> output_type;
  3093. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3094. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3095. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3096. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3097. }
  3098. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3099. template <typename... Args>
  3100. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3101. make_edges_in_order(nodes, *this);
  3102. }
  3103. #endif
  3104. // Copy constructor
  3105. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3106. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3107. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3108. }
  3109. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3110. void set_name( const char *name ) __TBB_override {
  3111. tbb::internal::fgt_node_desc( this, name );
  3112. }
  3113. #endif
  3114. };
  3115. #if __TBB_VARIADIC_MAX >= 6
  3116. template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5>
  3117. class indexer_node<T0, T1, T2, T3, T4, T5> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
  3118. private:
  3119. static const int N = 6;
  3120. public:
  3121. typedef tuple<T0, T1, T2, T3, T4, T5> InputTuple;
  3122. typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5> output_type;
  3123. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3124. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3125. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3126. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3127. }
  3128. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3129. template <typename... Args>
  3130. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3131. make_edges_in_order(nodes, *this);
  3132. }
  3133. #endif
  3134. // Copy constructor
  3135. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3136. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3137. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3138. }
  3139. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3140. void set_name( const char *name ) __TBB_override {
  3141. tbb::internal::fgt_node_desc( this, name );
  3142. }
  3143. #endif
  3144. };
  3145. #endif //variadic max 6
  3146. #if __TBB_VARIADIC_MAX >= 7
  3147. template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
  3148. typename T6>
  3149. class indexer_node<T0, T1, T2, T3, T4, T5, T6> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
  3150. private:
  3151. static const int N = 7;
  3152. public:
  3153. typedef tuple<T0, T1, T2, T3, T4, T5, T6> InputTuple;
  3154. typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6> output_type;
  3155. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3156. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3157. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3158. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3159. }
  3160. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3161. template <typename... Args>
  3162. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3163. make_edges_in_order(nodes, *this);
  3164. }
  3165. #endif
  3166. // Copy constructor
  3167. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3168. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3169. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3170. }
  3171. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3172. void set_name( const char *name ) __TBB_override {
  3173. tbb::internal::fgt_node_desc( this, name );
  3174. }
  3175. #endif
  3176. };
  3177. #endif //variadic max 7
  3178. #if __TBB_VARIADIC_MAX >= 8
  3179. template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
  3180. typename T6, typename T7>
  3181. class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
  3182. private:
  3183. static const int N = 8;
  3184. public:
  3185. typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7> InputTuple;
  3186. typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7> output_type;
  3187. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3188. indexer_node(graph& g) : unfolded_type(g) {
  3189. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3190. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3191. }
  3192. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3193. template <typename... Args>
  3194. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3195. make_edges_in_order(nodes, *this);
  3196. }
  3197. #endif
  3198. // Copy constructor
  3199. indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3200. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3201. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3202. }
  3203. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3204. void set_name( const char *name ) __TBB_override {
  3205. tbb::internal::fgt_node_desc( this, name );
  3206. }
  3207. #endif
  3208. };
  3209. #endif //variadic max 8
  3210. #if __TBB_VARIADIC_MAX >= 9
  3211. template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
  3212. typename T6, typename T7, typename T8>
  3213. class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
  3214. private:
  3215. static const int N = 9;
  3216. public:
  3217. typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> InputTuple;
  3218. typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8> output_type;
  3219. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3220. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3221. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3222. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3223. }
  3224. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3225. template <typename... Args>
  3226. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3227. make_edges_in_order(nodes, *this);
  3228. }
  3229. #endif
  3230. // Copy constructor
  3231. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3232. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3233. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3234. }
  3235. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3236. void set_name( const char *name ) __TBB_override {
  3237. tbb::internal::fgt_node_desc( this, name );
  3238. }
  3239. #endif
  3240. };
  3241. #endif //variadic max 9
  3242. #if __TBB_VARIADIC_MAX >= 10
  3243. template<typename T0, typename T1, typename T2, typename T3, typename T4, typename T5,
  3244. typename T6, typename T7, typename T8, typename T9>
  3245. class indexer_node/*default*/ : public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> > {
  3246. private:
  3247. static const int N = 10;
  3248. public:
  3249. typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> InputTuple;
  3250. typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
  3251. typedef typename internal::unfolded_indexer_node<InputTuple> unfolded_type;
  3252. __TBB_NOINLINE_SYM indexer_node(graph& g) : unfolded_type(g) {
  3253. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3254. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3255. }
  3256. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3257. template <typename... Args>
  3258. indexer_node(const node_set<Args...>& nodes) : indexer_node(nodes.graph_reference()) {
  3259. make_edges_in_order(nodes, *this);
  3260. }
  3261. #endif
  3262. // Copy constructor
  3263. __TBB_NOINLINE_SYM indexer_node( const indexer_node& other ) : unfolded_type(other) {
  3264. tbb::internal::fgt_multiinput_node<N>( CODEPTR(), tbb::internal::FLOW_INDEXER_NODE, &this->my_graph,
  3265. this->input_ports(), static_cast< sender< output_type > *>(this) );
  3266. }
  3267. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3268. void set_name( const char *name ) __TBB_override {
  3269. tbb::internal::fgt_node_desc( this, name );
  3270. }
  3271. #endif
  3272. };
  3273. #endif //variadic max 10
  3274. #if __TBB_PREVIEW_ASYNC_MSG
  3275. inline void internal_make_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
  3276. #else
  3277. template< typename T >
  3278. inline void internal_make_edge( sender<T> &p, receiver<T> &s ) {
  3279. #endif
  3280. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3281. s.internal_add_built_predecessor(p);
  3282. p.internal_add_built_successor(s);
  3283. #endif
  3284. p.register_successor( s );
  3285. tbb::internal::fgt_make_edge( &p, &s );
  3286. }
  3287. //! Makes an edge between a single predecessor and a single successor
  3288. template< typename T >
  3289. inline void make_edge( sender<T> &p, receiver<T> &s ) {
  3290. internal_make_edge( p, s );
  3291. }
  3292. #if __TBB_PREVIEW_ASYNC_MSG
  3293. template< typename TS, typename TR,
  3294. typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
  3295. || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
  3296. inline void make_edge( TS &p, TR &s ) {
  3297. internal_make_edge( p, s );
  3298. }
  3299. template< typename T >
  3300. inline void make_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
  3301. internal_make_edge( p, s );
  3302. }
  3303. template< typename T >
  3304. inline void make_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
  3305. internal_make_edge( p, s );
  3306. }
  3307. #endif // __TBB_PREVIEW_ASYNC_MSG
  3308. #if __TBB_FLOW_GRAPH_CPP11_FEATURES
  3309. //Makes an edge from port 0 of a multi-output predecessor to port 0 of a multi-input successor.
  3310. template< typename T, typename V,
  3311. typename = typename T::output_ports_type, typename = typename V::input_ports_type >
  3312. inline void make_edge( T& output, V& input) {
  3313. make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
  3314. }
  3315. //Makes an edge from port 0 of a multi-output predecessor to a receiver.
  3316. template< typename T, typename R,
  3317. typename = typename T::output_ports_type >
  3318. inline void make_edge( T& output, receiver<R>& input) {
  3319. make_edge(get<0>(output.output_ports()), input);
  3320. }
  3321. //Makes an edge from a sender to port 0 of a multi-input successor.
  3322. template< typename S, typename V,
  3323. typename = typename V::input_ports_type >
  3324. inline void make_edge( sender<S>& output, V& input) {
  3325. make_edge(output, get<0>(input.input_ports()));
  3326. }
  3327. #endif
  3328. #if __TBB_PREVIEW_ASYNC_MSG
  3329. inline void internal_remove_edge( internal::untyped_sender &p, internal::untyped_receiver &s ) {
  3330. #else
  3331. template< typename T >
  3332. inline void internal_remove_edge( sender<T> &p, receiver<T> &s ) {
  3333. #endif
  3334. p.remove_successor( s );
  3335. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3336. // TODO: should we try to remove p from the predecessor list of s, in case the edge is reversed?
  3337. p.internal_delete_built_successor(s);
  3338. s.internal_delete_built_predecessor(p);
  3339. #endif
  3340. tbb::internal::fgt_remove_edge( &p, &s );
  3341. }
  3342. //! Removes an edge between a single predecessor and a single successor
  3343. template< typename T >
  3344. inline void remove_edge( sender<T> &p, receiver<T> &s ) {
  3345. internal_remove_edge( p, s );
  3346. }
  3347. #if __TBB_PREVIEW_ASYNC_MSG
  3348. template< typename TS, typename TR,
  3349. typename = typename tbb::internal::enable_if<tbb::internal::is_same_type<TS, internal::untyped_sender>::value
  3350. || tbb::internal::is_same_type<TR, internal::untyped_receiver>::value>::type>
  3351. inline void remove_edge( TS &p, TR &s ) {
  3352. internal_remove_edge( p, s );
  3353. }
  3354. template< typename T >
  3355. inline void remove_edge( sender<T> &p, receiver<typename T::async_msg_data_type> &s ) {
  3356. internal_remove_edge( p, s );
  3357. }
  3358. template< typename T >
  3359. inline void remove_edge( sender<typename T::async_msg_data_type> &p, receiver<T> &s ) {
  3360. internal_remove_edge( p, s );
  3361. }
  3362. #endif // __TBB_PREVIEW_ASYNC_MSG
  3363. #if __TBB_FLOW_GRAPH_CPP11_FEATURES
  3364. //Removes an edge between port 0 of a multi-output predecessor and port 0 of a multi-input successor.
  3365. template< typename T, typename V,
  3366. typename = typename T::output_ports_type, typename = typename V::input_ports_type >
  3367. inline void remove_edge( T& output, V& input) {
  3368. remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
  3369. }
  3370. //Removes an edge between port 0 of a multi-output predecessor and a receiver.
  3371. template< typename T, typename R,
  3372. typename = typename T::output_ports_type >
  3373. inline void remove_edge( T& output, receiver<R>& input) {
  3374. remove_edge(get<0>(output.output_ports()), input);
  3375. }
  3376. //Removes an edge between a sender and port 0 of a multi-input successor.
  3377. template< typename S, typename V,
  3378. typename = typename V::input_ports_type >
  3379. inline void remove_edge( sender<S>& output, V& input) {
  3380. remove_edge(output, get<0>(input.input_ports()));
  3381. }
  3382. #endif
  3383. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3384. template<typename C >
  3385. template< typename S >
  3386. void internal::edge_container<C>::sender_extract( S &s ) {
  3387. edge_list_type e = built_edges;
  3388. for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
  3389. remove_edge(s, **i);
  3390. }
  3391. }
  3392. template<typename C >
  3393. template< typename R >
  3394. void internal::edge_container<C>::receiver_extract( R &r ) {
  3395. edge_list_type e = built_edges;
  3396. for ( typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
  3397. remove_edge(**i, r);
  3398. }
  3399. }
  3400. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  3401. //! Returns a copy of the body from a function or continue node
  3402. template< typename Body, typename Node >
  3403. Body copy_body( Node &n ) {
  3404. return n.template copy_function_object<Body>();
  3405. }
  3406. #if __TBB_FLOW_GRAPH_CPP11_FEATURES
  3407. //composite_node
  3408. template< typename InputTuple, typename OutputTuple > class composite_node;
  3409. template< typename... InputTypes, typename... OutputTypes>
  3410. class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<OutputTypes...> > : public graph_node{
  3411. public:
  3412. typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
  3413. typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
  3414. private:
  3415. std::unique_ptr<input_ports_type> my_input_ports;
  3416. std::unique_ptr<output_ports_type> my_output_ports;
  3417. static const size_t NUM_INPUTS = sizeof...(InputTypes);
  3418. static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
  3419. protected:
  3420. void reset_node(reset_flags) __TBB_override {}
  3421. public:
  3422. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3423. composite_node( graph &g, const char *type_name = "composite_node" ) : graph_node(g) {
  3424. tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
  3425. tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
  3426. }
  3427. #else
  3428. composite_node( graph &g ) : graph_node(g) {
  3429. tbb::internal::fgt_multiinput_multioutput_node( CODEPTR(), tbb::internal::FLOW_COMPOSITE_NODE, this, &this->my_graph );
  3430. }
  3431. #endif
  3432. template<typename T1, typename T2>
  3433. void set_external_ports(T1&& input_ports_tuple, T2&& output_ports_tuple) {
  3434. __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
  3435. __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
  3436. my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
  3437. my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
  3438. tbb::internal::fgt_internal_input_alias_helper<T1, NUM_INPUTS>::alias_port( this, input_ports_tuple);
  3439. tbb::internal::fgt_internal_output_alias_helper<T2, NUM_OUTPUTS>::alias_port( this, output_ports_tuple);
  3440. }
  3441. template< typename... NodeTypes >
  3442. void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
  3443. template< typename... NodeTypes >
  3444. void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
  3445. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3446. void set_name( const char *name ) __TBB_override {
  3447. tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
  3448. }
  3449. #endif
  3450. input_ports_type& input_ports() {
  3451. __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
  3452. return *my_input_ports;
  3453. }
  3454. output_ports_type& output_ports() {
  3455. __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
  3456. return *my_output_ports;
  3457. }
  3458. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3459. void extract() __TBB_override {
  3460. __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
  3461. }
  3462. #endif
  3463. }; // class composite_node
  3464. //composite_node with only input ports
  3465. template< typename... InputTypes>
  3466. class composite_node <tbb::flow::tuple<InputTypes...>, tbb::flow::tuple<> > : public graph_node {
  3467. public:
  3468. typedef tbb::flow::tuple< receiver<InputTypes>&... > input_ports_type;
  3469. private:
  3470. std::unique_ptr<input_ports_type> my_input_ports;
  3471. static const size_t NUM_INPUTS = sizeof...(InputTypes);
  3472. protected:
  3473. void reset_node(reset_flags) __TBB_override {}
  3474. public:
  3475. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3476. composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
  3477. tbb::internal::fgt_composite( CODEPTR(), this, &g );
  3478. tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
  3479. }
  3480. #else
  3481. composite_node( graph &g ) : graph_node(g) {
  3482. tbb::internal::fgt_composite( CODEPTR(), this, &g );
  3483. }
  3484. #endif
  3485. template<typename T>
  3486. void set_external_ports(T&& input_ports_tuple) {
  3487. __TBB_STATIC_ASSERT(NUM_INPUTS == tbb::flow::tuple_size<input_ports_type>::value, "number of arguments does not match number of input ports");
  3488. my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
  3489. tbb::internal::fgt_internal_input_alias_helper<T, NUM_INPUTS>::alias_port( this, std::forward<T>(input_ports_tuple));
  3490. }
  3491. template< typename... NodeTypes >
  3492. void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
  3493. template< typename... NodeTypes >
  3494. void add_nodes( const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
  3495. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3496. void set_name( const char *name ) __TBB_override {
  3497. tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
  3498. }
  3499. #endif
  3500. input_ports_type& input_ports() {
  3501. __TBB_ASSERT(my_input_ports, "input ports not set, call set_external_ports to set input ports");
  3502. return *my_input_ports;
  3503. }
  3504. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3505. void extract() __TBB_override {
  3506. __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
  3507. }
  3508. #endif
  3509. }; // class composite_node
  3510. //composite_nodes with only output_ports
  3511. template<typename... OutputTypes>
  3512. class composite_node <tbb::flow::tuple<>, tbb::flow::tuple<OutputTypes...> > : public graph_node {
  3513. public:
  3514. typedef tbb::flow::tuple< sender<OutputTypes>&... > output_ports_type;
  3515. private:
  3516. std::unique_ptr<output_ports_type> my_output_ports;
  3517. static const size_t NUM_OUTPUTS = sizeof...(OutputTypes);
  3518. protected:
  3519. void reset_node(reset_flags) __TBB_override {}
  3520. public:
  3521. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3522. __TBB_NOINLINE_SYM composite_node( graph &g, const char *type_name = "composite_node") : graph_node(g) {
  3523. tbb::internal::fgt_composite( CODEPTR(), this, &g );
  3524. tbb::internal::fgt_multiinput_multioutput_node_desc( this, type_name );
  3525. }
  3526. #else
  3527. __TBB_NOINLINE_SYM composite_node( graph &g ) : graph_node(g) {
  3528. tbb::internal::fgt_composite( CODEPTR(), this, &g );
  3529. }
  3530. #endif
  3531. template<typename T>
  3532. void set_external_ports(T&& output_ports_tuple) {
  3533. __TBB_STATIC_ASSERT(NUM_OUTPUTS == tbb::flow::tuple_size<output_ports_type>::value, "number of arguments does not match number of output ports");
  3534. my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
  3535. tbb::internal::fgt_internal_output_alias_helper<T, NUM_OUTPUTS>::alias_port( this, std::forward<T>(output_ports_tuple));
  3536. }
  3537. template<typename... NodeTypes >
  3538. void add_visible_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, true, n...); }
  3539. template<typename... NodeTypes >
  3540. void add_nodes(const NodeTypes&... n) { internal::add_nodes_impl(this, false, n...); }
  3541. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3542. void set_name( const char *name ) __TBB_override {
  3543. tbb::internal::fgt_multiinput_multioutput_node_desc( this, name );
  3544. }
  3545. #endif
  3546. output_ports_type& output_ports() {
  3547. __TBB_ASSERT(my_output_ports, "output ports not set, call set_external_ports to set output ports");
  3548. return *my_output_ports;
  3549. }
  3550. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3551. void extract() __TBB_override {
  3552. __TBB_ASSERT(false, "Current composite_node implementation does not support extract");
  3553. }
  3554. #endif
  3555. }; // class composite_node
  3556. #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES
  3557. namespace internal {
  3558. template<typename Gateway>
  3559. class async_body_base: tbb::internal::no_assign {
  3560. public:
  3561. typedef Gateway gateway_type;
  3562. async_body_base(gateway_type *gateway): my_gateway(gateway) { }
  3563. void set_gateway(gateway_type *gateway) {
  3564. my_gateway = gateway;
  3565. }
  3566. protected:
  3567. gateway_type *my_gateway;
  3568. };
  3569. template<typename Input, typename Ports, typename Gateway, typename Body>
  3570. class async_body: public async_body_base<Gateway> {
  3571. public:
  3572. typedef async_body_base<Gateway> base_type;
  3573. typedef Gateway gateway_type;
  3574. async_body(const Body &body, gateway_type *gateway)
  3575. : base_type(gateway), my_body(body) { }
  3576. void operator()( const Input &v, Ports & ) {
  3577. my_body(v, *this->my_gateway);
  3578. }
  3579. Body get_body() { return my_body; }
  3580. private:
  3581. Body my_body;
  3582. };
  3583. } // namespace internal
  3584. } // namespace interfaceX
  3585. namespace interface11 {
  3586. //! Implements async node
  3587. template < typename Input, typename Output,
  3588. typename Policy = queueing_lightweight,
  3589. typename Allocator=__TBB_DEFAULT_NODE_ALLOCATOR(Input) >
  3590. class async_node
  3591. : public multifunction_node< Input, tuple< Output >, Policy, Allocator >, public sender< Output >
  3592. {
  3593. #if !TBB_DEPRECATED_FLOW_NODE_ALLOCATOR
  3594. __TBB_STATIC_ASSERT(
  3595. (tbb::internal::is_same_type<Allocator, null_type>::value),
  3596. "Allocator template parameter for flow graph nodes is deprecated and will removed in the future. "
  3597. "To temporary enable the deprecated interface specify TBB_ENABLE_DEPRECATED_NODE_ALLOCATOR."
  3598. );
  3599. #endif
  3600. typedef multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type;
  3601. typedef typename internal::multifunction_input<Input, typename base_type::output_ports_type, Policy, Allocator> mfn_input_type;
  3602. public:
  3603. typedef Input input_type;
  3604. typedef Output output_type;
  3605. typedef receiver<input_type> receiver_type;
  3606. typedef typename receiver_type::predecessor_type predecessor_type;
  3607. typedef typename sender<output_type>::successor_type successor_type;
  3608. typedef receiver_gateway<output_type> gateway_type;
  3609. typedef internal::async_body_base<gateway_type> async_body_base_type;
  3610. typedef typename base_type::output_ports_type output_ports_type;
  3611. private:
  3612. struct try_put_functor {
  3613. typedef internal::multifunction_output<Output> output_port_type;
  3614. output_port_type *port;
  3615. // TODO: pass value by copy since we do not want to block asynchronous thread.
  3616. const Output *value;
  3617. bool result;
  3618. try_put_functor(output_port_type &p, const Output &v) : port(&p), value(&v), result(false) { }
  3619. void operator()() {
  3620. result = port->try_put(*value);
  3621. }
  3622. };
  3623. class receiver_gateway_impl: public receiver_gateway<Output> {
  3624. public:
  3625. receiver_gateway_impl(async_node* node): my_node(node) {}
  3626. void reserve_wait() __TBB_override {
  3627. tbb::internal::fgt_async_reserve(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
  3628. my_node->my_graph.reserve_wait();
  3629. }
  3630. void release_wait() __TBB_override {
  3631. my_node->my_graph.release_wait();
  3632. tbb::internal::fgt_async_commit(static_cast<typename async_node::receiver_type *>(my_node), &my_node->my_graph);
  3633. }
  3634. //! Implements gateway_type::try_put for an external activity to submit a message to FG
  3635. bool try_put(const Output &i) __TBB_override {
  3636. return my_node->try_put_impl(i);
  3637. }
  3638. private:
  3639. async_node* my_node;
  3640. } my_gateway;
  3641. //The substitute of 'this' for member construction, to prevent compiler warnings
  3642. async_node* self() { return this; }
  3643. //! Implements gateway_type::try_put for an external activity to submit a message to FG
  3644. bool try_put_impl(const Output &i) {
  3645. internal::multifunction_output<Output> &port_0 = internal::output_port<0>(*this);
  3646. internal::broadcast_cache<output_type>& port_successors = port_0.successors();
  3647. tbb::internal::fgt_async_try_put_begin(this, &port_0);
  3648. task_list tasks;
  3649. bool is_at_least_one_put_successful = port_successors.gather_successful_try_puts(i, tasks);
  3650. __TBB_ASSERT( is_at_least_one_put_successful || tasks.empty(),
  3651. "Return status is inconsistent with the method operation." );
  3652. while( !tasks.empty() ) {
  3653. internal::enqueue_in_graph_arena(this->my_graph, tasks.pop_front());
  3654. }
  3655. tbb::internal::fgt_async_try_put_end(this, &port_0);
  3656. return is_at_least_one_put_successful;
  3657. }
  3658. public:
  3659. template<typename Body>
  3660. __TBB_NOINLINE_SYM async_node(
  3661. graph &g, size_t concurrency,
  3662. #if __TBB_CPP11_PRESENT
  3663. Body body, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
  3664. #else
  3665. __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body body, node_priority_t priority = tbb::flow::internal::no_priority)
  3666. #endif
  3667. ) : base_type(
  3668. g, concurrency,
  3669. internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
  3670. (body, &my_gateway) __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority) ), my_gateway(self()) {
  3671. tbb::internal::fgt_multioutput_node_with_body<1>(
  3672. CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
  3673. &this->my_graph, static_cast<receiver<input_type> *>(this),
  3674. this->output_ports(), this->my_body
  3675. );
  3676. }
  3677. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES && __TBB_CPP11_PRESENT
  3678. template <typename Body, typename... Args>
  3679. __TBB_NOINLINE_SYM async_node(graph& g, size_t concurrency, Body body, node_priority_t priority)
  3680. : async_node(g, concurrency, body, Policy(), priority) {}
  3681. #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  3682. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3683. template <typename Body, typename... Args>
  3684. __TBB_NOINLINE_SYM async_node(
  3685. const node_set<Args...>& nodes, size_t concurrency, Body body,
  3686. __TBB_FLOW_GRAPH_PRIORITY_ARG1(Policy = Policy(), node_priority_t priority = tbb::flow::internal::no_priority)
  3687. ) : async_node(nodes.graph_reference(), concurrency, __TBB_FLOW_GRAPH_PRIORITY_ARG1(body, priority)) {
  3688. make_edges_in_order(nodes, *this);
  3689. }
  3690. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  3691. template <typename Body, typename... Args>
  3692. __TBB_NOINLINE_SYM async_node(const node_set<Args...>& nodes, size_t concurrency, Body body, node_priority_t priority)
  3693. : async_node(nodes, concurrency, body, Policy(), priority) {}
  3694. #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  3695. #endif // __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3696. __TBB_NOINLINE_SYM async_node( const async_node &other ) : base_type(other), sender<Output>(), my_gateway(self()) {
  3697. static_cast<async_body_base_type*>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
  3698. static_cast<async_body_base_type*>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
  3699. tbb::internal::fgt_multioutput_node_with_body<1>( CODEPTR(), tbb::internal::FLOW_ASYNC_NODE,
  3700. &this->my_graph, static_cast<receiver<input_type> *>(this),
  3701. this->output_ports(), this->my_body );
  3702. }
  3703. gateway_type& gateway() {
  3704. return my_gateway;
  3705. }
  3706. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3707. void set_name( const char *name ) __TBB_override {
  3708. tbb::internal::fgt_multioutput_node_desc( this, name );
  3709. }
  3710. #endif
  3711. // Define sender< Output >
  3712. //! Add a new successor to this node
  3713. bool register_successor( successor_type &r ) __TBB_override {
  3714. return internal::output_port<0>(*this).register_successor(r);
  3715. }
  3716. //! Removes a successor from this node
  3717. bool remove_successor( successor_type &r ) __TBB_override {
  3718. return internal::output_port<0>(*this).remove_successor(r);
  3719. }
  3720. template<typename Body>
  3721. Body copy_function_object() {
  3722. typedef internal::multifunction_body<input_type, typename base_type::output_ports_type> mfn_body_type;
  3723. typedef internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body> async_body_type;
  3724. mfn_body_type &body_ref = *this->my_body;
  3725. async_body_type ab = *static_cast<async_body_type*>(dynamic_cast< internal::multifunction_body_leaf<input_type, typename base_type::output_ports_type, async_body_type> & >(body_ref).get_body_ptr());
  3726. return ab.get_body();
  3727. }
  3728. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3729. //! interface to record edges for traversal & deletion
  3730. typedef typename internal::edge_container<successor_type> built_successors_type;
  3731. typedef typename built_successors_type::edge_list_type successor_list_type;
  3732. built_successors_type &built_successors() __TBB_override {
  3733. return internal::output_port<0>(*this).built_successors();
  3734. }
  3735. void internal_add_built_successor( successor_type &r ) __TBB_override {
  3736. internal::output_port<0>(*this).internal_add_built_successor(r);
  3737. }
  3738. void internal_delete_built_successor( successor_type &r ) __TBB_override {
  3739. internal::output_port<0>(*this).internal_delete_built_successor(r);
  3740. }
  3741. void copy_successors( successor_list_type &l ) __TBB_override {
  3742. internal::output_port<0>(*this).copy_successors(l);
  3743. }
  3744. size_t successor_count() __TBB_override {
  3745. return internal::output_port<0>(*this).successor_count();
  3746. }
  3747. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  3748. protected:
  3749. void reset_node( reset_flags f) __TBB_override {
  3750. base_type::reset_node(f);
  3751. }
  3752. };
  3753. #if __TBB_PREVIEW_STREAMING_NODE
  3754. #include "internal/_flow_graph_streaming_node.h"
  3755. #endif // __TBB_PREVIEW_STREAMING_NODE
  3756. #include "internal/_flow_graph_node_set_impl.h"
  3757. template< typename T >
  3758. class overwrite_node : public graph_node, public receiver<T>, public sender<T> {
  3759. public:
  3760. typedef T input_type;
  3761. typedef T output_type;
  3762. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  3763. typedef typename sender<output_type>::successor_type successor_type;
  3764. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3765. typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
  3766. typedef typename sender<output_type>::built_successors_type built_successors_type;
  3767. typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
  3768. typedef typename sender<output_type>::successor_list_type successor_list_type;
  3769. #endif
  3770. __TBB_NOINLINE_SYM explicit overwrite_node(graph &g) : graph_node(g), my_buffer_is_valid(false) {
  3771. my_successors.set_owner( this );
  3772. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
  3773. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
  3774. }
  3775. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3776. template <typename... Args>
  3777. overwrite_node(const node_set<Args...>& nodes) : overwrite_node(nodes.graph_reference()) {
  3778. make_edges_in_order(nodes, *this);
  3779. }
  3780. #endif
  3781. //! Copy constructor; doesn't take anything from src; default won't work
  3782. __TBB_NOINLINE_SYM overwrite_node( const overwrite_node& src ) :
  3783. graph_node(src.my_graph), receiver<T>(), sender<T>(), my_buffer_is_valid(false)
  3784. {
  3785. my_successors.set_owner( this );
  3786. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_OVERWRITE_NODE, &this->my_graph,
  3787. static_cast<receiver<input_type> *>(this), static_cast<sender<output_type> *>(this) );
  3788. }
  3789. ~overwrite_node() {}
  3790. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3791. void set_name( const char *name ) __TBB_override {
  3792. tbb::internal::fgt_node_desc( this, name );
  3793. }
  3794. #endif
  3795. bool register_successor( successor_type &s ) __TBB_override {
  3796. spin_mutex::scoped_lock l( my_mutex );
  3797. if (my_buffer_is_valid && internal::is_graph_active( my_graph )) {
  3798. // We have a valid value that must be forwarded immediately.
  3799. bool ret = s.try_put( my_buffer );
  3800. if ( ret ) {
  3801. // We add the successor that accepted our put
  3802. my_successors.register_successor( s );
  3803. } else {
  3804. // In case of reservation a race between the moment of reservation and register_successor can appear,
  3805. // because failed reserve does not mean that register_successor is not ready to put a message immediately.
  3806. // We have some sort of infinite loop: reserving node tries to set pull state for the edge,
  3807. // but overwrite_node tries to return push state back. That is why we have to break this loop with task creation.
  3808. task *rtask = new ( task::allocate_additional_child_of( *( my_graph.root_task() ) ) )
  3809. register_predecessor_task( *this, s );
  3810. internal::spawn_in_graph_arena( my_graph, *rtask );
  3811. }
  3812. } else {
  3813. // No valid value yet, just add as successor
  3814. my_successors.register_successor( s );
  3815. }
  3816. return true;
  3817. }
  3818. bool remove_successor( successor_type &s ) __TBB_override {
  3819. spin_mutex::scoped_lock l( my_mutex );
  3820. my_successors.remove_successor(s);
  3821. return true;
  3822. }
  3823. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3824. built_predecessors_type &built_predecessors() __TBB_override { return my_built_predecessors; }
  3825. built_successors_type &built_successors() __TBB_override { return my_successors.built_successors(); }
  3826. void internal_add_built_successor( successor_type &s) __TBB_override {
  3827. spin_mutex::scoped_lock l( my_mutex );
  3828. my_successors.internal_add_built_successor(s);
  3829. }
  3830. void internal_delete_built_successor( successor_type &s) __TBB_override {
  3831. spin_mutex::scoped_lock l( my_mutex );
  3832. my_successors.internal_delete_built_successor(s);
  3833. }
  3834. size_t successor_count() __TBB_override {
  3835. spin_mutex::scoped_lock l( my_mutex );
  3836. return my_successors.successor_count();
  3837. }
  3838. void copy_successors(successor_list_type &v) __TBB_override {
  3839. spin_mutex::scoped_lock l( my_mutex );
  3840. my_successors.copy_successors(v);
  3841. }
  3842. void internal_add_built_predecessor( predecessor_type &p) __TBB_override {
  3843. spin_mutex::scoped_lock l( my_mutex );
  3844. my_built_predecessors.add_edge(p);
  3845. }
  3846. void internal_delete_built_predecessor( predecessor_type &p) __TBB_override {
  3847. spin_mutex::scoped_lock l( my_mutex );
  3848. my_built_predecessors.delete_edge(p);
  3849. }
  3850. size_t predecessor_count() __TBB_override {
  3851. spin_mutex::scoped_lock l( my_mutex );
  3852. return my_built_predecessors.edge_count();
  3853. }
  3854. void copy_predecessors( predecessor_list_type &v ) __TBB_override {
  3855. spin_mutex::scoped_lock l( my_mutex );
  3856. my_built_predecessors.copy_edges(v);
  3857. }
  3858. void extract() __TBB_override {
  3859. my_buffer_is_valid = false;
  3860. built_successors().sender_extract(*this);
  3861. built_predecessors().receiver_extract(*this);
  3862. }
  3863. #endif /* TBB_DEPRECATED_FLOW_NODE_EXTRACTION */
  3864. bool try_get( input_type &v ) __TBB_override {
  3865. spin_mutex::scoped_lock l( my_mutex );
  3866. if ( my_buffer_is_valid ) {
  3867. v = my_buffer;
  3868. return true;
  3869. }
  3870. return false;
  3871. }
  3872. //! Reserves an item
  3873. bool try_reserve( T &v ) __TBB_override {
  3874. return try_get(v);
  3875. }
  3876. //! Releases the reserved item
  3877. bool try_release() __TBB_override { return true; }
  3878. //! Consumes the reserved item
  3879. bool try_consume() __TBB_override { return true; }
  3880. bool is_valid() {
  3881. spin_mutex::scoped_lock l( my_mutex );
  3882. return my_buffer_is_valid;
  3883. }
  3884. void clear() {
  3885. spin_mutex::scoped_lock l( my_mutex );
  3886. my_buffer_is_valid = false;
  3887. }
  3888. protected:
  3889. template< typename R, typename B > friend class run_and_put_task;
  3890. template<typename X, typename Y> friend class internal::broadcast_cache;
  3891. template<typename X, typename Y> friend class internal::round_robin_cache;
  3892. task * try_put_task( const input_type &v ) __TBB_override {
  3893. spin_mutex::scoped_lock l( my_mutex );
  3894. return try_put_task_impl(v);
  3895. }
  3896. task * try_put_task_impl(const input_type &v) {
  3897. my_buffer = v;
  3898. my_buffer_is_valid = true;
  3899. task * rtask = my_successors.try_put_task(v);
  3900. if (!rtask) rtask = SUCCESSFULLY_ENQUEUED;
  3901. return rtask;
  3902. }
  3903. graph& graph_reference() const __TBB_override {
  3904. return my_graph;
  3905. }
  3906. //! Breaks an infinite loop between the node reservation and register_successor call
  3907. struct register_predecessor_task : public graph_task {
  3908. register_predecessor_task(predecessor_type& owner, successor_type& succ) :
  3909. o(owner), s(succ) {};
  3910. tbb::task* execute() __TBB_override {
  3911. if (!s.register_predecessor(o)) {
  3912. o.register_successor(s);
  3913. }
  3914. return NULL;
  3915. }
  3916. predecessor_type& o;
  3917. successor_type& s;
  3918. };
  3919. spin_mutex my_mutex;
  3920. internal::broadcast_cache< input_type, null_rw_mutex > my_successors;
  3921. #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
  3922. internal::edge_container<predecessor_type> my_built_predecessors;
  3923. #endif
  3924. input_type my_buffer;
  3925. bool my_buffer_is_valid;
  3926. void reset_receiver(reset_flags /*f*/) __TBB_override {}
  3927. void reset_node( reset_flags f) __TBB_override {
  3928. my_buffer_is_valid = false;
  3929. if (f&rf_clear_edges) {
  3930. my_successors.clear();
  3931. }
  3932. }
  3933. }; // overwrite_node
  3934. template< typename T >
  3935. class write_once_node : public overwrite_node<T> {
  3936. public:
  3937. typedef T input_type;
  3938. typedef T output_type;
  3939. typedef overwrite_node<T> base_type;
  3940. typedef typename receiver<input_type>::predecessor_type predecessor_type;
  3941. typedef typename sender<output_type>::successor_type successor_type;
  3942. //! Constructor
  3943. __TBB_NOINLINE_SYM explicit write_once_node(graph& g) : base_type(g) {
  3944. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
  3945. static_cast<receiver<input_type> *>(this),
  3946. static_cast<sender<output_type> *>(this) );
  3947. }
  3948. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  3949. template <typename... Args>
  3950. write_once_node(const node_set<Args...>& nodes) : write_once_node(nodes.graph_reference()) {
  3951. make_edges_in_order(nodes, *this);
  3952. }
  3953. #endif
  3954. //! Copy constructor: call base class copy constructor
  3955. __TBB_NOINLINE_SYM write_once_node( const write_once_node& src ) : base_type(src) {
  3956. tbb::internal::fgt_node( CODEPTR(), tbb::internal::FLOW_WRITE_ONCE_NODE, &(this->my_graph),
  3957. static_cast<receiver<input_type> *>(this),
  3958. static_cast<sender<output_type> *>(this) );
  3959. }
  3960. #if TBB_PREVIEW_FLOW_GRAPH_TRACE
  3961. void set_name( const char *name ) __TBB_override {
  3962. tbb::internal::fgt_node_desc( this, name );
  3963. }
  3964. #endif
  3965. protected:
  3966. template< typename R, typename B > friend class run_and_put_task;
  3967. template<typename X, typename Y> friend class internal::broadcast_cache;
  3968. template<typename X, typename Y> friend class internal::round_robin_cache;
  3969. task *try_put_task( const T &v ) __TBB_override {
  3970. spin_mutex::scoped_lock l( this->my_mutex );
  3971. return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
  3972. }
  3973. };
  3974. } // interfaceX
  3975. using interface11::reset_flags;
  3976. using interface11::rf_reset_protocol;
  3977. using interface11::rf_reset_bodies;
  3978. using interface11::rf_clear_edges;
  3979. using interface11::graph;
  3980. using interface11::graph_node;
  3981. using interface11::continue_msg;
  3982. using interface11::source_node;
  3983. using interface11::input_node;
  3984. using interface11::function_node;
  3985. using interface11::multifunction_node;
  3986. using interface11::split_node;
  3987. using interface11::internal::output_port;
  3988. using interface11::indexer_node;
  3989. using interface11::internal::tagged_msg;
  3990. using interface11::internal::cast_to;
  3991. using interface11::internal::is_a;
  3992. using interface11::continue_node;
  3993. using interface11::overwrite_node;
  3994. using interface11::write_once_node;
  3995. using interface11::broadcast_node;
  3996. using interface11::buffer_node;
  3997. using interface11::queue_node;
  3998. using interface11::sequencer_node;
  3999. using interface11::priority_queue_node;
  4000. using interface11::limiter_node;
  4001. using namespace interface11::internal::graph_policy_namespace;
  4002. using interface11::join_node;
  4003. using interface11::input_port;
  4004. using interface11::copy_body;
  4005. using interface11::make_edge;
  4006. using interface11::remove_edge;
  4007. using interface11::internal::tag_value;
  4008. #if __TBB_FLOW_GRAPH_CPP11_FEATURES
  4009. using interface11::composite_node;
  4010. #endif
  4011. using interface11::async_node;
  4012. #if __TBB_PREVIEW_ASYNC_MSG
  4013. using interface11::async_msg;
  4014. #endif
  4015. #if __TBB_PREVIEW_STREAMING_NODE
  4016. using interface11::port_ref;
  4017. using interface11::streaming_node;
  4018. #endif // __TBB_PREVIEW_STREAMING_NODE
  4019. #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
  4020. using internal::node_priority_t;
  4021. using internal::no_priority;
  4022. #endif
  4023. #if __TBB_PREVIEW_FLOW_GRAPH_NODE_SET
  4024. using interface11::internal::follows;
  4025. using interface11::internal::precedes;
  4026. using interface11::internal::make_node_set;
  4027. using interface11::internal::make_edges;
  4028. #endif
  4029. } // flow
  4030. } // tbb
  4031. // Include deduction guides for node classes
  4032. #include "internal/_flow_graph_nodes_deduction.h"
  4033. #undef __TBB_PFG_RESET_ARG
  4034. #undef __TBB_COMMA
  4035. #undef __TBB_DEFAULT_NODE_ALLOCATOR
  4036. #include "internal/_warning_suppress_disable_notice.h"
  4037. #undef __TBB_flow_graph_H_include_area
  4038. #if TBB_USE_THREADING_TOOLS && TBB_PREVIEW_FLOW_GRAPH_TRACE && ( __linux__ || __APPLE__ )
  4039. #undef __TBB_NOINLINE_SYM
  4040. #endif
  4041. #endif // __TBB_flow_graph_H