Przeglądaj źródła

[Task scheduler] Refactoring and cleanup. Removed a bunch of not-so needed lambdas to avoid delegate invocation cost.

Jérémie Laval 15 lat temu
rodzic
commit
4870f96b08

+ 8 - 11
mcs/class/corlib/System.Threading.Tasks/Scheduler.cs

@@ -30,23 +30,23 @@ namespace System.Threading.Tasks
 {
 	internal class Scheduler: TaskScheduler, IScheduler
 	{
-		IProducerConsumerCollection<Task> workQueue;
-		ThreadWorker[]        workers;
-		EventWaitHandle       pulseHandle = new AutoResetEvent (false);
+		readonly IProducerConsumerCollection<Task> workQueue;
+		readonly ThreadWorker[]        workers;
+		readonly EventWaitHandle       pulseHandle = new AutoResetEvent (false);
 
 		public Scheduler ()
-			: this (Environment.ProcessorCount, 0, ThreadPriority.Normal)
+			: this (Environment.ProcessorCount, ThreadPriority.AboveNormal)
 		{
 			
 		}
 		
-		public Scheduler (int maxWorker, int maxStackSize, ThreadPriority priority)
+		public Scheduler (int maxWorker, ThreadPriority priority)
 		{
 			workQueue = new ConcurrentQueue<Task> ();
 			workers = new ThreadWorker [maxWorker];
 			
 			for (int i = 0; i < maxWorker; i++) {
-				workers [i] = new ThreadWorker (this, workers, workQueue, maxStackSize, priority, pulseHandle);
+				workers [i] = new ThreadWorker (this, workers, i, workQueue, priority, pulseHandle);
 				workers [i].Pulse ();
 			}
 		}
@@ -64,9 +64,7 @@ namespace System.Threading.Tasks
 			if (AreTasksFinished (task))
 				return;
 			
-			ParticipateUntil (delegate {
-				return AreTasksFinished (task);
-			});
+			ParticipateUntil (() => AreTasksFinished (task));
 		}
 		
 		public bool ParticipateUntil (Task task, Func<bool> predicate)
@@ -101,9 +99,8 @@ namespace System.Threading.Tasks
 		
 		public void Dispose ()
 		{
-			foreach (ThreadWorker w in workers) {
+			foreach (ThreadWorker w in workers)
 				w.Dispose ();
-			}
 		}
 		
 		bool AreTasksFinished (Task parent)

+ 46 - 68
mcs/class/corlib/System.Threading.Tasks/ThreadWorker.cs

@@ -31,98 +31,75 @@ namespace System.Threading.Tasks
 {
 	internal class ThreadWorker : IDisposable
 	{
-		static Random r = new Random ();
-		
 		Thread workerThread;
 		
-		readonly          ThreadWorker[]         others;
-		internal readonly IDequeOperations<Task> dDeque;
-		readonly          Action<Task>           childWorkAdder;
-		readonly          EventWaitHandle        waitHandle;
-		readonly          IProducerConsumerCollection<Task> sharedWorkQueue;
+		readonly IDequeOperations<Task> dDeque;
+		readonly ThreadWorker[]         others;
+		readonly EventWaitHandle        waitHandle;
+		readonly IProducerConsumerCollection<Task> sharedWorkQueue;
+		readonly IScheduler             sched;
+		readonly ThreadPriority         threadPriority;
+
 		// Flag to tell if workerThread is running
 		int started = 0; 
 		
-		readonly bool isLocal;
 		readonly int  workerLength;
-		readonly int  stealingStart;
+		readonly int  workerPosition;
 		const    int  maxRetry = 5;
 		
 		const int sleepThreshold = 100;
 		const int deepSleepTime = 10;
 		
-		Action threadInitializer;
-		
-		public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
-		                     int maxStackSize, ThreadPriority priority, EventWaitHandle handle)
-		: this (sched, others, sharedWorkQueue, true, maxStackSize, priority, handle)
-		{
-		}
-		
-		public ThreadWorker (IScheduler sched, ThreadWorker[] others, IProducerConsumerCollection<Task> sharedWorkQueue,
-		                     bool createThread, int maxStackSize, ThreadPriority priority, EventWaitHandle handle)
+		public ThreadWorker (IScheduler sched,
+		                     ThreadWorker[] others,
+		                     int workerPosition,
+		                     IProducerConsumerCollection<Task> sharedWorkQueue,
+		                     ThreadPriority priority,
+		                     EventWaitHandle handle)
 		{
 			this.others          = others;
-
-			this.dDeque = new CyclicDeque<Task> ();
-			
+			this.dDeque          = new CyclicDeque<Task> ();
+			this.sched           = sched;
 			this.sharedWorkQueue = sharedWorkQueue;
 			this.workerLength    = others.Length;
-			this.isLocal         = !createThread;
+			this.workerPosition  = workerPosition;
 			this.waitHandle      = handle;
-			
-			this.childWorkAdder = delegate (Task t) { 
-				dDeque.PushBottom (t);
-				sched.PulseAll ();
-			};
-			
-			// Find the stealing start index randomly (then the traversal
-			// will be done in Round-Robin fashion)
-			do {
-				this.stealingStart = r.Next(0, workerLength);
-			} while (others[stealingStart] == this);
-			
-			InitializeUnderlyingThread (maxStackSize, priority);
+			this.threadPriority  = priority;
+
+			InitializeUnderlyingThread ();
 		}
 		
-		void InitializeUnderlyingThread (int maxStackSize, ThreadPriority priority)
+		void InitializeUnderlyingThread ()
 		{
-			threadInitializer = delegate {
-				// Special case of the participant ThreadWorker
-				if (isLocal) {			
-					this.workerThread = Thread.CurrentThread;
-					return;
-				}
-				
-				this.workerThread = (maxStackSize == 0) ? new Thread (WorkerMethodWrapper) :
-					new Thread (WorkerMethodWrapper, maxStackSize);
+			this.workerThread = new Thread (WorkerMethodWrapper);
 	
-				this.workerThread.IsBackground = true;
-				this.workerThread.Priority = priority;
-				this.workerThread.Name = "ParallelFxThreadWorker";
-			};
-			threadInitializer ();
+			this.workerThread.IsBackground = true;
+			this.workerThread.Priority = threadPriority;
+			this.workerThread.Name = "ParallelFxThreadWorker";
 		}
 
 		public void Dispose ()
 		{
 			Stop ();
-			if (!isLocal && workerThread.ThreadState != ThreadState.Stopped)
+			if (workerThread.ThreadState != ThreadState.Stopped)
 				workerThread.Abort ();
 		}
 		
 		public void Pulse ()
 		{
+			if (started == 1)
+				return;
+
 			// If the thread was stopped then set it in use and restart it
 			int result = Interlocked.Exchange (ref started, 1);
 			if (result != 0)
 				return;
-			if (!isLocal) {
-				if (this.workerThread.ThreadState != ThreadState.Unstarted) {
-					threadInitializer ();
-				}
-				workerThread.Start ();
+
+			if (this.workerThread.ThreadState != ThreadState.Unstarted) {
+				InitializeUnderlyingThread ();
 			}
+
+			workerThread.Start ();
 		}
 		
 		public void Stop ()
@@ -179,7 +156,7 @@ namespace System.Threading.Tasks
 					// Now we process our work
 					while (dDeque.PopBottom (out value) == PopResult.Succeed) {
 						if (value != null) {
-							value.Execute (childWorkAdder);
+							value.Execute (ChildWorkAdder);
 							result = true;
 						}
 					}
@@ -189,9 +166,10 @@ namespace System.Threading.Tasks
 				ThreadWorker other;
 				
 				// Repeat the operation a little so that we can let other things process.
-				for (int j = 0; j < maxRetry; j++) {
+				for (int j = 0; j < maxRetry; ++j) {
+					int len = workerLength + workerPosition;
 					// Start stealing with the ThreadWorker at our right to minimize contention
-					for (int it = stealingStart; it < stealingStart + workerLength; it++) {
+					for (int it = workerPosition + 1; it < len; ++it) {
 						int i = it % workerLength;
 						if ((other = others [i]) == null || other == this)
 							continue;
@@ -200,7 +178,7 @@ namespace System.Threading.Tasks
 						if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
 							hasStolenFromOther = true;
 							if (value != null) {
-								value.Execute (childWorkAdder);
+								value.Execute (ChildWorkAdder);
 								result = true;
 							}
 						}
@@ -260,6 +238,12 @@ namespace System.Threading.Tasks
 				wait.SpinOnce ();
 			}
 		}
+
+		internal void ChildWorkAdder (Task t)
+		{
+			dDeque.PushBottom (t);
+			sched.PulseAll ();
+		}
 		
 		static bool CheckTaskFitness (Task t)
 		{
@@ -271,13 +255,7 @@ namespace System.Threading.Tasks
 				return started == 0;
 			}
 		}
-		
-		public bool IsLocal {
-			get {
-				return isLocal;
-			}
-		}
-		
+
 		public int Id {
 			get {
 				return workerThread.ManagedThreadId;