Browse Source

Add blocks implementation

Jérémie Laval 14 years ago
parent
commit
31bb3e1cf8
29 changed files with 3296 additions and 1 deletions
  1. 15 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources
  2. 111 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs
  3. 173 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs
  4. 140 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs
  5. 145 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs
  6. 115 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs
  7. 221 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs
  8. 213 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs
  9. 235 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs
  10. 2 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs
  11. 96 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs
  12. 60 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs
  13. 96 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs
  14. 118 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs
  15. 158 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs
  16. 158 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs
  17. 170 0
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs
  18. 11 1
      mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources
  19. 72 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs
  20. 132 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchBlockTest.cs
  21. 87 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BroadcastBlockTest.cs
  22. 97 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BufferBlockTest.cs
  23. 105 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs
  24. 145 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs
  25. 64 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/JoinBlockTest.cs
  26. 70 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/JoinBlock`3Test.cs
  27. 75 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformBlockTest.cs
  28. 79 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformManyBlockTest.cs
  29. 133 0
      mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/WriteOnceBlockTest.cs

+ 15 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow.dll.sources

@@ -21,3 +21,18 @@ System.Threading.Tasks.Dataflow/MessageVault.cs
 System.Threading.Tasks.Dataflow/PassingMessageBox.cs
 System.Threading.Tasks.Dataflow/PassingMessageBox.cs
 System.Threading.Tasks.Dataflow/TargetBuffer.cs
 System.Threading.Tasks.Dataflow/TargetBuffer.cs
 ../corlib/System.Threading/AtomicBoolean.cs
 ../corlib/System.Threading/AtomicBoolean.cs
+System.Threading.Tasks.Dataflow/ActionBlock.cs
+System.Threading.Tasks.Dataflow/BatchBlock.cs
+System.Threading.Tasks.Dataflow/BroadcastBlock.cs
+System.Threading.Tasks.Dataflow/BufferBlock.cs
+System.Threading.Tasks.Dataflow/ChooserBlock.cs
+System.Threading.Tasks.Dataflow/DataflowBlock.cs
+System.Threading.Tasks.Dataflow/JoinBlock.cs
+System.Threading.Tasks.Dataflow/JoinBlock`3.cs
+System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs
+System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs
+System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs
+System.Threading.Tasks.Dataflow/ReceiveBlock.cs
+System.Threading.Tasks.Dataflow/TransformBlock.cs
+System.Threading.Tasks.Dataflow/TransformManyBlock.cs
+System.Threading.Tasks.Dataflow/WriteOnceBlock.cs

+ 111 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ActionBlock.cs

@@ -0,0 +1,111 @@
+// ActionBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class ActionBlock<TInput> : ITargetBlock<TInput>, IDataflowBlock
+	{
+		static readonly ExecutionDataflowBlockOptions defaultOptions = new ExecutionDataflowBlockOptions ();
+
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
+		ExecutingMessageBox<TInput> messageBox;
+		Action<TInput> action;
+		ExecutionDataflowBlockOptions dataflowBlockOptions;
+
+
+		public ActionBlock (Action<TInput> action) : this (action, defaultOptions)
+		{
+			
+		}
+
+		public ActionBlock (Action<TInput> action, ExecutionDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (action == null)
+				throw new ArgumentNullException ("action");
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.action = action;
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue, compHelper, () => true, ProcessQueue, dataflowBlockOptions);
+		}
+
+		public ActionBlock (Func<TInput, Task> action) : this (action, defaultOptions)
+		{
+			throw new NotImplementedException ();
+		}
+
+		public ActionBlock (Func<TInput, Task> action, ExecutionDataflowBlockOptions dataflowBlockOptions)
+		{
+			throw new NotImplementedException ();
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           TInput messageValue,
+		                                           ISourceBlock<TInput> source,
+		                                           bool consumeToAccept)
+		{
+			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+		}
+
+		void ProcessQueue ()
+		{
+			TInput data;
+			while (messageQueue.TryTake (out data))
+				action (data);
+		}
+
+		public void Complete ()
+		{
+			messageBox.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+
+		public int InputCount {
+			get {
+				return messageQueue.Count;
+			}
+		}
+	}
+}
+
+#endif

+ 173 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BatchBlock.cs

@@ -0,0 +1,173 @@
+// B.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class BatchBlock<T> : IPropagatorBlock<T, T[]>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T[]>, IReceivableSourceBlock<T[]>
+	{
+		static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
+
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
+		MessageBox<T> messageBox;
+		MessageVault<T[]> vault;
+		DataflowBlockOptions dataflowBlockOptions;
+		readonly int batchSize;
+		int batchCount;
+		MessageOutgoingQueue<T[]> outgoing;
+		TargetBuffer<T[]> targets = new TargetBuffer<T[]> ();
+		DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+
+		public BatchBlock (int batchSize) : this (batchSize, defaultOptions)
+		{
+
+		}
+
+		public BatchBlock (int batchSize, DataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.batchSize = batchSize;
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, BatchProcess, dataflowBlockOptions);
+			this.outgoing = new MessageOutgoingQueue<T[]> (compHelper, () => messageQueue.IsCompleted);
+			this.vault = new MessageVault<T[]> ();
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           T messageValue,
+		                                           ISourceBlock<T> source,
+		                                           bool consumeToAccept)
+		{
+			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<T[]> target, bool unlinkAfterOne)
+		{
+			var result = targets.AddTarget (target, unlinkAfterOne);
+			outgoing.ProcessForTarget (target, this, false, ref headers);
+
+			return result;
+		}
+
+		public T[] ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T[]> target, out bool messageConsumed)
+		{
+			return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T[]> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public bool TryReceive (Predicate<T[]> filter, out T[] item)
+		{
+			return TryReceive (filter, out item);
+		}
+
+		public bool TryReceiveAll (out IList<T[]> items)
+		{
+			return outgoing.TryReceiveAll (out items);
+		}
+
+		public void TriggerBatch ()
+		{
+			int earlyBatchSize;
+			do {
+				earlyBatchSize = batchCount;
+				if (earlyBatchSize == 0)
+					return;
+			} while (Interlocked.CompareExchange (ref batchCount, 0, earlyBatchSize) != earlyBatchSize);
+
+			MakeBatch (targets.Current, earlyBatchSize);
+		}
+
+		// TODO: there can be out-of-order processing of message elements if two collections
+		// are triggered and work side by side. See if it's a problem or not.
+		void BatchProcess ()
+		{
+			ITargetBlock<T[]> target = targets.Current;
+			int current = Interlocked.Increment (ref batchCount);
+
+			if (current % batchSize != 0)
+				return;
+
+			Interlocked.Add (ref batchCount, -current);
+
+			MakeBatch (target, batchSize);
+		}
+
+		void MakeBatch (ITargetBlock<T[]> target, int size)
+		{
+			T[] batch = new T[size];
+			for (int i = 0; i < size; ++i)
+				messageQueue.TryTake (out batch[i]);
+
+			if (target == null)
+				outgoing.AddData (batch);
+			else
+				target.OfferMessage (headers.Increment (), batch, this, false);
+
+			if (!outgoing.IsEmpty && targets.Current != null)
+				outgoing.ProcessForTarget (targets.Current, this, false, ref headers);
+		}
+
+		public void Complete ()
+		{
+			messageBox.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+
+		public int OutputCount {
+			get {
+				return outgoing.Count;
+			}
+		}
+	}
+}
+
+#endif

+ 140 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BroadcastBlock.cs

@@ -0,0 +1,140 @@
+// BroadcastBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class BroadcastBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
+	{
+		static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
+
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
+		MessageBox<T> messageBox;
+		MessageVault<T> vault;
+		DataflowBlockOptions dataflowBlockOptions;
+		readonly Func<T, T> cloner;
+		MessageOutgoingQueue<T> outgoing;
+		TargetBuffer<T> targets = new TargetBuffer<T> ();
+		DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+
+		public BroadcastBlock (Func<T, T> cloner) : this (cloner, defaultOptions)
+		{
+
+		}
+
+		public BroadcastBlock (Func<T, T> cloner, DataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.cloner = cloner;
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, BroadcastProcess, dataflowBlockOptions);
+			this.outgoing = new MessageOutgoingQueue<T> (compHelper, () => messageQueue.IsCompleted);
+			this.vault = new MessageVault<T> ();
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           T messageValue,
+		                                           ISourceBlock<T> source,
+		                                           bool consumeToAccept)
+		{
+			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<T> target, bool unlinkAfterOne)
+		{
+			return targets.AddTarget (target, unlinkAfterOne);
+		}
+
+		public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
+		{
+			return cloner(vault.ConsumeMessage (messageHeader, target, out messageConsumed));
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public bool TryReceive (Predicate<T> filter, out T item)
+		{
+			return outgoing.TryReceive (filter, out item);
+		}
+
+		public bool TryReceiveAll (out IList<T> items)
+		{
+			return outgoing.TryReceiveAll (out items);
+		}
+
+		void BroadcastProcess ()
+		{
+			T input;
+
+			if (!messageQueue.TryTake (out input) || targets.Current == null)
+				return;
+
+			foreach (var target in targets) {
+				DataflowMessageHeader header = headers.Increment ();
+				if (cloner != null)
+					vault.StoreMessage (header, input);
+				target.OfferMessage (header, input, this, cloner != null);
+				// TODO: verify if it's the correct semantic
+				T save = input;
+				if (!messageQueue.TryTake (out input))
+					input = save;
+			}
+		}
+
+		public void Complete ()
+		{
+			messageBox.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+	}
+}
+
+#endif

+ 145 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/BufferBlock.cs

@@ -0,0 +1,145 @@
+// BufferBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class BufferBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
+	{
+		static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
+
+		DataflowBlockOptions dataflowBlockOptions;
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		MessageBox<T> messageBox;
+		MessageVault<T> vault;
+		MessageOutgoingQueue<T> outgoing;
+		BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
+		TargetBuffer<T> targets = new TargetBuffer<T> ();
+		DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+
+		public BufferBlock () : this (defaultOptions)
+		{
+			
+		}
+
+		public BufferBlock (DataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => outgoing.IsCompleted, ProcessQueue, dataflowBlockOptions);
+			this.outgoing = new MessageOutgoingQueue<T> (compHelper, () => messageQueue.IsCompleted);
+			this.vault = new MessageVault<T> ();
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           T messageValue,
+		                                           ISourceBlock<T> source,
+		                                           bool consumeToAccept)
+		{
+			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<T> target, bool unlinkAfterOne)
+		{
+			var result = targets.AddTarget (target, unlinkAfterOne);
+			ProcessQueue ();
+
+			return result;
+		}
+
+		public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
+		{
+			return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public bool TryReceive (Predicate<T> filter, out T item)
+		{
+			return outgoing.TryReceive (filter, out item);
+		}
+
+		public bool TryReceiveAll (out IList<T> items)
+		{
+			return outgoing.TryReceiveAll (out items);
+		}
+
+		void ProcessQueue ()
+		{
+			ITargetBlock<T> target;
+			T input;
+
+			while (messageQueue.TryTake (out input)) {
+				if ((target = targets.Current) != null)
+					target.OfferMessage (headers.Increment (), input, this, false);
+				else
+					outgoing.AddData (input);
+			}
+
+			if (!outgoing.IsEmpty && (target = targets.Current) != null)
+				outgoing.ProcessForTarget (target, this, false, ref headers);
+		}
+
+		public void Complete ()
+		{
+			messageBox.Complete ();
+			outgoing.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+
+		public int Count {
+			get {
+				return outgoing.Count;
+			}
+		}
+	}
+}
+
+#endif

+ 115 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ChooserBlock.cs

@@ -0,0 +1,115 @@
+// JoinBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	internal class ChooserBlock<T1, T2, T3>
+	{
+		class ChooseTarget<TMessage> : ITargetBlock<TMessage>
+		{
+			Action<TMessage> messageArrived;
+
+			public ChooseTarget (Action<TMessage> messageArrived)
+			{
+				this.messageArrived = messageArrived;
+			}
+
+			public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+			                                           TMessage messageValue,
+			                                           ISourceBlock<TMessage> source,
+			                                           bool consumeToAccept)
+			{
+				messageArrived (messageValue);
+				return DataflowMessageStatus.Accepted;
+			}
+
+			public Task Completion {
+				get {
+					return null;
+				}
+			}
+
+			public void Complete ()
+			{
+
+			}
+
+			public void Fault (Exception ex)
+			{
+			
+			}
+		}
+
+		TaskCompletionSource<int> completion = new TaskCompletionSource<int> ();
+
+		public ChooserBlock (Action<T1> action1, Action<T2> action2, Action<T3> action3, DataflowBlockOptions dataflowBlockOptions)
+		{
+			// TODO: take care of options and its cancellation token
+
+			Target1 = new ChooseTarget<T1> (message => MessageArrived (0, action1, message));
+			Target2 = new ChooseTarget<T2> (message => MessageArrived (1, action2, message));
+			Target3 = new ChooseTarget<T3> (message => MessageArrived (2, action3, message));
+		}
+
+		void MessageArrived<TMessage> (int index, Action<TMessage> action, TMessage value)
+		{
+			try {
+				action (value);
+				completion.SetResult (index);
+			} catch (Exception e) {
+				completion.SetException (e);
+			}
+		}
+
+		public ITargetBlock<T1> Target1 {
+			get;
+			private set;
+		}
+
+		public ITargetBlock<T2> Target2 {
+			get;
+			private set;
+		}
+
+		public ITargetBlock<T3> Target3 {
+			get;
+			private set;
+		}
+
+		public Task<int> Completion {
+			get {
+				return completion.Task;
+			}
+		}
+	}
+}
+
+#endif

+ 221 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/DataflowBlock.cs

@@ -0,0 +1,221 @@
+// IPropagatorBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public static class DataflowBlock
+	{
+		static DataflowMessageHeader globalHeader = new DataflowMessageHeader ();
+
+		public static IObservable<TOutput> AsObservable<TOutput> (this ISourceBlock<TOutput> source)
+		{
+			if (source == null)
+				throw new ArgumentNullException ("source");
+
+			return new ObservableDataflowBlock<TOutput> (source);
+		}
+
+		public static IObserver<TInput> AsObserver<TInput> (this ITargetBlock<TInput> target)
+		{
+			if (target == null)
+				throw new ArgumentNullException ("target");
+
+			return new ObserverDataflowBlock<TInput> (target);
+		}
+
+		public static Task<int> Choose<T1, T2> (ISourceBlock<T1> source1, Action<T1> action1, ISourceBlock<T2> source2, Action<T2> action2)
+		{
+			return Choose<T1, T2> (source1, action1, source2, action2, new DataflowBlockOptions ());
+		}
+
+		public static Task<int> Choose<T1, T2> (ISourceBlock<T1> source1,
+		                                        Action<T1> action1,
+		                                        ISourceBlock<T2> source2,
+		                                        Action<T2> action2,
+		                                        DataflowBlockOptions dataflowBlockOptions)
+		{
+			if (source1 == null)
+				throw new ArgumentNullException ("source1");
+			if (source2 == null)
+				throw new ArgumentNullException ("source2");
+
+			var chooser = new ChooserBlock<T1, T2, object> (action1, action2, null, dataflowBlockOptions);
+			source1.LinkTo (chooser.Target1);
+			source2.LinkTo (chooser.Target2);
+
+			return chooser.Completion;
+		}
+
+		public static Task<int> Choose<T1, T2, T3> (ISourceBlock<T1> source1,
+		                                            Action<T1> action1,
+		                                            ISourceBlock<T2> source2,
+		                                            Action<T2> action2,
+		                                            ISourceBlock<T3> source3,
+		                                            Action<T3> action3)
+		{
+			return Choose (source1, action1, source2, action2, source3, action3, new DataflowBlockOptions ());
+		}
+
+		public static Task<int> Choose<T1, T2, T3> (ISourceBlock<T1> source1,
+		                                            Action<T1> action1,
+		                                            ISourceBlock<T2> source2,
+		                                            Action<T2> action2,
+		                                            ISourceBlock<T3> source3,
+		                                            Action<T3> action3,
+		                                            DataflowBlockOptions dataflowBlockOptions)
+		{
+			if (source1 == null)
+				throw new ArgumentNullException ("source1");
+			if (source2 == null)
+				throw new ArgumentNullException ("source2");
+			if (source3 == null)
+				throw new ArgumentNullException ("source3");
+
+			var chooser = new ChooserBlock<T1, T2, T3> (action1, action2, action3, dataflowBlockOptions);
+			source1.LinkTo (chooser.Target1);
+			source2.LinkTo (chooser.Target2);
+			source3.LinkTo (chooser.Target3);
+
+			return chooser.Completion;
+		}
+
+		public static IPropagatorBlock<TInput, TOutput> Encapsulate<TInput, TOutput> (ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
+		{
+			return new PropagatorWrapperBlock<TInput, TOutput> (target, source);
+		}
+
+		public static IDisposable LinkTo<TOutput> (this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target)
+		{
+			return source.LinkTo (target, (_) => true);
+		}
+
+		public static IDisposable LinkTo<TOutput> (this ISourceBlock<TOutput> source, ITargetBlock<TOutput> target, Predicate<TOutput> predicate)
+		{
+			return source.LinkTo (target, predicate, true);
+		}
+
+		public static IDisposable LinkTo<TOutput> (this ISourceBlock<TOutput> source,
+		                                           ITargetBlock<TOutput> target,
+		                                           Predicate<TOutput> predicate,
+		                                           bool discardsMessages)
+		{
+			return source.LinkTo (target, false);
+		}
+
+		public static Task<bool> OutputAvailableAsync<TOutput> (this ISourceBlock<TOutput> source)
+		{
+			throw new NotImplementedException ();
+		}
+
+		public static bool Post<TInput> (this ITargetBlock<TInput> target, TInput item)
+		{
+			if (target == null)
+				throw new ArgumentNullException ("target");
+
+			return target.OfferMessage (globalHeader.Increment (), item, null, false) == DataflowMessageStatus.Accepted;
+		}
+
+		public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source)
+		{
+			return Receive<TOutput> (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
+		}
+
+		public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
+		{
+			return Receive<TOutput> (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
+		}
+
+		public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
+		{
+			return Receive<TOutput> (source, timeout, CancellationToken.None);
+		}
+
+		public static TOutput Receive<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
+		{
+			if (source == null)
+				throw new ArgumentNullException ("source");
+			if (timeout.TotalMilliseconds < -1)
+				throw new ArgumentOutOfRangeException ("timeout");
+
+			cancellationToken.ThrowIfCancellationRequested ();
+
+			long tm = (long)timeout.TotalMilliseconds;
+			ReceiveBlock<TOutput> block = new ReceiveBlock<TOutput> ();
+			var bridge = source.LinkTo (block);
+			return block.WaitAndGet (bridge, cancellationToken, tm);
+		}
+
+		public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source)
+		{
+			return ReceiveAsync<TOutput> (source, TimeSpan.FromMilliseconds (-1), CancellationToken.None);
+		}
+
+		public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, CancellationToken cancellationToken)
+		{
+			return ReceiveAsync (source, TimeSpan.FromMilliseconds (-1), cancellationToken);
+		}
+
+		public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout)
+		{
+			return ReceiveAsync<TOutput> (source, timeout, CancellationToken.None);
+		}
+
+		public static Task<TOutput> ReceiveAsync<TOutput> (this ISourceBlock<TOutput> source, TimeSpan timeout, CancellationToken cancellationToken)
+		{
+			if (source == null)
+				throw new ArgumentNullException ("source");
+			if (timeout.TotalMilliseconds < -1)
+				throw new ArgumentOutOfRangeException ("timeout");
+
+			cancellationToken.ThrowIfCancellationRequested ();
+
+			long tm = (long)timeout.TotalMilliseconds;
+			ReceiveBlock<TOutput> block = new ReceiveBlock<TOutput> ();
+			var bridge = source.LinkTo (block);
+			return block.AsyncGet (bridge, cancellationToken, tm);
+		}
+
+		public static bool TryReceive<TOutput> (this IReceivableSourceBlock<TOutput> source, out TOutput item)
+		{
+			item = default (TOutput);
+			if (source == null)
+				throw new ArgumentNullException ("source");
+
+			return source.TryReceive (null, out item);
+		}
+
+		public static Task<bool> SendAsync<TInput> (this ITargetBlock<TInput> target, TInput item)
+		{
+			throw new NotImplementedException ();
+		}
+	}
+}
+
+#endif

+ 213 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock.cs

@@ -0,0 +1,213 @@
+// JoinBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class JoinBlock<T1, T2> : IReceivableSourceBlock<Tuple<T1, T2>>, ISourceBlock<Tuple<T1, T2>>, IDataflowBlock
+	{
+		static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
+
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		GroupingDataflowBlockOptions dataflowBlockOptions;
+		TargetBuffer<Tuple<T1, T2>> targets = new TargetBuffer<Tuple<T1, T2>> ();
+		MessageVault<Tuple<T1, T2>> vault = new MessageVault<Tuple<T1, T2>> ();
+		MessageOutgoingQueue<Tuple<T1, T2>> outgoing;
+
+		JoinTarget<T1> target1;
+		JoinTarget<T2> target2;
+
+		DataflowMessageHeader headers;
+
+		public JoinBlock () : this (defaultOptions)
+		{
+
+		}
+
+		public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.target1 = new JoinTarget<T1> (this, SignalArrivalTarget1, new BlockingCollection<T1> (), compHelper);
+			this.target2 = new JoinTarget<T2> (this, SignalArrivalTarget2, new BlockingCollection<T2> (), compHelper);
+			this.outgoing = new MessageOutgoingQueue<Tuple<T1, T2>> (compHelper, () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2>> target, bool unlinkAfterOne)
+		{
+			var result = targets.AddTarget (target, unlinkAfterOne);
+			outgoing.ProcessForTarget (target, this, false, ref headers);
+			return result;
+		}
+
+		public bool TryReceive (Predicate<Tuple<T1, T2>> filter, out Tuple<T1, T2> item)
+		{
+			return outgoing.TryReceive (filter, out item);
+		}
+
+		public bool TryReceiveAll (out IList<Tuple<T1, T2>> items)
+		{
+			return outgoing.TryReceiveAll (out items);
+		}
+
+		public Tuple<T1, T2> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target, out bool messageConsumed)
+		{
+			return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2>> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public void Complete ()
+		{
+			outgoing.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+
+		void SignalArrivalTarget1 ()
+		{
+			T2 val2;
+			if (target2.Buffer.TryTake (out val2)) {
+				T1 val1 = target1.Buffer.Take ();
+				TriggerMessage (val1, val2);
+			}
+		}
+
+		void SignalArrivalTarget2 ()
+		{
+			T1 val1;
+			if (target1.Buffer.TryTake (out val1)) {
+				T2 val2 = target2.Buffer.Take ();
+				TriggerMessage (val1, val2);
+			}
+		}
+
+		void TriggerMessage (T1 val1, T2 val2)
+		{
+			Tuple<T1, T2> tuple = Tuple.Create (val1, val2);
+			ITargetBlock<Tuple<T1, T2>> target = targets.Current;
+
+			if (target == null) {
+				outgoing.AddData (tuple);
+			} else {
+				target.OfferMessage (headers.Increment (),
+				                     tuple,
+				                     this,
+				                     false);
+			}
+
+			if (!outgoing.IsEmpty && (target = targets.Current) != null)
+				outgoing.ProcessForTarget (target, this, false, ref headers);
+		}
+
+		class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
+		{
+			JoinBlock<T1, T2> joinBlock;
+			BlockingCollection<TTarget> buffer;
+			Action signal;
+
+			public JoinTarget (JoinBlock<T1, T2> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
+			: base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
+			{
+				this.joinBlock = joinBlock;
+				this.buffer = buffer;
+				this.signal = signal;
+			}
+
+			protected override void EnsureProcessing ()
+			{
+				signal ();
+			}
+
+			public BlockingCollection<TTarget> Buffer {
+				get {
+					return buffer;
+				}
+			}
+
+			DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
+			                                                          TTarget messageValue,
+			                                                          ISourceBlock<TTarget> source,
+			                                                          bool consumeToAccept)
+			{
+				return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			}
+
+			void IDataflowBlock.Complete ()
+			{
+				Complete ();
+			}
+
+			Task IDataflowBlock.Completion {
+				get {
+					return joinBlock.Completion;
+				}
+			}
+
+			void IDataflowBlock.Fault (Exception e)
+			{
+				joinBlock.Fault (e);
+			}
+		}
+
+		public ITargetBlock<T1> Target1 {
+			get {
+				return target1;
+			}
+		}
+
+		public ITargetBlock<T2> Target2 {
+			get {
+				return target2;
+			}
+		}
+	}
+}
+
+#endif

+ 235 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/JoinBlock`3.cs

@@ -0,0 +1,235 @@
+// JoinBlock`3.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class JoinBlock<T1, T2, T3> : IReceivableSourceBlock<Tuple<T1, T2, T3>>, ISourceBlock<Tuple<T1, T2, T3>>, IDataflowBlock
+	{
+		static readonly GroupingDataflowBlockOptions defaultOptions = new GroupingDataflowBlockOptions ();
+
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		GroupingDataflowBlockOptions dataflowBlockOptions;
+		TargetBuffer<Tuple<T1, T2, T3>> targets = new TargetBuffer<Tuple<T1, T2, T3>> ();
+		MessageVault<Tuple<T1, T2, T3>> vault = new MessageVault<Tuple<T1, T2, T3>> ();
+		MessageOutgoingQueue<Tuple<T1, T2, T3>> outgoing;
+
+		JoinTarget<T1> target1;
+		JoinTarget<T2> target2;
+		JoinTarget<T3> target3;
+
+		SpinLock targetLock = new SpinLock (false);
+
+		DataflowMessageHeader headers;
+
+		public JoinBlock () : this (defaultOptions)
+		{
+
+		}
+
+		public JoinBlock (GroupingDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.dataflowBlockOptions = dataflowBlockOptions;
+
+			Func<bool> checker1 = () => target2.Buffer.Count == 0 || target3.Buffer.Count == 0;
+			Func<bool> checker2 = () => target1.Buffer.Count == 0 || target3.Buffer.Count == 0;
+			Func<bool> checker3 = () => target1.Buffer.Count == 0 || target2.Buffer.Count == 0;
+
+			this.target1 = new JoinTarget<T1> (this, () => SignalArrivalTargetImpl (checker1), new BlockingCollection<T1> (), compHelper);
+			this.target2 = new JoinTarget<T2> (this, () => SignalArrivalTargetImpl (checker2), new BlockingCollection<T2> (), compHelper);
+			this.target3 = new JoinTarget<T3> (this, () => SignalArrivalTargetImpl (checker3), new BlockingCollection<T3> (), compHelper);
+			this.outgoing =
+				new MessageOutgoingQueue<Tuple<T1, T2, T3>> (compHelper,
+				                                             () => target1.Buffer.IsCompleted || target2.Buffer.IsCompleted || target3.Buffer.IsCompleted);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<Tuple<T1, T2, T3>> target, bool unlinkAfterOne)
+		{
+			var result = targets.AddTarget (target, unlinkAfterOne);
+			outgoing.ProcessForTarget (target, this, false, ref headers);
+			return result;
+		}
+
+		public bool TryReceive (Predicate<Tuple<T1, T2, T3>> filter, out Tuple<T1, T2, T3> item)
+		{
+			return outgoing.TryReceive (filter, out item);
+		}
+
+		public bool TryReceiveAll (out IList<Tuple<T1, T2, T3>> items)
+		{
+			return outgoing.TryReceiveAll (out items);
+		}
+
+		public Tuple<T1, T2, T3> ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target, out bool messageConsumed)
+		{
+			return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<Tuple<T1, T2, T3>> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public void Complete ()
+		{
+			outgoing.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+
+		// TODO : see if we can find a lockless implementation
+		void SignalArrivalTargetImpl (Func<bool> check)
+		{
+			bool taken = false;
+			T1 value1;
+			T2 value2;
+			T3 value3;
+
+			try {
+				targetLock.Enter (ref taken);
+				if (check ())
+					return;
+
+				value1 = target1.Buffer.Take ();
+				value2 = target2.Buffer.Take ();
+				value3 = target3.Buffer.Take ();
+			} finally {
+				if (taken)
+					targetLock.Exit ();
+			}
+
+			TriggerMessage (value1, value2, value3);
+		}
+
+		void TriggerMessage (T1 val1, T2 val2, T3 val3)
+		{
+			Tuple<T1, T2, T3> tuple = Tuple.Create (val1, val2, val3);
+			ITargetBlock<Tuple<T1, T2, T3>> target = targets.Current;
+			if (target == null) {
+				outgoing.AddData (tuple);
+			} else {
+				target.OfferMessage (headers.Increment (),
+				                     tuple,
+				                     this,
+				                     false);
+			}
+
+			if (!outgoing.IsEmpty && (target = targets.Current) != null)
+				outgoing.ProcessForTarget (target, this, false, ref headers);
+		}
+
+		class JoinTarget<TTarget> : MessageBox<TTarget>, ITargetBlock<TTarget>
+		{
+			JoinBlock<T1, T2, T3> joinBlock;
+			BlockingCollection<TTarget> buffer;
+			Action signal;
+
+			public JoinTarget (JoinBlock<T1, T2, T3> joinBlock, Action signal, BlockingCollection<TTarget> buffer, CompletionHelper helper)
+			: base (buffer, helper, () => joinBlock.outgoing.IsCompleted)
+			{
+				this.joinBlock = joinBlock;
+				this.buffer = buffer;
+				this.signal = signal;
+			}
+
+			protected override void EnsureProcessing ()
+			{
+				signal ();
+			}
+
+			public BlockingCollection<TTarget> Buffer {
+				get {
+					return buffer;
+				}
+			}
+
+			DataflowMessageStatus ITargetBlock<TTarget>.OfferMessage (DataflowMessageHeader messageHeader,
+			                                                          TTarget messageValue,
+			                                                          ISourceBlock<TTarget> source,
+			                                                          bool consumeToAccept)
+			{
+				return OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+			}
+
+			void IDataflowBlock.Complete ()
+			{
+				Complete ();
+			}
+
+			Task IDataflowBlock.Completion {
+				get {
+					return joinBlock.Completion;
+				}
+			}
+
+			void IDataflowBlock.Fault (Exception e)
+			{
+				joinBlock.Fault (e);
+			}
+		}
+
+		public ITargetBlock<T1> Target1 {
+			get {
+				return target1;
+			}
+		}
+
+		public ITargetBlock<T2> Target2 {
+			get {
+				return target2;
+			}
+		}
+
+		public ITargetBlock<T3> Target3 {
+			get {
+				return target3;
+			}
+		}
+	}
+}
+
+#endif

+ 2 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/MessageOutgoingQueue.cs

@@ -70,6 +70,8 @@ namespace System.Threading.Tasks.Dataflow
 
 
 			foreach (var output in GetNonBlockingConsumingEnumerable ())
 			foreach (var output in GetNonBlockingConsumingEnumerable ())
 				target.OfferMessage (headers.Increment (), output, source, consumeToAccept);
 				target.OfferMessage (headers.Increment (), output, source, consumeToAccept);
+
+			VerifyCompleteness ();
 		}
 		}
 
 
 		public bool TryReceive (Predicate<T> filter, out T item)
 		public bool TryReceive (Predicate<T> filter, out T item)

+ 96 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObservableDataflowBlock.cs

@@ -0,0 +1,96 @@
+// ObservableDataflowBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	internal class ObservableDataflowBlock<TSource> : IObservable<TSource>
+	{
+		class ObserverWrapper : ITargetBlock<TSource>
+		{
+			IObserver<TSource> observer;
+
+			public ObserverWrapper (IObserver<TSource> observer)
+			{
+				this.observer = observer;
+			}
+
+			public void Complete ()
+			{
+				observer.OnCompleted ();
+			}
+
+			public void Fault (Exception ex)
+			{
+				observer.OnError (ex);
+			}
+
+			public Task Completion {
+				get {
+					return null;
+				}
+			}
+
+			public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+			                                           TSource messageValue,
+			                                           ISourceBlock<TSource> source,
+			                                           bool consumeToAccept)
+			{
+				if (consumeToAccept) {
+					if (!source.ReserveMessage (messageHeader, this))
+						return DataflowMessageStatus.NotAvailable;
+					bool consumed;
+					messageValue = source.ConsumeMessage (messageHeader, this, out consumed);
+					if (!consumed)
+						return DataflowMessageStatus.NotAvailable;
+				}
+
+				observer.OnNext (messageValue);
+
+				return DataflowMessageStatus.Accepted;
+			}
+		}
+
+		ISourceBlock<TSource> source;
+
+		public ObservableDataflowBlock (ISourceBlock<TSource> source)
+		{
+			this.source = source;
+		}
+
+		public IDisposable Subscribe (IObserver<TSource> observer)
+		{
+			ObserverWrapper wrapper = new ObserverWrapper (observer);
+			return source.LinkTo (wrapper);
+		}
+	}
+}
+
+#endif

+ 60 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ObserverDataflowBlock.cs

@@ -0,0 +1,60 @@
+// ObserverDataflowBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	internal class ObserverDataflowBlock<TInput> : IObserver<TInput>
+	{
+		ITargetBlock<TInput> target;
+
+		public ObserverDataflowBlock (ITargetBlock<TInput> target)
+		{
+			this.target = target;
+		}
+
+		public void OnCompleted ()
+		{
+			target.Complete ();
+		}
+
+		public void OnError (Exception ex)
+		{
+			target.Fault (ex);
+		}
+
+		public void OnNext (TInput value)
+		{
+			target.Post (value);
+		}
+	}
+}
+
+#endif

+ 96 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/PropagatorWrapperBlock.cs

@@ -0,0 +1,96 @@
+// PropagatorWrapperBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	internal class PropagatorWrapperBlock<TInput, TOutput> : IPropagatorBlock<TInput, TOutput>
+	{
+		ITargetBlock<TInput> target;
+		ISourceBlock<TOutput> source;
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+
+		public PropagatorWrapperBlock (ITargetBlock<TInput> target, ISourceBlock<TOutput> source)
+		{
+			this.target = target;
+			this.source = source;
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           TInput messageValue,
+		                                           ISourceBlock<TInput> source,
+		                                           bool consumeToAccept)
+		{
+			return target.OfferMessage (messageHeader, messageValue, source, consumeToAccept);
+		}
+
+		public TOutput ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
+		{
+			return source.ConsumeMessage (messageHeader, target, out messageConsumed);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<TOutput> target, bool unlinkAfterOne)
+		{
+			return source.LinkTo (target, unlinkAfterOne);
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
+		{
+			source.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
+		{
+			return source.ReserveMessage (messageHeader, target);
+		}
+
+		public void Complete ()
+		{
+			compHelper.Complete ();
+			source.Complete ();
+			target.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+			source.Fault (ex);
+			target.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+	}
+}
+
+#endif

+ 118 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/ReceiveBlock.cs

@@ -0,0 +1,118 @@
+// ReceiveBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	/* This internal block is used by the Receive methods in DataflowBlock static class
+	 * to retrieve elements in a blocking way
+	 */
+	internal class ReceiveBlock<TOutput> : ITargetBlock<TOutput>
+	{
+		ManualResetEventSlim waitHandle = new ManualResetEventSlim (false);
+		TaskCompletionSource<TOutput> completion = new TaskCompletionSource<TOutput> ();
+		IDisposable linkBridge;
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           TOutput messageValue,
+		                                           ISourceBlock<TOutput> source,
+		                                           bool consumeToAccept)
+		{
+			if (!messageHeader.IsValid)
+				return DataflowMessageStatus.Declined;
+
+			if (consumeToAccept) {
+				bool consummed;
+				if (!source.ReserveMessage (messageHeader, this))
+					return DataflowMessageStatus.NotAvailable;
+				messageValue = source.ConsumeMessage (messageHeader, this, out consummed);
+				if (!consummed)
+					return DataflowMessageStatus.NotAvailable;
+			}
+
+			ReceivedValue = messageValue;
+			completion.TrySetResult (messageValue);
+			Thread.MemoryBarrier ();
+			waitHandle.Set ();
+
+			/* We do the unlinking here so that we don't get called twice
+			 */
+			if (linkBridge != null) {
+				linkBridge.Dispose ();
+				linkBridge = null;
+			}
+
+			return DataflowMessageStatus.Accepted;
+		}
+
+		public TOutput WaitAndGet (IDisposable bridge, CancellationToken token, long timeout)
+		{
+			this.linkBridge = bridge;
+			Wait (token, timeout);
+			return ReceivedValue;
+		}
+
+		public Task<TOutput> AsyncGet (IDisposable bridge, CancellationToken token, long timeout)
+		{
+			this.linkBridge = bridge;
+			token.Register (() => completion.TrySetCanceled ());
+			// TODO : take care of timeout through the TaskEx.Wait thing
+			return completion.Task;
+		}
+
+		public void Wait (CancellationToken token, long timeout)
+		{
+			waitHandle.Wait (timeout >= int.MaxValue ? int.MaxValue : (int)timeout, token);
+		}
+
+		public TOutput ReceivedValue {
+			get;
+			private set;
+		}
+
+		public Task Completion {
+			get {
+				return null;
+			}
+		}
+
+		public void Complete ()
+		{
+
+		}
+
+		public void Fault (Exception ex)
+		{
+			
+		}
+	}
+}
+
+#endif

+ 158 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformBlock.cs

@@ -0,0 +1,158 @@
+// TransformBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class TransformBlock<TInput, TOutput> :
+		IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>
+	{
+		static readonly ExecutionDataflowBlockOptions defaultOptions = new ExecutionDataflowBlockOptions ();
+
+		ExecutionDataflowBlockOptions dataflowBlockOptions;
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
+		MessageBox<TInput> messageBox;
+		MessageVault<TOutput> vault;
+		MessageOutgoingQueue<TOutput> outgoing;
+		TargetBuffer<TOutput> targets = new TargetBuffer<TOutput> ();
+		DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+		readonly Func<TInput, TOutput> transformer;
+
+		public TransformBlock (Func<TInput, TOutput> transformer) : this (transformer, defaultOptions)
+		{
+
+		}
+
+		public TransformBlock (Func<TInput, TOutput> transformer, ExecutionDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.transformer = transformer;
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
+			                                                   compHelper,
+			                                                   () => outgoing.IsCompleted,
+			                                                   TransformProcess,
+			                                                   dataflowBlockOptions);
+			this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
+			this.vault = new MessageVault<TOutput> ();
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           TInput messageValue,
+		                                           ISourceBlock<TInput> source,
+		                                           bool consumeToAccept)
+		{
+			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<TOutput> target, bool unlinkAfterOne)
+		{
+			var result = targets.AddTarget (target, unlinkAfterOne);
+			outgoing.ProcessForTarget (target, this, false, ref headers);
+			return result;
+		}
+
+		public TOutput ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
+		{
+			return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public bool TryReceive (Predicate<TOutput> filter, out TOutput item)
+		{
+			return outgoing.TryReceive (filter, out item);
+		}
+
+		public bool TryReceiveAll (out IList<TOutput> items)
+		{
+			return outgoing.TryReceiveAll (out items);
+		}
+
+		void TransformProcess ()
+		{
+			ITargetBlock<TOutput> target;
+			TInput input;
+
+			while (messageQueue.TryTake (out input)) {
+				TOutput output = transformer (input);
+
+				if ((target = targets.Current) != null)
+					target.OfferMessage (headers.Increment (), output, this, false);
+				else
+					outgoing.AddData (output);
+			}
+
+			if (!outgoing.IsEmpty && (target = targets.Current) != null)
+				outgoing.ProcessForTarget (target, this, false, ref headers);
+		}
+
+		public void Complete ()
+		{
+			messageBox.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+
+		public int OutputCount {
+			get {
+				return outgoing.Count;
+			}
+		}
+
+		public int InputCount {
+			get {
+				return messageQueue.Count;
+			}
+		}
+	}
+}
+
+#endif

+ 158 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/TransformManyBlock.cs

@@ -0,0 +1,158 @@
+// TransformManyBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class TransformManyBlock<TInput, TOutput> :
+		IPropagatorBlock<TInput, TOutput>, ITargetBlock<TInput>, IDataflowBlock, ISourceBlock<TOutput>, IReceivableSourceBlock<TOutput>
+	{
+		static readonly ExecutionDataflowBlockOptions defaultOptions = new ExecutionDataflowBlockOptions ();
+
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		BlockingCollection<TInput> messageQueue = new BlockingCollection<TInput> ();
+		MessageBox<TInput> messageBox;
+		MessageVault<TOutput> vault;
+		ExecutionDataflowBlockOptions dataflowBlockOptions;
+		readonly Func<TInput, IEnumerable<TOutput>> transformer;
+		MessageOutgoingQueue<TOutput> outgoing;
+		TargetBuffer<TOutput> targets = new TargetBuffer<TOutput> ();
+		DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+
+		public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transformer) : this (transformer, defaultOptions)
+		{
+
+		}
+
+		public TransformManyBlock (Func<TInput, IEnumerable<TOutput>> transformer, ExecutionDataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.transformer = transformer;
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.messageBox = new ExecutingMessageBox<TInput> (messageQueue,
+			                                                   compHelper,
+			                                                   () => outgoing.IsCompleted,
+			                                                   TransformProcess,
+			                                                   dataflowBlockOptions);
+			this.outgoing = new MessageOutgoingQueue<TOutput> (compHelper, () => messageQueue.IsCompleted);
+			this.vault = new MessageVault<TOutput> ();
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           TInput messageValue,
+		                                           ISourceBlock<TInput> source,
+		                                           bool consumeToAccept)
+		{
+			return messageBox.OfferMessage (this, messageHeader, messageValue, source, consumeToAccept);
+		}
+
+		public IDisposable LinkTo (ITargetBlock<TOutput> target, bool unlinkAfterOne)
+		{
+			var result = targets.AddTarget (target, unlinkAfterOne);
+			outgoing.ProcessForTarget (target, this, false, ref headers);
+			return result;
+		}
+
+		public TOutput ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target, out bool messageConsumed)
+		{
+			return vault.ConsumeMessage (messageHeader, target, out messageConsumed);
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<TOutput> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public bool TryReceive (Predicate<TOutput> filter, out TOutput item)
+		{
+			return outgoing.TryReceive (filter, out item);
+		}
+
+		public bool TryReceiveAll (out IList<TOutput> items)
+		{
+			return outgoing.TryReceiveAll (out items);
+		}
+
+		void TransformProcess ()
+		{
+			ITargetBlock<TOutput> target;
+			TInput input;
+
+			while (messageQueue.TryTake (out input)) {
+				foreach (var item in transformer (input)) {
+					if ((target = targets.Current) != null)
+						target.OfferMessage (headers.Increment (), item, this, false);
+					else
+						outgoing.AddData (item);
+				}
+			}
+
+			if (!outgoing.IsEmpty && (target = targets.Current) != null)
+				outgoing.ProcessForTarget (target, this, false, ref headers);
+		}
+
+		public void Complete ()
+		{
+			messageBox.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+
+		public int OutputCount {
+			get {
+				return outgoing.Count;
+			}
+		}
+
+		public int InputCount {
+			get {
+				return messageQueue.Count;
+			}
+		}
+	}
+}
+
+#endif

+ 170 - 0
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow/WriteOnceBlock.cs

@@ -0,0 +1,170 @@
+// WriteOnceBlock.cs
+//
+// Copyright (c) 2011 Jérémie "garuma" Laval
+//
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+//
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+//
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+//
+//
+
+#if NET_4_0 || MOBILE
+
+using System;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Collections.Concurrent;
+
+namespace System.Threading.Tasks.Dataflow
+{
+	public sealed class WriteOnceBlock<T> : IPropagatorBlock<T, T>, ITargetBlock<T>, IDataflowBlock, ISourceBlock<T>, IReceivableSourceBlock<T>
+	{
+		static readonly DataflowBlockOptions defaultOptions = new DataflowBlockOptions ();
+
+		CompletionHelper compHelper = CompletionHelper.GetNew ();
+		BlockingCollection<T> messageQueue = new BlockingCollection<T> ();
+		MessageBox<T> messageBox;
+		MessageVault<T> vault;
+		DataflowBlockOptions dataflowBlockOptions;
+		readonly Func<T, T> cloner;
+		TargetBuffer<T> targets = new TargetBuffer<T> ();
+		DataflowMessageHeader headers = DataflowMessageHeader.NewValid ();
+
+		AtomicBooleanValue written;
+		bool ready;
+		T finalValue;
+
+		public WriteOnceBlock (Func<T, T> cloner) : this (cloner, defaultOptions)
+		{
+
+		}
+
+		public WriteOnceBlock (Func<T, T> cloner, DataflowBlockOptions dataflowBlockOptions)
+		{
+			if (dataflowBlockOptions == null)
+				throw new ArgumentNullException ("dataflowBlockOptions");
+
+			this.cloner = cloner;
+			this.dataflowBlockOptions = dataflowBlockOptions;
+			this.messageBox = new PassingMessageBox<T> (messageQueue, compHelper, () => true, BroadcastProcess, dataflowBlockOptions);
+			this.vault = new MessageVault<T> ();
+		}
+
+		public DataflowMessageStatus OfferMessage (DataflowMessageHeader messageHeader,
+		                                           T messageValue,
+		                                           ISourceBlock<T> source,
+		                                           bool consumeToAccept)
+		{
+			if (written.TryRelaxedSet ()) {
+				Thread.MemoryBarrier ();
+				finalValue = messageValue;
+				Thread.MemoryBarrier ();
+				ready = true;
+				return messageBox.OfferMessage (this, messageHeader, finalValue, source, consumeToAccept);
+			} else {
+				return DataflowMessageStatus.DecliningPermanently;
+			}
+		}
+
+		public IDisposable LinkTo (ITargetBlock<T> target, bool unlinkAfterOne)
+		{
+			return targets.AddTarget (target, unlinkAfterOne);
+		}
+
+		public T ConsumeMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target, out bool messageConsumed)
+		{
+			return cloner(vault.ConsumeMessage (messageHeader, target, out messageConsumed));
+		}
+
+		public void ReleaseReservation (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			vault.ReleaseReservation (messageHeader, target);
+		}
+
+		public bool ReserveMessage (DataflowMessageHeader messageHeader, ITargetBlock<T> target)
+		{
+			return vault.ReserveMessage (messageHeader, target);
+		}
+
+		public bool TryReceive (Predicate<T> filter, out T item)
+		{
+			item = default (T);
+			if (!written.Value)
+				return false;
+
+			if (!ready) {
+				SpinWait spin = new SpinWait ();
+				while (!ready)
+					spin.SpinOnce ();
+			}
+
+			if (filter == null || filter (finalValue)) {
+				item = cloner != null ? cloner (finalValue) : finalValue;
+				return true;
+			}
+
+			return false;
+		}
+
+		public bool TryReceiveAll (out IList<T> items)
+		{
+			items = null;
+			if (!written.Value)
+				return false;
+
+			T item;
+			if (!TryReceive (null, out item))
+				return false;
+
+			items = new T[] { item };
+			return true;
+		}
+
+		void BroadcastProcess ()
+		{
+			T input;
+
+			if (!messageQueue.TryTake (out input) || targets.Current == null)
+				return;
+
+			foreach (var target in targets) {
+				DataflowMessageHeader header = headers.Increment ();
+				if (cloner != null)
+					vault.StoreMessage (header, input);
+				target.OfferMessage (header, input, this, cloner != null);
+			}
+		}
+
+		public void Complete ()
+		{
+			messageBox.Complete ();
+		}
+
+		public void Fault (Exception ex)
+		{
+			compHelper.Fault (ex);
+		}
+
+		public Task Completion {
+			get {
+				return compHelper.Completion;
+			}
+		}
+	}
+}
+
+#endif

+ 11 - 1
mcs/class/System.Threading.Tasks.Dataflow/System.Threading.Tasks.Dataflow_test.dll.sources

@@ -2,4 +2,14 @@ System.Threading.Tasks/ConcurrentExclusiveSchedulerPairTest.cs
 System.Threading.Tasks.Dataflow/DataflowMessageHeaderTest.cs
 System.Threading.Tasks.Dataflow/DataflowMessageHeaderTest.cs
 System.Threading.Tasks.Dataflow/CompletionHelperTest.cs
 System.Threading.Tasks.Dataflow/CompletionHelperTest.cs
 ../System.Threading.Tasks.Dataflow/CompletionHelper.cs
 ../System.Threading.Tasks.Dataflow/CompletionHelper.cs
-
+System.Threading.Tasks.Dataflow/ActionBlockTest.cs
+System.Threading.Tasks.Dataflow/BatchBlockTest.cs
+System.Threading.Tasks.Dataflow/BroadcastBlockTest.cs
+System.Threading.Tasks.Dataflow/BufferBlockTest.cs
+System.Threading.Tasks.Dataflow/CompletionTest.cs
+System.Threading.Tasks.Dataflow/DataflowBlockTest.cs
+System.Threading.Tasks.Dataflow/JoinBlockTest.cs
+System.Threading.Tasks.Dataflow/JoinBlock`3Test.cs
+System.Threading.Tasks.Dataflow/TransformBlockTest.cs
+System.Threading.Tasks.Dataflow/TransformManyBlockTest.cs
+System.Threading.Tasks.Dataflow/WriteOnceBlockTest.cs

+ 72 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/ActionBlockTest.cs

@@ -0,0 +1,72 @@
+#if NET_4_0
+// 
+// ActionBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class ActionBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			bool[] array = new bool[3];
+			ActionBlock<int> block = new ActionBlock<int> ((i) => array[i] = true);
+
+			for (int i = 0; i < array.Length; ++i)
+				Assert.IsTrue (block.Post (i), "Not accepted");
+
+			// TODO: use a more sensible approach here based on Completion
+			Thread.Sleep (1300);
+			
+			Assert.IsTrue (array.All (b => b), "Some false");
+		}
+
+		[Test]
+		public void CompleteTest ()
+		{
+			ActionBlock<int> block = new ActionBlock<int> ((i) => Thread.Sleep (100));
+
+			for (int i = 0; i < 10; i++)
+				Assert.IsTrue (block.Post (i), "Not Accepted");
+
+			block.Complete ();
+			// Still element to be processed so Completion should be false
+			Assert.IsFalse (block.Completion.IsCompleted);
+			block.Completion.Wait ();
+			Assert.IsTrue (block.Completion.IsCompleted);
+		}
+	}
+}
+#endif

+ 132 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BatchBlockTest.cs

@@ -0,0 +1,132 @@
+#if NET_4_0
+// 
+// BatchBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class BatchBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			int[] array = null;
+
+			var buffer = new BatchBlock<int> (10);
+			var block = new ActionBlock<int[]> (i => array = i);
+			buffer.LinkTo<int[]>(block);
+			
+			for (int i = 0; i < 9; i++)
+				Assert.IsTrue (buffer.Post (i));
+
+			Thread.Sleep (1600);
+
+			Assert.IsNull (array);
+
+			Assert.IsTrue (buffer.Post (42));
+			Thread.Sleep (1600);
+
+			Assert.IsNotNull (array);
+			CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8, 42 }, array);
+		}
+
+		[Test]
+		public void TriggerBatchTest ()
+		{
+			int[] array = null;
+
+			var buffer = new BatchBlock<int> (10);
+			var block = new ActionBlock<int[]> (i => array = i);
+			buffer.LinkTo(block);
+			
+			for (int i = 0; i < 9; i++)
+				Assert.IsTrue (buffer.Post (i));
+
+			buffer.TriggerBatch ();
+			Thread.Sleep (1600);
+
+			Assert.IsNotNull (array);
+
+			Assert.IsTrue (buffer.Post (42));
+			Thread.Sleep (1600);
+
+			CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 }, array);
+		}
+
+		[Test]
+		public void TriggerBatchLateBinding ()
+		{
+			int[] array = null;
+
+			var buffer = new BatchBlock<int> (10);
+			var block = new ActionBlock<int[]> (i => array = i);
+			
+			for (int i = 0; i < 9; i++)
+				Assert.IsTrue (buffer.Post (i));
+
+			buffer.TriggerBatch ();
+			buffer.LinkTo(block);
+
+			Thread.Sleep (1600);
+
+			Assert.IsNotNull (array);
+
+			Assert.IsTrue (buffer.Post (42));
+			Thread.Sleep (1600);
+
+			CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 }, array);
+		}
+
+		[Test]
+		public void LateTriggerBatchKeepCountTest ()
+		{
+			int[] array = null;
+
+			var buffer = new BatchBlock<int> (15);
+			var block = new ActionBlock<int[]> (i => array = i);
+			
+			for (int i = 0; i < 9; i++)
+				Assert.IsTrue (buffer.Post (i));
+			buffer.TriggerBatch ();
+			Assert.IsTrue (buffer.Post (42));
+			buffer.LinkTo(block);
+
+			Thread.Sleep (1600);
+
+			Assert.IsNotNull (array);
+			CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 5, 6, 7, 8 }, array);			
+		}
+	}
+}
+#endif

+ 87 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BroadcastBlockTest.cs

@@ -0,0 +1,87 @@
+#if NET_4_0
+// 
+// BroadcastBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class BroadcastBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			bool act1 = false, act2 = false;
+
+			var broadcast = new BroadcastBlock<int> (null);
+			var action1 = new ActionBlock<int> (i => act1 = i == 42);
+			var action2 = new ActionBlock<int> (i => act2 = i == 42);
+
+			broadcast.LinkTo (action1);
+			broadcast.LinkTo (action2);
+
+			Assert.IsTrue (broadcast.Post (42));
+
+			Thread.Sleep (1600);
+
+			Assert.IsTrue (act1);
+			Assert.IsTrue (act2);
+		}
+
+		[Test]
+		public void CloningTest ()
+		{
+			object act1 = null, act2 = null;
+
+			object source = new object ();
+			var broadcast = new BroadcastBlock<object> (o => new object ());
+			var action1 = new ActionBlock<object> (i => act1 = i);
+			var action2 = new ActionBlock<object> (i => act2 = i);
+
+			broadcast.LinkTo (action1);
+			broadcast.LinkTo (action2);
+
+			Assert.IsTrue (broadcast.Post (source));
+
+			Thread.Sleep (1600);
+
+			Assert.IsNotNull (act1);
+			Assert.IsNotNull (act2);
+
+			Assert.IsFalse (source.Equals (act1));
+			Assert.IsFalse (source.Equals (act2));
+			Assert.IsFalse (act2.Equals (act1));
+		}
+	}
+}
+#endif

+ 97 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/BufferBlockTest.cs

@@ -0,0 +1,97 @@
+#if NET_4_0
+// 
+// BufferBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class BufferBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			int data = -1;
+			BufferBlock<int> buffer = new BufferBlock<int> ();
+			ActionBlock<int> action = new ActionBlock<int> ((i) => data = i);
+			buffer.LinkTo (action);
+
+			Assert.IsTrue (buffer.Post (42));
+			Thread.Sleep (600);
+			Assert.AreEqual (42, data);	
+		}
+
+		[Test]
+		public void LateBindingTest ()
+		{
+			BufferBlock<int> buffer = new BufferBlock<int> ();
+
+			for (int i = 0; i < 10; i++)
+				Assert.IsTrue (buffer.Post (i));
+			int count = 0;
+				
+			ActionBlock<int> block = new ActionBlock<int> ((i) => Interlocked.Increment (ref count));
+			buffer.LinkTo (block);
+			
+			Thread.Sleep (600);
+
+			Assert.AreEqual (10, count);
+		}
+
+		[Test]
+		public void MultipleBindingTest ()
+		{
+			BufferBlock<int> buffer = new BufferBlock<int> ();
+
+			int count = 0;
+				
+			ActionBlock<int> block = new ActionBlock<int> ((i) => Interlocked.Decrement (ref count));
+			IDisposable bridge = buffer.LinkTo (block);
+			for (int i = 0; i < 10; i++)
+				Assert.IsTrue (buffer.Post (i));
+			Thread.Sleep (600);
+
+			count = 0;
+			bridge.Dispose ();
+
+			ActionBlock<int> block2 = new ActionBlock<int> ((i) => Interlocked.Increment (ref count));
+			buffer.LinkTo (block2);
+			for (int i = 0; i < 10; i++)
+				Assert.IsTrue (buffer.Post (i));
+			Thread.Sleep (600);
+
+			Assert.AreEqual (10, count);
+		}
+	}
+}
+#endif

+ 105 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/CompletionTest.cs

@@ -0,0 +1,105 @@
+#if NET_4_0
+// 
+// CompletionTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class CompletionTest
+	{
+		[Test]
+		public void WithNoElements ()
+		{
+			var block = new BufferBlock<int> ();
+			//var block2 = new BufferBlock<int> ();
+			//block.LinkTo (block2);
+			block.Post (42);
+			Thread.Sleep (600);
+			//((IDataflowBlock)block2).Fault (new Exception ());
+			block.Complete ();
+			Thread.Sleep (600);
+			Console.WriteLine (block.Completion.IsCompleted);
+			Console.WriteLine (block.Completion.Status);
+			block.Receive ();
+			Console.WriteLine (block.Completion.IsCompleted);
+			Console.WriteLine (block.Completion.Status);
+		}
+
+		[Test]
+		public void WithElementsStillLingering ()
+		{
+			var block = new BufferBlock<int> ();
+			//var block2 = new BufferBlock<int> ();
+			//block.LinkTo (block2);
+			block.Post (42);
+			Thread.Sleep (600);
+			//((IDataflowBlock)block2).Fault (new Exception ());
+			block.Complete ();
+			Thread.Sleep (600);
+			Console.WriteLine (block.Completion.IsCompleted);
+			Console.WriteLine (block.Completion.Status);
+			block.Receive ();
+			Console.WriteLine (block.Completion.IsCompleted);
+			Console.WriteLine (block.Completion.Status);
+		}
+
+		[Test]
+		public void EmptyAfterReceive ()
+		{
+			var block = new BufferBlock<int> ();
+			block.Post (42);
+			Thread.Sleep (600);
+			block.Complete ();
+			Thread.Sleep (600);
+			Assert.IsFalse (block.Completion.IsCompleted);
+			Assert.AreEqual (TaskStatus.WaitingForActivation, block.Completion.Status);
+			block.Receive ();
+			Assert.IsTrue (block.Completion.IsCompleted);
+			Assert.AreEqual (TaskStatus.RanToCompletion, block.Completion.Status);
+		}
+
+		[Test]
+		public void WithElementsStillLingeringButFaulted ()
+		{
+			var block = new BufferBlock<int> ();
+			block.Post (42);
+			Thread.Sleep (600);
+			((IDataflowBlock)block).Fault (new Exception ());
+			Thread.Sleep (600);
+			Assert.IsTrue (block.Completion.IsCompleted);
+			Assert.AreEqual (TaskStatus.Faulted, block.Completion.Status);
+		}
+	}
+}
+#endif

+ 145 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/DataflowBlockTest.cs

@@ -0,0 +1,145 @@
+#if NET_4_0
+// 
+// DataflowBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class DataflowBlockTest
+	{
+		[Test]
+		public void ChooseTest ()
+		{
+			var source1 = new BufferBlock<int> ();
+			var source2 = new BufferBlock<long> ();
+
+			bool action1 = false;
+			bool action2 = false;
+			var completion = DataflowBlock.Choose (source1, (_) => action1 = true, source2, (_) => action2 = true);
+
+			source1.Post (42);
+
+			Thread.Sleep (1600);
+			Assert.IsTrue (action1);
+			Assert.IsFalse (action2);
+			Assert.IsTrue (completion.IsCompleted);
+			Assert.AreEqual (TaskStatus.RanToCompletion, completion.Status);
+			Assert.AreEqual (0, completion.Result);
+		}
+
+		[Test]
+		public void ChooseTest_3 ()
+		{
+			var source1 = new BufferBlock<int> ();
+			var source2 = new BufferBlock<long> ();
+			var source3 = new BufferBlock<object> ();
+
+			bool action1 = false;
+			bool action2 = false;
+			bool action3 = false;
+			var completion = DataflowBlock.Choose (source1, (_) => action1 = true, source2, (_) => action2 = true, source3, (_) => action3 = true);
+
+			source3.Post (new object ());
+
+			Thread.Sleep (1600);
+			Assert.IsFalse (action1);
+			Assert.IsFalse (action2);
+			Assert.IsTrue (action3);
+			Assert.IsTrue (completion.IsCompleted);
+			Assert.AreEqual (TaskStatus.RanToCompletion, completion.Status);
+			Assert.AreEqual (2, completion.Result);			
+		}
+
+		[Test]
+		public void TryReceiveTest ()
+		{
+			var block = new BufferBlock<int> ();
+			int value = -1;
+
+			block.Post (42);
+			Thread.Sleep (500);
+			Assert.IsTrue (block.TryReceive (out value));
+			Assert.AreEqual (42, value);
+		}
+
+		[Test]
+		public void ReceiveTest ()
+		{
+			var block = new BufferBlock<int> ();
+			Task.Factory.StartNew (() => { Thread.Sleep (1400); block.Post (42); });
+			Assert.AreEqual (42, block.Receive ());
+		}
+
+		[Test]
+		public void AsyncReceiveTest ()
+		{
+			int result = -1;
+			var mre = new ManualResetEventSlim (false);
+
+			var block = new WriteOnceBlock<int> (null);
+			block.ReceiveAsync ().ContinueWith (i => { result = i.Result; mre.Set (); });
+			Task.Factory.StartNew (() => { Thread.Sleep (600); block.Post (42); });
+			mre.Wait ();
+
+			Assert.AreEqual (42, result);
+		}
+
+		[Test]
+		public void AsyncReceiveTestCanceled ()
+		{
+			var src = new CancellationTokenSource ();
+
+			var block = new WriteOnceBlock<int> (null);
+			var task = block.ReceiveAsync (src.Token);
+			Task.Factory.StartNew (() => { Thread.Sleep (800); block.Post (42); });
+			Thread.Sleep (50);
+			src.Cancel ();
+
+			AggregateException ex = null;
+
+			try {
+				task.Wait ();
+			} catch (AggregateException e) {
+				ex = e;
+			}
+
+			Assert.IsNotNull (ex);
+			Assert.IsNotNull (ex.InnerException);
+			Assert.IsInstanceOfType (typeof (OperationCanceledException), ex.InnerException);
+			Assert.IsTrue (task.IsCompleted);
+			Assert.AreEqual (TaskStatus.Canceled, task.Status);
+		}
+	}
+}
+#endif

+ 64 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/JoinBlockTest.cs

@@ -0,0 +1,64 @@
+#if NET_4_0
+// 
+// JoinBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class JoinBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			Tuple<int, int> tuple = null;
+
+			var ablock = new ActionBlock<Tuple<int, int>> (t => tuple = t);
+			var block = new JoinBlock<int, int> ();
+			block.LinkTo (ablock);
+
+			block.Target1.Post (42);
+
+			Thread.Sleep (1600);
+			Assert.IsNull (tuple);
+
+			block.Target2.Post (24);
+
+			Thread.Sleep (1600);
+			Assert.IsNotNull (tuple);
+			Assert.AreEqual (42, tuple.Item1);
+			Assert.AreEqual (24, tuple.Item2);
+		}
+	}
+}
+#endif

+ 70 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/JoinBlock`3Test.cs

@@ -0,0 +1,70 @@
+#if NET_4_0
+// 
+// JoinBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class JoinBlock3Test
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			Tuple<int, int, int> tuple = null;
+
+			var ablock = new ActionBlock<Tuple<int, int, int>> (t => tuple = t);
+			var block = new JoinBlock<int, int, int> ();
+			block.LinkTo (ablock);
+
+			block.Target1.Post (42);
+
+			Thread.Sleep (1600);
+			Assert.IsNull (tuple);
+
+			block.Target2.Post (24);
+
+			Thread.Sleep (1600);
+			Assert.IsNull (tuple);
+
+			block.Target3.Post (44);
+
+			Thread.Sleep (1600);
+			Assert.IsNotNull (tuple);
+			Assert.AreEqual (42, tuple.Item1);
+			Assert.AreEqual (24, tuple.Item2);
+			Assert.AreEqual (44, tuple.Item3);
+		}
+	}
+}
+#endif

+ 75 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformBlockTest.cs

@@ -0,0 +1,75 @@
+#if NET_4_0
+// 
+// TransformBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class TransformBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			int[] array = new int[10];
+			ActionBlock<int> action = new ActionBlock<int> ((i) => array[Math.Abs (i)] = i);
+			TransformBlock<int, int> block = new TransformBlock<int, int> (i => -i);
+			block.LinkTo (action);
+
+			for (int i = 0; i < array.Length; ++i)
+				Assert.IsTrue (block.Post (i), "Not accepted");
+
+			Thread.Sleep (1300);
+
+			CollectionAssert.AreEqual (new int[] { 0, -1, -2, -3, -4, -5, -6, -7, -8, -9 }, array);
+		}
+
+		[Test]
+		public void DeferredUsageTest ()
+		{
+			int[] array = new int[10];
+			ActionBlock<int> action = new ActionBlock<int> ((i) => array[Math.Abs (i)] = i);
+			TransformBlock<int, int> block = new TransformBlock<int, int> (i => -i);
+
+			for (int i = 0; i < array.Length; ++i)
+				Assert.IsTrue (block.Post (i), "Not accepted");
+
+			Thread.Sleep (1300);
+			block.LinkTo (action);
+			Thread.Sleep (600);
+
+			CollectionAssert.AreEqual (new int[] { 0, -1, -2, -3, -4, -5, -6, -7, -8, -9 }, array);
+		}
+	}
+}
+#endif

+ 79 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/TransformManyBlockTest.cs

@@ -0,0 +1,79 @@
+#if NET_4_0
+// 
+// TransformManyBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class TransformManyBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			int insIndex = -1;
+			int[] array = new int[5 + 3];
+
+			var block = new ActionBlock<int> (i => array[Interlocked.Increment (ref insIndex)] = i);
+			var trsm = new TransformManyBlock<int, int> (i => Enumerable.Range (0, i));
+			trsm.LinkTo (block);
+
+			trsm.Post (5);
+			trsm.Post (3);
+
+			Thread.Sleep (1600);
+			
+			CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 0, 1, 2 }, array);
+		}
+
+		[Test]
+		public void DeferredUsageTest ()
+		{
+			int insIndex = -1;
+			int[] array = new int[5 + 3];
+
+			var block = new ActionBlock<int> (i => array[Interlocked.Increment (ref insIndex)] = i);
+			var trsm = new TransformManyBlock<int, int> (i => Enumerable.Range (0, i));
+
+			trsm.Post (5);
+			trsm.Post (3);
+
+			Thread.Sleep (1600);
+			trsm.LinkTo (block);
+			Thread.Sleep (500);
+
+			CollectionAssert.AreEquivalent (new int[] { 0, 1, 2, 3, 4, 0, 1, 2 }, array);
+		}
+	}
+}
+#endif

+ 133 - 0
mcs/class/System.Threading.Tasks.Dataflow/Test/System.Threading.Tasks.Dataflow/WriteOnceBlockTest.cs

@@ -0,0 +1,133 @@
+#if NET_4_0
+// 
+// WriteOnceBlockTest.cs
+//  
+// Author:
+//       Jérémie "garuma" Laval <[email protected]>
+// 
+// Copyright (c) 2011 Jérémie "garuma" Laval
+// 
+// Permission is hereby granted, free of charge, to any person obtaining a copy
+// of this software and associated documentation files (the "Software"), to deal
+// in the Software without restriction, including without limitation the rights
+// to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
+// copies of the Software, and to permit persons to whom the Software is
+// furnished to do so, subject to the following conditions:
+// 
+// The above copyright notice and this permission notice shall be included in
+// all copies or substantial portions of the Software.
+// 
+// THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
+// IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
+// FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
+// AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
+// LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
+// OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
+// THE SOFTWARE.
+
+using System;
+using System.Linq;
+using System.Threading;
+using System.Threading.Tasks;
+using System.Collections.Generic;
+using System.Threading.Tasks.Dataflow;
+
+using NUnit.Framework;
+
+namespace MonoTests.System.Threading.Tasks.Dataflow
+{
+	[TestFixture]
+	public class WriteOnceBlockTest
+	{
+		[Test]
+		public void BasicUsageTest ()
+		{
+			bool act1 = false, act2 = false;
+
+			var broadcast = new WriteOnceBlock<int> (null);
+			var action1 = new ActionBlock<int> (i => act1 = i == 42);
+			var action2 = new ActionBlock<int> (i => act2 = i == 42);
+
+			broadcast.LinkTo (action1);
+			broadcast.LinkTo (action2);
+
+			Assert.IsTrue (broadcast.Post (42));
+
+			Thread.Sleep (1600);
+
+			Assert.IsTrue (act1);
+			Assert.IsTrue (act2);
+		}
+
+		[Test]
+		public void CloningTest ()
+		{
+			object act1 = null, act2 = null;
+
+			object source = new object ();
+			var broadcast = new WriteOnceBlock<object> (o => new object ());
+			var action1 = new ActionBlock<object> (i => act1 = i);
+			var action2 = new ActionBlock<object> (i => act2 = i);
+
+			broadcast.LinkTo (action1);
+			broadcast.LinkTo (action2);
+
+			Assert.IsTrue (broadcast.Post (source));
+
+			Thread.Sleep (1600);
+
+			Assert.IsNotNull (act1);
+			Assert.IsNotNull (act2);
+
+			Assert.IsFalse (source.Equals (act1));
+			Assert.IsFalse (source.Equals (act2));
+			Assert.IsFalse (act2.Equals (act1));
+		}
+
+		[Test]
+		public void WriteOnceBehaviorTest ()
+		{
+			bool act1 = false, act2 = false;
+
+			var broadcast = new WriteOnceBlock<int> (null);
+			var action1 = new ActionBlock<int> (i => act1 = i == 42);
+			var action2 = new ActionBlock<int> (i => act2 = i == 42);
+
+			broadcast.LinkTo (action1);
+			broadcast.LinkTo (action2);
+
+			Assert.IsTrue (broadcast.Post (42));
+
+			Thread.Sleep (1600);
+
+			Assert.IsTrue (act1);
+			Assert.IsTrue (act2);
+
+			Assert.IsFalse (broadcast.Post (24));
+			Thread.Sleep (1600);
+
+			Assert.IsTrue (act1);
+			Assert.IsTrue (act2);
+		}
+
+		[Test]
+		public void TryReceiveBehavior ()
+		{
+			var block = new WriteOnceBlock<int> (null);
+			int foo;
+			Assert.IsFalse (block.TryReceive (null, out foo));
+			block.Post (42);
+			Thread.Sleep (300);
+			Assert.IsTrue (block.TryReceive (null, out foo));
+			Assert.AreEqual (42, foo);
+			Assert.IsTrue (block.TryReceive (null, out foo));
+			Assert.IsFalse (block.TryReceive (i => i == 0, out foo));
+			IList<int> bar;
+			Assert.IsTrue (block.TryReceiveAll (out bar));
+			Assert.IsNotNull (bar);
+			Assert.AreEqual (1, bar.Count);
+			Assert.AreEqual (42, bar[0]);
+		}
+	}
+}
+#endif