|
|
@@ -44,6 +44,7 @@ namespace System.Linq.Parallel
|
|
|
readonly int count;
|
|
|
CountdownEvent stagingCount;
|
|
|
CountdownEvent participantCount;
|
|
|
+ CancellationTokenSource src = new CancellationTokenSource ();
|
|
|
|
|
|
public SlotBucket (int count)
|
|
|
{
|
|
|
@@ -59,17 +60,25 @@ namespace System.Linq.Parallel
|
|
|
long index = value.Key;
|
|
|
|
|
|
if (index >= currentIndex && index < currentIndex + count) {
|
|
|
- stagingArea [index % count] = value;
|
|
|
+ stagingArea[index % count] = value;
|
|
|
stagingCount.Signal ();
|
|
|
} else {
|
|
|
- temporaryArea.TryAdd (value.Key, value.Value);
|
|
|
+ temporaryArea.TryAdd (index, value.Value);
|
|
|
+ if (index >= currentIndex && index < currentIndex + count) {
|
|
|
+ T dummy;
|
|
|
+ if (temporaryArea.TryRemove (index, out dummy)) {
|
|
|
+ stagingArea[index % count] = value;
|
|
|
+ stagingCount.Signal ();
|
|
|
+ }
|
|
|
+ }
|
|
|
}
|
|
|
}
|
|
|
|
|
|
// Called by each worker's endAction
|
|
|
public void EndParticipation ()
|
|
|
{
|
|
|
- participantCount.Signal ();
|
|
|
+ if (participantCount.Signal ())
|
|
|
+ src.Cancel ();
|
|
|
}
|
|
|
|
|
|
// Called at the end with ContinueAll
|
|
|
@@ -108,21 +117,24 @@ namespace System.Linq.Parallel
|
|
|
stagingCount.Reset ();
|
|
|
|
|
|
Interlocked.Add (ref currentIndex, count);
|
|
|
-
|
|
|
- SpinWait sw = new SpinWait ();
|
|
|
+
|
|
|
+ Skim ();
|
|
|
|
|
|
while (!stagingCount.IsSet) {
|
|
|
- if (participantCount.IsSet && temporaryArea.IsEmpty) {
|
|
|
+ if (!participantCount.IsSet)
|
|
|
+ try {
|
|
|
+ stagingCount.Wait (src.Token);
|
|
|
+ } catch {
|
|
|
+ Skim ();
|
|
|
+ }
|
|
|
+
|
|
|
+ if (participantCount.IsSet) {
|
|
|
// Totally finished
|
|
|
- if (stagingCount.CurrentCount == count)
|
|
|
- return null;
|
|
|
- else
|
|
|
+ if (stagingArea[0].HasValue)
|
|
|
break;
|
|
|
+ else
|
|
|
+ return null;
|
|
|
}
|
|
|
- Skim ();
|
|
|
-
|
|
|
- if (!stagingCount.IsSet)
|
|
|
- sw.SpinOnce ();
|
|
|
}
|
|
|
|
|
|
return ((IEnumerable<KeyValuePair<long, T>?>)stagingArea).GetEnumerator ();
|