Browse Source

Added support for BoundedCapacity

Petr Onderka 13 years ago
parent
commit
2f92f56945
19 changed files with 492 additions and 76 deletions
  1. 1 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj
  2. 6 6
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs
  3. 4 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs
  4. 10 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs
  5. 12 4
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs
  6. 6 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs
  7. 7 8
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs
  8. 4 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs
  9. 9 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs
  10. 12 4
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs
  11. 5 5
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs
  12. 85 15
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs
  13. 7 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs
  14. 5 4
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs
  15. 4 3
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs
  16. 16 7
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs
  17. 2 2
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
  18. 280 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BoundedCapacityTest.cs
  19. 17 2
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/JoinBlockTest.cs

+ 1 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow-tests-net_4_5.csproj

@@ -44,6 +44,7 @@
   <ItemGroup>
     <Compile Include="Test\System.Threading.Tasks.Dataflow\ReceivingTest.cs" />
     <Compile Include="Test\TestScheduler.cs" />
+    <Compile Include="Test\System.Threading.Tasks.Dataflow\BoundedCapacityTest.cs" />
     <Compile Include="Test\System.Threading.Tasks.Dataflow\ExecutionBlocksTest.cs" />
     <Compile Include="Test\System.Threading.Tasks.Dataflow\BatchedJoinBlockTest.cs" />
     <Compile Include="Test\System.Threading.Tasks.Dataflow\BatchedJoinBlock`3Test.cs" />

+ 6 - 6
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs

@@ -56,7 +56,7 @@ namespace System.Threading.Tasks.Dataflow
 			this.action = action;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue, compHelper,
+			this.messageBox = new ExecutingMessageBox<TInput> (this, messageQueue, compHelper,
 				() => true, ProcessItem, () => { }, dataflowBlockOptions);
 		}
 
@@ -72,12 +72,12 @@ namespace System.Threading.Tasks.Dataflow
 			throw new NotImplementedException ();
 		}
 
-		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
-		                                           TInput messageValue,
-		                                           ISourceBlock<TInput> source,
-		                                           bool consumeToAccept)
+		public DataflowMessageStatus OfferMessage (
+			DataflowMessageHeader messageHeader, TInput messageValue,
+			ISourceBlock<TInput> source, bool consumeToAccept)
 		{
-			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			return messageBox.OfferMessage (
+				messageHeader, messageValue, source, consumeToAccept);
 		}
 
 		bool ProcessItem ()

+ 4 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs

@@ -54,10 +54,11 @@ namespace System.Threading.Tasks.Dataflow
 			this.batchSize = batchSize;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper,
+			this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
 				() => outgoing.IsCompleted, BatchProcess, dataflowBlockOptions);
 			this.outgoing = new MessageOutgoingQueue<T[]> (this, compHelper,
-				() => messageQueue.IsCompleted, dataflowBlockOptions);
+				() => messageQueue.IsCompleted, () => messageBox.DecreaseCount (),
+				dataflowBlockOptions);
 		}
 
 		public DataflowMessageStatus OfferMessage (
@@ -65,7 +66,7 @@ namespace System.Threading.Tasks.Dataflow
 			bool consumeToAccept)
 		{
 			return messageBox.OfferMessage (
-				this, messageHeader, messageValue, source, consumeToAccept);
+				messageHeader, messageValue, source, consumeToAccept);
 		}
 
 		public IDisposable LinkTo (ITargetBlock<T[]> target, DataflowLinkOptions linkOptions)

+ 10 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock.cs

@@ -58,13 +58,20 @@ namespace System.Threading.Tasks.Dataflow {
 			completionHelper = CompletionHelper.GetNew (dataflowBlockOptions);
 
 			target1 = new JoinTarget<T1> (
-				this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+				this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+				dataflowBlockOptions);
 			target2 = new JoinTarget<T2> (
-				this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+				this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+				dataflowBlockOptions);
 
 			outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>>> (
 				this, completionHelper,
-				() => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted, options);
+				() => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
+				() =>
+				{
+					target1.DecreaseCount ();
+					target2.DecreaseCount ();
+				}, options);
 		}
 
 		public int BatchSize { get; private set; }

+ 12 - 4
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchedJoinBlock`3.cs

@@ -60,17 +60,25 @@ namespace System.Threading.Tasks.Dataflow {
 			completionHelper = CompletionHelper.GetNew (options);
 
 			target1 = new JoinTarget<T1> (
-				this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+				this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+				dataflowBlockOptions);
 			target2 = new JoinTarget<T2> (
-				this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+				this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+				dataflowBlockOptions);
 			target3 = new JoinTarget<T3>(
-				this, SignalTarget, completionHelper, () => outgoing.IsCompleted);
+				this, SignalTarget, completionHelper, () => outgoing.IsCompleted,
+				dataflowBlockOptions);
 
 			outgoing = new MessageOutgoingQueue<Tuple<IList<T1>, IList<T2>, IList<T3>>> (
 				this, completionHelper,
 				() => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted
 				      || target3.Buffer.IsCompleted,
-				options);
+				() =>
+				{
+					target1.DecreaseCount ();
+					target2.DecreaseCount ();
+					target3.DecreaseCount ();
+				}, options);
 		}
 
 		public int BatchSize { get; private set; }

+ 6 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs

@@ -52,8 +52,11 @@ namespace System.Threading.Tasks.Dataflow {
 			this.cloner = cloner;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, BroadcastProcess, dataflowBlockOptions);
-			this.outgoing = new MessageOutgoingQueue<T> (this, compHelper, () => messageQueue.IsCompleted, dataflowBlockOptions);
+			this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
+				() => outgoing.IsCompleted, BroadcastProcess, dataflowBlockOptions);
+			this.outgoing = new MessageOutgoingQueue<T> (this, compHelper,
+				() => messageQueue.IsCompleted, () => messageBox.DecreaseCount (),
+				dataflowBlockOptions);
 			this.vault = new MessageVault<T> ();
 		}
 
@@ -62,7 +65,7 @@ namespace System.Threading.Tasks.Dataflow {
 		                                           ISourceBlock<T> source,
 		                                           bool consumeToAccept)
 		{
-			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			return messageBox.OfferMessage (messageHeader, messageValue, source, consumeToAccept);
 		}
 
 		public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)

+ 7 - 8
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs

@@ -30,7 +30,6 @@ namespace System.Threading.Tasks.Dataflow {
 		readonly MessageBox<T> messageBox;
 		readonly MessageOutgoingQueue<T> outgoing;
 		readonly BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
-		DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
 
 		public BufferBlock () : this (DataflowBlockOptions.Default)
 		{
@@ -43,18 +42,18 @@ namespace System.Threading.Tasks.Dataflow {
 
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper,
+			this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper,
 				() => outgoing.IsCompleted, ProcessQueue, dataflowBlockOptions);
 			this.outgoing = new MessageOutgoingQueue<T> (this, compHelper,
-				() => messageQueue.IsCompleted, dataflowBlockOptions);
+				() => messageQueue.IsCompleted, () => messageBox.DecreaseCount (),
+				dataflowBlockOptions);
 		}
 
-		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
-		                                           T messageValue,
-		                                           ISourceBlock<T> source,
-		                                           bool consumeToAccept)
+		DataflowMessageStatus ITargetBlock<T>.OfferMessage (
+			DataflowMessageHeader messageHeader, T messageValue, ISourceBlock<T> source,
+			bool consumeToAccept)
 		{
-			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			return messageBox.OfferMessage (messageHeader, messageValue, source, consumeToAccept);
 		}
 
 		public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)

+ 4 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ExecutingMessageBox.cs

@@ -37,10 +37,11 @@ namespace System.Threading.Tasks.Dataflow
 		int degreeOfParallelism = 1;
 
 		public ExecutingMessageBox (
-			BlockingCollection<TInput> messageQueue, CompletionHelper compHelper,
-			Func<bool> externalCompleteTester, Func<bool> processItem, Action outgoingQueueComplete,
+			ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
+			CompletionHelper compHelper, Func<bool> externalCompleteTester,
+			Func<bool> processItem, Action outgoingQueueComplete,
 			ExecutionDataflowBlockOptions options)
-			: base (messageQueue, compHelper, externalCompleteTester)
+			: base (target, messageQueue, compHelper, externalCompleteTester, options)
 		{
 			this.options = options;
 			this.processItem = processItem;

+ 9 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs

@@ -48,11 +48,17 @@ namespace System.Threading.Tasks.Dataflow
 
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
-			target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
+			target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper,
+				() => outgoing.IsCompleted, dataflowBlockOptions);
+			target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper,
+				() => outgoing.IsCompleted, dataflowBlockOptions);
 			outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (this, compHelper,
 				() => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted,
-				dataflowBlockOptions);
+				() =>
+				{
+					target1.DecreaseCount ();
+					target2.DecreaseCount ();
+				}, dataflowBlockOptions);
 		}
 
 		public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, DataflowLinkOptions linkOptions)

+ 12 - 4
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs

@@ -53,14 +53,22 @@ namespace System.Threading.Tasks.Dataflow
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
 
-			target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
-			target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
-			target3 = new JoinTarget<T3> (this, SignalArrivalTargetImpl, compHelper, () => outgoing.IsCompleted);
+			target1 = new JoinTarget<T1> (this, SignalArrivalTargetImpl, compHelper,
+				() => outgoing.IsCompleted, dataflowBlockOptions);
+			target2 = new JoinTarget<T2> (this, SignalArrivalTargetImpl, compHelper,
+				() => outgoing.IsCompleted, dataflowBlockOptions);
+			target3 = new JoinTarget<T3> (this, SignalArrivalTargetImpl, compHelper,
+				() => outgoing.IsCompleted, dataflowBlockOptions);
 			outgoing = new MessageOutgoingQueue<Tuple<T1, T2, T3>> (
 				this, compHelper,
 				() => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted
 				      || target3.Buffer.IsCompleted,
-				dataflowBlockOptions);
+				() =>
+				{
+					target1.DecreaseCount ();
+					target2.DecreaseCount ();
+					target3.DecreaseCount ();
+				}, dataflowBlockOptions);
 		}
 
 		public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2, T3>> target, DataflowLinkOptions linkOptions)

+ 5 - 5
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinTarget.cs

@@ -19,8 +19,6 @@
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
 using System.Collections.Concurrent;
 
@@ -31,11 +29,13 @@ namespace System.Threading.Tasks.Dataflow {
 		readonly Action signal;
 
 		public JoinTarget (IDataflowBlock joinBlock, Action signal, CompletionHelper helper,
-		                   Func<bool> externalCompleteTester)
-			: base (new BlockingCollection<TTarget> (), helper, externalCompleteTester)
+		                   Func<bool> externalCompleteTester, DataflowBlockOptions options)
+			: base (null, new BlockingCollection<TTarget> (), helper, externalCompleteTester,
+				options)
 		{
 			this.joinBlock = joinBlock;
 			this.signal = signal;
+			Target = this;
 		}
 
 		protected override void EnsureProcessing ()
@@ -54,7 +54,7 @@ namespace System.Threading.Tasks.Dataflow {
 		                                                          ISourceBlock<TTarget> source,
 		                                                          bool consumeToAccept)
 		{
-			return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			return OfferMessage (messageHeader, messageValue, source, consumeToAccept);
 		}
 
 		void IDataflowBlock.Complete ()

+ 85 - 15
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageBox.cs

@@ -1,6 +1,7 @@
 // MessageBox.cs
 //
 // Copyright (c) 2011 Jérémie "garuma" Laval
+// Copyright (c) 2012 Petr Onderka
 //
 // Permission is hereby granted, free of charge, to any person obtaining a copy
 // of this software and associated documentation files (the "Software"), to deal
@@ -19,46 +20,64 @@
 // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
 // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
 // THE SOFTWARE.
-//
-//
 
 using System.Collections.Concurrent;
+using System.Linq;
 
-namespace System.Threading.Tasks.Dataflow
-{
-	/* In MessageBox we store message that have been offered to us so that they can be
-	 * later processed
-	 */
-	internal class MessageBox<TInput>
-	{
+namespace System.Threading.Tasks.Dataflow {
+	/// <summary>
+	/// In MessageBox we store message that have been offered to us so that they can be
+	/// later processed 
+	/// </summary>
+	internal class MessageBox<TInput> {
+		protected ITargetBlock<TInput> Target;
 		readonly CompletionHelper compHelper;
 		readonly Func<bool> externalCompleteTester;
+		readonly DataflowBlockOptions options;
+		readonly ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader>
+			postponedMessages =
+				new ConcurrentDictionary<ISourceBlock<TInput>, DataflowMessageHeader> ();
+		int itemCount;
+		readonly AtomicBoolean postponedProcessing = new AtomicBoolean ();
 
 		protected BlockingCollection<TInput> MessageQueue { get; private set; }
 
 		public MessageBox (
-			BlockingCollection<TInput> messageQueue, CompletionHelper compHelper,
-			Func<bool> externalCompleteTester)
+			ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
+			CompletionHelper compHelper, Func<bool> externalCompleteTester,
+			DataflowBlockOptions options)
 		{
+			this.Target = target;
 			this.compHelper = compHelper;
 			this.MessageQueue = messageQueue;
 			this.externalCompleteTester = externalCompleteTester;
+			this.options = options;
 		}
 
 		public DataflowMessageStatus OfferMessage (
-			ITargetBlock<TInput> target, DataflowMessageHeader messageHeader,
-			TInput messageValue, ISourceBlock<TInput> source, bool consumeToAccept)
+			DataflowMessageHeader messageHeader, TInput messageValue,
+			ISourceBlock<TInput> source, bool consumeToAccept)
 		{
 			if (!messageHeader.IsValid)
 				return DataflowMessageStatus.Declined;
 			if (MessageQueue.IsAddingCompleted || !compHelper.CanRun)
 				return DataflowMessageStatus.DecliningPermanently;
 
+			if (options.BoundedCapacity != -1
+			    && Thread.VolatileRead (ref itemCount) >= options.BoundedCapacity) {
+				if (source == null)
+					return DataflowMessageStatus.Declined;
+
+				postponedMessages [source] = messageHeader;
+				
+				return DataflowMessageStatus.Postponed;
+			}
+
 			if (consumeToAccept) {
 				bool consummed;
-				if (!source.ReserveMessage (messageHeader, target))
+				if (!source.ReserveMessage (messageHeader, Target))
 					return DataflowMessageStatus.NotAvailable;
-				messageValue = source.ConsumeMessage (messageHeader, target, out consummed);
+				messageValue = source.ConsumeMessage (messageHeader, Target, out consummed);
 				if (!consummed)
 					return DataflowMessageStatus.NotAvailable;
 			}
@@ -71,6 +90,8 @@ namespace System.Threading.Tasks.Dataflow
 				return DataflowMessageStatus.DecliningPermanently;
 			}
 
+			IncreaseCount ();
+
 			EnsureProcessing ();
 
 			VerifyCompleteness ();
@@ -78,6 +99,55 @@ namespace System.Threading.Tasks.Dataflow
 			return DataflowMessageStatus.Accepted;
 		}
 
+		public void IncreaseCount ()
+		{
+			Interlocked.Increment (ref itemCount);
+		}
+
+		public void DecreaseCount()
+		{
+			int decreased = Interlocked.Decrement (ref itemCount);
+
+			if (decreased < options.BoundedCapacity && !postponedMessages.IsEmpty)
+				EnsurePostponedProcessing ();
+		}
+
+		void EnsurePostponedProcessing ()
+		{
+			if (postponedProcessing.TrySet())
+				Task.Factory.StartNew (RetrievePostponed, options.CancellationToken,
+					TaskCreationOptions.PreferFairness, options.TaskScheduler);
+		}
+
+		void RetrievePostponed ()
+		{
+			while (Volatile.Read (ref itemCount) < options.BoundedCapacity
+			       && !postponedMessages.IsEmpty) {
+				var block = postponedMessages.First ().Key;
+				DataflowMessageHeader header;
+				postponedMessages.TryRemove (block, out header);
+
+				bool consumed;
+				var item = block.ConsumeMessage (header, Target, out consumed);
+				if (consumed) {
+					try {
+						MessageQueue.Add (item);
+						IncreaseCount ();
+						EnsureProcessing ();
+					} catch (InvalidOperationException) {
+						break;
+					}
+				}
+			}
+
+			postponedProcessing.Value = false;
+
+			// because of race
+			if (Volatile.Read (ref itemCount) < options.BoundedCapacity
+			    && !postponedMessages.IsEmpty)
+				EnsurePostponedProcessing ();
+		}
+
 		protected virtual void EnsureProcessing ()
 		{
 		}

+ 7 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs

@@ -35,6 +35,7 @@ namespace System.Threading.Tasks.Dataflow {
 		int outgoingCount;
 		readonly CompletionHelper compHelper;
 		readonly Func<bool> externalCompleteTester;
+		readonly Action decreaseItemsCount;
 		readonly DataflowBlockOptions options;
 		readonly AtomicBoolean isProcessing = new AtomicBoolean ();
 		readonly TargetCollection<T> targets;
@@ -43,12 +44,13 @@ namespace System.Threading.Tasks.Dataflow {
 
 		public MessageOutgoingQueue (
 			ISourceBlock<T> block, CompletionHelper compHelper,
-			Func<bool> externalCompleteTester, DataflowBlockOptions options)
+			Func<bool> externalCompleteTester, Action decreaseItemsCount, DataflowBlockOptions options)
 		{
 			this.outgoing = new BlockingCollection<T> (store);
 			this.targets = new TargetCollection<T> (block);
 			this.compHelper = compHelper;
 			this.externalCompleteTester = externalCompleteTester;
+			this.decreaseItemsCount = decreaseItemsCount;
 			this.options = options;
 		}
 
@@ -92,6 +94,7 @@ namespace System.Threading.Tasks.Dataflow {
 					if (processed) {
 						outgoing.TryTake (out item);
 						Interlocked.Decrement (ref outgoingCount);
+						decreaseItemsCount ();
 						FirstItemChanged ();
 					}
 				} finally {
@@ -131,6 +134,7 @@ namespace System.Threading.Tasks.Dataflow {
 					outgoing.TryTake (out result);
 					messageConsumed = true;
 					Interlocked.Decrement (ref outgoingCount);
+					decreaseItemsCount ();
 					reservedForTargetBlock = null;
 					FirstItemChanged ();
 				}
@@ -211,6 +215,7 @@ namespace System.Threading.Tasks.Dataflow {
 					outgoing.TryTake (out item);
 					success = true;
 					Interlocked.Decrement (ref outgoingCount);
+					decreaseItemsCount ();
 					FirstItemChanged ();
 				}
 			} finally {
@@ -243,6 +248,7 @@ namespace System.Threading.Tasks.Dataflow {
 				T item;
 				while (outgoing.TryTake (out item)) {
 					Interlocked.Decrement (ref outgoingCount);
+					decreaseItemsCount ();
 					list.Add (item);
 				}
 

+ 5 - 4
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PassingMessageBox.cs

@@ -30,10 +30,11 @@ namespace System.Threading.Tasks.Dataflow
 		readonly Action processQueue;
 
 		public PassingMessageBox (
-			BlockingCollection<TInput> messageQueue, CompletionHelper compHelper,
-			Func<bool> externalCompleteTester, Action processQueue,
-			DataflowBlockOptions dataflowBlockOptions)
-			: base (messageQueue, compHelper, externalCompleteTester)
+			ITargetBlock<TInput> target, BlockingCollection<TInput> messageQueue,
+			CompletionHelper compHelper, Func<bool> externalCompleteTester,
+			Action processQueue, DataflowBlockOptions dataflowBlockOptions)
+			: base (target, messageQueue, compHelper, externalCompleteTester,
+				dataflowBlockOptions)
 		{
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.processQueue = processQueue;

+ 4 - 3
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs

@@ -51,11 +51,12 @@ namespace System.Threading.Tasks.Dataflow
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
 			this.messageBox = new ExecutingMessageBox<TInput> (
-				messageQueue, compHelper,
+				this, messageQueue, compHelper,
 				() => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
 				dataflowBlockOptions);
 			this.outgoing = new MessageOutgoingQueue<TOutput> (this, compHelper,
-				() => messageQueue.IsCompleted, dataflowBlockOptions);
+				() => messageQueue.IsCompleted, () => messageBox.DecreaseCount (),
+				dataflowBlockOptions);
 		}
 
 		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
@@ -63,7 +64,7 @@ namespace System.Threading.Tasks.Dataflow
 		                                           ISourceBlock<TInput> source,
 		                                           bool consumeToAccept)
 		{
-			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			return messageBox.OfferMessage (messageHeader, messageValue, source, consumeToAccept);
 		}
 
 		public IDisposable LinkTo (ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)

+ 16 - 7
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs

@@ -51,12 +51,12 @@ namespace System.Threading.Tasks.Dataflow
 			this.transformer = transformer;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new ExecutingMessageBox<TInput> (
-				messageQueue, compHelper,
+			this.messageBox = new ExecutingMessageBox<TInput> (this, messageQueue, compHelper,
 				() => outgoing.IsCompleted, TransformProcess, () => outgoing.Complete (),
 				dataflowBlockOptions);
-			this.outgoing = new MessageOutgoingQueue<TOutput> (
-				this, compHelper, () => messageQueue.IsCompleted, dataflowBlockOptions);
+			this.outgoing = new MessageOutgoingQueue<TOutput> (this, compHelper,
+				() => messageQueue.IsCompleted, () => messageBox.DecreaseCount (),
+				dataflowBlockOptions);
 		}
 
 		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
@@ -64,7 +64,7 @@ namespace System.Threading.Tasks.Dataflow
 		                                           ISourceBlock<TInput> source,
 		                                           bool consumeToAccept)
 		{
-			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			return messageBox.OfferMessage (messageHeader, messageValue, source, consumeToAccept);
 		}
 
 		public IDisposable LinkTo (ITargetBlock<TOutput> target, DataflowLinkOptions linkOptions)
@@ -105,9 +105,18 @@ namespace System.Threading.Tasks.Dataflow
 			if (dequeued) {
 				var result = transformer (input);
 
-				if (result != null)
-					foreach (var item in result)
+				bool first = true;
+				if (result != null) {
+					foreach (var item in result) {
+						if (first)
+							first = false;
+						else
+							messageBox.IncreaseCount ();
 						outgoing.AddData (item);
+					}
+				}
+				if (first)
+					messageBox.DecreaseCount ();
 			}
 
 			return dequeued;

+ 2 - 2
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs

@@ -55,7 +55,7 @@ namespace System.Threading.Tasks.Dataflow
 			this.cloner = cloner;
 			this.dataflowBlockOptions = dataflowBlockOptions;
 			this.compHelper = CompletionHelper.GetNew (dataflowBlockOptions);
-			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => true, BroadcastProcess, dataflowBlockOptions);
+			this.messageBox = new PassingMessageBox<T> (this, messageQueue, compHelper, () => true, BroadcastProcess, dataflowBlockOptions);
 			this.vault = new MessageVault<T> ();
 		}
 
@@ -69,7 +69,7 @@ namespace System.Threading.Tasks.Dataflow
 				finalValue = messageValue;
 				Thread.MemoryBarrier ();
 				ready = true;
-				return messageBox.OfferMessage (this, messageHeader, finalValue, source, consumeToAccept);
+				return messageBox.OfferMessage (messageHeader, finalValue, source, consumeToAccept);
 			} else {
 				return DataflowMessageStatus.DecliningPermanently;
 			}

+ 280 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BoundedCapacityTest.cs

@@ -0,0 +1,280 @@
+// BoundedCapacityTest.cs
+//  
+// Copyright (c) 2012 Petr Onderka
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Collections.Generic;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow {
+	[TestFixture]
+	public class BoundedCapacityTest {
+		[Test]
+		public void PostTest ()
+		{
+			var block =
+				new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+			Assert.IsTrue (block.Post (1));
+			Assert.IsFalse (block.Post (2));
+			Assert.AreEqual (1, block.Receive ());
+			Assert.IsTrue (block.Post (3));
+			Assert.AreEqual (3, block.Receive ());
+		}
+
+		[Test]
+		public void OfferMessageTest ()
+		{
+			var block =
+				new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+			ITargetBlock<int> target = block;
+
+			Assert.AreEqual (DataflowMessageStatus.Accepted,
+				target.OfferMessage (new DataflowMessageHeader (1), 42, null, false));
+			Assert.AreEqual (DataflowMessageStatus.Declined,
+				target.OfferMessage (new DataflowMessageHeader (2), 43, null, false));
+
+			Assert.AreEqual (42, block.Receive ());
+
+			Assert.AreEqual (DataflowMessageStatus.Accepted,
+				target.OfferMessage (new DataflowMessageHeader (3), 44, null, false));
+
+			Assert.AreEqual (44, block.Receive ());
+		}
+
+		[Test]
+		public void OfferMessageWithSourceTest ()
+		{
+			var block =
+				new BufferBlock<int> (new DataflowBlockOptions { BoundedCapacity = 1 });
+			ITargetBlock<int> target = block;
+			var source = new TestSourceBlock<int> ();
+
+			Assert.AreEqual (DataflowMessageStatus.Accepted,
+				target.OfferMessage (new DataflowMessageHeader (1), 42, source, false));
+			var header = new DataflowMessageHeader (2);
+			source.AddMessage (header, 43);
+			Assert.AreEqual (DataflowMessageStatus.Postponed,
+				target.OfferMessage (header, 43, source, false));
+
+			Assert.AreEqual (42, block.Receive (TimeSpan.FromMilliseconds (100)));
+
+			Assert.IsFalse (block.Completion.Wait (100));
+
+			Assert.IsTrue (source.WasConsumed (header));
+
+			Assert.AreEqual (43, block.Receive (TimeSpan.FromMilliseconds (100)));
+
+			Assert.AreEqual (DataflowMessageStatus.Accepted,
+				target.OfferMessage (new DataflowMessageHeader (3), 44, source, false));
+
+			Assert.AreEqual (44, block.Receive ());
+		}
+
+		[Test]
+		public void TransformManyBlockTest ()
+		{
+			var block = new TransformManyBlock<int, int> (
+				i => new[] { -i, i },
+				new ExecutionDataflowBlockOptions { BoundedCapacity = 1 });
+
+			Assert.IsTrue (block.Post (1));
+			Assert.IsFalse (block.Post (2));
+
+			Assert.IsFalse (block.Completion.Wait (100));
+
+			Assert.IsFalse (block.Post (3));
+
+			Assert.AreEqual (-1, block.Receive ());
+
+			Assert.IsFalse (block.Post (4));
+
+			Assert.AreEqual (1, block.Receive ());
+
+			Assert.IsTrue (block.Post (5));
+
+			Assert.AreEqual (-5, block.Receive ());
+			Assert.AreEqual (5, block.Receive ());
+		}
+
+		[Test]
+		public void TransformFullTest()
+		{
+			var scheduler = new TestScheduler();
+
+			int n = 0;
+			var transform = new TransformBlock<int, int>(
+				i => Interlocked.Increment(ref n),
+				new ExecutionDataflowBlockOptions { BoundedCapacity = 2, TaskScheduler = scheduler });
+
+			Assert.IsTrue(transform.Post(1));
+			Assert.IsTrue(transform.Post(2));
+
+			Assert.GreaterOrEqual(scheduler.ExecuteAll(), 1);
+
+			Assert.AreEqual(2, Thread.VolatileRead(ref n));
+		}
+
+		[Test]
+		public void TransformManyOverfullTest()
+		{
+			var scheduler = new TestScheduler();
+
+			int n = 0;
+			var transform = new TransformManyBlock<int, int>(
+				i =>
+				{
+					Interlocked.Increment(ref n);
+					return new[] { -i, i };
+				},
+				new ExecutionDataflowBlockOptions { BoundedCapacity = 2, TaskScheduler = scheduler });
+
+			Assert.IsTrue(transform.Post(1));
+			Assert.IsTrue(transform.Post(2));
+
+			Assert.GreaterOrEqual(scheduler.ExecuteAll(), 1);
+
+			Assert.AreEqual(2, Thread.VolatileRead(ref n));
+		}
+
+		private int n;
+
+		[Test]
+		public void TransformManyOverfullTest2()
+		{
+			var scheduler = new TestScheduler();
+
+			n = 0;
+			var transform = new TransformManyBlock<int, int>(
+				i => ComputeResults(),
+				new ExecutionDataflowBlockOptions { BoundedCapacity = 100, TaskScheduler = scheduler });
+
+			for (int i = 0; i < 100; i++)
+				Assert.IsTrue(transform.Post(i));
+
+			Assert.IsFalse(transform.Post(101));
+
+			Assert.GreaterOrEqual(scheduler.ExecuteAll(), 1);
+
+			Assert.IsFalse(transform.Post(102));
+
+			Assert.AreEqual(10000, Thread.VolatileRead(ref n));
+		}
+
+		private IEnumerable<int> ComputeResults()
+		{
+			for (int j = 0; j < 100; j++)
+				yield return Interlocked.Increment(ref n);
+		}
+
+		[Test]
+		public void MultipleOffersTest ()
+		{
+			var scheduler = new TestScheduler ();
+			var block = new BufferBlock<int> (
+				new DataflowBlockOptions { BoundedCapacity = 1, TaskScheduler = scheduler });
+			var target = (ITargetBlock<int>)block;
+			var source = new TestSourceBlock<int> ();
+
+			var header1 = new DataflowMessageHeader (1);
+			Assert.AreEqual (DataflowMessageStatus.Accepted,
+				target.OfferMessage (header1, 41, source, false));
+
+			var header2 = new DataflowMessageHeader (2);
+			source.AddMessage (header2, 42);
+			Assert.AreEqual (DataflowMessageStatus.Postponed,
+				target.OfferMessage (header2, 42, source, false));
+
+			var header3 = new DataflowMessageHeader (3);
+			source.AddMessage (header3, 43);
+			Assert.AreEqual (DataflowMessageStatus.Postponed,
+				target.OfferMessage (header3, 43, source, false));
+
+			Assert.AreEqual (41, block.Receive ());
+			scheduler.ExecuteAll ();
+			Assert.IsTrue (source.WasConsumed (header3));
+			Assert.IsFalse (source.WasConsumed (header2));
+		}
+	}
+
+	class TestSourceBlock<T> : ISourceBlock<T> {
+		readonly Dictionary<DataflowMessageHeader, T> messages =
+			new Dictionary<DataflowMessageHeader, T> ();
+
+		readonly HashSet<DataflowMessageHeader> consumed =
+			new HashSet<DataflowMessageHeader> ();
+
+		public void Complete ()
+		{
+			throw new NotImplementedException ();
+		}
+
+		public void Fault (Exception exception)
+		{
+			throw new NotImplementedException ();
+		}
+
+		public Task Completion { get; private set; }
+
+		public void AddMessage (DataflowMessageHeader header, T item)
+		{
+			messages.Add (header, item);
+		}
+
+		public bool WasConsumed (DataflowMessageHeader header)
+		{
+			return consumed.Contains (header);
+		}
+
+		public T ConsumeMessage (DataflowMessageHeader messageHeader,
+		                         ITargetBlock<T> target, out bool messageConsumed)
+		{
+			T item;
+			if (messages.TryGetValue (messageHeader, out item)) {
+				messages.Remove (messageHeader);
+				consumed.Add (messageHeader);
+				messageConsumed = true;
+				return item;
+			}
+			messageConsumed = false;
+			return default(T);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<T> target, DataflowLinkOptions linkOptions)
+		{
+			throw new NotImplementedException ();
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader,
+		                            ITargetBlock<T> target)
+		{
+			throw new NotImplementedException ();
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader,
+		                                ITargetBlock<T> target)
+		{
+			throw new NotImplementedException ();
+		}
+	}
+}

+ 17 - 2
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/JoinBlockTest.cs

@@ -25,7 +25,6 @@
 // THE SOFTWARE.
 
 using System;
-using System.Linq;
 using System.Threading;
 using System.Threading.Tasks;
 using System.Threading.Tasks.Dataflow;
@@ -82,5 +81,21 @@ namespace MonoTests.System.Threading.Tasks.Dataflow
 			Assert.AreEqual (42, tuple.Item1);
 			Assert.AreEqual (24, tuple.Item2);
 		}
+
+		[Test]
+		public void BoundedCapacityTest ()
+		{
+			var block = new JoinBlock<int, int> (
+				new GroupingDataflowBlockOptions { BoundedCapacity = 1 });
+			Assert.IsTrue (block.Target1.Post (1));
+			Assert.IsFalse (block.Target1.Post (2));
+
+			Assert.IsTrue (block.Target2.Post (10));
+			Assert.IsFalse (block.Target2.Post (11));
+			Assert.IsFalse (block.Target1.Post (3));
+
+			Assert.AreEqual (Tuple.Create (1, 10), block.Receive ());
+			Assert.IsTrue (block.Target1.Post (4));
+		}
 	}
-}
+}