Browse Source

[PLinq] Add Cancellation checks even deeper. Introduce merge token.

Merge token are simply the combined cancellation source of user (i.e. token supplied with PLinq operator WithCancellation) and internal tokens (supplied with WithImplementerToken).
Jérémie Laval 15 years ago
parent
commit
ec820e9208

+ 3 - 2
mcs/class/System.Core/System.Linq.Parallel.QueryNodes/QueryWhereNode.cs

@@ -98,7 +98,7 @@ namespace System.Linq.Parallel.QueryNodes
 			});
 
 			return sources
-				.Select ((s, i) => GetEnumerator (s, barrier, store, i))
+				.Select ((s, i) => GetEnumerator (s, barrier, options.MergedToken, store, i))
 				.ToList ();
 		}
 
@@ -116,6 +116,7 @@ namespace System.Linq.Parallel.QueryNodes
 
 		IEnumerable<KeyValuePair<long, TSource>> GetEnumerator (IEnumerable<KeyValuePair<long, TSource>> source,
 		                                                        Barrier barrier,
+		                                                        CancellationToken token,
 		                                                        Tuple<TSource, long, bool>[] store, int index)
 		{
 			IEnumerator<KeyValuePair<long, TSource>> current = source.GetEnumerator ();
@@ -129,7 +130,7 @@ namespace System.Linq.Parallel.QueryNodes
 					result = IsIndexed ? indexedPredicate (curr.Value, (int)curr.Key) : predicate (curr.Value);
 					store[index] = Tuple.Create (curr.Value, curr.Key, result);
 
-					barrier.SignalAndWait ();
+					barrier.SignalAndWait (token);
 
 					Tuple<TSource, long, bool> value = store [index];
 

+ 8 - 6
mcs/class/System.Core/System.Linq.Parallel/OrderingEnumerator.cs

@@ -45,14 +45,16 @@ namespace System.Linq.Parallel
 			CountdownEvent stagingCount;
 			CountdownEvent participantCount;
 			CancellationTokenSource src = new CancellationTokenSource ();
+			CancellationToken mergedToken;
 
-			public SlotBucket (int count)
+			public SlotBucket (int count, CancellationToken token)
 			{
 				this.count = count;
 				stagingCount = new CountdownEvent (count);
 				participantCount = new CountdownEvent (count);
 				stagingArea = new KeyValuePair<long, T>?[count];
 				currentIndex = -count;
+				mergedToken = CancellationTokenSource.CreateLinkedTokenSource (src.Token, token).Token;
 			}
 
 			public void Add (KeyValuePair<long, T> value)
@@ -84,7 +86,7 @@ namespace System.Linq.Parallel
 			// Called at the end with ContinueAll
 			public void Stop ()
 			{
-				
+				src.Cancel ();
 			}
 
 			void Skim ()
@@ -123,7 +125,7 @@ namespace System.Linq.Parallel
 				while (!stagingCount.IsSet) {
 					if (!participantCount.IsSet)
 						try {
-							stagingCount.Wait (src.Token);
+							stagingCount.Wait (mergedToken);
 						} catch {
 							Skim ();
 						}
@@ -147,15 +149,15 @@ namespace System.Linq.Parallel
 		IEnumerator<KeyValuePair<long, T>?> currEnum;
 		KeyValuePair<long, T> curr;
 
-		internal OrderingEnumerator (int num)
+		internal OrderingEnumerator (int num, CancellationToken token)
 		{
 			this.num = num;
-			slotBucket = new SlotBucket (num);
+			slotBucket = new SlotBucket (num, token);
 		}
 
 		public void Dispose ()
 		{
-
+			slotBucket.Stop ();
 		}
 
 		public void Reset ()

+ 2 - 3
mcs/class/System.Core/System.Linq.Parallel/ParallelQueryEnumerator.cs

@@ -80,11 +80,10 @@ namespace System.Linq.Parallel
 					buffer = new BlockingCollection<T> (DefaultBufferSize);
 				}
 
-				var src = CancellationTokenSource.CreateLinkedTokenSource (options.Token, options.ImplementerToken);
-				IEnumerable<T> source = buffer.GetConsumingEnumerable (src.Token);
+				IEnumerable<T> source = buffer.GetConsumingEnumerable (options.MergedToken);
 				loader = source.GetEnumerator ();
 			} else {
-				loader = ordEnumerator = new OrderingEnumerator<T> (options.PartitionCount);
+				loader = ordEnumerator = new OrderingEnumerator<T> (options.PartitionCount, options.MergedToken);
 			}
 		}
 

+ 21 - 0
mcs/class/System.Core/System.Linq.Parallel/QueryOptions.cs

@@ -65,6 +65,27 @@ namespace System.Linq.Parallel
 			PartitionCount = partitionCount;
 			PartitionerSettings = null;
 			ImplementerToken = implementerToken;
+
+			MergeTokens (token, implementerToken);
+		}
+
+		void MergeTokens (CancellationToken token, CancellationToken implementerToken)
+		{
+			bool implementedNone = implementerToken == CancellationToken.None;
+			bool tokenNone = token == CancellationToken.None;
+			if (!implementedNone && !tokenNone)
+				MergedToken = CancellationTokenSource.CreateLinkedTokenSource (implementerToken, token).Token;
+			else if (implementedNone && !tokenNone)
+				MergedToken = token;
+			else if (!implementedNone && tokenNone)
+				MergedToken = implementerToken;
+			else
+				MergedToken = CancellationToken.None;
+		}
+
+		public CancellationToken MergedToken {
+			get;
+			private set;
 		}
 	}
 }

+ 5 - 5
mcs/class/System.Core/Test/System.Linq/ParallelEnumerableTests.cs

@@ -545,11 +545,11 @@ namespace MonoTests.System.Linq
 		[TestAttribute]
 		public void ElementAtTestCase()
 		{
-			ParallelTestHelper.Repeat (() => {
-					Assert.AreEqual(1, baseEnumerable.AsParallel ().ElementAt(0), "#1");
-					Assert.AreEqual(51, baseEnumerable.AsParallel ().ElementAt(50), "#2");
-					Assert.AreEqual(489, baseEnumerable.AsParallel ().ElementAt(488), "#3");
-			});
+			//ParallelTestHelper.Repeat (() => {
+					Assert.AreEqual(1, baseEnumerable.AsParallel ().AsOrdered ().ElementAt(0), "#1");
+					Assert.AreEqual(51, baseEnumerable.AsParallel ().AsOrdered ().ElementAt(50), "#2");
+					Assert.AreEqual(489, baseEnumerable.AsParallel ().AsOrdered ().ElementAt(488), "#3");
+			//});
 		}
 
 		[Test]

+ 8 - 1
mcs/class/System/System.Threading/Barrier.cs

@@ -133,13 +133,20 @@ namespace System.Threading
 			SignalAndWait ((c) => { c.Wait (); return true; });
 		}
 		
+		public void SignalAndWait (CancellationToken token)
+		{
+			if (cleaned)
+				throw GetDisposed ();
+			SignalAndWait ((c) => { c.Wait (token); return true; });
+		}
+
 		public bool SignalAndWait (int millisecondTimeout)
 		{
 			if (cleaned)
 				throw GetDisposed ();
 			return SignalAndWait ((c) => c.Wait (millisecondTimeout));
 		}
-		
+
 		public bool SignalAndWait (TimeSpan ts)
 		{
 			if (cleaned)