Browse Source

Implemented execution block ctors for async actions

Petr Onderka 13 years ago
parent
commit
c5a7eec7c0
14 changed files with 651 additions and 155 deletions
  1. 2 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj
  2. 2 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources
  3. 35 9
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs
  4. 98 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/AsyncExecutingMessageBox.cs
  5. 17 16
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs
  6. 12 81
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs
  7. 125 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBoxBase.cs
  8. 5 5
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs
  9. 54 4
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs
  10. 69 15
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs
  11. 45 6
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs
  12. 97 4
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs
  13. 49 11
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformBlockTest.cs
  14. 41 4
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformManyBlockTest.cs

+ 2 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj

@@ -46,7 +46,9 @@
    <Compile Include="..\..\build\common\Locale.cs" />
    <Compile Include="..\..\build\common\MonoTODOAttribute.cs" />
    <Compile Include="Assembly\AssemblyInfo.cs" />
+    <Compile Include="System.Threading.Tasks.Dataflow\AsyncExecutingMessageBox.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BroadcastOutgoingQueue.cs" />
+    <Compile Include="System.Threading.Tasks.Dataflow\ExecutingMessageBoxBase.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\NullTargetBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\OutgoingQueueBase.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\OutputAvailableBlock.cs" />

+ 2 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources

@@ -45,3 +45,5 @@ System.Threading.Tasks.Dataflow/SendBlock.cs
 System.Threading.Tasks.Dataflow/PredicateBlock.cs
 System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs
 System.Threading.Tasks.Dataflow/NullTargetBlock.cs
+System.Threading.Tasks.Dataflow/AsyncExecutingMessageBox.cs
+System.Threading.Tasks.Dataflow/ExecutingMessageBoxBase.cs

+ 35 - 9
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs

@@ -27,25 +27,33 @@ namespace System.Threading.Tasks.Dataflow {
 	public sealed class ActionBlock<TInput> : ITargetBlock<TInput> {
 		readonly CompletionHelper compHelper;
 		readonly BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
-		readonly ExecutingMessageBox<TInput> messageBox;
+		readonly ExecutingMessageBoxBase<TInput> messageBox;
 		readonly Action<TInput> action;
+		readonly Func<TInput, Task> asyncAction;
 		readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
 
+		ActionBlock (ExecutionDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.compHelper = new CompletionHelper (dataflowBlockOptions);
+		}
+
 		public ActionBlock (Action<TInput> action)
 			: this (action, ExecutionDataflowBlockOptions.Default)
 		{
 		}
 
-		public ActionBlock (Action<TInput> action, ExecutionDataflowBlockOptions dataflowBlockOptions)
+		public ActionBlock (Action<TInput> action,
+		                    ExecutionDataflowBlockOptions dataflowBlockOptions)
+			: this (dataflowBlockOptions)
 		{
 			if (action == null)
 				throw new ArgumentNullException ("action");
-			if (dataflowBlockOptions == null)
-				throw new ArgumentNullException ("dataflowBlockOptions");
 
 			this.action = action;
-			this.dataflowBlockOptions = dataflowBlockOptions;
-			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
 			this.messageBox = new ExecutingMessageBox<TInput> (this, messageQueue, compHelper,
 				() => true, ProcessItem, () => { }, dataflowBlockOptions);
 		}
@@ -55,10 +63,17 @@ namespace System.Threading.Tasks.Dataflow {
 		{
 		}
 
-		[MonoTODO]
-		public ActionBlock (Func<TInput, Task> action, ExecutionDataflowBlockOptions dataflowBlockOptions)
+		public ActionBlock (Func<TInput, Task> action,
+		                    ExecutionDataflowBlockOptions dataflowBlockOptions)
+			: this (dataflowBlockOptions)
 		{
-			throw new NotImplementedException ();
+			if (action == null)
+				throw new ArgumentNullException ("action");
+
+			this.asyncAction = action;
+			this.messageBox = new AsyncExecutingMessageBox<TInput, Task> (
+				this, messageQueue, compHelper, () => true, AsyncProcessItem, null,
+				() => { }, dataflowBlockOptions);
 		}
 
 		DataflowMessageStatus ITargetBlock<TInput>.OfferMessage (
@@ -85,6 +100,17 @@ namespace System.Threading.Tasks.Dataflow {
 			return dequeued;
 		}
 
+		bool AsyncProcessItem(out Task task)
+		{
+			TInput data;
+			bool dequeued = messageQueue.TryTake (out data);
+			if (dequeued)
+				task = asyncAction (data);
+			else
+				task = null;
+			return dequeued;
+		}
+
 		public void Complete ()
 		{
 			messageBox.Complete ();

+ 98 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/AsyncExecutingMessageBox.cs

@@ -0,0 +1,98 @@
+// AsyncExecutingMessageBox.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow {
+	class AsyncExecutingMessageBox<TInput, TTask>
+		: ExecutingMessageBoxBase<TInput>
+		where TTask : Task {
+		public delegate bool AsyncProcessItem (out TTask task);
+
+		readonly AsyncProcessItem processItem;
+		readonly Action<TTask> processFinishedTask;
+
+		public AsyncExecutingMessageBox (
+			ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
+			CompletionHelper compHelper, Func<bool> externalCompleteTester,
+			AsyncProcessItem processItem, Action<TTask> processFinishedTask,
+			Action outgoingQueueComplete, ExecutionDataflowBlockOptions options)
+			: base (
+				target, messageQueue, compHelper, externalCompleteTester,
+				outgoingQueueComplete, options)
+		{
+			this.processItem = processItem;
+			this.processFinishedTask = processFinishedTask;
+		}
+
+		protected override void ProcessQueue ()
+		{
+			StartProcessQueue ();
+
+			ProcessQueueWithoutStart ();
+		}
+
+		void ProcessQueueWithoutStart ()
+		{
+			try {
+				int i = 0;
+				while (CanRun (i)) {
+					TTask task;
+					if (!processItem (out task))
+						break;
+					if (task == null || task.IsCanceled
+					    || (task.IsCompleted && !task.IsFaulted)) {
+						if (processFinishedTask != null)
+							processFinishedTask (task);
+					} else if (task.IsFaulted) {
+						CompHelper.RequestFault (task.Exception);
+						break;
+					} else {
+						task.ContinueWith (
+							t => TaskFinished ((TTask)t), Options.TaskScheduler);
+						return;
+					}
+					i++;
+				}
+			} catch (Exception e) {
+				CompHelper.RequestFault (e);
+			}
+
+			FinishProcessQueue ();
+		}
+
+		void TaskFinished (TTask task)
+		{
+			if (task.IsFaulted) {
+				CompHelper.RequestFault (task.Exception);
+				FinishProcessQueue ();
+				return;
+			}
+
+			if (processFinishedTask != null)
+				processFinishedTask (task);
+
+			ProcessQueueWithoutStart ();
+		}
+	}
+}

+ 17 - 16
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs

@@ -1,4 +1,4 @@
-// ActionBlock.cs
+// CompletionHelper.cs
 //
 // Copyright (c) 2011 Jérémie "garuma" Laval
 // Copyright (c) 2012 Petr Onderka
@@ -20,33 +20,34 @@
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
-namespace System.Threading.Tasks.Dataflow
-{
+namespace System.Threading.Tasks.Dataflow {
 	/// <summary>
 	/// This is used to implement a default behavior for Dataflow completion tracking
 	/// that is the Completion property, Complete/Fault method combo
 	/// and the CancellationToken option.
 	/// </summary>
-	internal class CompletionHelper
-	{
-		TaskCompletionSource<object> source;
+	class CompletionHelper {
+		readonly TaskCompletionSource<object> source;
 
-		private readonly AtomicBoolean canFaultOrCancelImmediatelly =
+		readonly AtomicBoolean canFaultOrCancelImmediatelly =
 			new AtomicBoolean { Value = true };
-		private readonly AtomicBoolean requestedFaultOrCancel =
+		readonly AtomicBoolean requestedFaultOrCancel =
 			new AtomicBoolean { Value = false };
 
 		Exception requestedException;
 
-		public static CompletionHelper GetNew (DataflowBlockOptions options)
+		public CompletionHelper (DataflowBlockOptions options)
 		{
-			var completionHelper = new CompletionHelper { source = new TaskCompletionSource<object> () };
+			source = new TaskCompletionSource<object> ();
 			if (options != null)
-				completionHelper.SetOptions (options);
-			return completionHelper;
+				SetOptions (options);
+		}
+
+		[Obsolete ("Use ctor")]
+		public static CompletionHelper GetNew (DataflowBlockOptions options)
+		{
+			return new CompletionHelper (options);
 		}
 
 		public Task Completion {
@@ -83,7 +84,7 @@ namespace System.Threading.Tasks.Dataflow
 		public void RequestFault (Exception exception)
 		{
 			if (exception == null)
-				throw new ArgumentNullException("exception");
+				throw new ArgumentNullException ("exception");
 
 			if (CanFaultOrCancelImmediatelly)
 				Fault (exception);
@@ -101,7 +102,7 @@ namespace System.Threading.Tasks.Dataflow
 		void RequestCancel ()
 		{
 			if (CanFaultOrCancelImmediatelly)
-				Cancel();
+				Cancel ();
 			else
 				requestedFaultOrCancel.Value = true;
 		}

+ 12 - 81
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs

@@ -1,6 +1,7 @@
 // ExecutingMessageBox.cs
 //
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 //
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
@@ -22,108 +23,38 @@
 
 using System.Collections.Concurrent;
 
-namespace System.Threading.Tasks.Dataflow
-{
-	internal class ExecutingMessageBox<TInput> : MessageBox<TInput>
-	{
-		readonly ExecutionDataflowBlockOptions options;
+namespace System.Threading.Tasks.Dataflow {
+	class ExecutingMessageBox<TInput> : ExecutingMessageBoxBase<TInput> {
 		readonly Func<bool> processItem;
-		readonly Action outgoingQueueComplete;
-		readonly CompletionHelper compHelper;
-
-		// even number: Task is waiting to run
-		// odd number: Task is not waiting to run
-		// invariant: dop / 2 Tasks are running or waiting
-		int degreeOfParallelism = 1;
 
 		public ExecutingMessageBox (
 			ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
 			CompletionHelper compHelper, Func<bool> externalCompleteTester,
 			Func<bool> processItem, Action outgoingQueueComplete,
 			ExecutionDataflowBlockOptions options)
-			: base (target, messageQueue, compHelper, externalCompleteTester, options)
+			: base (
+				target, messageQueue, compHelper, externalCompleteTester,
+				outgoingQueueComplete, options)
 		{
-			this.options = options;
 			this.processItem = processItem;
-			this.outgoingQueueComplete = outgoingQueueComplete;
-			this.compHelper = compHelper;
 		}
 
-		protected override void EnsureProcessing (bool newItem)
+		protected override void ProcessQueue ()
 		{
-			StartProcessing ();
-		}
-
-		void StartProcessing ()
-		{
-			// atomically increase degreeOfParallelism by 1 only if it's odd
-			// and low enough
-			int startDegreeOfParallelism;
-			int currentDegreeOfParallelism = degreeOfParallelism;
-			do {
-				startDegreeOfParallelism = currentDegreeOfParallelism;
-				if (startDegreeOfParallelism % 2 == 0
-				    || (options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
-				        && startDegreeOfParallelism / 2 >= options.MaxDegreeOfParallelism))
-					return;
-				currentDegreeOfParallelism =
-					Interlocked.CompareExchange (ref degreeOfParallelism,
-						startDegreeOfParallelism + 1, startDegreeOfParallelism);
-			} while (startDegreeOfParallelism != currentDegreeOfParallelism);
-
-			Task.Factory.StartNew (ProcessQueue, options.CancellationToken,
-				TaskCreationOptions.PreferFairness, options.TaskScheduler);
-		}
-
-		void ProcessQueue ()
-		{
-			compHelper.CanFaultOrCancelImmediatelly = false;
-
-			int incrementedDegreeOfParallelism =
-				Interlocked.Increment (ref degreeOfParallelism);
-			if ((options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded
-			     || incrementedDegreeOfParallelism / 2 < options.MaxDegreeOfParallelism)
-			    && MessageQueue.Count > 0 && compHelper.CanRun)
-				StartProcessing ();
+			StartProcessQueue ();
 
 			try {
 				int i = 0;
-				while (compHelper.CanRun
-				       && (options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded
-				           || i++ < options.MaxMessagesPerTask)) {
+				while (CanRun (i)) {
 					if (!processItem ())
 						break;
+					i++;
 				}
 			} catch (Exception e) {
-				compHelper.RequestFault (e);
+				CompHelper.RequestFault (e);
 			}
 
-			int decrementedDegreeOfParallelism =
-				Interlocked.Add (ref degreeOfParallelism, -2);
-
-			if (decrementedDegreeOfParallelism % 2 == 1) {
-				if (decrementedDegreeOfParallelism == 1) {
-					compHelper.CanFaultOrCancelImmediatelly = true;
-					base.VerifyCompleteness ();
-					if (MessageQueue.IsCompleted)
-						outgoingQueueComplete ();
-				}
-				if (MessageQueue.Count > 0 && compHelper.CanRun)
-					StartProcessing ();
-			}
-		}
-
-		protected override void OutgoingQueueComplete ()
-		{
-			if (MessageQueue.IsCompleted
-			    && Thread.VolatileRead (ref degreeOfParallelism) == 1)
-				outgoingQueueComplete ();
-		}
-
-		protected override void VerifyCompleteness ()
-		{
-			if (Thread.VolatileRead (ref degreeOfParallelism) == 1)
-				base.VerifyCompleteness ();
+			FinishProcessQueue ();
 		}
 	}
 }

+ 125 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBoxBase.cs

@@ -0,0 +1,125 @@
+// ExecutingMessageBoxBase.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow {
+	abstract class ExecutingMessageBoxBase<TInput> : MessageBox<TInput> {
+		protected ExecutionDataflowBlockOptions Options { get; private set; }
+		readonly Action outgoingQueueComplete;
+
+		// even number: Task is waiting to run
+		// odd number: Task is not waiting to run
+		// invariant: dop / 2 Tasks are running or waiting
+		int degreeOfParallelism = 1;
+
+		protected ExecutingMessageBoxBase (
+			ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
+			CompletionHelper compHelper, Func<bool> externalCompleteTester,
+			Action outgoingQueueComplete, ExecutionDataflowBlockOptions options)
+			: base (
+				target, messageQueue, compHelper, externalCompleteTester,
+				options)
+		{
+			this.Options = options;
+			this.outgoingQueueComplete = outgoingQueueComplete;
+		}
+
+		protected override void EnsureProcessing (bool newItem)
+		{
+			StartProcessing ();
+		}
+
+		void StartProcessing ()
+		{
+			// atomically increase degreeOfParallelism by 1 only if it's odd
+			// and low enough
+			int startDegreeOfParallelism;
+			int currentDegreeOfParallelism = degreeOfParallelism;
+			do {
+				startDegreeOfParallelism = currentDegreeOfParallelism;
+				if (startDegreeOfParallelism % 2 == 0
+				    || (Options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
+				        && startDegreeOfParallelism / 2 >= Options.MaxDegreeOfParallelism))
+					return;
+				currentDegreeOfParallelism =
+					Interlocked.CompareExchange (ref degreeOfParallelism,
+						startDegreeOfParallelism + 1, startDegreeOfParallelism);
+			} while (startDegreeOfParallelism != currentDegreeOfParallelism);
+
+			Task.Factory.StartNew (ProcessQueue, Options.CancellationToken,
+				TaskCreationOptions.PreferFairness, Options.TaskScheduler);
+		}
+
+		protected abstract void ProcessQueue ();
+
+		protected void StartProcessQueue ()
+		{
+			CompHelper.CanFaultOrCancelImmediatelly = false;
+
+			int incrementedDegreeOfParallelism =
+				Interlocked.Increment (ref degreeOfParallelism);
+			if ((Options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded
+			     || incrementedDegreeOfParallelism / 2 < Options.MaxDegreeOfParallelism)
+			    && MessageQueue.Count > 0 && CompHelper.CanRun)
+				StartProcessing ();
+		}
+
+		protected void FinishProcessQueue ()
+		{
+			int decrementedDegreeOfParallelism =
+				Interlocked.Add (ref degreeOfParallelism, -2);
+
+			if (decrementedDegreeOfParallelism % 2 == 1) {
+				if (decrementedDegreeOfParallelism == 1) {
+					CompHelper.CanFaultOrCancelImmediatelly = true;
+					base.VerifyCompleteness ();
+					if (MessageQueue.IsCompleted)
+						outgoingQueueComplete ();
+				}
+				if (MessageQueue.Count > 0 && CompHelper.CanRun)
+					StartProcessing ();
+			}
+		}
+
+		protected override void OutgoingQueueComplete ()
+		{
+			if (MessageQueue.IsCompleted
+			    && Thread.VolatileRead (ref degreeOfParallelism) == 1)
+				outgoingQueueComplete ();
+		}
+
+		protected override void VerifyCompleteness ()
+		{
+			if (Thread.VolatileRead (ref degreeOfParallelism) == 1)
+				base.VerifyCompleteness ();
+		}
+
+		protected bool CanRun (int iteration)
+		{
+			return CompHelper.CanRun
+			       && (Options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded
+			           || iteration < Options.MaxMessagesPerTask);
+		}
+	}
+}

+ 5 - 5
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs

@@ -30,8 +30,8 @@ namespace System.Threading.Tasks.Dataflow {
 	/// later processed 
 	/// </summary>
 	internal class MessageBox<TInput> {
-		protected ITargetBlock<TInput> Target;
-		readonly CompletionHelper compHelper;
+		protected ITargetBlock<TInput> Target { get; set; }
+		protected CompletionHelper CompHelper { get; private set; }
 		readonly Func<bool> externalCompleteTester;
 		readonly DataflowBlockOptions options;
 		readonly bool greedy;
@@ -51,7 +51,7 @@ namespace System.Threading.Tasks.Dataflow {
 			DataflowBlockOptions options, bool greedy = true, Func<bool> canAccept = null)
 		{
 			this.Target = target;
-			this.compHelper = compHelper;
+			this.CompHelper = compHelper;
 			this.MessageQueue = messageQueue;
 			this.externalCompleteTester = externalCompleteTester;
 			this.options = options;
@@ -71,7 +71,7 @@ namespace System.Threading.Tasks.Dataflow {
 					"consumeToAccept may only be true if provided with a non-null source.",
 					"consumeToAccept");
 
-			if (MessageQueue.IsAddingCompleted || !compHelper.CanRun)
+			if (MessageQueue.IsAddingCompleted || !CompHelper.CanRun)
 				return DataflowMessageStatus.DecliningPermanently;
 
 			var full = options.BoundedCapacity != -1
@@ -250,7 +250,7 @@ namespace System.Threading.Tasks.Dataflow {
 		protected  virtual void VerifyCompleteness ()
 		{
 			if (MessageQueue.IsCompleted && externalCompleteTester ())
-				compHelper.Complete ();
+				CompHelper.Complete ();
 		}
 	}
 }

+ 54 - 4
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs

@@ -33,6 +33,16 @@ namespace System.Threading.Tasks.Dataflow {
 		readonly MessageBox<TInput> messageBox;
 		readonly OutgoingQueue<TOutput> outgoing;
 		readonly Func<TInput, TOutput> transform;
+		readonly Func<TInput, Task<TOutput>> asyncTransform;
+
+		TransformBlock (ExecutionDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+		
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.compHelper = new CompletionHelper (dataflowBlockOptions);
+		}
 
 		public TransformBlock (Func<TInput, TOutput> transform)
 			: this (transform, ExecutionDataflowBlockOptions.Default)
@@ -41,15 +51,12 @@ namespace System.Threading.Tasks.Dataflow {
 
 		public TransformBlock (Func<TInput, TOutput> transform,
 		                       ExecutionDataflowBlockOptions dataflowBlockOptions)
+			: this (dataflowBlockOptions)
 		{
 			if (transform == null)
 				throw new ArgumentNullException("transform");
-			if (dataflowBlockOptions == null)
-				throw new ArgumentNullException ("dataflowBlockOptions");
 
 			this.transform = transform;
-			this.dataflowBlockOptions = dataflowBlockOptions;
-			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
 			this.messageBox = new ExecutingMessageBox<TInput> (
 				this, messageQueue, compHelper,
 				() => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
@@ -59,6 +66,28 @@ namespace System.Threading.Tasks.Dataflow {
 				dataflowBlockOptions);
 		}
 
+		public TransformBlock(Func<TInput, Task<TOutput>> transform)
+			: this(transform, ExecutionDataflowBlockOptions.Default)
+		{
+		}
+
+		public TransformBlock (Func<TInput, Task<TOutput>> transform,
+		                       ExecutionDataflowBlockOptions dataflowBlockOptions)
+			: this (dataflowBlockOptions)
+		{
+			if (transform == null)
+				throw new ArgumentNullException("transform");
+
+			this.asyncTransform = transform;
+			this.messageBox = new AsyncExecutingMessageBox<TInput, Task<TOutput>> (
+				this, messageQueue, compHelper, () => outgoing.IsCompleted,
+				AsyncTransformProcess, AsyncProcessFinishedTask, () => outgoing.Complete (),
+				dataflowBlockOptions);
+			this.outgoing = new OutgoingQueue<TOutput> (this, compHelper,
+				() => messageQueue.IsCompleted, messageBox.DecreaseCount,
+				dataflowBlockOptions);
+		}
+
 		DataflowMessageStatus ITargetBlock<TInput>.OfferMessage (
 			DataflowMessageHeader messageHeader, TInput messageValue,
 			ISourceBlock<TInput> source, bool consumeToAccept)
@@ -111,6 +140,27 @@ namespace System.Threading.Tasks.Dataflow {
 			return dequeued;
 		}
 
+		bool AsyncTransformProcess (out Task<TOutput> task)
+		{
+			TInput input;
+
+			var dequeued = messageQueue.TryTake (out input);
+			if (dequeued)
+				task = asyncTransform (input);
+			else
+				task = null;
+
+			return dequeued;
+		}
+
+		void AsyncProcessFinishedTask (Task<TOutput> task)
+		{
+			if (task == null || task.IsCanceled)
+				messageBox.DecreaseCount ();
+			else
+				outgoing.AddData (task.Result);
+		}
+
 		public void Complete ()
 		{
 			messageBox.Complete ();

+ 69 - 15
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs

@@ -32,8 +32,18 @@ namespace System.Threading.Tasks.Dataflow {
 		readonly MessageBox<TInput> messageBox;
 		readonly ExecutionDataflowBlockOptions dataflowBlockOptions;
 		readonly Func<TInput, IEnumerable<TOutput>> transform;
+		readonly Func<TInput, Task<IEnumerable<TOutput>>> asyncTransform;
 		readonly OutgoingQueue<TOutput> outgoing;
 
+		TransformManyBlock (ExecutionDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+			
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.compHelper = new CompletionHelper (dataflowBlockOptions);
+		}
+
 		public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transform)
 			: this (transform, ExecutionDataflowBlockOptions.Default)
 		{
@@ -41,15 +51,12 @@ namespace System.Threading.Tasks.Dataflow {
 
 		public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transform,
 		                           ExecutionDataflowBlockOptions dataflowBlockOptions)
+			: this (dataflowBlockOptions)
 		{
 			if (transform == null)
 				throw new ArgumentNullException ("transform");
-			if (dataflowBlockOptions == null)
-				throw new ArgumentNullException ("dataflowBlockOptions");
 
 			this.transform = transform;
-			this.dataflowBlockOptions = dataflowBlockOptions;
-			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
 			this.messageBox = new ExecutingMessageBox<TInput> (this, messageQueue, compHelper,
 				() => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
 				dataflowBlockOptions);
@@ -58,6 +65,27 @@ namespace System.Threading.Tasks.Dataflow {
 				dataflowBlockOptions);
 		}
 
+		public TransformManyBlock (Func<TInput, Task<IEnumerable<TOutput>>> transform)
+			: this (transform, ExecutionDataflowBlockOptions.Default)
+		{
+		}
+
+		public TransformManyBlock (Func<TInput, Task<IEnumerable<TOutput>>> transform,
+		                           ExecutionDataflowBlockOptions dataflowBlockOptions)
+			: this (dataflowBlockOptions)
+		{
+			if (transform == null)
+				throw new ArgumentNullException ("transform");
+
+			this.asyncTransform = transform;
+			this.messageBox = new AsyncExecutingMessageBox<TInput, Task<IEnumerable<TOutput>>> (this, messageQueue, compHelper,
+				() => outgoing.IsCompleted, AsyncTransformProcess, ProcessFinishedTask, () => outgoing.Complete (),
+				dataflowBlockOptions);
+			this.outgoing = new OutgoingQueue<TOutput> (this, compHelper,
+				() => messageQueue.IsCompleted, messageBox.DecreaseCount,
+				dataflowBlockOptions);			
+		}
+
 		DataflowMessageStatus ITargetBlock<TInput>.OfferMessage (
 			DataflowMessageHeader messageHeader, TInput messageValue,
 			ISourceBlock<TInput> source, bool consumeToAccept)
@@ -107,23 +135,49 @@ namespace System.Threading.Tasks.Dataflow {
 			if (dequeued) {
 				var result = transform (input);
 
-				bool first = true;
-				if (result != null) {
-					foreach (var item in result) {
-						if (first)
-							first = false;
-						else
-							messageBox.IncreaseCount ();
-						outgoing.AddData (item);
-					}
+				EnqueueTransformed (result);
+			}
+
+			return dequeued;
+		}
+
+		void EnqueueTransformed (IEnumerable<TOutput> transformed)
+		{
+			bool first = true;
+			if (transformed != null) {
+				foreach (var item in transformed) {
+					if (first)
+						first = false;
+					else
+						messageBox.IncreaseCount ();
+					outgoing.AddData (item);
 				}
-				if (first)
-					messageBox.DecreaseCount ();
 			}
+			if (first)
+				messageBox.DecreaseCount ();
+		}
+
+		bool AsyncTransformProcess (out Task<IEnumerable<TOutput>> task)
+		{
+			TInput input;
+
+			var dequeued = messageQueue.TryTake (out input);
+			if (dequeued)
+				task = asyncTransform (input);
+			else
+				task = null;
 
 			return dequeued;
 		}
 
+		void ProcessFinishedTask (Task<IEnumerable<TOutput>> task)
+		{
+			if (task == null || task.IsCanceled)
+				messageBox.DecreaseCount ();
+			else
+				EnqueueTransformed (task.Result);
+		}
+
 		public void Complete ()
 		{
 			messageBox.Complete ();

+ 45 - 6
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs

@@ -3,8 +3,10 @@
 //  
 // Author:
 //       Jérémie "garuma" Laval <[email protected]>
+//       Petr Onderka <[email protected]>
 // 
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 // 
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
@@ -26,15 +28,14 @@
 
 using System.Linq;
 using System.Threading;
+using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
 
 using NUnit.Framework;
 
-namespace MonoTests.System.Threading.Tasks.Dataflow
-{
+namespace MonoTests.System.Threading.Tasks.Dataflow {
 	[TestFixture]
-	public class ActionBlockTest
-	{
+	public class ActionBlockTest {
 		[Test]
 		public void BasicUsageTest ()
 		{
@@ -53,7 +54,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 		[Test]
 		public void CompleteTest ()
 		{
-			ActionBlock<int> block = new ActionBlock<int> ((i) => Thread.Sleep (100));
+			var block = new ActionBlock<int> (i => Thread.Sleep (100));
 
 			for (int i = 0; i < 10; i++)
 				Assert.IsTrue (block.Post (i), "Not Accepted");
@@ -64,5 +65,43 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 			block.Completion.Wait ();
 			Assert.IsTrue (block.Completion.IsCompleted);
 		}
+
+		[Test]
+		public void AsyncNullTest()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new ActionBlock<int> (
+				i => null,
+				new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			Assert.IsTrue (block.Post (1));
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsFalse (block.Completion.Wait (100));
+
+			block.Complete ();
+
+			Assert.IsTrue (block.Completion.Wait (100));
+		}
+
+		[Test]
+		public void AsyncCancelledTest()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new ActionBlock<int> (
+				i =>
+				{
+					var tcs = new TaskCompletionSource<int> ();
+					tcs.SetCanceled ();
+					return tcs.Task;
+				}, new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			Assert.IsTrue (block.Post (1));
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsFalse (block.Completion.Wait (100));
+		}
 	}
-}
+}

+ 97 - 4
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs

@@ -22,7 +22,9 @@
 
 using System;
 using System.Collections.Generic;
+using System.Linq;
 using System.Threading;
+using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
 using NUnit.Framework;
 
@@ -40,14 +42,35 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 			yield return new TransformManyBlock<int, int> (i =>
 			{
 				action ();
-				return new int[0];
+				return Enumerable.Empty<int> ();
 			});
 		}
-			
+
+		static IEnumerable<ITargetBlock<int>> GetExecutionBlocksWithAsyncAction (
+			Func<int, Task> action, ExecutionDataflowBlockOptions options)
+		{
+			yield return new ActionBlock<int> (action, options);
+			yield return new TransformBlock<int, int> (
+				i => action (i).ContinueWith (
+					t =>
+					{
+						t.Wait ();
+						return i;
+					}), options);
+			yield return new TransformManyBlock<int, int> (
+				i => action (i).ContinueWith (
+					t =>
+					{
+						t.Wait ();
+						return Enumerable.Empty<int> ();
+					}), options);
+		}
+
 		[Test]
 		public void ExceptionTest ()
 		{
-			var blocks = GetExecutionBlocksWithAction (() => { throw new Exception (); });
+			var exception = new Exception ();
+			var blocks = GetExecutionBlocksWithAction (() => { throw exception; });
 			foreach (var block in blocks) {
 				Assert.IsFalse (block.Completion.Wait (100));
 
@@ -56,7 +79,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 				var ae =
 					AssertEx.Throws<AggregateException> (() => block.Completion.Wait (100));
 				Assert.AreEqual (1, ae.InnerExceptions.Count);
-				Assert.AreEqual (typeof(Exception), ae.InnerException.GetType ());
+				Assert.AreSame (exception, ae.InnerException);
 			}
 		}
 
@@ -102,5 +125,75 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 				Assert.AreEqual (0, Thread.VolatileRead (ref ranAfterFault));
 			}
 		}
+
+		[Test]
+		public void AsyncTest ()
+		{
+			var tcs = new TaskCompletionSource<int> ();
+			int result = 0;
+
+			var scheduler = new TestScheduler ();
+
+			var blocks = GetExecutionBlocksWithAsyncAction (
+				i =>
+				tcs.Task.ContinueWith (t => Thread.VolatileWrite (ref result, i + t.Result)),
+				new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			foreach (var block in blocks) {
+				Assert.IsTrue (block.Post (1));
+
+				scheduler.ExecuteAll ();
+				Thread.Sleep (100);
+				Thread.MemoryBarrier ();
+
+				Assert.AreEqual (0, result);
+
+				tcs.SetResult (10);
+
+				Thread.Sleep (100);
+
+				// the continuation should be executed on the configured TaskScheduler
+				Assert.AreEqual (0, result);
+
+				scheduler.ExecuteAll ();
+
+				Assert.AreEqual (11, result);
+
+				tcs = new TaskCompletionSource<int> ();
+				Thread.VolatileWrite (ref result, 0);
+			}
+		}
+
+		[Test]
+		public void AsyncExceptionTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var exception = new Exception ();
+
+			var blocks = GetExecutionBlocksWithAsyncAction (
+				i =>
+				{
+					var tcs = new TaskCompletionSource<int> ();
+					tcs.SetException (exception);
+					return tcs.Task;
+				},
+				new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			foreach (var block in blocks) {
+				Assert.IsTrue (block.Post (1));
+
+				// the task should be executed on the configured TaskScheduler
+				Assert.IsFalse (block.Completion.Wait (100));
+
+				scheduler.ExecuteAll ();
+
+				var ae =
+					AssertEx.Throws<AggregateException> (() => block.Completion.Wait (100)).
+						Flatten ();
+
+				Assert.AreEqual (1, ae.InnerExceptions.Count);
+				Assert.AreSame (exception, ae.InnerException);
+			}
+		}
 	}
 }

+ 49 - 11
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformBlockTest.cs

@@ -3,8 +3,10 @@
 //  
 // Author:
 //       Jérémie "garuma" Laval <[email protected]>
+//       Petr Onderka <[email protected]>
 // 
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 // 
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
@@ -26,15 +28,14 @@
 
 using System;
 using System.Threading;
+using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
 
 using NUnit.Framework;
 
-namespace MonoTests.System.Threading.Tasks.Dataflow
-{
+namespace MonoTests.System.Threading.Tasks.Dataflow {
 	[TestFixture]
-	public class TransformBlockTest
-	{
+	public class TransformBlockTest {
 		[Test]
 		public void BasicUsageTest ()
 		{
@@ -61,18 +62,55 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 		public void DeferredUsageTest ()
 		{
 			int[] array = new int[10];
-			var evt = new ManualResetEventSlim (false);
-			ActionBlock<int> action = new ActionBlock<int> ((i) => array[Math.Abs (i)] = i);
-			TransformBlock<int, int> block = new TransformBlock<int, int> (i => -i);
+			var action = new ActionBlock<int> (i => array[Math.Abs (i)] = i);
+			var block = new TransformBlock<int, int> (i => -i);
 
 			for (int i = 0; i < array.Length; ++i)
 				Assert.IsTrue (block.Post (i), "Not accepted");
 
-			Thread.Sleep (1300);
+			Thread.Sleep (300);
 			block.LinkTo (action);
-			Thread.Sleep (600);
+			Thread.Sleep (100);
+
+			CollectionAssert.AreEqual (new[] { 0, -1, -2, -3, -4, -5, -6, -7, -8, -9 }, array);
+		}
+
+		[Test]
+		public void AsyncNullTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new TransformBlock<int, int> (
+				i => null,
+				new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			Assert.IsTrue (block.Post (1));
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsFalse (block.Completion.Wait (100));
+
+			block.Complete ();
+
+			Assert.IsTrue (block.Completion.Wait (100));
+		}
+
+		[Test]
+		public void AsyncCancelledTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new TransformBlock<int, int> (
+				i =>
+				{
+					var tcs = new TaskCompletionSource<int> ();
+					tcs.SetCanceled ();
+					return tcs.Task;
+				}, new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			Assert.IsTrue (block.Post (1));
+
+			scheduler.ExecuteAll ();
 
-			CollectionAssert.AreEqual (new int[] { 0, -1, -2, -3, -4, -5, -6, -7, -8, -9 }, array);
+			Assert.IsFalse (block.Completion.Wait (100));
 		}
 	}
-}
+}

+ 41 - 4
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformManyBlockTest.cs

@@ -29,15 +29,14 @@
 using System.Collections.Generic;
 using System.Linq;
 using System.Threading;
+using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
 
 using NUnit.Framework;
 
-namespace MonoTests.System.Threading.Tasks.Dataflow
-{
+namespace MonoTests.System.Threading.Tasks.Dataflow {
 	[TestFixture]
-	public class TransformManyBlockTest
-	{
+	public class TransformManyBlockTest {
 		[Test]
 		public void BasicUsageTest ()
 		{
@@ -92,5 +91,43 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 			Assert.IsTrue (transformMany.Completion.Wait (100));
 			Assert.IsFalse (received);
 		}
+
+		[Test]
+		public void AsyncNullTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new TransformManyBlock<int, int> (
+				i => (Task<IEnumerable<int>>)null,
+				new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			Assert.IsTrue (block.Post (1));
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsFalse (block.Completion.Wait (100));
+
+			block.Complete ();
+
+			Assert.IsTrue (block.Completion.Wait (100));
+		}
+
+		[Test]
+		public void AsyncCancelledTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new TransformManyBlock<int, int> (
+				i =>
+				{
+					var tcs = new TaskCompletionSource<IEnumerable<int>> ();
+					tcs.SetCanceled ();
+					return tcs.Task;
+				}, new ExecutionDataflowBlockOptions { TaskScheduler = scheduler });
+
+			Assert.IsTrue (block.Post (1));
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsFalse (block.Completion.Wait (100));
+		}
 	}
 }