ThreadWorker.cs 7.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282
  1. // ThreadWorker.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0 || BOOTSTRAP_NET_4_0
  25. using System;
  26. using System.Threading;
  27. using System.Collections.Concurrent;
  28. namespace System.Threading.Tasks
  29. {
  30. internal class ThreadWorker : IDisposable
  31. {
  32. Thread workerThread;
  33. readonly IDequeOperations<Task> dDeque;
  34. readonly ThreadWorker[] others;
  35. readonly EventWaitHandle waitHandle;
  36. readonly IProducerConsumerCollection<Task> sharedWorkQueue;
  37. readonly IScheduler sched;
  38. readonly ThreadPriority threadPriority;
  39. // Flag to tell if workerThread is running
  40. int started = 0;
  41. readonly int workerLength;
  42. readonly int workerPosition;
  43. const int maxRetry = 5;
  44. const int sleepThreshold = 100;
  45. const int deepSleepTime = 10;
  46. public ThreadWorker (IScheduler sched,
  47. ThreadWorker[] others,
  48. int workerPosition,
  49. IProducerConsumerCollection<Task> sharedWorkQueue,
  50. ThreadPriority priority,
  51. EventWaitHandle handle)
  52. {
  53. this.others = others;
  54. this.dDeque = new CyclicDeque<Task> ();
  55. this.sched = sched;
  56. this.sharedWorkQueue = sharedWorkQueue;
  57. this.workerLength = others.Length;
  58. this.workerPosition = workerPosition;
  59. this.waitHandle = handle;
  60. this.threadPriority = priority;
  61. InitializeUnderlyingThread ();
  62. }
  63. void InitializeUnderlyingThread ()
  64. {
  65. this.workerThread = new Thread (WorkerMethodWrapper);
  66. this.workerThread.IsBackground = true;
  67. this.workerThread.Priority = threadPriority;
  68. this.workerThread.Name = "ParallelFxThreadWorker";
  69. }
  70. public void Dispose ()
  71. {
  72. Stop ();
  73. if (workerThread.ThreadState != ThreadState.Stopped)
  74. workerThread.Abort ();
  75. }
  76. public void Pulse ()
  77. {
  78. if (started == 1)
  79. return;
  80. // If the thread was stopped then set it in use and restart it
  81. int result = Interlocked.Exchange (ref started, 1);
  82. if (result != 0)
  83. return;
  84. if (this.workerThread.ThreadState != ThreadState.Unstarted) {
  85. InitializeUnderlyingThread ();
  86. }
  87. workerThread.Start ();
  88. }
  89. public void Stop ()
  90. {
  91. // Set the flag to stop so that the while in the thread will stop
  92. // doing its infinite loop.
  93. started = 0;
  94. }
  95. // This is the actual method called in the Thread
  96. void WorkerMethodWrapper ()
  97. {
  98. int sleepTime = 0;
  99. SpinWait wait = new SpinWait ();
  100. // Main loop
  101. while (started == 1) {
  102. bool result = false;
  103. result = WorkerMethod ();
  104. if (result) {
  105. sleepTime = 0;
  106. wait = new SpinWait ();
  107. }
  108. // Wait a little and if the Thread has been more sleeping than working shut it down
  109. wait.SpinOnce ();
  110. // If we are spinning too much, have a deeper sleep
  111. if (sleepTime++ > sleepThreshold)
  112. waitHandle.WaitOne (deepSleepTime);
  113. }
  114. started = 0;
  115. }
  116. // Main method, used to do all the logic of retrieving, processing and stealing work.
  117. bool WorkerMethod ()
  118. {
  119. bool result = false;
  120. bool hasStolenFromOther;
  121. do {
  122. hasStolenFromOther = false;
  123. Task value;
  124. // We fill up our work deque concurrently with other ThreadWorker
  125. while (sharedWorkQueue.Count > 0) {
  126. while (sharedWorkQueue.TryTake (out value)) {
  127. dDeque.PushBottom (value);
  128. }
  129. // Now we process our work
  130. while (dDeque.PopBottom (out value) == PopResult.Succeed) {
  131. if (value != null) {
  132. value.Execute (ChildWorkAdder);
  133. result = true;
  134. }
  135. }
  136. }
  137. // When we have finished, steal from other worker
  138. ThreadWorker other;
  139. // Repeat the operation a little so that we can let other things process.
  140. for (int j = 0; j < maxRetry; ++j) {
  141. int len = workerLength + workerPosition;
  142. // Start stealing with the ThreadWorker at our right to minimize contention
  143. for (int it = workerPosition + 1; it < len; ++it) {
  144. int i = it % workerLength;
  145. if ((other = others [i]) == null || other == this)
  146. continue;
  147. // Maybe make this steal more than one item at a time, see TODO.
  148. if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
  149. hasStolenFromOther = true;
  150. if (value != null) {
  151. value.Execute (ChildWorkAdder);
  152. result = true;
  153. }
  154. }
  155. }
  156. }
  157. } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
  158. return result;
  159. }
  160. // Almost same as above but with an added predicate and treating one item at a time.
  161. // It's used by Scheduler Participate(...) method for special waiting case like
  162. // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
  163. // Predicate should be really fast and not blocking as it is called a good deal of time
  164. // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
  165. public static void WorkerMethod (Func<bool> predicate, IProducerConsumerCollection<Task> sharedWorkQueue,
  166. ThreadWorker[] others)
  167. {
  168. SpinWait wait = new SpinWait ();
  169. while (!predicate ()) {
  170. Task value;
  171. // Dequeue only one item as we have restriction
  172. if (sharedWorkQueue.TryTake (out value)) {
  173. if (value != null) {
  174. if (CheckTaskFitness (value))
  175. value.Execute (null);
  176. else
  177. sharedWorkQueue.TryAdd (value);
  178. }
  179. }
  180. // First check to see if we comply to predicate
  181. if (predicate ())
  182. return;
  183. // Try to complete other work by stealing since our desired tasks may be in other worker
  184. ThreadWorker other;
  185. for (int i = 0; i < others.Length; i++) {
  186. if ((other = others [i]) == null)
  187. continue;
  188. if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
  189. if (value != null) {
  190. if (CheckTaskFitness (value))
  191. value.Execute (null);
  192. else
  193. sharedWorkQueue.TryAdd (value);
  194. }
  195. }
  196. if (predicate ())
  197. return;
  198. }
  199. wait.SpinOnce ();
  200. }
  201. }
  202. internal void ChildWorkAdder (Task t)
  203. {
  204. dDeque.PushBottom (t);
  205. sched.PulseAll ();
  206. }
  207. static bool CheckTaskFitness (Task t)
  208. {
  209. return (t.CreationOptions | TaskCreationOptions.LongRunning) > 0;
  210. }
  211. public bool Finished {
  212. get {
  213. return started == 0;
  214. }
  215. }
  216. public int Id {
  217. get {
  218. return workerThread.ManagedThreadId;
  219. }
  220. }
  221. public bool Equals (ThreadWorker other)
  222. {
  223. return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
  224. }
  225. public override bool Equals (object obj)
  226. {
  227. ThreadWorker temp = obj as ThreadWorker;
  228. return temp == null ? false : Equals (temp);
  229. }
  230. public override int GetHashCode ()
  231. {
  232. return workerThread.ManagedThreadId.GetHashCode ();
  233. }
  234. }
  235. }
  236. #endif