Prechádzať zdrojové kódy

Implemented LinkTo() with predicate

Petr Onderka 13 rokov pred
rodič
commit
6dc84bab1d

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj

@@ -48,6 +48,7 @@
    <Compile Include="Assembly\AssemblyInfo.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BroadcastOutgoingQueue.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\OutgoingQueueBase.cs" />
+    <Compile Include="System.Threading.Tasks.Dataflow\PredicateBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\SendBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock`3.cs" />

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources

@@ -42,3 +42,4 @@ System.Threading.Tasks.Dataflow/TransformBlock.cs
 System.Threading.Tasks.Dataflow/TransformManyBlock.cs
 System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
 System.Threading.Tasks.Dataflow/SendBlock.cs
+System.Threading.Tasks.Dataflow/PredicateBlock.cs

+ 3 - 2
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs

@@ -132,7 +132,6 @@ namespace System.Threading.Tasks.Dataflow {
 			return source.LinkTo (target, DataflowLinkOptions.Default, predicate);
 		}
 
-		[MonoTODO("Use predicate")]
 		public static IDisposable LinkTo<TOutput> (
 			this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target,
 			DataflowLinkOptions linkOptions, Predicate<TOutput> predicate)
@@ -142,7 +141,9 @@ namespace System.Threading.Tasks.Dataflow {
 			if (predicate == null)
 				throw new ArgumentNullException("predicate");
 
-			return source.LinkTo (target, linkOptions);
+			var predicateBlock = new PredicateBlock<TOutput> (source, target, predicate);
+
+			return source.LinkTo (predicateBlock, linkOptions);
 		}
 
 		[MonoTODO]

+ 131 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PredicateBlock.cs

@@ -0,0 +1,131 @@
+// PredicateBlock.cs
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+namespace System.Threading.Tasks.Dataflow {
+	/// <summary>
+	/// This block is used by the version of <see cref="DataflowBlock.LinkTo"/>
+	/// that has a predicate to wrap the target block,
+	/// so that the predicate can be checked.
+	/// </summary>
+	class PredicateBlock<T> : ITargetBlock<T> {
+		/// <summary>
+		/// Wraps the source block of the link.
+		/// This is necessary so that the communication from target to source works correctly.
+		/// </summary>
+		class SourceBlock : ISourceBlock<T> {
+			readonly ISourceBlock<T> actualSource;
+			readonly PredicateBlock<T> predicateBlock;
+
+			public SourceBlock (ISourceBlock<T> actualSource,
+			                    PredicateBlock<T> predicateBlock)
+			{
+				this.actualSource = actualSource;
+				this.predicateBlock = predicateBlock;
+			}
+
+			public Task Completion
+			{
+				get { return actualSource.Completion; }
+			}
+
+			public void Complete ()
+			{
+				actualSource.Complete ();
+			}
+
+			public void Fault (Exception exception)
+			{
+				actualSource.Fault (exception);
+			}
+
+			public T ConsumeMessage (DataflowMessageHeader messageHeader,
+			                         ITargetBlock<T> target, out bool messageConsumed)
+			{
+				return actualSource.ConsumeMessage (messageHeader, predicateBlock,
+					out messageConsumed);
+			}
+
+			public IDisposable LinkTo (ITargetBlock<T> target,
+			                           DataflowLinkOptions linkOptions)
+			{
+				return actualSource.LinkTo (target, linkOptions);
+			}
+
+			public void ReleaseReservation (DataflowMessageHeader messageHeader,
+			                                ITargetBlock<T> target)
+			{
+				actualSource.ReleaseReservation (messageHeader, predicateBlock);
+			}
+
+			public bool ReserveMessage (DataflowMessageHeader messageHeader,
+			                            ITargetBlock<T> target)
+			{
+				return actualSource.ReserveMessage (messageHeader, predicateBlock);
+			}
+		}
+
+		readonly ITargetBlock<T> actualTarget;
+		readonly Predicate<T> predicate;
+		readonly SourceBlock sourceBlock;
+
+		public PredicateBlock (ISourceBlock<T> actualSource,
+		                       ITargetBlock<T> actualTarget, Predicate<T> predicate)
+		{
+			this.actualTarget = actualTarget;
+			this.predicate = predicate;
+			sourceBlock = new SourceBlock (actualSource, this);
+		}
+
+		public DataflowMessageStatus OfferMessage (
+			DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
+			bool consumeToAccept)
+		{
+			if (!messageHeader.IsValid)
+				throw new ArgumentException ("The messageHeader is not valid.",
+					"messageHeader");
+			if (consumeToAccept && source == null)
+				throw new ArgumentException (
+					"consumeToAccept may only be true if provided with a non-null source.",
+					"consumeToAccept");
+
+			if (!predicate(messageValue))
+				return DataflowMessageStatus.Declined;
+
+			return actualTarget.OfferMessage (messageHeader, messageValue, sourceBlock,
+				consumeToAccept);
+		}
+
+		public Task Completion {
+			get { return actualTarget.Completion; }
+		}
+
+		public void Complete ()
+		{
+			actualTarget.Complete ();
+		}
+
+		public void Fault (Exception exception)
+		{
+			actualTarget.Fault (exception);
+		}
+	}
+}

+ 103 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs

@@ -237,5 +237,108 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 			Assert.IsTrue (task.Wait (100));
 			Assert.IsFalse (task.Result);
 		}
+
+		[Test]
+		public void LinkToPredicateTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var source = new BufferBlock<int> (
+				new DataflowBlockOptions { TaskScheduler = scheduler });
+			var target = new BufferBlock<int> ();
+			source.LinkTo (target, i => i % 2 == 1);
+
+			Assert.IsTrue (source.Post (1));
+			Assert.IsTrue (source.Post (2));
+			Assert.IsTrue (source.Post (3));
+
+			scheduler.ExecuteAll ();
+
+			int item;
+			Assert.IsTrue (target.TryReceive (out item));
+			Assert.AreEqual (1, item);
+			Assert.IsFalse (target.TryReceive (out item));
+
+			Assert.IsTrue (source.TryReceive (out item));
+			Assert.AreEqual (2, item);
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsTrue (target.TryReceive (out item));
+			Assert.AreEqual (3, item);
+		}
+
+		[Test]
+		public void LinkToPredicateMaxMessagesTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var source = new BufferBlock<int> (
+				new DataflowBlockOptions { TaskScheduler = scheduler });
+			var target = new BufferBlock<int> ();
+			source.LinkTo (target, new DataflowLinkOptions { MaxMessages = 1 },
+				i => i % 2 == 1);
+
+			Assert.IsTrue (source.Post (2));
+			Assert.IsTrue (source.Post (1));
+			Assert.IsTrue (source.Post (3));
+
+			scheduler.ExecuteAll ();
+
+			int item;
+			Assert.IsFalse (target.TryReceive (out item));
+			Assert.IsTrue (source.TryReceive (out item));
+			Assert.AreEqual (2, item);
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsTrue (target.TryReceive (out item));
+			Assert.AreEqual (1, item);
+
+			scheduler.ExecuteAll ();
+			
+			Assert.IsFalse (target.TryReceive (out item));
+		}
+
+		[Test]
+		public void LinkToPredicatePostponed ()
+		{
+			var scheduler = new TestScheduler ();
+			var source = new BufferBlock<int> (
+				new DataflowBlockOptions { TaskScheduler = scheduler });
+			var target = new BufferBlock<int> (
+				new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
+			source.LinkTo (target, i => true);
+
+			Assert.IsTrue (target.Post (1));
+			Assert.IsTrue (source.Post (2));
+
+			scheduler.ExecuteAll ();
+
+			int item;
+			Assert.IsTrue (target.TryReceive (out item));
+			Assert.AreEqual (1, item);
+
+			scheduler.ExecuteAll ();
+
+			Assert.IsTrue (target.TryReceive (out item));
+			Assert.AreEqual (2, item);
+		}
+
+		[Test]
+		public void LinkToPredicateClonerTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var source = new BroadcastBlock<int> (i => i * 10,
+				new DataflowBlockOptions { TaskScheduler = scheduler });
+			var target = new BufferBlock<int> ();
+			source.LinkTo (target, i => i < 10);
+
+			Assert.IsTrue (source.Post (1));
+
+			scheduler.ExecuteAll ();
+
+			int item;
+			Assert.IsTrue (target.TryReceive (out item));
+			Assert.AreEqual (10, item);
+		}
 	}
 }