ThreadWorker.cs 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345
  1. // ThreadWorker.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0
  25. using System;
  26. using System.Threading;
  27. using System.Collections.Concurrent;
  28. namespace System.Threading.Tasks
  29. {
  30. internal class ThreadWorker : IDisposable
  31. {
  32. Thread workerThread;
  33. /* This field is used when a TheadWorker have to call Task.Wait
  34. * which bring him back here with the static WorkerMethod although
  35. * it's more optimized for him to continue calling its own WorkerMethod
  36. */
  37. [ThreadStatic]
  38. static ThreadWorker autoReference;
  39. readonly IConcurrentDeque<Task> dDeque;
  40. readonly ThreadWorker[] others;
  41. readonly ManualResetEvent waitHandle;
  42. readonly IProducerConsumerCollection<Task> sharedWorkQueue;
  43. readonly IScheduler sched;
  44. readonly ThreadPriority threadPriority;
  45. // Flag to tell if workerThread is running
  46. int started = 0;
  47. readonly int workerLength;
  48. readonly int workerPosition;
  49. const int maxRetry = 5;
  50. const int sleepThreshold = 100;
  51. int deepSleepTime = 8;
  52. public ThreadWorker (IScheduler sched,
  53. ThreadWorker[] others,
  54. int workerPosition,
  55. IProducerConsumerCollection<Task> sharedWorkQueue,
  56. IConcurrentDeque<Task> dDeque,
  57. ThreadPriority priority,
  58. ManualResetEvent handle)
  59. {
  60. this.others = others;
  61. this.dDeque = dDeque;
  62. this.sched = sched;
  63. this.sharedWorkQueue = sharedWorkQueue;
  64. this.workerLength = others.Length;
  65. this.workerPosition = workerPosition;
  66. this.waitHandle = handle;
  67. this.threadPriority = priority;
  68. InitializeUnderlyingThread ();
  69. }
  70. void InitializeUnderlyingThread ()
  71. {
  72. this.workerThread = new Thread (WorkerMethodWrapper);
  73. this.workerThread.IsBackground = true;
  74. this.workerThread.Priority = threadPriority;
  75. this.workerThread.Name = "ParallelFxThreadWorker";
  76. }
  77. public void Dispose ()
  78. {
  79. Stop ();
  80. if (workerThread.ThreadState != ThreadState.Stopped)
  81. workerThread.Abort ();
  82. }
  83. public void Pulse ()
  84. {
  85. if (started == 1)
  86. return;
  87. // If the thread was stopped then set it in use and restart it
  88. int result = Interlocked.Exchange (ref started, 1);
  89. if (result != 0)
  90. return;
  91. if (this.workerThread.ThreadState != ThreadState.Unstarted) {
  92. InitializeUnderlyingThread ();
  93. }
  94. workerThread.Start ();
  95. }
  96. public void Stop ()
  97. {
  98. // Set the flag to stop so that the while in the thread will stop
  99. // doing its infinite loop.
  100. started = 0;
  101. }
  102. // This is the actual method called in the Thread
  103. void WorkerMethodWrapper ()
  104. {
  105. int sleepTime = 0;
  106. autoReference = this;
  107. // Main loop
  108. while (started == 1) {
  109. bool result = false;
  110. result = WorkerMethod ();
  111. if (!result)
  112. waitHandle.Reset ();
  113. Thread.Yield ();
  114. if (result) {
  115. deepSleepTime = 8;
  116. sleepTime = 0;
  117. continue;
  118. }
  119. // If we are spinning too much, have a deeper sleep
  120. if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
  121. waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
  122. }
  123. }
  124. started = 0;
  125. }
  126. // Main method, used to do all the logic of retrieving, processing and stealing work.
  127. bool WorkerMethod ()
  128. {
  129. bool result = false;
  130. bool hasStolenFromOther;
  131. do {
  132. hasStolenFromOther = false;
  133. Task value;
  134. // We fill up our work deque concurrently with other ThreadWorker
  135. while (sharedWorkQueue.Count > 0) {
  136. while (sharedWorkQueue.TryTake (out value)) {
  137. dDeque.PushBottom (value);
  138. waitHandle.Set ();
  139. }
  140. // Now we process our work
  141. while (dDeque.PopBottom (out value) == PopResult.Succeed) {
  142. waitHandle.Set ();
  143. if (value != null) {
  144. value.Execute (ChildWorkAdder);
  145. result = true;
  146. }
  147. }
  148. }
  149. // When we have finished, steal from other worker
  150. ThreadWorker other;
  151. // Repeat the operation a little so that we can let other things process.
  152. for (int j = 0; j < maxRetry; ++j) {
  153. int len = workerLength + workerPosition;
  154. // Start stealing with the ThreadWorker at our right to minimize contention
  155. for (int it = workerPosition + 1; it < len; ++it) {
  156. int i = it % workerLength;
  157. if ((other = others [i]) == null || other == this)
  158. continue;
  159. // Maybe make this steal more than one item at a time, see TODO.
  160. if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
  161. waitHandle.Set ();
  162. hasStolenFromOther = true;
  163. if (value != null) {
  164. value.Execute (ChildWorkAdder);
  165. result = true;
  166. }
  167. }
  168. }
  169. }
  170. } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
  171. return result;
  172. }
  173. // Almost same as above but with an added predicate and treating one item at a time.
  174. // It's used by Scheduler Participate(...) method for special waiting case like
  175. // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
  176. // Predicate should be really fast and not blocking as it is called a good deal of time
  177. // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
  178. public static void WorkerMethod (Task self,
  179. ManualResetEventSlim predicateEvt,
  180. int millisecondsTimeout,
  181. IProducerConsumerCollection<Task> sharedWorkQueue,
  182. ThreadWorker[] others,
  183. ManualResetEvent evt)
  184. {
  185. const int stage1 = 5, stage2 = 0;
  186. int tries = 8;
  187. WaitHandle[] handles = null;
  188. Watch watch = Watch.StartNew ();
  189. while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout) {
  190. Task value;
  191. // If we are in fact a normal ThreadWorker, use our own deque
  192. if (autoReference != null) {
  193. while (autoReference.dDeque.PopBottom (out value) == PopResult.Succeed && value != null) {
  194. evt.Set ();
  195. if (CheckTaskFitness (self, value))
  196. value.Execute (autoReference.ChildWorkAdder);
  197. else {
  198. sharedWorkQueue.TryAdd (value);
  199. evt.Set ();
  200. }
  201. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  202. return;
  203. }
  204. }
  205. int count = sharedWorkQueue.Count;
  206. // Dequeue only one item as we have restriction
  207. while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
  208. evt.Set ();
  209. if (CheckTaskFitness (self, value))
  210. value.Execute (null);
  211. else {
  212. if (autoReference == null)
  213. sharedWorkQueue.TryAdd (value);
  214. else
  215. autoReference.dDeque.PushBottom (value);
  216. evt.Set ();
  217. }
  218. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  219. return;
  220. }
  221. // First check to see if we comply to predicate
  222. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  223. return;
  224. // Try to complete other work by stealing since our desired tasks may be in other worker
  225. ThreadWorker other;
  226. for (int i = 0; i < others.Length; i++) {
  227. if ((other = others [i]) == autoReference || other == null)
  228. continue;
  229. if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
  230. evt.Set ();
  231. if (CheckTaskFitness (self, value))
  232. value.Execute (null);
  233. else {
  234. if (autoReference == null)
  235. sharedWorkQueue.TryAdd (value);
  236. else
  237. autoReference.dDeque.PushBottom (value);
  238. evt.Set ();
  239. }
  240. }
  241. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  242. return;
  243. }
  244. if (--tries > stage1)
  245. Thread.Yield ();
  246. else if (tries >= stage2)
  247. predicateEvt.Wait (ComputeTimeout (100, millisecondsTimeout, watch));
  248. else {
  249. if (tries == stage2 - 1)
  250. handles = new [] { predicateEvt.WaitHandle, evt };
  251. WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
  252. }
  253. }
  254. }
  255. internal void ChildWorkAdder (Task t)
  256. {
  257. dDeque.PushBottom (t);
  258. sched.PulseAll ();
  259. }
  260. static bool CheckTaskFitness (Task self, Task t)
  261. {
  262. return ((t.CreationOptions & TaskCreationOptions.LongRunning) == 0 && t.Id < self.Id) || t.Parent == self || t == self;
  263. }
  264. static int ComputeTimeout (int proposedTimeout, int timeout, Watch watch)
  265. {
  266. return timeout == -1 ? proposedTimeout : Math.Min (proposedTimeout, Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
  267. }
  268. public bool Finished {
  269. get {
  270. return started == 0;
  271. }
  272. }
  273. public int Id {
  274. get {
  275. return workerThread.ManagedThreadId;
  276. }
  277. }
  278. public bool Equals (ThreadWorker other)
  279. {
  280. return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
  281. }
  282. public override bool Equals (object obj)
  283. {
  284. ThreadWorker temp = obj as ThreadWorker;
  285. return temp == null ? false : Equals (temp);
  286. }
  287. public override int GetHashCode ()
  288. {
  289. return workerThread.ManagedThreadId.GetHashCode ();
  290. }
  291. }
  292. }
  293. #endif