소스 검색

Implemented SendAsync()

Petr Onderka 13 년 전
부모
커밋
460864bd02

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-net_4_5.csproj

@@ -46,6 +46,7 @@
    <Compile Include="..\..\build\common\Locale.cs" />
    <Compile Include="..\..\build\common\MonoTODOAttribute.cs" />
    <Compile Include="Assembly\AssemblyInfo.cs" />
+    <Compile Include="System.Threading.Tasks.Dataflow\SendBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\BatchedJoinBlock`3.cs" />
     <Compile Include="System.Threading.Tasks.Dataflow\DataflowLinkOptions.cs" />

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources

@@ -41,3 +41,4 @@ System.Threading.Tasks.Dataflow/ReceiveBlock.cs
 System.Threading.Tasks.Dataflow/TransformBlock.cs
 System.Threading.Tasks.Dataflow/TransformManyBlock.cs
 System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
+System.Threading.Tasks.Dataflow/SendBlock.cs

+ 28 - 8
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs

@@ -23,8 +23,6 @@
 
 namespace System.Threading.Tasks.Dataflow {
 	public static class DataflowBlock {
-		static DataflowMessageHeader GlobalHeader = new DataflowMessageHeader ();
-
 		public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
 		{
 			if (source == null)
@@ -158,7 +156,8 @@ namespace System.Threading.Tasks.Dataflow {
 			if (target == null)
 				throw new ArgumentNullException ("target");
 
-			return target.OfferMessage (GlobalHeader.Increment (), item, null, false) == DataflowMessageStatus.Accepted;
+			return target.OfferMessage (new DataflowMessageHeader(1), item, null, false)
+			       == DataflowMessageStatus.Accepted;
 		}
 
 		public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
@@ -247,11 +246,32 @@ namespace System.Threading.Tasks.Dataflow {
 			return source.TryReceive (null, out item);
 		}
 
-		[MonoTODO]
-		public static Task<bool> SendAsync<TInput> (this ITargetBlock<TInput> target, TInput item)
+		public static Task<bool> SendAsync<TInput> (
+			this ITargetBlock<TInput> target, TInput item)
 		{
-			throw new NotImplementedException ();
+			return SendAsync (target, item, CancellationToken.None);
 		}
-	}
-}
 
+		public static Task<bool> SendAsync<TInput> (
+			this ITargetBlock<TInput> target, TInput item,
+			CancellationToken cancellationToken)
+		{
+			if (target == null)
+				throw new ArgumentNullException ("target");
+
+			cancellationToken.ThrowIfCancellationRequested ();
+
+			var status = target.OfferMessage (
+				new DataflowMessageHeader (1), item, null, false);
+
+			if (status == DataflowMessageStatus.Accepted)
+				return Task.FromResult (true);
+			if (status != DataflowMessageStatus.Declined
+			    && status != DataflowMessageStatus.Postponed)
+				return Task.FromResult (false);
+
+			var block = new SendBlock<TInput> (target, item, cancellationToken);
+			return block.Send ();
+		}
+	}
+}

+ 170 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/SendBlock.cs

@@ -0,0 +1,170 @@
+// SendBlock.cs
+//
+// Copyright (c) 2012 Petr Onderka
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+namespace System.Threading.Tasks.Dataflow {
+	/// <summary>
+	/// This block is used in <see cref="DataflowBlock.SendAsync"/>
+	/// to asynchronously wait until a single item is sent to a given target.
+	/// </summary>
+	class SendBlock<T> : ISourceBlock<T> {
+		readonly ITargetBlock<T> sendTarget;
+		readonly T item;
+		CancellationToken cancellationToken;
+		readonly TaskCompletionSource<bool> taskCompletionSource =
+			new TaskCompletionSource<bool> ();
+		readonly DataflowMessageHeader sendHeader = new DataflowMessageHeader (1);
+		CancellationTokenRegistration cancellationTokenRegistration;
+
+		bool isReserved;
+
+		volatile bool cancelDisabled;
+
+		public SendBlock (ITargetBlock<T> sendTarget, T item,
+		                  CancellationToken cancellationToken)
+		{
+			this.sendTarget = sendTarget;
+			this.item = item;
+			this.cancellationToken = cancellationToken;
+		}
+
+		public Task<bool> Send ()
+		{
+			cancellationTokenRegistration = cancellationToken.Register (
+				() =>
+				{
+					if (!cancelDisabled)
+						taskCompletionSource.SetCanceled ();
+				});
+
+			PerformSend ();
+
+			return taskCompletionSource.Task;
+		}
+
+		void PerformSend ()
+		{
+			DisableCancel ();
+
+			if (taskCompletionSource.Task.IsCanceled)
+				return;
+
+			var status = sendTarget.OfferMessage (sendHeader, item, this, false);
+
+			if (status == DataflowMessageStatus.Accepted)
+				SetResult (true);
+			else if (status != DataflowMessageStatus.Postponed)
+				SetResult (false);
+			else
+				EnableCancel ();
+		}
+
+		public Task Completion {
+			get { throw new NotSupportedException (); }
+		}
+
+		public void Complete ()
+		{
+			throw new NotSupportedException ();
+		}
+
+		public void Fault (Exception exception)
+		{
+			throw new NotSupportedException ();
+		}
+
+		public T ConsumeMessage (DataflowMessageHeader messageHeader,
+		                         ITargetBlock<T> target, out bool messageConsumed)
+		{
+			if (!messageHeader.IsValid)
+				throw new ArgumentException ("The messageHeader is not valid.",
+					"messageHeader");
+			if (target == null)
+				throw new ArgumentNullException("target");
+
+			DisableCancel ();
+
+			messageConsumed = false;
+
+			if (taskCompletionSource.Task.IsCanceled)
+				return default(T);
+
+			if (messageHeader != sendHeader || target != sendTarget) {
+				EnableCancel ();
+				return default(T);
+			}
+
+			SetResult (true);
+
+			messageConsumed = true;
+			return item;
+		}
+
+		public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
+		{
+			throw new NotSupportedException ();
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			if (messageHeader != sendHeader || target != sendTarget || !isReserved)
+				throw new InvalidOperationException (
+					"The target did not have the message reserved.");
+
+			isReserved = false;
+			EnableCancel ();
+			PerformSend ();
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			DisableCancel ();
+
+			if (messageHeader == sendHeader && target == sendTarget) {
+				isReserved = true;
+				return true;
+			}
+
+			EnableCancel ();
+
+			return false;
+		}
+
+		void DisableCancel ()
+		{
+			cancelDisabled = true;
+		}
+
+		void EnableCancel ()
+		{
+			cancelDisabled = false;
+
+			if (cancellationToken.IsCancellationRequested)
+				taskCompletionSource.SetCanceled ();
+		}
+
+		void SetResult (bool result)
+		{
+			cancellationTokenRegistration.Dispose ();
+			taskCompletionSource.SetResult (result);
+		}
+	}
+}

+ 60 - 2
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs

@@ -3,8 +3,10 @@
 //  
 // Author:
 //       Jérémie "garuma" Laval <[email protected]>
+//       Petr Onderka <[email protected]>
 // 
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 // 
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
@@ -25,7 +27,6 @@
 // THE SOFTWARE.
 
 using System;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
@@ -179,5 +180,62 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 			Assert.IsTrue (task.IsCompleted);
 			Assert.AreEqual (TaskStatus.Canceled, task.Status);
 		}
+
+		[Test]
+		public void SendAsyncAcceptedTest ()
+		{
+			var target = new BufferBlock<int> ();
+			var task = target.SendAsync (1);
+
+			Assert.IsTrue (task.Wait (0));
+			Assert.IsTrue (task.Result);
+		}
+
+		[Test]
+		public void SendAsyncDeclinedTest ()
+		{
+			var target = new BufferBlock<int> ();
+			target.Complete ();
+			var task = target.SendAsync (1);
+
+			Assert.IsTrue (task.Wait (0));
+			Assert.IsFalse (task.Result);
+		}
+
+		[Test]
+		public void SendAsyncPostponedAcceptedTest ()
+		{
+			var target =
+				new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+
+			Assert.IsTrue (target.Post (1));
+
+			var task = target.SendAsync (1);
+
+			Assert.IsFalse (task.Wait (100));
+
+			Assert.AreEqual (1, target.Receive ());
+
+			Assert.IsTrue (task.Wait (100));
+			Assert.IsTrue (task.Result);
+		}
+
+		[Test]
+		public void SendAsyncPostponedDeclinedTest ()
+		{
+			var target =
+				new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+
+			Assert.IsTrue (target.Post (1));
+
+			var task = target.SendAsync (1);
+
+			Assert.IsFalse (task.Wait (100));
+
+			target.Complete ();
+
+			Assert.IsTrue (task.Wait (100));
+			Assert.IsFalse (task.Result);
+		}
 	}
-}
+}

+ 47 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ReceivingTest.cs

@@ -22,6 +22,7 @@
 
 using System;
 using System.Collections.Generic;
+using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
 using NUnit.Framework;
@@ -193,6 +194,52 @@ namespace MonoTests.System.Threading.Tasks.Dataflow {
 
 			Assert.AreEqual (43, target.DirectlyAccepted);
 		}
+
+		[Test]
+		public void FaultConsume()
+		{
+			var scheduler = new TestScheduler ();
+			var source =
+				new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+			var target = new TestTargetBlock<int> { Postpone = true };
+			Assert.IsNotNull (source.LinkTo (target));
+
+			Assert.IsTrue (source.Post (42));
+			scheduler.ExecuteAll ();
+			Assert.IsTrue (target.HasPostponed);
+
+			((IDataflowBlock)source).Fault (new Exception ());
+
+			scheduler.ExecuteAll ();
+			Thread.Sleep (100);
+
+			int value;
+			Assert.IsFalse (target.RetryPostponed (out value));
+		}
+
+		[Test]
+		public void ReserveFaultConsume()
+		{
+			var scheduler = new TestScheduler ();
+			var source =
+				new BufferBlock<int> (new DataflowBlockOptions { TaskScheduler = scheduler });
+			var target = new TestTargetBlock<int> { Postpone = true };
+			Assert.IsNotNull (source.LinkTo (target));
+
+			Assert.IsTrue (source.Post (42));
+			scheduler.ExecuteAll ();
+			Assert.IsTrue (target.HasPostponed);
+
+			Assert.IsTrue (target.ReservePostponed ());
+
+			((IDataflowBlock)source).Fault (new Exception ());
+
+			scheduler.ExecuteAll ();
+			Thread.Sleep (100);
+
+			int value;
+			Assert.IsTrue (target.RetryPostponed (out value));
+		}
 	}
 
 	class TestTargetBlock<T> : ITargetBlock<T> {