|
|
@@ -25,7 +25,7 @@ using System.Collections.Generic;
|
|
|
|
|
|
namespace System.Threading.Tasks.Dataflow {
|
|
|
/// <summary>
|
|
|
- /// Collection of target blocks for a source block.
|
|
|
+ /// Base class for collection of target blocks for a source block.
|
|
|
/// Also handles sending messages to the target blocks.
|
|
|
/// </summary>
|
|
|
abstract class TargetCollectionBase<T> {
|
|
|
@@ -52,6 +52,10 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
Reserved = new AtomicBoolean ();
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Is called after a message was sent, makes sure the linked is destroyed after
|
|
|
+ /// <see cref="DataflowLinkOptions.MaxMessages"/> were sent.
|
|
|
+ /// </summary>
|
|
|
public void MessageSent()
|
|
|
{
|
|
|
if (remainingMessages != -1)
|
|
|
@@ -61,11 +65,17 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
}
|
|
|
|
|
|
readonly AtomicBoolean disabled = new AtomicBoolean ();
|
|
|
+ /// <summary>
|
|
|
+ /// Is the link destroyed?
|
|
|
+ /// </summary>
|
|
|
public bool Disabled
|
|
|
{
|
|
|
get { return disabled.Value; }
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Destroys the link to this target.
|
|
|
+ /// </summary>
|
|
|
public void Dispose ()
|
|
|
{
|
|
|
disabled.Value = true;
|
|
|
@@ -74,16 +84,22 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
cancellationTokenSource.Cancel ();
|
|
|
|
|
|
Target ignored;
|
|
|
- targetCollection.targetDictionary.TryRemove (TargetBlock, out ignored);
|
|
|
+ targetCollection.TargetDictionary.TryRemove (TargetBlock, out ignored);
|
|
|
|
|
|
// to avoid memory leak; it could take a long time
|
|
|
// before this object is actually removed from the collection
|
|
|
TargetBlock = null;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Does this target have a postponed message?
|
|
|
+ /// </summary>
|
|
|
public AtomicBoolean Postponed { get; private set; }
|
|
|
|
|
|
- // used only by broadcast blocks
|
|
|
+ /// <summary>
|
|
|
+ /// Does this target have a reserved message?
|
|
|
+ /// </summary>
|
|
|
+ /// <remarks>Used only by broadcast blocks.</remarks>
|
|
|
public AtomicBoolean Reserved { get; private set; }
|
|
|
}
|
|
|
|
|
|
@@ -95,7 +111,7 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
readonly ConcurrentQueue<Target> appendQueue = new ConcurrentQueue<Target> ();
|
|
|
readonly LinkedList<Target> targets = new LinkedList<Target> ();
|
|
|
|
|
|
- protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> targetDictionary =
|
|
|
+ protected readonly ConcurrentDictionary<ITargetBlock<T>, Target> TargetDictionary =
|
|
|
new ConcurrentDictionary<ITargetBlock<T>, Target> ();
|
|
|
|
|
|
// lastMessageHeaderId will be always accessed only from one thread
|
|
|
@@ -106,13 +122,19 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
bool firstOffering;
|
|
|
T currentItem;
|
|
|
|
|
|
- public TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept)
|
|
|
+ protected TargetCollectionBase (ISourceBlock<T> block, bool broadcast, bool consumeToAccept)
|
|
|
{
|
|
|
this.block = block;
|
|
|
this.broadcast = broadcast;
|
|
|
this.consumeToAccept = consumeToAccept;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Adds a target block to send messages to.
|
|
|
+ /// </summary>
|
|
|
+ /// <returns>
|
|
|
+ /// An object that can be used to destroy the link to the added target.
|
|
|
+ /// </returns>
|
|
|
public IDisposable AddTarget (ITargetBlock<T> targetBlock, DataflowLinkOptions options)
|
|
|
{
|
|
|
CancellationTokenSource cancellationTokenSource = null;
|
|
|
@@ -129,7 +151,7 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
|
|
|
var target = new Target (
|
|
|
this, targetBlock, options.MaxMessages, cancellationTokenSource);
|
|
|
- targetDictionary [targetBlock] = target;
|
|
|
+ TargetDictionary [targetBlock] = target;
|
|
|
if (options.Append)
|
|
|
appendQueue.Enqueue (target);
|
|
|
else
|
|
|
@@ -138,6 +160,9 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
return target;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Sets the current item to be offered to targets
|
|
|
+ /// </summary>
|
|
|
public void SetCurrentItem (T item)
|
|
|
{
|
|
|
firstOffering = true;
|
|
|
@@ -147,18 +172,32 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
ClearUnpostponed ();
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Clears the collection of "unpostponed" targets.
|
|
|
+ /// </summary>
|
|
|
protected abstract void ClearUnpostponed ();
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Resets the current item to be offered to targets.
|
|
|
+ /// This means there is currently nothing to offer.
|
|
|
+ /// </summary>
|
|
|
public void ResetCurrentItem ()
|
|
|
{
|
|
|
currentItem = default(T);
|
|
|
Thread.VolatileWrite (ref currentMessageHeaderId, 0);
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Is there an item to send right now?
|
|
|
+ /// </summary>
|
|
|
public bool HasCurrentItem {
|
|
|
get { return Thread.VolatileRead (ref currentMessageHeaderId) != 0; }
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Offers the current item to all eligible targets.
|
|
|
+ /// </summary>
|
|
|
+ /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
|
|
|
public bool OfferItemToTargets ()
|
|
|
{
|
|
|
// is there an item to offer?
|
|
|
@@ -169,7 +208,7 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
|
|
|
do {
|
|
|
// order is important here, we want to make sure that prepended target
|
|
|
- // added after appended target is always processed first
|
|
|
+ // added before appended target is processed first
|
|
|
var appended = PrependOrAppend (false);
|
|
|
var prepended = PrependOrAppend (true);
|
|
|
|
|
|
@@ -192,6 +231,9 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Are there any targets that currently require a message to be sent to them?
|
|
|
+ /// </summary>
|
|
|
public bool NeedsProcessing {
|
|
|
get {
|
|
|
return !appendQueue.IsEmpty || !prependQueue.IsEmpty
|
|
|
@@ -199,8 +241,19 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Is the collection of unpostponed targets empty?
|
|
|
+ /// </summary>
|
|
|
protected abstract bool UnpostponedIsEmpty { get; }
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Prepends (appends) targets that should be prepended (appended) to the collection of targets.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="prepend"><c>true</c> to prepend, <c>false</c> to append.</param>
|
|
|
+ /// <returns>
|
|
|
+ /// Nodes that contain first and last target added to the list,
|
|
|
+ /// or <c>null</c> if no nodes were added.
|
|
|
+ /// </returns>
|
|
|
Tuple<LinkedListNode<Target>, LinkedListNode<Target>> PrependOrAppend (
|
|
|
bool prepend)
|
|
|
{
|
|
|
@@ -227,6 +280,10 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
: Tuple.Create (first, last);
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Offers the current item to the targets between the given nodes (inclusive).
|
|
|
+ /// </summary>
|
|
|
+ /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
|
|
|
bool OfferItemToTargets (
|
|
|
Tuple<LinkedListNode<Target>, LinkedListNode<Target>> targetPair)
|
|
|
{
|
|
|
@@ -252,8 +309,16 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Offers the current item to unpostponed targets.
|
|
|
+ /// </summary>
|
|
|
+ /// <returns>Was the item accepted? (Always <c>false</c> for broadcast blocks.)</returns>
|
|
|
protected abstract bool OfferItemToUnpostponed ();
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Offers the current item to the given target.
|
|
|
+ /// </summary>
|
|
|
+ /// <returns>Was the item accepted?</returns>
|
|
|
protected bool OfferItem (Target target)
|
|
|
{
|
|
|
if (target.Reserved.Value)
|
|
|
@@ -282,12 +347,18 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Returns whether the given header corresponds to the current item.
|
|
|
+ /// </summary>
|
|
|
public bool VerifyHeader (DataflowMessageHeader header)
|
|
|
{
|
|
|
return header.Id == Thread.VolatileRead (ref currentMessageHeaderId);
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Target collection for non-broadcast blocks.
|
|
|
+ /// </summary>
|
|
|
class TargetCollection<T> : TargetCollectionBase<T> {
|
|
|
readonly ConcurrentQueue<Target> unpostponedTargets =
|
|
|
new ConcurrentQueue<Target> ();
|
|
|
@@ -297,20 +368,32 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
{
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Is the collection of unpostponed targets empty?
|
|
|
+ /// </summary>
|
|
|
protected override bool UnpostponedIsEmpty {
|
|
|
get { return unpostponedTargets.IsEmpty; }
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Returns whether the given header corresponds to the current item
|
|
|
+ /// and that the given target block postponed this item.
|
|
|
+ /// </summary>
|
|
|
public bool VerifyHeader (DataflowMessageHeader header, ITargetBlock<T> targetBlock)
|
|
|
{
|
|
|
return VerifyHeader (header)
|
|
|
- && targetDictionary[targetBlock].Postponed.Value;
|
|
|
+ && TargetDictionary[targetBlock].Postponed.Value;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Unpostpones the given target.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="targetBlock">Target to unpostpone.</param>
|
|
|
+ /// <param name="messageConsumed">Did the target consume an item?</param>
|
|
|
public void UnpostponeTarget (ITargetBlock<T> targetBlock, bool messageConsumed)
|
|
|
{
|
|
|
Target target;
|
|
|
- if (!targetDictionary.TryGetValue (targetBlock, out target))
|
|
|
+ if (!TargetDictionary.TryGetValue (targetBlock, out target))
|
|
|
return;
|
|
|
|
|
|
if (messageConsumed)
|
|
|
@@ -320,6 +403,9 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
target.Postponed.Value = false;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Clears the collection of "unpostponed" targets.
|
|
|
+ /// </summary>
|
|
|
protected override void ClearUnpostponed ()
|
|
|
{
|
|
|
Target ignored;
|
|
|
@@ -327,6 +413,10 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Offers the current item to unpostponed targets.
|
|
|
+ /// </summary>
|
|
|
+ /// <returns>Was the item accepted?</returns>
|
|
|
protected override bool OfferItemToUnpostponed ()
|
|
|
{
|
|
|
Target target;
|
|
|
@@ -339,6 +429,9 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Target collection for broadcast blocks.
|
|
|
+ /// </summary>
|
|
|
class BroadcastTargetCollection<T> : TargetCollectionBase<T> {
|
|
|
// it's necessary to store the headers because of a race between
|
|
|
// UnpostponeTargetConsumed and SetCurrentItem
|
|
|
@@ -351,19 +444,30 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
{
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Is the collection of unpostponed targets empty?
|
|
|
+ /// </summary>
|
|
|
protected override bool UnpostponedIsEmpty {
|
|
|
get { return unpostponedTargets.IsEmpty; }
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Marks the target as having a reserved message.
|
|
|
+ /// </summary>
|
|
|
public void ReserveTarget (ITargetBlock<T> targetBlock)
|
|
|
{
|
|
|
- targetDictionary [targetBlock].Reserved.Value = true;
|
|
|
+ TargetDictionary [targetBlock].Reserved.Value = true;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Unpostpone target after it consumed a message.
|
|
|
+ /// </summary>
|
|
|
+ /// <param name="targetBlock">The target to unpostpone.</param>
|
|
|
+ /// <param name="header">Header of the message the target consumed.</param>
|
|
|
public void UnpostponeTargetConsumed (ITargetBlock<T> targetBlock,
|
|
|
DataflowMessageHeader header)
|
|
|
{
|
|
|
- Target target = targetDictionary [targetBlock];
|
|
|
+ Target target = TargetDictionary [targetBlock];
|
|
|
|
|
|
target.MessageSent ();
|
|
|
unpostponedTargets.Enqueue (Tuple.Create (target, header));
|
|
|
@@ -372,10 +476,13 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
target.Reserved.Value = false;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Unpostpone target in the case when it didn't successfuly consume a message.
|
|
|
+ /// </summary>
|
|
|
public void UnpostponeTargetNotConsumed (ITargetBlock<T> targetBlock)
|
|
|
{
|
|
|
Target target;
|
|
|
- if (!targetDictionary.TryGetValue (targetBlock, out target))
|
|
|
+ if (!TargetDictionary.TryGetValue (targetBlock, out target))
|
|
|
return;
|
|
|
|
|
|
unpostponedTargets.Enqueue (Tuple.Create (target,
|
|
|
@@ -385,6 +492,9 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
target.Reserved.Value = false;
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Clears the collection of "unpostponed" targets.
|
|
|
+ /// </summary>
|
|
|
protected override void ClearUnpostponed ()
|
|
|
{
|
|
|
Tuple<Target, DataflowMessageHeader> ignored;
|
|
|
@@ -392,6 +502,10 @@ namespace System.Threading.Tasks.Dataflow {
|
|
|
}
|
|
|
}
|
|
|
|
|
|
+ /// <summary>
|
|
|
+ /// Offers the current item to unpostponed targets.
|
|
|
+ /// </summary>
|
|
|
+ /// <returns>Always <c>false</c>.</returns>
|
|
|
protected override bool OfferItemToUnpostponed ()
|
|
|
{
|
|
|
Tuple<Target, DataflowMessageHeader> tuple;
|