Sfoglia il codice sorgente

Rework internal wait mechanism in thread workers participation code

Jérémie Laval 15 anni fa
parent
commit
4091f624db

+ 1 - 1
mcs/class/corlib/System.Threading.Tasks/IScheduler.cs

@@ -33,7 +33,7 @@ namespace System.Threading.Tasks
 	{
 		void AddWork (Task t);
 		void ParticipateUntil (Task task);
-		bool ParticipateUntil (Task task, Func<bool> predicate);
+		bool ParticipateUntil (Task task, ManualResetEventSlim predicateEvt, int millisecondsTimeout);
 		void PulseAll ();
 	}
 }

+ 16 - 14
mcs/class/corlib/System.Threading.Tasks/Scheduler.cs

@@ -63,33 +63,35 @@ namespace System.Threading.Tasks
 		{
 			if (task.IsCompleted)
 				return;
+
+			ManualResetEventSlim evt = new ManualResetEventSlim (false);
+			task.ContinueWith (_ => evt.Set (), TaskContinuationOptions.ExecuteSynchronously);
+			if (evt.IsSet)
+				return;
 			
-			ParticipateUntilInternal (task, TaskCompletedPredicate);
+			ParticipateUntilInternal (task, evt, -1);
 		}
 		
-		public bool ParticipateUntil (Task task, Func<bool> predicate)
+		public bool ParticipateUntil (Task task, ManualResetEventSlim evt, int millisecondsTimeout)
 		{
 			if (task.IsCompleted)
 				return false;
+
+			bool isFromPredicate = true;
+			task.ContinueWith (_ => { isFromPredicate = false; evt.Set (); }, TaskContinuationOptions.ExecuteSynchronously);
 			
-			bool isFromPredicate = false;
-			
-			ParticipateUntilInternal (task, delegate {
-				if (predicate ()) {
-					isFromPredicate = true;
-					return true;
-				}
-				return task.IsCompleted;
-			});
+			ParticipateUntilInternal (task, evt, millisecondsTimeout);
 
 			return isFromPredicate;
 		}
 		
 		// Called with Task.WaitAll(someTasks) or Task.WaitAny(someTasks) so that we can remove ourselves
 		// also when our wait condition is ok
-		public void ParticipateUntilInternal (Task self, Func<Task, bool> predicate)
-		{	
-			ThreadWorker.WorkerMethod (self, predicate, workQueue, workers, pulseHandle);
+		public void ParticipateUntilInternal (Task self, ManualResetEventSlim evt, int millisecondsTimeout)
+		{
+			if (millisecondsTimeout == -1)
+				millisecondsTimeout = int.MaxValue;
+			ThreadWorker.WorkerMethod (self, evt, millisecondsTimeout, workQueue, workers, pulseHandle);
 		}
 
 		static bool TaskCompletedPredicate (Task self)

+ 3 - 4
mcs/class/corlib/System.Threading.Tasks/SchedulerProxy.cs

@@ -50,16 +50,15 @@ namespace System.Threading.Tasks
 			ParticipateUntil (() => task.IsCompleted);
 		}
 		
-		public bool ParticipateUntil (Task task, Func<bool> predicate)
+		public bool ParticipateUntil (Task task, ManualResetEventSlim evt, int millisecondsTimeout)
 		{
 			bool fromPredicate = false;
 			
 			ParticipateUntil (() => {
-				if (predicate ()) {
+				if (evt.IsSet) {
 					fromPredicate = true;
 					return true;
 				}
-				
 				return task.IsCompleted;
 			});
 			
@@ -69,7 +68,7 @@ namespace System.Threading.Tasks
 		public void ParticipateUntil (Func<bool> predicate)
 		{
 			SpinWait sw = new SpinWait ();
-			
+
 			while (!predicate ())
 				sw.SpinOnce ();
 		}

+ 19 - 13
mcs/class/corlib/System.Threading.Tasks/Task.cs

@@ -133,7 +133,7 @@ namespace System.Threading.Tasks
 		static void EmptyFunc (object o)
 		{
 		}
-		
+
 		#region Start
 		public void Start ()
 		{
@@ -209,6 +209,7 @@ namespace System.Threading.Tasks
 		{
 			Task continuation = new Task ((o) => continuationAction ((Task)o), this, cancellationToken, GetCreationOptions (continuationOptions));
 			ContinueWithCore (continuation, continuationOptions, scheduler);
+
 			return continuation;
 		}
 		
@@ -521,9 +522,13 @@ namespace System.Threading.Tasks
 				millisecondsTimeout = ComputeTimeout (millisecondsTimeout, watch);
 			}
 
-			Func<bool> stopFunc
-				= delegate { cancellationToken.ThrowIfCancellationRequested (); return watch.ElapsedMilliseconds > millisecondsTimeout; };
-			bool result = scheduler.ParticipateUntil (this, stopFunc);
+			ManualResetEventSlim predicateEvt = new ManualResetEventSlim (false);
+			if (cancellationToken != CancellationToken.None) {
+				cancellationToken.Register (predicateEvt.Set);
+				cancellationToken.ThrowIfCancellationRequested ();
+			}
+
+			bool result = scheduler.ParticipateUntil (this, predicateEvt, millisecondsTimeout);
 
 			if (exception != null)
 				throw exception;
@@ -637,6 +642,7 @@ namespace System.Threading.Tasks
 			IScheduler sched = null;
 			Task task = null;
 			Watch watch = Watch.StartNew ();
+			ManualResetEventSlim predicateEvt = new ManualResetEventSlim (false);
 
 			foreach (Task t in tasks) {
 				int indexResult = index++;
@@ -648,6 +654,9 @@ namespace System.Threading.Tasks
 					// Check if we are the first to have finished
 					if (result == 1)
 						indexFirstFinished = indexResult;
+
+					// Stop waiting
+					predicateEvt.Set ();
 				}, TaskContinuationOptions.ExecuteSynchronously);
 
 				if (sched == null && t.scheduler != null) {
@@ -666,20 +675,17 @@ namespace System.Threading.Tasks
 				task = tasks[shandle];
 				millisecondsTimeout = ComputeTimeout (millisecondsTimeout, watch);
 			}
-			
+
 			// One task already finished
 			if (indexFirstFinished != -1)
 				return indexFirstFinished;
-			
-			// All tasks are supposed to use the same TaskScheduler
-			sched.ParticipateUntil (task, delegate {
-				if (millisecondsTimeout != -1 && watch.ElapsedMilliseconds > millisecondsTimeout)
-					return true;
 
+			if (cancellationToken != CancellationToken.None) {
+				cancellationToken.Register (predicateEvt.Set);
 				cancellationToken.ThrowIfCancellationRequested ();
+			}
 
-				return numFinished >= 1;
-			});
+			sched.ParticipateUntil (task, predicateEvt, millisecondsTimeout);
 
 			// Index update is still not done
 			if (indexFirstFinished == -1) {
@@ -687,7 +693,7 @@ namespace System.Threading.Tasks
 				while (indexFirstFinished == -1)
 					wait.SpinOnce ();
 			}
-			
+
 			return indexFirstFinished;
 		}
 

+ 40 - 13
mcs/class/corlib/System.Threading.Tasks/ThreadWorker.cs

@@ -208,12 +208,18 @@ namespace System.Threading.Tasks
 		// Predicate should be really fast and not blocking as it is called a good deal of time
 		// Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
 		public static void WorkerMethod (Task self,
-		                                 Func<Task, bool> predicate,
+		                                 ManualResetEventSlim predicateEvt,
+		                                 int millisecondsTimeout,
 		                                 IProducerConsumerCollection<Task> sharedWorkQueue,
 		                                 ThreadWorker[] others,
 		                                 ManualResetEvent evt)
 		{
-			while (!predicate (self)) {
+			const int stage1 = 5, stage2 = 0;
+			int tries = 8;
+			WaitHandle[] handles = null;
+			Watch watch = Watch.StartNew ();
+
+			while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout) {
 				Task value;
 				
 				// If we are in fact a normal ThreadWorker, use our own deque
@@ -227,50 +233,66 @@ namespace System.Threading.Tasks
 							evt.Set ();
 						}
 
-						if (predicate (self))
+						if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
 							return;
 					}
 				}
 
+				int count = sharedWorkQueue.Count;
+
 				// Dequeue only one item as we have restriction
-				while (sharedWorkQueue.TryTake (out value) && value != null) {
+				while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
 					evt.Set ();
 					if (CheckTaskFitness (self, value))
 						value.Execute (null);
 					else {
-						sharedWorkQueue.TryAdd (value);
+						if (autoReference == null)
+							sharedWorkQueue.TryAdd (value);
+						else
+							autoReference.dDeque.PushBottom (value);
 						evt.Set ();
 					}
 
-					if (predicate (self))
+					if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
 						return;
 				}
 
 				// First check to see if we comply to predicate
-				if (predicate (self))
+				if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
 					return;
 				
 				// Try to complete other work by stealing since our desired tasks may be in other worker
 				ThreadWorker other;
 				for (int i = 0; i < others.Length; i++) {
-					if ((other = others [i]) == null)
+					if ((other = others [i]) == autoReference || other == null)
 						continue;
-					
+
 					if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
 						evt.Set ();
 						if (CheckTaskFitness (self, value))
 							value.Execute (null);
 						else {
-							sharedWorkQueue.TryAdd (value);
+							if (autoReference == null)
+								sharedWorkQueue.TryAdd (value);
+							else
+								autoReference.dDeque.PushBottom (value);
 							evt.Set ();
 						}
 					}
-					
-					if (predicate (self))
+
+					if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
 						return;
 				}
 
-				Thread.Yield ();
+				if (--tries > stage1)
+					Thread.Yield ();
+				else if (tries >= stage2)
+					predicateEvt.Wait (ComputeTimeout (100, millisecondsTimeout, watch));
+				else {
+					if (tries == stage2 - 1)
+						handles = new [] { predicateEvt.WaitHandle, evt };
+					WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
+				}
 			}
 		}
 
@@ -284,6 +306,11 @@ namespace System.Threading.Tasks
 		{
 			return ((t.CreationOptions & TaskCreationOptions.LongRunning) == 0 && t.Id < self.Id) || t.Parent == self || t == self;
 		}
+
+		static int ComputeTimeout (int proposedTimeout, int timeout, Watch watch)
+		{
+			return timeout == -1 ? proposedTimeout : Math.Min (proposedTimeout, Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
+		}
 		
 		public bool Finished {
 			get {