Browse Source

Implemented NullTarget()

Petr Onderka 13 years ago
parent
commit
f5aa9b18cc

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj

@@ -47,6 +47,7 @@
    <Compile Include="..\..\build\common\MonoTODOAttribute.cs" />
    <Compile Include="Assembly\AssemblyInfo.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BroadcastOutgoingQueue.cs" />
+    <Compile Include="System.Threading.Tasks.Dataflow\NullTargetBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\OutgoingQueueBase.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\OutputAvailableBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\PredicateBlock.cs" />

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources

@@ -44,3 +44,4 @@ System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
 System.Threading.Tasks.Dataflow/SendBlock.cs
 System.Threading.Tasks.Dataflow/PredicateBlock.cs
 System.Threading.Tasks.Dataflow/OutputAvailableBlock.cs
+System.Threading.Tasks.Dataflow/NullTargetBlock.cs

+ 5 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs

@@ -296,5 +296,10 @@ namespace System.Threading.Tasks.Dataflow {
 			var block = new SendBlock<TInput> (target, item, cancellationToken);
 			return block.Send ();
 		}
+
+		public static ITargetBlock<TInput> NullTarget<TInput>()
+		{
+			return new NullTargetBlock<TInput> ();
+		}
 	}
 }

+ 1 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs

@@ -92,9 +92,9 @@ namespace System.Threading.Tasks.Dataflow {
 			}
 
 			if (consumeToAccept) {
-				bool consummed;
 				if (!source.ReserveMessage (messageHeader, Target))
 					return DataflowMessageStatus.NotAvailable;
+				bool consummed;
 				messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
 				if (!consummed)
 					return DataflowMessageStatus.NotAvailable;

+ 67 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/NullTargetBlock.cs

@@ -0,0 +1,67 @@
+// NullTargetBlock.cs
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+namespace System.Threading.Tasks.Dataflow {
+	/// <summary>
+	/// Target block returned by <see cref="DataflowBlock.NullTarget{TInput}"/>.
+	/// </summary>
+	class NullTargetBlock<TInput> : ITargetBlock<TInput> {
+		public NullTargetBlock ()
+		{
+			Completion = new TaskCompletionSource<bool> ().Task;
+		}
+
+		public DataflowMessageStatus OfferMessage (
+			DataflowMessageHeader messageHeader, TInput messageValue,
+			ISourceBlock<TInput> source, bool consumeToAccept)
+		{
+			if (!messageHeader.IsValid)
+				throw new ArgumentException ("The messageHeader is not valid.",
+					"messageHeader");
+			if (consumeToAccept && source == null)
+				throw new ArgumentException (
+					"consumeToAccept may only be true if provided with a non-null source.",
+					"consumeToAccept");
+
+			if (consumeToAccept) {
+				if (!source.ReserveMessage (messageHeader, this))
+					return DataflowMessageStatus.NotAvailable;
+				bool consummed;
+				source.ConsumeMessage (messageHeader, this, out consummed);
+				if (!consummed)
+					return DataflowMessageStatus.NotAvailable;
+			}
+
+			return DataflowMessageStatus.Accepted;
+		}
+
+		public Task Completion { get; private set; }
+
+		public void Complete ()
+		{
+		}
+
+		public void Fault (Exception exception)
+		{
+		}
+	}
+}

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/Blocks.cs

@@ -105,6 +105,7 @@ namespace MonoTests {
 			yield return new TransformBlock<T, T> (i => i);
 			yield return new TransformManyBlock<T, T>(x => new T[0]);
 			yield return new WriteOnceBlock<T> (x => x);
+			yield return DataflowBlock.NullTarget<T> ();
 		}
 	}
 }

+ 27 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs

@@ -336,5 +336,32 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 			Assert.IsTrue (target.TryReceive (out item));
 			Assert.AreEqual (10, item);
 		}
+
+		[Test]
+		public void NullTargetTest ()
+		{
+			var target = DataflowBlock.NullTarget<int> ();
+			Assert.IsTrue (target.Post (1));
+
+			var source = new TestSourceBlock<int> ();
+			var header = new DataflowMessageHeader (1);
+			source.AddMessage (header, 2);
+
+			Assert.IsFalse (source.WasConsumed (header));
+
+			Assert.AreEqual (DataflowMessageStatus.Accepted,
+				target.OfferMessage (header, 2, source, true));
+			Assert.IsTrue (source.WasConsumed (header));
+
+			Assert.IsFalse (target.Completion.Wait (100));
+
+			target.Complete ();
+
+			Assert.IsFalse (target.Completion.Wait (100));
+
+			target.Fault (new Exception ());
+
+			Assert.IsFalse (target.Completion.Wait (100));
+		}
 	}
 }