Browse Source

Corrected handling of exceptions

Also fixed some race conditions.
Petr Onderka 13 years ago
parent
commit
746c068fd0
22 changed files with 324 additions and 119 deletions
  1. 1 0
      mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs
  2. 3 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj
  3. 7 6
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs
  4. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs
  5. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs
  6. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs
  7. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs
  8. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs
  9. 58 8
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs
  10. 60 23
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs
  11. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs
  12. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs
  13. 7 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs
  14. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs
  15. 10 10
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs
  16. 17 18
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs
  17. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
  18. 3 5
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs
  19. 1 1
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs
  20. 22 22
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs
  21. 106 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs
  22. 20 11
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/OptionsTest.cs

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/Assembly/AssemblyInfo.cs

@@ -57,3 +57,4 @@ using System.Runtime.InteropServices;
 
 [assembly: ComVisible (false)]
 
+[assembly: InternalsVisibleTo("System.Threading.Tasks.Dataflow_test_net_4_5")]

+ 3 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj

@@ -1,4 +1,4 @@
-<?xml version="1.0" encoding="utf-8"?>
+<?xml version="1.0" encoding="utf-8"?>
 <Project ToolsVersion="4.0" DefaultTargets="Build" xmlns="http://schemas.microsoft.com/developer/msbuild/2003">
   <PropertyGroup>
     <Configuration Condition=" '$(Configuration)' == '' ">Debug</Configuration>
@@ -42,11 +42,11 @@
   </PropertyGroup>
   <Import Project="$(MSBuildToolsPath)\Microsoft.CSharp.targets" />
   <ItemGroup>
+    <Compile Include="Test\System.Threading.Tasks.Dataflow\ExecutionBlocksTest.cs" />
     <Compile Include="Test\System.Threading.Tasks.Dataflow\BatchedJoinBlockTest.cs" />
     <Compile Include="Test\System.Threading.Tasks.Dataflow\BatchedJoinBlock`3Test.cs" />
    <Compile Include="Test\System.Threading.Tasks.Dataflow\DataflowMessageHeaderTest.cs" />
    <Compile Include="Test\System.Threading.Tasks.Dataflow\CompletionHelperTest.cs" />
-   <Compile Include="System.Threading.Tasks.Dataflow\CompletionHelper.cs" />
    <Compile Include="Test\System.Threading.Tasks.Dataflow\ActionBlockTest.cs" />
    <Compile Include="Test\System.Threading.Tasks.Dataflow\BatchBlockTest.cs" />
    <Compile Include="Test\System.Threading.Tasks.Dataflow\BroadcastBlockTest.cs" />
@@ -91,4 +91,4 @@
     <Folder Include="Properties\" />
   </ItemGroup>
   
-</Project>
+</Project>

+ 7 - 6
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs

@@ -56,7 +56,8 @@ namespace System.Threading.Tasks.Dataflow
 			this.action = action;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue, compHelper, () => true, ProcessQueue, dataflowBlockOptions);
+			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue, compHelper,
+				() => true, ProcessItem, () => { }, dataflowBlockOptions);
 		}
 
 		[MonoTODO]
@@ -79,13 +80,13 @@ namespace System.Threading.Tasks.Dataflow
 			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
 		}
 
-		void ProcessQueue (int maxMessages)
+		bool ProcessItem ()
 		{
-			int i = 0;
 			TInput data;
-			while ((maxMessages == DataflowBlockOptions.Unbounded || i++ < maxMessages)
-			       && messageQueue.TryTake (out data))
+			bool dequeued = messageQueue.TryTake (out data);
+			if (dequeued)
 				action (data);
+			return dequeued;
 		}
 
 		public void Complete ()
@@ -95,7 +96,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs

@@ -166,7 +166,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs

@@ -145,7 +145,7 @@ namespace System.Threading.Tasks.Dataflow {
 
 		void IDataflowBlock.Fault (Exception exception)
 		{
-			completionHelper.Fault (exception);
+			completionHelper.RequestFault (exception);
 		}
 
 		Tuple<IList<T1>, IList<T2>> ISourceBlock<Tuple<IList<T1>, IList<T2>>>.ConsumeMessage (

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs

@@ -166,7 +166,7 @@ namespace System.Threading.Tasks.Dataflow {
 
 		void IDataflowBlock.Fault (Exception exception)
 		{
-			completionHelper.Fault (exception);
+			completionHelper.RequestFault (exception);
 		}
 
 		Tuple<IList<T1>, IList<T2>, IList<T3>>

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs

@@ -123,7 +123,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs

@@ -125,7 +125,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 58 - 8
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/CompletionHelper.cs

@@ -27,12 +27,20 @@ namespace System.Threading.Tasks.Dataflow
 {
 	/// <summary>
 	/// This is used to implement a default behavior for Dataflow completion tracking
-	/// that is the Completion property and Complete/Fault method combo 
+	/// that is the Completion property, Complete/Fault method combo
+	/// and the CancellationToken option.
 	/// </summary>
-	internal struct CompletionHelper
+	internal class CompletionHelper
 	{
 		TaskCompletionSource<object> source;
 
+		private readonly AtomicBoolean canFaultOrCancelImmediatelly =
+			new AtomicBoolean { Value = true };
+		private readonly AtomicBoolean requestedFaultOrCancel =
+			new AtomicBoolean { Value = false };
+
+		Exception requestedException;
+
 		public static CompletionHelper GetNew (DataflowBlockOptions options)
 		{
 			var completionHelper = new CompletionHelper { source = new TaskCompletionSource<object> () };
@@ -45,23 +53,65 @@ namespace System.Threading.Tasks.Dataflow
 			get { return source.Task; }
 		}
 
+		public bool CanFaultOrCancelImmediatelly {
+			get { return canFaultOrCancelImmediatelly.Value; }
+			set {
+				if (value) {
+					if (canFaultOrCancelImmediatelly.TrySet () && requestedFaultOrCancel.Value) {
+						if (requestedException == null)
+							Cancel ();
+						else
+							Fault (requestedException);
+					}
+				} else
+					canFaultOrCancelImmediatelly.Value = false;
+			}
+		}
+
+		public bool CanRun {
+			get {
+				return source.Task.Status == TaskStatus.WaitingForActivation
+				       && !requestedFaultOrCancel.Value;
+			}
+		}
+
 		public void Complete ()
 		{
 			source.TrySetResult (null);
 		}
 
-		public void Fault (Exception ex)
+		public void RequestFault (Exception ex)
+		{
+			if (CanFaultOrCancelImmediatelly)
+				Fault (ex);
+			else {
+				Interlocked.CompareExchange (ref requestedException, ex, null);
+				requestedFaultOrCancel.Value = true;
+			}
+		}
+
+		void Fault (Exception ex)
 		{
 			source.TrySetException (ex);
 		}
 
+		void RequestCancel ()
+		{
+			if (CanFaultOrCancelImmediatelly)
+				Cancel();
+			else
+				requestedFaultOrCancel.Value = true;
+		}
+
+		void Cancel ()
+		{
+			source.TrySetCanceled ();
+		}
+
 		void SetOptions (DataflowBlockOptions options)
 		{
-			// source can't be used in a lambda directly
-			var sourceTmp = source;
 			if (options.CancellationToken != CancellationToken.None)
-				options.CancellationToken.Register (
-					() => sourceTmp.TrySetCanceled ());
+				options.CancellationToken.Register (RequestCancel);
 		}
 	}
-}
+}

+ 60 - 23
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs

@@ -27,64 +27,101 @@ namespace System.Threading.Tasks.Dataflow
 	internal class ExecutingMessageBox<TInput> : MessageBox<TInput>
 	{
 		readonly ExecutionDataflowBlockOptions options;
-		readonly Action<int> processQueue;
-		CompletionHelper compHelper;
+		readonly Func<bool> processItem;
+		readonly Action outgoingQueueComplete;
+		readonly CompletionHelper compHelper;
 
-		readonly AtomicBoolean waitingTask = new AtomicBoolean ();
-		int degreeOfParallelism;
+		// even number: Task is waiting to run
+		// odd number: Task is not waiting to run
+		// invariant: dop / 2 Tasks are running or waiting
+		int degreeOfParallelism = 1;
 
 		public ExecutingMessageBox (
 			BlockingCollection<TInput> messageQueue, CompletionHelper compHelper,
-			Func<bool> externalCompleteTester, Action<int> processQueue,
+			Func<bool> externalCompleteTester, Func<bool> processItem, Action outgoingQueueComplete,
 			ExecutionDataflowBlockOptions options)
 			: base (messageQueue, compHelper, externalCompleteTester)
 		{
 			this.options = options;
-			this.processQueue = processQueue;
+			this.processItem = processItem;
+			this.outgoingQueueComplete = outgoingQueueComplete;
 			this.compHelper = compHelper;
 		}
 
 		protected override void EnsureProcessing ()
 		{
-			if ((options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
-			     && Thread.VolatileRead (ref degreeOfParallelism) >= options.MaxDegreeOfParallelism) ||
-			    !waitingTask.TrySet ())
-				return;
-
 			StartProcessing ();
 		}
 
 		void StartProcessing ()
 		{
+			// atomically increase degreeOfParallelism by 1 only if it's odd
+			// and low enough
+			int startDegreeOfParallelism;
+			int currentDegreeOfParallelism = degreeOfParallelism;
+			do {
+				startDegreeOfParallelism = currentDegreeOfParallelism;
+				if (startDegreeOfParallelism % 2 == 0
+				    || (options.MaxDegreeOfParallelism != DataflowBlockOptions.Unbounded
+				        && startDegreeOfParallelism / 2 >= options.MaxDegreeOfParallelism))
+					return;
+				currentDegreeOfParallelism =
+					Interlocked.CompareExchange (ref degreeOfParallelism,
+						startDegreeOfParallelism + 1, startDegreeOfParallelism);
+			} while (startDegreeOfParallelism != currentDegreeOfParallelism);
+
 			Task.Factory.StartNew (ProcessQueue, TaskCreationOptions.PreferFairness);
 		}
 
 		void ProcessQueue ()
 		{
+			compHelper.CanFaultOrCancelImmediatelly = false;
+
 			int incrementedDegreeOfParallelism =
 				Interlocked.Increment (ref degreeOfParallelism);
 			if ((options.MaxDegreeOfParallelism == DataflowBlockOptions.Unbounded
-			     || incrementedDegreeOfParallelism < options.MaxDegreeOfParallelism)
-			    && (MessageQueue.Count > 0))
-				StartProcessing();
-			else
-				waitingTask.Value = false;
+			     || incrementedDegreeOfParallelism / 2 < options.MaxDegreeOfParallelism)
+			    && MessageQueue.Count > 0 && compHelper.CanRun)
+				StartProcessing ();
 
 			try {
-				processQueue (options.MaxMessagesPerTask);
+				int i = 0;
+				while (compHelper.CanRun
+				       && (options.MaxMessagesPerTask == DataflowBlockOptions.Unbounded
+				           || i++ < options.MaxMessagesPerTask)) {
+					if (!processItem ())
+						break;
+				}
 			} catch (Exception e) {
-				compHelper.Fault (e);
+				compHelper.RequestFault (e);
 			}
 
 			int decrementedDegreeOfParallelism =
-				Interlocked.Decrement (ref degreeOfParallelism);
+				Interlocked.Add (ref degreeOfParallelism, -2);
 
-			if (!waitingTask.Value) {
-				if (decrementedDegreeOfParallelism == 0 && MessageQueue.IsCompleted)
-					compHelper.Complete ();
-				else if (MessageQueue.Count > 0)
+			if (decrementedDegreeOfParallelism % 2 == 1) {
+				if (decrementedDegreeOfParallelism == 1) {
+					compHelper.CanFaultOrCancelImmediatelly = true;
+					base.VerifyCompleteness ();
+					if (MessageQueue.IsCompleted)
+						outgoingQueueComplete ();
+				}
+				if (MessageQueue.Count > 0)
 					EnsureProcessing ();
 			}
 		}
+
+		protected override void OutgoingQueueComplete ()
+		{
+			if (MessageQueue.IsCompleted
+			    && Thread.VolatileRead (ref degreeOfParallelism) == 1)
+				outgoingQueueComplete ();
+		}
+
+		protected override void VerifyCompleteness ()
+		{
+			if (Thread.VolatileRead (ref degreeOfParallelism) == 1)
+				base.VerifyCompleteness ();
+		}
 	}
 }

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs

@@ -102,7 +102,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs

@@ -107,7 +107,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 7 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs

@@ -85,14 +85,18 @@ namespace System.Threading.Tasks.Dataflow
 		{
 			// Make message queue complete
 			MessageQueue.CompleteAdding ();
+			OutgoingQueueComplete ();
 			VerifyCompleteness ();
 		}
 
-		void VerifyCompleteness ()
+		protected virtual void OutgoingQueueComplete ()
+		{
+		}
+
+		protected  virtual void VerifyCompleteness ()
 		{
 			if (MessageQueue.IsCompleted && externalCompleteTester ())
 				compHelper.Complete ();
 		}
 	}
-}
-
+}

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs

@@ -79,7 +79,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 			source.Fault (ex);
 			target.Fault (ex);
 		}

+ 10 - 10
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs

@@ -58,11 +58,10 @@ namespace System.Threading.Tasks.Dataflow
 			this.transformer = transformer;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
-			                                                   compHelper,
-			                                                   () => outgoing.IsCompleted,
-			                                                   TransformProcess,
-			                                                   dataflowBlockOptions);
+			this.messageBox = new ExecutingMessageBox<TInput> (
+				messageQueue, compHelper,
+				() => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
+				dataflowBlockOptions);
 			this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
 			this.vault = new MessageVault<TOutput> ();
 		}
@@ -107,14 +106,13 @@ namespace System.Threading.Tasks.Dataflow
 			return outgoing.TryReceiveAll (out items);
 		}
 
-		void TransformProcess (int maxMessages)
+		bool TransformProcess ()
 		{
-			int i = 0;
 			ITargetBlock<TOutput> target;
 			TInput input;
 
-			while ((maxMessages == DataflowBlockOptions.Unbounded || i++ < maxMessages)
-				&& messageQueue.TryTake (out input)) {
+			var dequeued = messageQueue.TryTake (out input);
+			if (dequeued) {
 				TOutput output = transformer (input);
 
 				if ((target = targets.Current) != null)
@@ -125,6 +123,8 @@ namespace System.Threading.Tasks.Dataflow
 
 			if (!outgoing.IsEmpty && (target = targets.Current) != null)
 				outgoing.ProcessForTarget (target, this, false, ref headers);
+
+			return dequeued;
 		}
 
 		public void Complete ()
@@ -134,7 +134,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 17 - 18
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs

@@ -58,11 +58,10 @@ namespace System.Threading.Tasks.Dataflow
 			this.transformer = transformer;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
-			                                                   compHelper,
-			                                                   () => outgoing.IsCompleted,
-			                                                   TransformProcess,
-			                                                   dataflowBlockOptions);
+			this.messageBox = new ExecutingMessageBox<TInput> (
+				messageQueue, compHelper,
+				() => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
+				dataflowBlockOptions);
 			this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
 			this.vault = new MessageVault<TOutput> ();
 		}
@@ -107,29 +106,29 @@ namespace System.Threading.Tasks.Dataflow
 			return outgoing.TryReceiveAll (out items);
 		}
 
-		void TransformProcess (int maxMessages)
+		bool TransformProcess ()
 		{
-			int i = 0;
 			ITargetBlock<TOutput> target;
 			TInput input;
 
-			while ((maxMessages == DataflowBlockOptions.Unbounded || i++ < maxMessages)
-				&& messageQueue.TryTake (out input)) {
+			var dequeued = messageQueue.TryTake (out input);
+			if (dequeued) {
 				var result = transformer (input);
 
-				if (result == null)
-					continue;
-
-				foreach (var item in result) {
-					if ((target = targets.Current) != null)
-						target.OfferMessage (headers.Increment (), item, this, false);
-					else
-						outgoing.AddData (item);
+				if (result != null) {
+					foreach (var item in result) {
+						if ((target = targets.Current) != null)
+							target.OfferMessage (headers.Increment (), item, this, false);
+						else
+							outgoing.AddData (item);
+					}
 				}
 			}
 
 			if (!outgoing.IsEmpty && (target = targets.Current) != null)
 				outgoing.ProcessForTarget (target, this, false, ref headers);
+
+			return dequeued;
 		}
 
 		public void Complete ()
@@ -139,7 +138,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs

@@ -156,7 +156,7 @@ namespace System.Threading.Tasks.Dataflow
 
 		public void Fault (Exception ex)
 		{
-			compHelper.Fault (ex);
+			compHelper.RequestFault (ex);
 		}
 
 		public Task Completion {

+ 3 - 5
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs

@@ -24,10 +24,8 @@
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
 
-using System;
 using System.Linq;
 using System.Threading;
-using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
 
 using NUnit.Framework;
@@ -41,13 +39,13 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 		public void BasicUsageTest ()
 		{
 			bool[] array = new bool[3];
-			CountdownEvent evt = new CountdownEvent (array.Length);
-			ActionBlock<int> block = new ActionBlock<int> ((i) => { array[i] = true; evt.Signal (); });
+			var evt = new CountdownEvent (array.Length);
+			var block = new ActionBlock<int> (i => { array[i] = true; evt.Signal (); });
 
 			for (int i = 0; i < array.Length; ++i)
 				Assert.IsTrue (block.Post (i), "Not accepted");
 
-			evt.Wait ();
+			Assert.IsTrue (evt.Wait (500));
 			
 			Assert.IsTrue (array.All (b => b), "Some false");
 		}

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionHelperTest.cs

@@ -57,7 +57,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 		public void FaultedTest ()
 		{
 			Exception ex = new ApplicationException ("Foobar");
-			helper.Fault (ex);
+			helper.RequestFault (ex);
 			Task completed = helper.Completion;
 
 			Assert.IsNotNull (completed);

+ 22 - 22
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs

@@ -37,41 +37,38 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 	[TestFixture]
 	public class CompletionTest
 	{
-		[Test]
-		public void WithNoElements ()
-		{
-			var block = new BufferBlock<int> ();
-			block.Post (42);
-			block.Complete ();
-			Console.WriteLine (block.Completion.IsCompleted);
-			Console.WriteLine (block.Completion.Status);
-			block.Receive ();
-			Console.WriteLine (block.Completion.IsCompleted);
-			Console.WriteLine (block.Completion.Status);
-		}
-
 		[Test]
 		public void WithElementsStillLingering ()
 		{
 			var block = new BufferBlock<int> ();
-			block.Post (42);
+			Assert.IsTrue (block.Post (42));
 			block.Complete ();
-			Console.WriteLine (block.Completion.IsCompleted);
-			Console.WriteLine (block.Completion.Status);
-			block.Receive ();
-			Console.WriteLine (block.Completion.IsCompleted);
-			Console.WriteLine (block.Completion.Status);
+
+			Assert.IsFalse (block.Completion.Wait (100));
+			Assert.IsFalse (block.Completion.IsCompleted);
+			Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status);
+
+			Assert.AreEqual (42, block.Receive ());
+
+			Assert.IsTrue (block.Completion.Wait (100));
+			Assert.IsTrue (block.Completion.IsCompleted);
+			Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status);
 		}
 
 		[Test]
 		public void EmptyAfterReceive ()
 		{
 			var block = new BufferBlock<int> ();
-			block.Post (42);
+			Assert.IsTrue (block.Post (42));
 			block.Complete ();
+
+			Assert.IsFalse (block.Completion.Wait (100));
 			Assert.IsFalse (block.Completion.IsCompleted);
 			Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status);
+
 			block.Receive ();
+
+			Assert.IsTrue (block.Completion.Wait (100));
 			Assert.IsTrue (block.Completion.IsCompleted);
 			Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status);
 		}
@@ -80,10 +77,13 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 		public void WithElementsStillLingeringButFaulted ()
 		{
 			var block = new BufferBlock<int> ();
-			block.Post (42);
+			Assert.IsTrue (block.Post (42));
 			((IDataflowBlock)block).Fault (new Exception ());
+
+			Assert.Throws<AggregateException> (() => block.Completion.Wait (100));
 			Assert.IsTrue (block.Completion.IsCompleted);
 			Assert.AreEqual (TaskStatus.Faulted, block.Completion.Status);
+			Assert.IsFalse (block.Post (43));
 		}
 	}
-}
+}

+ 106 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ExecutionBlocksTest.cs

@@ -0,0 +1,106 @@
+// ExecutionBlocksTest.cs
+//  
+// Copyright (c) 2012 Petr Onderka
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks.Dataflow;
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow {
+	[TestFixture]
+	public class ExecutionBlocksTest {
+		static IEnumerable<ITargetBlock<int>> GetExecutionBlocksWithAction(Action action)
+		{
+			yield return new ActionBlock<int> (i => action());
+			yield return new TransformBlock<int, int> (i =>
+			{
+				action ();
+				return i;
+			});
+			yield return new TransformManyBlock<int, int> (i =>
+			{
+				action ();
+				return new int[0];
+			});
+		}
+			
+		[Test]
+		public void ExceptionTest ()
+		{
+			var blocks = GetExecutionBlocksWithAction (() => { throw new Exception (); });
+			foreach (var block in blocks) {
+				Assert.IsFalse (block.Completion.Wait (100));
+
+				block.Post (1);
+
+				var ae =
+					Assert.Throws<AggregateException> (() => block.Completion.Wait (100));
+				Assert.AreEqual (1, ae.InnerExceptions.Count);
+				Assert.AreEqual (typeof(Exception), ae.InnerException.GetType ());
+			}
+		}
+
+		[Test]
+		public void NoProcessingAfterFaultTest ()
+		{
+			int shouldRun = 1;
+			int ranAfterFault = 0;
+			var evt = new ManualResetEventSlim ();
+
+			var blocks = GetExecutionBlocksWithAction (() =>
+			{
+				if (Thread.VolatileRead (ref shouldRun) == 0) {
+					ranAfterFault++;
+					return;
+				}
+
+				evt.Wait ();
+			});
+
+			foreach (var block in blocks) {
+				shouldRun = 1;
+				ranAfterFault = 0;
+				evt.Reset ();
+
+				Assert.IsTrue (block.Post (1));
+				Assert.IsTrue (block.Post (2));
+
+				Assert.IsFalse (block.Completion.Wait (100));
+				Assert.AreEqual (0, ranAfterFault);
+
+				block.Fault (new Exception ());
+
+				Assert.IsFalse (block.Completion.Wait (100));
+
+				shouldRun = 0;
+				evt.Set ();
+
+				Assert.Throws<AggregateException> (() => block.Completion.Wait (100));
+
+				Thread.Sleep (100);
+
+				Assert.AreEqual (0, Thread.VolatileRead (ref ranAfterFault));
+			}
+		}
+	}
+}

+ 20 - 11
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/OptionsTest.cs

@@ -125,7 +125,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 					q => new TransformManyBlock<int, int> (i =>
 					{
 						q.Enqueue (Tuple.Create (i, Task.CurrentId.Value));
-						return new int[0];
+						return new[] { i };
 					}, options)
 				};
 
@@ -139,7 +139,14 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 					block.Post (i);
 
 				block.Complete ();
-				Assert.IsTrue (block.Completion.Wait (500), queue.Count.ToString());
+
+				var source = block as ISourceBlock<int>;
+				if (source != null) {
+					Assert.IsFalse (block.Completion.Wait (100));
+
+					source.LinkTo (new BufferBlock<int> ());
+				}
+				Assert.IsTrue (block.Completion.Wait (500));
 
 				CollectionAssert.AreEquivalent (
 					Enumerable.Range (0, 100), queue.Select (t => t.Item1));
@@ -174,7 +181,7 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 
 			foreach (var last in lasts)
 				times [last.Value] = Tuple.Create<int?, int?> (
-					times [last.Value].Item2, last.Key);
+					times [last.Value].Item1, last.Key);
 
 			int maxDop = 0;
 			int dop = 0;
@@ -195,23 +202,25 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 		[Test]
 		public void MaxDegreeOfParallelismTest()
 		{
-			for (int i = 0; i < 10;i++)
+			// loop to better test for race conditions
+			// some that showed in this test were quite rare
+			for (int i = 0; i < 10; i++)
 			{
-				var options = new ExecutionDataflowBlockOptions();
+				var options = new ExecutionDataflowBlockOptions ();
 				foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
-					Assert.AreEqual(1, CalculateDegreeOfParallelism (taskIds));
+					Assert.AreEqual (1, CalculateDegreeOfParallelism (taskIds));
 
 				options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 2 };
-				foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
+				foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
 					Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 2);
 
 				options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 };
-				foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
-					Assert.LessOrEqual(CalculateDegreeOfParallelism (taskIds), 4);
+				foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
+					Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), 4);
 
 				options = new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = -1 };
-				foreach (var taskIds in GetTaskIdsForExecutionsOptions(options))
-					Assert.LessOrEqual(CalculateDegreeOfParallelism (taskIds), taskIds.Length);
+				foreach (var taskIds in GetTaskIdsForExecutionsOptions (options))
+					Assert.LessOrEqual (CalculateDegreeOfParallelism (taskIds), taskIds.Length);
 			}
 		}