DataflowBlock.cs 9.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277
  1. // DataflowBlock.cs
  2. //
  3. // Copyright (c) 2011 Jérémie "garuma" Laval
  4. // Copyright (c) 2012 Petr Onderka
  5. //
  6. // Permission is hereby granted, free of charge, to any person obtaining a copy
  7. // of this software and associated documentation files (the "Software"), to deal
  8. // in the Software without restriction, including without limitation the rights
  9. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  10. // copies of the Software, and to permit persons to whom the Software is
  11. // furnished to do so, subject to the following conditions:
  12. //
  13. // The above copyright notice and this permission notice shall be included in
  14. // all copies or substantial portions of the Software.
  15. //
  16. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  17. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  18. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  19. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  20. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  21. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  22. // THE SOFTWARE.
  23. namespace System.Threading.Tasks.Dataflow {
  24. public static class DataflowBlock {
  25. public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
  26. {
  27. if (source == null)
  28. throw new ArgumentNullException ("source");
  29. return new ObservableDataflowBlock<TOutput> (source);
  30. }
  31. public static IObserver<TInput> AsObserver<TInput> (this ITargetBlock<TInput> target)
  32. {
  33. if (target == null)
  34. throw new ArgumentNullException ("target");
  35. return new ObserverDataflowBlock<TInput> (target);
  36. }
  37. public static Task<int> Choose<T1, T2> (
  38. ISourceBlock<T1> source1, Action<T1> action1,
  39. ISourceBlock<T2> source2, Action<T2> action2)
  40. {
  41. return Choose (source1, action1, source2, action2,
  42. DataflowBlockOptions.Default);
  43. }
  44. public static Task<int> Choose<T1, T2> (
  45. ISourceBlock<T1> source1, Action<T1> action1,
  46. ISourceBlock<T2> source2, Action<T2> action2,
  47. DataflowBlockOptions dataflowBlockOptions)
  48. {
  49. if (source1 == null)
  50. throw new ArgumentNullException ("source1");
  51. if (source2 == null)
  52. throw new ArgumentNullException ("source2");
  53. if (action1 == null)
  54. throw new ArgumentNullException("action1");
  55. if (action2 == null)
  56. throw new ArgumentNullException("action2");
  57. if (dataflowBlockOptions == null)
  58. throw new ArgumentNullException("dataflowBlockOptions");
  59. var chooser = new ChooserBlock<T1, T2, object> (action1, action2, null, dataflowBlockOptions);
  60. source1.LinkTo (chooser.Target1);
  61. source2.LinkTo (chooser.Target2);
  62. return chooser.Completion;
  63. }
  64. public static Task<int> Choose<T1, T2, T3> (
  65. ISourceBlock<T1> source1, Action<T1> action1,
  66. ISourceBlock<T2> source2, Action<T2> action2,
  67. ISourceBlock<T3> source3, Action<T3> action3)
  68. {
  69. return Choose (source1, action1, source2, action2, source3, action3,
  70. DataflowBlockOptions.Default);
  71. }
  72. public static Task<int> Choose<T1, T2, T3> (
  73. ISourceBlock<T1> source1, Action<T1> action1,
  74. ISourceBlock<T2> source2, Action<T2> action2,
  75. ISourceBlock<T3> source3, Action<T3> action3,
  76. DataflowBlockOptions dataflowBlockOptions)
  77. {
  78. if (source1 == null)
  79. throw new ArgumentNullException ("source1");
  80. if (source2 == null)
  81. throw new ArgumentNullException ("source2");
  82. if (source3 == null)
  83. throw new ArgumentNullException ("source3");
  84. if (action1 == null)
  85. throw new ArgumentNullException("action1");
  86. if (action2 == null)
  87. throw new ArgumentNullException("action2");
  88. if (action3 == null)
  89. throw new ArgumentNullException("action3");
  90. if (dataflowBlockOptions == null)
  91. throw new ArgumentNullException("dataflowBlockOptions");
  92. var chooser = new ChooserBlock<T1, T2, T3> (action1, action2, action3, dataflowBlockOptions);
  93. source1.LinkTo (chooser.Target1);
  94. source2.LinkTo (chooser.Target2);
  95. source3.LinkTo (chooser.Target3);
  96. return chooser.Completion;
  97. }
  98. public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput> (
  99. ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
  100. {
  101. return new PropagatorWrapperBlock<TInput, TOutput> (target, source);
  102. }
  103. public static IDisposable LinkTo<TOutput> (this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target)
  104. {
  105. if (source == null)
  106. throw new ArgumentNullException("source");
  107. return source.LinkTo (target, DataflowLinkOptions.Default);
  108. }
  109. public static IDisposable LinkTo<TOutput> (
  110. this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
  111. Predicate<TOutput> predicate)
  112. {
  113. if (source == null)
  114. throw new ArgumentNullException("source");
  115. return source.LinkTo (target, DataflowLinkOptions.Default, predicate);
  116. }
  117. [MonoTODO("Use predicate")]
  118. public static IDisposable LinkTo<TOutput> (
  119. this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
  120. DataflowLinkOptions linkOptions, Predicate<TOutput> predicate)
  121. {
  122. if (source == null)
  123. throw new ArgumentNullException("source");
  124. if (predicate == null)
  125. throw new ArgumentNullException("predicate");
  126. return source.LinkTo (target, linkOptions);
  127. }
  128. [MonoTODO]
  129. public static Task<bool> OutputAvailableAsync<TOutput> (this ISourceBlock<TOutput> source)
  130. {
  131. throw new NotImplementedException ();
  132. }
  133. public static bool Post<TInput> (this ITargetBlock<TInput> target, TInput item)
  134. {
  135. if (target == null)
  136. throw new ArgumentNullException ("target");
  137. return target.OfferMessage (new DataflowMessageHeader(1), item, null, false)
  138. == DataflowMessageStatus.Accepted;
  139. }
  140. public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
  141. {
  142. return Receive (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
  143. }
  144. public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
  145. {
  146. return Receive (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
  147. }
  148. public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
  149. {
  150. return Receive (source, timeout, CancellationToken.None);
  151. }
  152. public static TOutput Receive<TOutput> (
  153. this ISourceBlock<TOutput> source, TimeSpan timeout,
  154. CancellationToken cancellationToken)
  155. {
  156. if (source == null)
  157. throw new ArgumentNullException ("source");
  158. if (timeout.TotalMilliseconds < -1)
  159. throw new ArgumentOutOfRangeException ("timeout");
  160. if (timeout.TotalMilliseconds > int.MaxValue)
  161. throw new ArgumentOutOfRangeException ("timeout");
  162. cancellationToken.ThrowIfCancellationRequested ();
  163. TOutput item;
  164. var receivableSource = source as IReceivableSourceBlock<TOutput>;
  165. if (receivableSource != null && receivableSource.TryReceive (null, out item))
  166. return item;
  167. if (source.Completion.IsCompleted || source.Completion.IsCanceled
  168. || source.Completion.IsFaulted)
  169. throw new InvalidOperationException (
  170. "No item could be received from the source.");
  171. int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
  172. var block = new ReceiveBlock<TOutput> ();
  173. var bridge = source.LinkTo (block,
  174. new DataflowLinkOptions { PropagateCompletion = true });
  175. return block.WaitAndGet (bridge, cancellationToken, timeoutMilliseconds);
  176. }
  177. public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source)
  178. {
  179. return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
  180. }
  181. public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
  182. {
  183. return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
  184. }
  185. public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
  186. {
  187. return ReceiveAsync (source, timeout, CancellationToken.None);
  188. }
  189. public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
  190. {
  191. if (source == null)
  192. throw new ArgumentNullException ("source");
  193. if (timeout.TotalMilliseconds < -1)
  194. throw new ArgumentOutOfRangeException ("timeout");
  195. if (timeout.TotalMilliseconds > int.MaxValue)
  196. throw new ArgumentOutOfRangeException ("timeout");
  197. cancellationToken.ThrowIfCancellationRequested ();
  198. long tm = (long)timeout.TotalMilliseconds;
  199. ReceiveBlock<TOutput> block = new ReceiveBlock<TOutput> ();
  200. var bridge = source.LinkTo (block);
  201. return block.AsyncGet (bridge, cancellationToken, tm);
  202. }
  203. public static bool TryReceive<TOutput> (this IReceivableSourceBlock<TOutput> source, out TOutput item)
  204. {
  205. item = default (TOutput);
  206. if (source == null)
  207. throw new ArgumentNullException ("source");
  208. return source.TryReceive (null, out item);
  209. }
  210. public static Task<bool> SendAsync<TInput> (
  211. this ITargetBlock<TInput> target, TInput item)
  212. {
  213. return SendAsync (target, item, CancellationToken.None);
  214. }
  215. public static Task<bool> SendAsync<TInput> (
  216. this ITargetBlock<TInput> target, TInput item,
  217. CancellationToken cancellationToken)
  218. {
  219. if (target == null)
  220. throw new ArgumentNullException ("target");
  221. cancellationToken.ThrowIfCancellationRequested ();
  222. var status = target.OfferMessage (
  223. new DataflowMessageHeader (1), item, null, false);
  224. if (status == DataflowMessageStatus.Accepted)
  225. return Task.FromResult (true);
  226. if (status != DataflowMessageStatus.Declined
  227. && status != DataflowMessageStatus.Postponed)
  228. return Task.FromResult (false);
  229. var block = new SendBlock<TInput> (target, item, cancellationToken);
  230. return block.Send ();
  231. }
  232. }
  233. }