ThreadWorker.cs 8.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314
  1. // ThreadWorker.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0 || BOOTSTRAP_NET_4_0
  25. using System;
  26. using System.Threading;
  27. using System.Collections.Concurrent;
  28. namespace System.Threading.Tasks
  29. {
  30. internal class ThreadWorker : IDisposable
  31. {
  32. Thread workerThread;
  33. /* This field is used when a TheadWorker have to call Task.Wait
  34. * which bring him back here with the static WorkerMethod although
  35. * it's more optimized for him to continue calling its own WorkerMethod
  36. */
  37. [ThreadStatic]
  38. static ThreadWorker autoReference;
  39. readonly IDequeOperations<Task> dDeque;
  40. readonly ThreadWorker[] others;
  41. readonly ManualResetEvent waitHandle;
  42. readonly IProducerConsumerCollection<Task> sharedWorkQueue;
  43. readonly IScheduler sched;
  44. readonly ThreadPriority threadPriority;
  45. // Flag to tell if workerThread is running
  46. int started = 0;
  47. readonly int workerLength;
  48. readonly int workerPosition;
  49. const int maxRetry = 5;
  50. const int sleepThreshold = 100;
  51. int deepSleepTime = 8;
  52. public ThreadWorker (IScheduler sched,
  53. ThreadWorker[] others,
  54. int workerPosition,
  55. IProducerConsumerCollection<Task> sharedWorkQueue,
  56. ThreadPriority priority,
  57. ManualResetEvent handle)
  58. {
  59. this.others = others;
  60. this.dDeque = new CyclicDeque<Task> ();
  61. this.sched = sched;
  62. this.sharedWorkQueue = sharedWorkQueue;
  63. this.workerLength = others.Length;
  64. this.workerPosition = workerPosition;
  65. this.waitHandle = handle;
  66. this.threadPriority = priority;
  67. InitializeUnderlyingThread ();
  68. }
  69. void InitializeUnderlyingThread ()
  70. {
  71. this.workerThread = new Thread (WorkerMethodWrapper);
  72. this.workerThread.IsBackground = true;
  73. this.workerThread.Priority = threadPriority;
  74. this.workerThread.Name = "ParallelFxThreadWorker";
  75. }
  76. public void Dispose ()
  77. {
  78. Stop ();
  79. if (workerThread.ThreadState != ThreadState.Stopped)
  80. workerThread.Abort ();
  81. }
  82. public void Pulse ()
  83. {
  84. if (started == 1)
  85. return;
  86. // If the thread was stopped then set it in use and restart it
  87. int result = Interlocked.Exchange (ref started, 1);
  88. if (result != 0)
  89. return;
  90. if (this.workerThread.ThreadState != ThreadState.Unstarted) {
  91. InitializeUnderlyingThread ();
  92. }
  93. workerThread.Start ();
  94. }
  95. public void Stop ()
  96. {
  97. // Set the flag to stop so that the while in the thread will stop
  98. // doing its infinite loop.
  99. started = 0;
  100. }
  101. // This is the actual method called in the Thread
  102. void WorkerMethodWrapper ()
  103. {
  104. int sleepTime = 0;
  105. autoReference = this;
  106. // Main loop
  107. while (started == 1) {
  108. bool result = false;
  109. result = WorkerMethod ();
  110. if (!result)
  111. waitHandle.Reset ();
  112. Thread.Yield ();
  113. if (result) {
  114. deepSleepTime = 8;
  115. sleepTime = 0;
  116. continue;
  117. }
  118. // If we are spinning too much, have a deeper sleep
  119. if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
  120. waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
  121. }
  122. }
  123. started = 0;
  124. }
  125. // Main method, used to do all the logic of retrieving, processing and stealing work.
  126. bool WorkerMethod ()
  127. {
  128. bool result = false;
  129. bool hasStolenFromOther;
  130. do {
  131. hasStolenFromOther = false;
  132. Task value;
  133. // We fill up our work deque concurrently with other ThreadWorker
  134. while (sharedWorkQueue.Count > 0) {
  135. while (sharedWorkQueue.TryTake (out value)) {
  136. dDeque.PushBottom (value);
  137. waitHandle.Set ();
  138. }
  139. // Now we process our work
  140. while (dDeque.PopBottom (out value) == PopResult.Succeed) {
  141. waitHandle.Set ();
  142. if (value != null) {
  143. value.Execute (ChildWorkAdder);
  144. result = true;
  145. }
  146. }
  147. }
  148. // When we have finished, steal from other worker
  149. ThreadWorker other;
  150. // Repeat the operation a little so that we can let other things process.
  151. for (int j = 0; j < maxRetry; ++j) {
  152. int len = workerLength + workerPosition;
  153. // Start stealing with the ThreadWorker at our right to minimize contention
  154. for (int it = workerPosition + 1; it < len; ++it) {
  155. int i = it % workerLength;
  156. if ((other = others [i]) == null || other == this)
  157. continue;
  158. // Maybe make this steal more than one item at a time, see TODO.
  159. if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
  160. waitHandle.Set ();
  161. hasStolenFromOther = true;
  162. if (value != null) {
  163. value.Execute (ChildWorkAdder);
  164. result = true;
  165. }
  166. }
  167. }
  168. }
  169. } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
  170. return result;
  171. }
  172. // Almost same as above but with an added predicate and treating one item at a time.
  173. // It's used by Scheduler Participate(...) method for special waiting case like
  174. // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
  175. // Predicate should be really fast and not blocking as it is called a good deal of time
  176. // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
  177. public static void WorkerMethod (Func<bool> predicate, IProducerConsumerCollection<Task> sharedWorkQueue,
  178. ThreadWorker[] others, ManualResetEvent evt)
  179. {
  180. while (!predicate ()) {
  181. Task value;
  182. // If we are in fact a normal ThreadWorker, use our own deque
  183. if (autoReference != null) {
  184. while (autoReference.dDeque.PopBottom (out value) == PopResult.Succeed && value != null) {
  185. evt.Set ();
  186. if (CheckTaskFitness (value))
  187. value.Execute (autoReference.ChildWorkAdder);
  188. else {
  189. autoReference.dDeque.PushBottom (value);
  190. evt.Set ();
  191. }
  192. if (predicate ())
  193. return;
  194. }
  195. }
  196. // Dequeue only one item as we have restriction
  197. while (sharedWorkQueue.TryTake (out value) && value != null) {
  198. evt.Set ();
  199. if (CheckTaskFitness (value))
  200. value.Execute (null);
  201. else {
  202. sharedWorkQueue.TryAdd (value);
  203. evt.Set ();
  204. }
  205. if (predicate ())
  206. return;
  207. }
  208. // First check to see if we comply to predicate
  209. if (predicate ())
  210. return;
  211. // Try to complete other work by stealing since our desired tasks may be in other worker
  212. ThreadWorker other;
  213. for (int i = 0; i < others.Length; i++) {
  214. if ((other = others [i]) == null)
  215. continue;
  216. if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
  217. evt.Set ();
  218. if (CheckTaskFitness (value))
  219. value.Execute (null);
  220. else {
  221. sharedWorkQueue.TryAdd (value);
  222. evt.Set ();
  223. }
  224. }
  225. if (predicate ())
  226. return;
  227. }
  228. Thread.Yield ();
  229. }
  230. }
  231. internal void ChildWorkAdder (Task t)
  232. {
  233. dDeque.PushBottom (t);
  234. sched.PulseAll ();
  235. }
  236. static bool CheckTaskFitness (Task t)
  237. {
  238. return (t.CreationOptions | TaskCreationOptions.LongRunning) > 0;
  239. }
  240. public bool Finished {
  241. get {
  242. return started == 0;
  243. }
  244. }
  245. public int Id {
  246. get {
  247. return workerThread.ManagedThreadId;
  248. }
  249. }
  250. public bool Equals (ThreadWorker other)
  251. {
  252. return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
  253. }
  254. public override bool Equals (object obj)
  255. {
  256. ThreadWorker temp = obj as ThreadWorker;
  257. return temp == null ? false : Equals (temp);
  258. }
  259. public override int GetHashCode ()
  260. {
  261. return workerThread.ManagedThreadId.GetHashCode ();
  262. }
  263. }
  264. }
  265. #endif