| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- // DataflowBlock.cs
- //
- // Copyright (c) 2011 Jérémie "garuma" Laval
- // Copyright (c) 2012 Petr Onderka
- //
- // Permission is hereby granted, free of charge, to any person obtaining a copy
- // of this software and associated documentation files (the "Software"), to deal
- // in the Software without restriction, including without limitation the rights
- // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
- // copies of the Software, and to permit persons to whom the Software is
- // furnished to do so, subject to the following conditions:
- //
- // The above copyright notice and this permission notice shall be included in
- // all copies or substantial portions of the Software.
- //
- // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
- // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
- // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
- // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
- // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
- // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
- // THE SOFTWARE.
- namespace System.Threading.Tasks.Dataflow {
- public static class DataflowBlock {
- public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
- {
- if (source == null)
- throw new ArgumentNullException ("source");
- return new ObservableDataflowBlock<TOutput> (source);
- }
- public static IObserver<TInput> AsObserver<TInput> (this ITargetBlock<TInput> target)
- {
- if (target == null)
- throw new ArgumentNullException ("target");
- return new ObserverDataflowBlock<TInput> (target);
- }
- public static Task<int> Choose<T1, T2> (
- ISourceBlock<T1> source1, Action<T1> action1,
- ISourceBlock<T2> source2, Action<T2> action2)
- {
- return Choose (source1, action1, source2, action2,
- DataflowBlockOptions.Default);
- }
- public static Task<int> Choose<T1, T2> (
- ISourceBlock<T1> source1, Action<T1> action1,
- ISourceBlock<T2> source2, Action<T2> action2,
- DataflowBlockOptions dataflowBlockOptions)
- {
- if (source1 == null)
- throw new ArgumentNullException ("source1");
- if (source2 == null)
- throw new ArgumentNullException ("source2");
- if (action1 == null)
- throw new ArgumentNullException("action1");
- if (action2 == null)
- throw new ArgumentNullException("action2");
- if (dataflowBlockOptions == null)
- throw new ArgumentNullException("dataflowBlockOptions");
- var chooser = new ChooserBlock<T1, T2, object> (action1, action2, null, dataflowBlockOptions);
- source1.LinkTo (chooser.Target1);
- source2.LinkTo (chooser.Target2);
- return chooser.Completion;
- }
- public static Task<int> Choose<T1, T2, T3> (
- ISourceBlock<T1> source1, Action<T1> action1,
- ISourceBlock<T2> source2, Action<T2> action2,
- ISourceBlock<T3> source3, Action<T3> action3)
- {
- return Choose (source1, action1, source2, action2, source3, action3,
- DataflowBlockOptions.Default);
- }
- public static Task<int> Choose<T1, T2, T3> (
- ISourceBlock<T1> source1, Action<T1> action1,
- ISourceBlock<T2> source2, Action<T2> action2,
- ISourceBlock<T3> source3, Action<T3> action3,
- DataflowBlockOptions dataflowBlockOptions)
- {
- if (source1 == null)
- throw new ArgumentNullException ("source1");
- if (source2 == null)
- throw new ArgumentNullException ("source2");
- if (source3 == null)
- throw new ArgumentNullException ("source3");
- if (action1 == null)
- throw new ArgumentNullException("action1");
- if (action2 == null)
- throw new ArgumentNullException("action2");
- if (action3 == null)
- throw new ArgumentNullException("action3");
- if (dataflowBlockOptions == null)
- throw new ArgumentNullException("dataflowBlockOptions");
- var chooser = new ChooserBlock<T1, T2, T3> (action1, action2, action3, dataflowBlockOptions);
- source1.LinkTo (chooser.Target1);
- source2.LinkTo (chooser.Target2);
- source3.LinkTo (chooser.Target3);
- return chooser.Completion;
- }
- public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput> (
- ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
- {
- return new PropagatorWrapperBlock<TInput, TOutput> (target, source);
- }
- public static IDisposable LinkTo<TOutput> (this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- return source.LinkTo (target, DataflowLinkOptions.Default);
- }
- public static IDisposable LinkTo<TOutput> (
- this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
- Predicate<TOutput> predicate)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- return source.LinkTo (target, DataflowLinkOptions.Default, predicate);
- }
- [MonoTODO("Use predicate")]
- public static IDisposable LinkTo<TOutput> (
- this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
- DataflowLinkOptions linkOptions, Predicate<TOutput> predicate)
- {
- if (source == null)
- throw new ArgumentNullException("source");
- if (predicate == null)
- throw new ArgumentNullException("predicate");
- return source.LinkTo (target, linkOptions);
- }
- [MonoTODO]
- public static Task<bool> OutputAvailableAsync<TOutput> (this ISourceBlock<TOutput> source)
- {
- throw new NotImplementedException ();
- }
- public static bool Post<TInput> (this ITargetBlock<TInput> target, TInput item)
- {
- if (target == null)
- throw new ArgumentNullException ("target");
- return target.OfferMessage (new DataflowMessageHeader(1), item, null, false)
- == DataflowMessageStatus.Accepted;
- }
- public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
- {
- return Receive (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
- }
- public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
- {
- return Receive (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
- }
- public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
- {
- return Receive (source, timeout, CancellationToken.None);
- }
- public static TOutput Receive<TOutput> (
- this ISourceBlock<TOutput> source, TimeSpan timeout,
- CancellationToken cancellationToken)
- {
- if (source == null)
- throw new ArgumentNullException ("source");
- if (timeout.TotalMilliseconds < -1)
- throw new ArgumentOutOfRangeException ("timeout");
- if (timeout.TotalMilliseconds > int.MaxValue)
- throw new ArgumentOutOfRangeException ("timeout");
- cancellationToken.ThrowIfCancellationRequested ();
- TOutput item;
- var receivableSource = source as IReceivableSourceBlock<TOutput>;
- if (receivableSource != null && receivableSource.TryReceive (null, out item))
- return item;
- if (source.Completion.IsCompleted || source.Completion.IsCanceled
- || source.Completion.IsFaulted)
- throw new InvalidOperationException (
- "No item could be received from the source.");
- int timeoutMilliseconds = (int)timeout.TotalMilliseconds;
- var block = new ReceiveBlock<TOutput> ();
- var bridge = source.LinkTo (block,
- new DataflowLinkOptions { PropagateCompletion = true });
- return block.WaitAndGet (bridge, cancellationToken, timeoutMilliseconds);
- }
- public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source)
- {
- return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
- }
- public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
- {
- return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
- }
- public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
- {
- return ReceiveAsync (source, timeout, CancellationToken.None);
- }
- public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
- {
- if (source == null)
- throw new ArgumentNullException ("source");
- if (timeout.TotalMilliseconds < -1)
- throw new ArgumentOutOfRangeException ("timeout");
- if (timeout.TotalMilliseconds > int.MaxValue)
- throw new ArgumentOutOfRangeException ("timeout");
- cancellationToken.ThrowIfCancellationRequested ();
- long tm = (long)timeout.TotalMilliseconds;
- ReceiveBlock<TOutput> block = new ReceiveBlock<TOutput> ();
- var bridge = source.LinkTo (block);
- return block.AsyncGet (bridge, cancellationToken, tm);
- }
- public static bool TryReceive<TOutput> (this IReceivableSourceBlock<TOutput> source, out TOutput item)
- {
- item = default (TOutput);
- if (source == null)
- throw new ArgumentNullException ("source");
- return source.TryReceive (null, out item);
- }
- public static Task<bool> SendAsync<TInput> (
- this ITargetBlock<TInput> target, TInput item)
- {
- return SendAsync (target, item, CancellationToken.None);
- }
- public static Task<bool> SendAsync<TInput> (
- this ITargetBlock<TInput> target, TInput item,
- CancellationToken cancellationToken)
- {
- if (target == null)
- throw new ArgumentNullException ("target");
- cancellationToken.ThrowIfCancellationRequested ();
- var status = target.OfferMessage (
- new DataflowMessageHeader (1), item, null, false);
- if (status == DataflowMessageStatus.Accepted)
- return Task.FromResult (true);
- if (status != DataflowMessageStatus.Declined
- && status != DataflowMessageStatus.Postponed)
- return Task.FromResult (false);
- var block = new SendBlock<TInput> (target, item, cancellationToken);
- return block.Send ();
- }
- }
- }
|