ThreadWorker.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438
  1. // ThreadWorker.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0 || MOBILE
  25. using System;
  26. using System.Threading;
  27. using System.Collections.Concurrent;
  28. #if INSIDE_MONO_PARALLEL
  29. using System.Threading.Tasks;
  30. using Watch = System.Diagnostics.Stopwatch;
  31. namespace Mono.Threading.Tasks
  32. #else
  33. namespace System.Threading.Tasks
  34. #endif
  35. {
  36. #if INSIDE_MONO_PARALLEL
  37. public
  38. #endif
  39. class ThreadWorker : IDisposable
  40. {
  41. Thread workerThread;
  42. /* This field is used when a TheadWorker have to call Task.Wait
  43. * which bring him back here with the static WorkerMethod although
  44. * it's more optimized for him to continue calling its own WorkerMethod
  45. */
  46. [ThreadStatic]
  47. static ThreadWorker autoReference;
  48. readonly IConcurrentDeque<Task> dDeque;
  49. readonly ThreadWorker[] others;
  50. readonly ManualResetEvent waitHandle;
  51. readonly IProducerConsumerCollection<Task> sharedWorkQueue;
  52. readonly ThreadPriority threadPriority;
  53. // Flag to tell if workerThread is running
  54. int started = 0;
  55. readonly int workerLength;
  56. readonly int workerPosition;
  57. const int maxRetry = 5;
  58. const int sleepThreshold = 100;
  59. int deepSleepTime = 8;
  60. public ThreadWorker (ThreadWorker[] others,
  61. int workerPosition,
  62. IProducerConsumerCollection<Task> sharedWorkQueue,
  63. IConcurrentDeque<Task> dDeque,
  64. ThreadPriority priority,
  65. ManualResetEvent handle)
  66. {
  67. this.others = others;
  68. this.dDeque = dDeque;
  69. this.sharedWorkQueue = sharedWorkQueue;
  70. this.workerLength = others.Length;
  71. this.workerPosition = workerPosition;
  72. this.waitHandle = handle;
  73. this.threadPriority = priority;
  74. InitializeUnderlyingThread ();
  75. }
  76. #if INSIDE_MONO_PARALLEL
  77. protected virtual
  78. #endif
  79. void InitializeUnderlyingThread ()
  80. {
  81. this.workerThread = new Thread (WorkerMethodWrapper);
  82. this.workerThread.IsBackground = true;
  83. this.workerThread.Priority = threadPriority;
  84. this.workerThread.Name = "ParallelFxThreadWorker";
  85. }
  86. #if INSIDE_MONO_PARALLEL
  87. virtual
  88. #endif
  89. public void Dispose ()
  90. {
  91. Stop ();
  92. if (workerThread.ThreadState != ThreadState.Stopped)
  93. workerThread.Abort ();
  94. }
  95. #if INSIDE_MONO_PARALLEL
  96. virtual
  97. #endif
  98. public void Pulse ()
  99. {
  100. if (started == 1)
  101. return;
  102. // If the thread was stopped then set it in use and restart it
  103. int result = Interlocked.Exchange (ref started, 1);
  104. if (result != 0)
  105. return;
  106. if (this.workerThread.ThreadState != ThreadState.Unstarted) {
  107. InitializeUnderlyingThread ();
  108. }
  109. workerThread.Start ();
  110. }
  111. #if INSIDE_MONO_PARALLEL
  112. virtual
  113. #endif
  114. public void Stop ()
  115. {
  116. // Set the flag to stop so that the while in the thread will stop
  117. // doing its infinite loop.
  118. started = 0;
  119. }
  120. #if INSIDE_MONO_PARALLEL
  121. protected virtual
  122. #endif
  123. // This is the actual method called in the Thread
  124. void WorkerMethodWrapper ()
  125. {
  126. int sleepTime = 0;
  127. autoReference = this;
  128. // Main loop
  129. while (started == 1) {
  130. bool result = false;
  131. result = WorkerMethod ();
  132. if (!result)
  133. waitHandle.Reset ();
  134. Thread.Yield ();
  135. if (result) {
  136. deepSleepTime = 8;
  137. sleepTime = 0;
  138. continue;
  139. }
  140. // If we are spinning too much, have a deeper sleep
  141. if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
  142. waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
  143. }
  144. }
  145. started = 0;
  146. }
  147. #if INSIDE_MONO_PARALLEL
  148. protected virtual
  149. #endif
  150. // Main method, used to do all the logic of retrieving, processing and stealing work.
  151. bool WorkerMethod ()
  152. {
  153. bool result = false;
  154. bool hasStolenFromOther;
  155. do {
  156. hasStolenFromOther = false;
  157. Task value;
  158. // We fill up our work deque concurrently with other ThreadWorker
  159. while (sharedWorkQueue.Count > 0) {
  160. while (sharedWorkQueue.TryTake (out value)) {
  161. dDeque.PushBottom (value);
  162. waitHandle.Set ();
  163. }
  164. // Now we process our work
  165. while (dDeque.PopBottom (out value) == PopResult.Succeed) {
  166. waitHandle.Set ();
  167. if (value != null) {
  168. value.Execute (ChildWorkAdder);
  169. result = true;
  170. }
  171. }
  172. }
  173. // When we have finished, steal from other worker
  174. ThreadWorker other;
  175. // Repeat the operation a little so that we can let other things process.
  176. for (int j = 0; j < maxRetry; ++j) {
  177. int len = workerLength + workerPosition;
  178. // Start stealing with the ThreadWorker at our right to minimize contention
  179. for (int it = workerPosition + 1; it < len; ++it) {
  180. int i = it % workerLength;
  181. if ((other = others [i]) == null || other == this)
  182. continue;
  183. // Maybe make this steal more than one item at a time, see TODO.
  184. if (other.dDeque.PopTop (out value) == PopResult.Succeed) {
  185. waitHandle.Set ();
  186. hasStolenFromOther = true;
  187. if (value != null) {
  188. value.Execute (ChildWorkAdder);
  189. result = true;
  190. }
  191. }
  192. }
  193. }
  194. } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
  195. return result;
  196. }
  197. #if !INSIDE_MONO_PARALLEL
  198. // Almost same as above but with an added predicate and treating one item at a time.
  199. // It's used by Scheduler Participate(...) method for special waiting case like
  200. // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
  201. // Predicate should be really fast and not blocking as it is called a good deal of time
  202. // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
  203. public static void ParticipativeWorkerMethod (Task self,
  204. ManualResetEventSlim predicateEvt,
  205. int millisecondsTimeout,
  206. IProducerConsumerCollection<Task> sharedWorkQueue,
  207. ThreadWorker[] others,
  208. ManualResetEvent evt)
  209. {
  210. const int stage1 = 5, stage2 = 0;
  211. int tries = 8;
  212. WaitHandle[] handles = null;
  213. Watch watch = Watch.StartNew ();
  214. if (millisecondsTimeout == -1)
  215. millisecondsTimeout = int.MaxValue;
  216. bool aggressive = false;
  217. bool hasAutoReference = autoReference != null;
  218. while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout) {
  219. // We try to execute the self task as it may be the simplest way to unlock
  220. // the situation
  221. if (self.Status == TaskStatus.WaitingToRun) {
  222. self.Execute (hasAutoReference ? autoReference.ChildWorkAdder : (Action<Task>)null);
  223. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  224. return;
  225. }
  226. Task value;
  227. // If we are in fact a normal ThreadWorker, use our own deque
  228. if (hasAutoReference) {
  229. var enumerable = autoReference.dDeque.GetEnumerable ();
  230. if (enumerable != null) {
  231. foreach (var t in enumerable) {
  232. if (t == null)
  233. continue;
  234. if (CheckTaskFitness (self, t))
  235. t.Execute (autoReference.ChildWorkAdder);
  236. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  237. return;
  238. }
  239. }
  240. }
  241. int count = sharedWorkQueue.Count;
  242. // Dequeue only one item as we have restriction
  243. while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
  244. evt.Set ();
  245. if (CheckTaskFitness (self, value) || aggressive)
  246. value.Execute (null);
  247. else {
  248. if (autoReference == null)
  249. sharedWorkQueue.TryAdd (value);
  250. else
  251. autoReference.dDeque.PushBottom (value);
  252. evt.Set ();
  253. }
  254. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  255. return;
  256. }
  257. // First check to see if we comply to predicate
  258. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  259. return;
  260. // Try to complete other work by stealing since our desired tasks may be in other worker
  261. ThreadWorker other;
  262. for (int i = 0; i < others.Length; i++) {
  263. if ((other = others [i]) == autoReference || other == null)
  264. continue;
  265. if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
  266. evt.Set ();
  267. if (CheckTaskFitness (self, value) || aggressive)
  268. value.Execute (null);
  269. else {
  270. if (autoReference == null)
  271. sharedWorkQueue.TryAdd (value);
  272. else
  273. autoReference.dDeque.PushBottom (value);
  274. evt.Set ();
  275. }
  276. }
  277. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  278. return;
  279. }
  280. /* Waiting is split in 4 phases
  281. * - until stage 1 we simply yield the thread to let others add data
  282. * - between stage 1 and stage2 we use ManualResetEventSlim light waiting mechanism
  283. * - after stage2 we fall back to the heavier WaitHandle waiting mechanism
  284. * - if really the situation isn't evolving after a couple of sleep, we disable
  285. * task fitness check altogether
  286. */
  287. if (--tries > stage1)
  288. Thread.Yield ();
  289. else if (tries >= stage2)
  290. predicateEvt.Wait (ComputeTimeout (100, millisecondsTimeout, watch));
  291. else {
  292. if (tries == stage2 - 1)
  293. handles = new [] { predicateEvt.WaitHandle, evt };
  294. WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
  295. if (tries == stage2 - 10)
  296. aggressive = true;
  297. }
  298. }
  299. }
  300. static bool CheckTaskFitness (Task self, Task t)
  301. {
  302. return ((t.CreationOptions & TaskCreationOptions.LongRunning) == 0 && t.Id < self.Id) || t.Parent == self || t == self;
  303. }
  304. #else
  305. public static ThreadWorker AutoReference {
  306. get {
  307. return autoReference;
  308. }
  309. set {
  310. autoReference = value;
  311. }
  312. }
  313. protected IConcurrentDeque<Task> Deque {
  314. get {
  315. return dDeque;
  316. }
  317. }
  318. protected ThreadWorker[] Others {
  319. get {
  320. return others;
  321. }
  322. }
  323. protected ManualResetEvent WaitHandle {
  324. get {
  325. return waitHandle;
  326. }
  327. }
  328. protected ThreadPriority Priority {
  329. get {
  330. return threadPriority;
  331. }
  332. }
  333. protected int WorkerPosition {
  334. get {
  335. return workerPosition;
  336. }
  337. }
  338. #endif
  339. #if INSIDE_MONO_PARALLEL
  340. protected virtual
  341. #endif
  342. internal void ChildWorkAdder (Task t)
  343. {
  344. dDeque.PushBottom (t);
  345. waitHandle.Set ();
  346. }
  347. static int ComputeTimeout (int proposed, int timeout, Watch watch)
  348. {
  349. return timeout == int.MaxValue ? proposed : System.Math.Min (proposed, System.Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
  350. }
  351. public bool Finished {
  352. get {
  353. return started == 0;
  354. }
  355. }
  356. public int Id {
  357. get {
  358. return workerThread.ManagedThreadId;
  359. }
  360. }
  361. public virtual bool Equals (ThreadWorker other)
  362. {
  363. return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
  364. }
  365. public override bool Equals (object obj)
  366. {
  367. ThreadWorker temp = obj as ThreadWorker;
  368. return temp == null ? false : Equals (temp);
  369. }
  370. public override int GetHashCode ()
  371. {
  372. return workerThread.ManagedThreadId.GetHashCode ();
  373. }
  374. }
  375. }
  376. #endif