|
|
@@ -35,11 +35,14 @@ namespace System.Collections.Concurrent
|
|
|
{
|
|
|
readonly IProducerConsumerCollection<T> underlyingColl;
|
|
|
readonly int upperBound;
|
|
|
- readonly Func<bool> isFull;
|
|
|
|
|
|
readonly SpinWait sw = new SpinWait ();
|
|
|
|
|
|
AtomicBoolean isComplete;
|
|
|
+ long completeId;
|
|
|
+
|
|
|
+ long addId = long.MinValue;
|
|
|
+ long removeId = long.MinValue;
|
|
|
|
|
|
#region ctors
|
|
|
public BlockingCollection ()
|
|
|
@@ -62,67 +65,124 @@ namespace System.Collections.Concurrent
|
|
|
this.underlyingColl = underlyingColl;
|
|
|
this.upperBound = upperBound;
|
|
|
this.isComplete = new AtomicBoolean ();
|
|
|
-
|
|
|
- if (upperBound == -1)
|
|
|
- isFull = FalseIsFull;
|
|
|
- else
|
|
|
- isFull = CountBasedIsFull;
|
|
|
}
|
|
|
+ #endregion
|
|
|
|
|
|
- static bool FalseIsFull ()
|
|
|
+ #region Add & Remove (+ Try)
|
|
|
+ public void Add (T item)
|
|
|
{
|
|
|
- return false;
|
|
|
+ Add (item, null);
|
|
|
}
|
|
|
|
|
|
- bool CountBasedIsFull ()
|
|
|
+ public void Add (T item, CancellationToken token)
|
|
|
{
|
|
|
- return underlyingColl.Count >= upperBound;
|
|
|
+ Add (item, () => token.IsCancellationRequested);
|
|
|
}
|
|
|
- #endregion
|
|
|
|
|
|
- #region Add & Remove (+ Try)
|
|
|
- public void Add (T item)
|
|
|
+ void Add (T item, Func<bool> cancellationFunc)
|
|
|
{
|
|
|
while (true) {
|
|
|
- while (isFull ()) {
|
|
|
- if (isComplete.Value)
|
|
|
- throw new InvalidOperationException ("The BlockingCollection<T>"
|
|
|
- + " has been marked as complete with regards to additions.");
|
|
|
- Block ();
|
|
|
+ long cachedAddId = addId;
|
|
|
+ long cachedRemoveId = removeId;
|
|
|
+
|
|
|
+ if (upperBound != -1) {
|
|
|
+ if (cachedAddId - cachedRemoveId > upperBound) {
|
|
|
+ Block ();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
}
|
|
|
- // Extra check. The status might have changed after Block() or if isFull() is always false
|
|
|
- if (isComplete.Value)
|
|
|
+
|
|
|
+ // Check our transaction id against completed stored one
|
|
|
+ if (isComplete.Value && cachedAddId >= completeId)
|
|
|
throw new InvalidOperationException ("The BlockingCollection<T> has"
|
|
|
+ " been marked as complete with regards to additions.");
|
|
|
- // Go back in main waiting loop
|
|
|
- if (isFull ())
|
|
|
- continue;
|
|
|
|
|
|
- if (underlyingColl.TryAdd (item))
|
|
|
+ if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) == cachedAddId)
|
|
|
break;
|
|
|
+
|
|
|
+ if (cancellationFunc != null && cancellationFunc ())
|
|
|
+ throw new OperationCanceledException ("CancellationToken triggered");
|
|
|
}
|
|
|
+
|
|
|
+
|
|
|
+ if (!underlyingColl.TryAdd (item))
|
|
|
+ throw new InvalidOperationException ("The underlying collection didn't accept the item.");
|
|
|
}
|
|
|
|
|
|
- public T Remove ()
|
|
|
+ public T Take ()
|
|
|
{
|
|
|
- T item;
|
|
|
-
|
|
|
- while (underlyingColl.Count == 0 || !underlyingColl.TryTake (out item)) {
|
|
|
- if (isComplete.Value)
|
|
|
- throw new OperationCanceledException ("The BlockingCollection<T> is empty and has been marked as complete with regards to additions.");
|
|
|
- Block ();
|
|
|
+ return Take (null);
|
|
|
+ }
|
|
|
+
|
|
|
+ public T Take (CancellationToken token)
|
|
|
+ {
|
|
|
+ return Take (() => token.IsCancellationRequested);
|
|
|
+ }
|
|
|
+
|
|
|
+ T Take (Func<bool> cancellationFunc)
|
|
|
+ {
|
|
|
+ while (true) {
|
|
|
+ long cachedRemoveId = removeId;
|
|
|
+ long cachedAddId = addId;
|
|
|
+
|
|
|
+ // Empty case
|
|
|
+ if (cachedRemoveId == cachedAddId) {
|
|
|
+ if (isComplete.Value && cachedRemoveId >= completeId)
|
|
|
+ throw new OperationCanceledException ("The BlockingCollection<T> has"
|
|
|
+ + " been marked as complete with regards to additions.");
|
|
|
+
|
|
|
+ Block ();
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+
|
|
|
+ if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) == cachedRemoveId)
|
|
|
+ break;
|
|
|
+
|
|
|
+ if (cancellationFunc != null && cancellationFunc ())
|
|
|
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
|
|
|
}
|
|
|
|
|
|
+ T item;
|
|
|
+ while (!underlyingColl.TryTake (out item));
|
|
|
+
|
|
|
return item;
|
|
|
}
|
|
|
|
|
|
public bool TryAdd (T item)
|
|
|
{
|
|
|
- if (isComplete.Value || isFull ()) {
|
|
|
- return false;
|
|
|
- }
|
|
|
+ return TryAdd (item, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ bool TryAdd (T item, Func<bool> contFunc, CancellationToken? token)
|
|
|
+ {
|
|
|
+ do {
|
|
|
+ if (token.HasValue && token.Value.IsCancellationRequested)
|
|
|
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
|
|
|
+
|
|
|
+ long cachedAddId = addId;
|
|
|
+ long cachedRemoveId = removeId;
|
|
|
+
|
|
|
+ if (upperBound != -1) {
|
|
|
+ if (cachedAddId - cachedRemoveId > upperBound) {
|
|
|
+ continue;
|
|
|
+ }
|
|
|
+ }
|
|
|
+
|
|
|
+ // Check our transaction id against completed stored one
|
|
|
+ if (isComplete.Value && cachedAddId >= completeId)
|
|
|
+ throw new InvalidOperationException ("The BlockingCollection<T> has"
|
|
|
+ + " been marked as complete with regards to additions.");
|
|
|
+
|
|
|
+ if (Interlocked.CompareExchange (ref addId, cachedAddId + 1, cachedAddId) != cachedAddId)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ if (!underlyingColl.TryAdd (item))
|
|
|
+ throw new InvalidOperationException ("The underlying collection didn't accept the item.");
|
|
|
+
|
|
|
+ return true;
|
|
|
+ } while (contFunc != null && contFunc ());
|
|
|
|
|
|
- return underlyingColl.TryAdd (item);
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
public bool TryAdd (T item, TimeSpan ts)
|
|
|
@@ -133,38 +193,67 @@ namespace System.Collections.Concurrent
|
|
|
public bool TryAdd (T item, int millisecondsTimeout)
|
|
|
{
|
|
|
Stopwatch sw = Stopwatch.StartNew ();
|
|
|
- while (isFull ()) {
|
|
|
- if (isComplete.Value || sw.ElapsedMilliseconds > millisecondsTimeout) {
|
|
|
- sw.Stop ();
|
|
|
- return false;
|
|
|
+ return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ public bool TryAdd (T item, int millisecondsTimeout, CancellationToken token)
|
|
|
+ {
|
|
|
+ Stopwatch sw = Stopwatch.StartNew ();
|
|
|
+ return TryAdd (item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
|
|
|
+ }
|
|
|
+
|
|
|
+ public bool TryTake (out T item)
|
|
|
+ {
|
|
|
+ return TryTake (out item, null, null);
|
|
|
+ }
|
|
|
+
|
|
|
+ bool TryTake (out T item, Func<bool> contFunc, CancellationToken? token)
|
|
|
+ {
|
|
|
+ item = default (T);
|
|
|
+
|
|
|
+ do {
|
|
|
+ if (token.HasValue && token.Value.IsCancellationRequested)
|
|
|
+ throw new OperationCanceledException ("The CancellationToken has had cancellation requested.");
|
|
|
+
|
|
|
+ long cachedRemoveId = removeId;
|
|
|
+ long cachedAddId = addId;
|
|
|
+
|
|
|
+ // Empty case
|
|
|
+ if (cachedRemoveId == cachedAddId) {
|
|
|
+ if (isComplete.Value && cachedRemoveId >= completeId)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ continue;
|
|
|
}
|
|
|
- Block ();
|
|
|
- }
|
|
|
- return TryAdd (item);
|
|
|
+
|
|
|
+ if (Interlocked.CompareExchange (ref removeId, cachedRemoveId + 1, cachedRemoveId) != cachedRemoveId)
|
|
|
+ continue;
|
|
|
+
|
|
|
+ return underlyingColl.TryTake (out item);
|
|
|
+ } while (contFunc != null && contFunc ());
|
|
|
+
|
|
|
+ return false;
|
|
|
}
|
|
|
|
|
|
- public bool TryRemove (out T item)
|
|
|
+ public bool TryTake (out T item, TimeSpan ts)
|
|
|
{
|
|
|
- return underlyingColl.TryTake (out item);
|
|
|
+ return TryTake (out item, (int)ts.TotalMilliseconds);
|
|
|
}
|
|
|
|
|
|
- public bool TryRemove (out T item, TimeSpan ts)
|
|
|
+ public bool TryTake (out T item, int millisecondsTimeout)
|
|
|
{
|
|
|
- return TryRemove (out item, (int)ts.TotalMilliseconds);
|
|
|
+ item = default (T);
|
|
|
+ Stopwatch sw = Stopwatch.StartNew ();
|
|
|
+
|
|
|
+ return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, null);
|
|
|
}
|
|
|
|
|
|
- public bool TryRemove (out T item, int millisecondsTimeout)
|
|
|
+ public bool TryTake (out T item, int millisecondsTimeout, CancellationToken token)
|
|
|
{
|
|
|
+ item = default (T);
|
|
|
Stopwatch sw = Stopwatch.StartNew ();
|
|
|
- while (underlyingColl.Count == 0) {
|
|
|
- if (isComplete.Value || sw.ElapsedMilliseconds > millisecondsTimeout) {
|
|
|
- item = default (T);
|
|
|
- return false;
|
|
|
- }
|
|
|
-
|
|
|
- Block ();
|
|
|
- }
|
|
|
- return TryRemove (out item);
|
|
|
+
|
|
|
+ return TryTake (out item, () => sw.ElapsedMilliseconds < millisecondsTimeout, token);
|
|
|
}
|
|
|
#endregion
|
|
|
|
|
|
@@ -185,7 +274,7 @@ namespace System.Collections.Concurrent
|
|
|
return false;
|
|
|
}
|
|
|
|
|
|
- public static int AddAny (BlockingCollection<T>[] collections, T item)
|
|
|
+ public static int AddToAny (BlockingCollection<T>[] collections, T item)
|
|
|
{
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
@@ -199,7 +288,21 @@ namespace System.Collections.Concurrent
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- public static int TryAddAny (BlockingCollection<T>[] collections, T item)
|
|
|
+ public static int AddToAny (BlockingCollection<T>[] collections, T item, CancellationToken token)
|
|
|
+ {
|
|
|
+ CheckArray (collections);
|
|
|
+ int index = 0;
|
|
|
+ foreach (var coll in collections) {
|
|
|
+ try {
|
|
|
+ coll.Add (item, token);
|
|
|
+ return index;
|
|
|
+ } catch {}
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item)
|
|
|
{
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
@@ -211,7 +314,7 @@ namespace System.Collections.Concurrent
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- public static int TryAddAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
|
|
|
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item, TimeSpan ts)
|
|
|
{
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
@@ -223,7 +326,7 @@ namespace System.Collections.Concurrent
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- public static int TryAddAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
|
|
|
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout)
|
|
|
{
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
@@ -235,14 +338,42 @@ namespace System.Collections.Concurrent
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- public static int RemoveAny (BlockingCollection<T>[] collections, out T item)
|
|
|
+ public static int TryAddToAny (BlockingCollection<T>[] collections, T item, int millisecondsTimeout,
|
|
|
+ CancellationToken token)
|
|
|
+ {
|
|
|
+ CheckArray (collections);
|
|
|
+ int index = 0;
|
|
|
+ foreach (var coll in collections) {
|
|
|
+ if (coll.TryAdd (item, millisecondsTimeout, token))
|
|
|
+ return index;
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static int TakeFromAny (BlockingCollection<T>[] collections, out T item)
|
|
|
+ {
|
|
|
+ item = default (T);
|
|
|
+ CheckArray (collections);
|
|
|
+ int index = 0;
|
|
|
+ foreach (var coll in collections) {
|
|
|
+ try {
|
|
|
+ item = coll.Take ();
|
|
|
+ return index;
|
|
|
+ } catch {}
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static int TakeFromAny (BlockingCollection<T>[] collections, out T item, CancellationToken token)
|
|
|
{
|
|
|
item = default (T);
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
foreach (var coll in collections) {
|
|
|
try {
|
|
|
- item = coll.Remove ();
|
|
|
+ item = coll.Take (token);
|
|
|
return index;
|
|
|
} catch {}
|
|
|
index++;
|
|
|
@@ -250,42 +381,57 @@ namespace System.Collections.Concurrent
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item)
|
|
|
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item)
|
|
|
+ {
|
|
|
+ item = default (T);
|
|
|
+
|
|
|
+ CheckArray (collections);
|
|
|
+ int index = 0;
|
|
|
+ foreach (var coll in collections) {
|
|
|
+ if (coll.TryTake (out item))
|
|
|
+ return index;
|
|
|
+ index++;
|
|
|
+ }
|
|
|
+ return -1;
|
|
|
+ }
|
|
|
+
|
|
|
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
|
|
|
{
|
|
|
item = default (T);
|
|
|
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
foreach (var coll in collections) {
|
|
|
- if (coll.TryRemove (out item))
|
|
|
+ if (coll.TryTake (out item, ts))
|
|
|
return index;
|
|
|
index++;
|
|
|
}
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item, TimeSpan ts)
|
|
|
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
|
|
|
{
|
|
|
item = default (T);
|
|
|
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
foreach (var coll in collections) {
|
|
|
- if (coll.TryRemove (out item, ts))
|
|
|
+ if (coll.TryTake (out item, millisecondsTimeout))
|
|
|
return index;
|
|
|
index++;
|
|
|
}
|
|
|
return -1;
|
|
|
}
|
|
|
|
|
|
- public static int TryRemoveAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout)
|
|
|
+ public static int TryTakeFromAny (BlockingCollection<T>[] collections, out T item, int millisecondsTimeout,
|
|
|
+ CancellationToken token)
|
|
|
{
|
|
|
item = default (T);
|
|
|
|
|
|
CheckArray (collections);
|
|
|
int index = 0;
|
|
|
foreach (var coll in collections) {
|
|
|
- if (coll.TryRemove (out item, millisecondsTimeout))
|
|
|
+ if (coll.TryTake (out item, millisecondsTimeout, token))
|
|
|
return index;
|
|
|
index++;
|
|
|
}
|
|
|
@@ -295,7 +441,9 @@ namespace System.Collections.Concurrent
|
|
|
|
|
|
public void CompleteAdding ()
|
|
|
{
|
|
|
- isComplete.Value = true;
|
|
|
+ // No further add beside that point
|
|
|
+ completeId = addId;
|
|
|
+ isComplete.Value = true;
|
|
|
}
|
|
|
|
|
|
void ICollection.CopyTo (Array array, int index)
|
|
|
@@ -310,8 +458,25 @@ namespace System.Collections.Concurrent
|
|
|
|
|
|
public IEnumerable<T> GetConsumingEnumerable ()
|
|
|
{
|
|
|
- T item;
|
|
|
- while (underlyingColl.TryTake (out item)) {
|
|
|
+ return GetConsumingEnumerable (Take);
|
|
|
+ }
|
|
|
+
|
|
|
+ public IEnumerable<T> GetConsumingEnumerable (CancellationToken token)
|
|
|
+ {
|
|
|
+ return GetConsumingEnumerable (() => Take (token));
|
|
|
+ }
|
|
|
+
|
|
|
+ IEnumerable<T> GetConsumingEnumerable (Func<T> getFunc)
|
|
|
+ {
|
|
|
+ while (true) {
|
|
|
+ T item = default (T);
|
|
|
+
|
|
|
+ try {
|
|
|
+ item = getFunc ();
|
|
|
+ } catch {
|
|
|
+ break;
|
|
|
+ }
|
|
|
+
|
|
|
yield return item;
|
|
|
}
|
|
|
}
|
|
|
@@ -326,13 +491,14 @@ namespace System.Collections.Concurrent
|
|
|
return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
|
|
|
}
|
|
|
|
|
|
- public IEnumerator<T> GetEnumerator ()
|
|
|
+ public void Dispose ()
|
|
|
{
|
|
|
- return ((IEnumerable<T>)underlyingColl).GetEnumerator ();
|
|
|
+
|
|
|
}
|
|
|
|
|
|
- public void Dispose ()
|
|
|
+ protected virtual void Dispose (bool managedRes)
|
|
|
{
|
|
|
+
|
|
|
}
|
|
|
|
|
|
public T[] ToArray ()
|
|
|
@@ -366,7 +532,7 @@ namespace System.Collections.Concurrent
|
|
|
|
|
|
public bool IsCompleted {
|
|
|
get {
|
|
|
- return isComplete.Value && underlyingColl.Count == 0;
|
|
|
+ return isComplete.Value && addId == removeId;
|
|
|
}
|
|
|
}
|
|
|
|