ThreadWorker.cs 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459
  1. // ThreadWorker.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0 || MOBILE
  25. using System;
  26. using System.Threading;
  27. using System.Collections.Concurrent;
  28. #if INSIDE_MONO_PARALLEL
  29. using System.Threading.Tasks;
  30. using Watch = System.Diagnostics.Stopwatch;
  31. namespace Mono.Threading.Tasks
  32. #else
  33. namespace System.Threading.Tasks
  34. #endif
  35. {
  36. #if INSIDE_MONO_PARALLEL
  37. public
  38. #endif
  39. class ThreadWorker : IDisposable
  40. {
  41. Thread workerThread;
  42. /* This field is used when a TheadWorker have to call Task.Wait
  43. * which bring him back here with the static WorkerMethod although
  44. * it's more optimized for him to continue calling its own WorkerMethod
  45. */
  46. [ThreadStatic]
  47. static ThreadWorker autoReference;
  48. readonly IConcurrentDeque<Task> dDeque;
  49. readonly ThreadWorker[] others;
  50. readonly ManualResetEvent waitHandle;
  51. readonly IProducerConsumerCollection<Task> sharedWorkQueue;
  52. readonly ThreadPriority threadPriority;
  53. // Flag to tell if workerThread is running
  54. int started = 0;
  55. readonly int workerLength;
  56. readonly int workerPosition;
  57. const int maxRetry = 3;
  58. const int sleepThreshold = 100;
  59. int deepSleepTime = 8;
  60. readonly Action<Task> adder;
  61. Task currentTask;
  62. public ThreadWorker (ThreadWorker[] others,
  63. int workerPosition,
  64. IProducerConsumerCollection<Task> sharedWorkQueue,
  65. IConcurrentDeque<Task> dDeque,
  66. ThreadPriority priority,
  67. ManualResetEvent handle)
  68. {
  69. this.others = others;
  70. this.dDeque = dDeque;
  71. this.sharedWorkQueue = sharedWorkQueue;
  72. this.workerLength = others.Length;
  73. this.workerPosition = workerPosition;
  74. this.waitHandle = handle;
  75. this.threadPriority = priority;
  76. this.adder = new Action<Task> (ChildWorkAdder);
  77. InitializeUnderlyingThread ();
  78. }
  79. #if INSIDE_MONO_PARALLEL
  80. protected virtual
  81. #endif
  82. void InitializeUnderlyingThread ()
  83. {
  84. this.workerThread = new Thread (WorkerMethodWrapper);
  85. this.workerThread.IsBackground = true;
  86. this.workerThread.Priority = threadPriority;
  87. this.workerThread.Name = "ParallelFxThreadWorker";
  88. }
  89. #if INSIDE_MONO_PARALLEL
  90. virtual
  91. #endif
  92. public void Dispose ()
  93. {
  94. Stop ();
  95. if (workerThread.ThreadState != ThreadState.Stopped)
  96. workerThread.Abort ();
  97. }
  98. #if INSIDE_MONO_PARALLEL
  99. virtual
  100. #endif
  101. public void Pulse ()
  102. {
  103. if (started == 1)
  104. return;
  105. // If the thread was stopped then set it in use and restart it
  106. int result = Interlocked.Exchange (ref started, 1);
  107. if (result != 0)
  108. return;
  109. if (this.workerThread.ThreadState != ThreadState.Unstarted) {
  110. InitializeUnderlyingThread ();
  111. }
  112. workerThread.Start ();
  113. }
  114. #if INSIDE_MONO_PARALLEL
  115. virtual
  116. #endif
  117. public void Stop ()
  118. {
  119. // Set the flag to stop so that the while in the thread will stop
  120. // doing its infinite loop.
  121. started = 0;
  122. }
  123. #if INSIDE_MONO_PARALLEL
  124. protected virtual
  125. #endif
  126. // This is the actual method called in the Thread
  127. void WorkerMethodWrapper ()
  128. {
  129. int sleepTime = 0;
  130. autoReference = this;
  131. bool wasWokenUp = false;
  132. // Main loop
  133. while (started == 1) {
  134. bool result = false;
  135. result = WorkerMethod ();
  136. if (!result && wasWokenUp)
  137. waitHandle.Reset ();
  138. wasWokenUp = false;
  139. Thread.Yield ();
  140. if (result) {
  141. deepSleepTime = 8;
  142. sleepTime = 0;
  143. continue;
  144. }
  145. // If we are spinning too much, have a deeper sleep
  146. if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
  147. wasWokenUp = waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
  148. }
  149. }
  150. started = 0;
  151. }
  152. #if INSIDE_MONO_PARALLEL
  153. protected virtual
  154. #endif
  155. // Main method, used to do all the logic of retrieving, processing and stealing work.
  156. bool WorkerMethod ()
  157. {
  158. bool result = false;
  159. bool hasStolenFromOther;
  160. do {
  161. hasStolenFromOther = false;
  162. Task value;
  163. // We fill up our work deque concurrently with other ThreadWorker
  164. while (sharedWorkQueue.Count > 0) {
  165. waitHandle.Set ();
  166. while (sharedWorkQueue.TryTake (out value)) {
  167. dDeque.PushBottom (value);
  168. }
  169. // Now we process our work
  170. while (dDeque.PopBottom (out value) == PopResult.Succeed) {
  171. waitHandle.Set ();
  172. ExecuteTask (value, ref result);
  173. }
  174. }
  175. // When we have finished, steal from other worker
  176. ThreadWorker other;
  177. // Repeat the operation a little so that we can let other things process.
  178. for (int j = 0; j < maxRetry; ++j) {
  179. int len = workerLength + workerPosition;
  180. // Start stealing with the ThreadWorker at our right to minimize contention
  181. for (int it = workerPosition + 1; it < len; ++it) {
  182. int i = it % workerLength;
  183. if ((other = others [i]) == null || other == this)
  184. continue;
  185. // Maybe make this steal more than one item at a time, see TODO.
  186. while (other.dDeque.PopTop (out value) == PopResult.Succeed) {
  187. if (!hasStolenFromOther)
  188. waitHandle.Set ();
  189. hasStolenFromOther = true;
  190. ExecuteTask (value, ref result);
  191. }
  192. }
  193. }
  194. } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
  195. return result;
  196. }
  197. void ExecuteTask (Task value, ref bool result)
  198. {
  199. if (value == null)
  200. return;
  201. var saveCurrent = currentTask;
  202. currentTask = value;
  203. value.Execute (adder);
  204. result = true;
  205. currentTask = saveCurrent;
  206. }
  207. #if !INSIDE_MONO_PARALLEL
  208. // Almost same as above but with an added predicate and treating one item at a time.
  209. // It's used by Scheduler Participate(...) method for special waiting case like
  210. // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
  211. // Predicate should be really fast and not blocking as it is called a good deal of time
  212. // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
  213. public static void ParticipativeWorkerMethod (Task self,
  214. ManualResetEventSlim predicateEvt,
  215. int millisecondsTimeout,
  216. IProducerConsumerCollection<Task> sharedWorkQueue,
  217. ThreadWorker[] others,
  218. ManualResetEvent evt)
  219. {
  220. const int stage1 = 5, stage2 = 0;
  221. int tries = 50;
  222. WaitHandle[] handles = null;
  223. Watch watch = Watch.StartNew ();
  224. if (millisecondsTimeout == -1)
  225. millisecondsTimeout = int.MaxValue;
  226. bool aggressive = false;
  227. bool hasAutoReference = autoReference != null;
  228. Action<Task> adder = null;
  229. while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout && !self.IsCompleted) {
  230. // We try to execute the self task as it may be the simplest way to unlock
  231. // the situation
  232. if (self.Status == TaskStatus.WaitingToRun) {
  233. self.Execute (hasAutoReference ? autoReference.adder : (Action<Task>)null);
  234. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  235. return;
  236. }
  237. Task value;
  238. // If we are in fact a normal ThreadWorker, use our own deque
  239. if (hasAutoReference) {
  240. var enumerable = autoReference.dDeque.GetEnumerable ();
  241. if (adder == null)
  242. adder = hasAutoReference ? autoReference.adder : (Action<Task>)null;
  243. if (enumerable != null) {
  244. foreach (var t in enumerable) {
  245. if (t == null)
  246. continue;
  247. if (CheckTaskFitness (self, t))
  248. t.Execute (adder);
  249. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  250. return;
  251. }
  252. }
  253. }
  254. int count = sharedWorkQueue.Count;
  255. // Dequeue only one item as we have restriction
  256. while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
  257. evt.Set ();
  258. if (CheckTaskFitness (self, value) || aggressive)
  259. value.Execute (null);
  260. else {
  261. if (autoReference == null)
  262. sharedWorkQueue.TryAdd (value);
  263. else
  264. autoReference.dDeque.PushBottom (value);
  265. evt.Set ();
  266. }
  267. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  268. return;
  269. }
  270. // First check to see if we comply to predicate
  271. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  272. return;
  273. // Try to complete other work by stealing since our desired tasks may be in other worker
  274. ThreadWorker other;
  275. for (int i = 0; i < others.Length; i++) {
  276. if ((other = others [i]) == autoReference || other == null)
  277. continue;
  278. if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
  279. evt.Set ();
  280. if (CheckTaskFitness (self, value) || aggressive)
  281. value.Execute (null);
  282. else {
  283. if (autoReference == null)
  284. sharedWorkQueue.TryAdd (value);
  285. else
  286. autoReference.dDeque.PushBottom (value);
  287. evt.Set ();
  288. }
  289. }
  290. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  291. return;
  292. }
  293. /* Waiting is split in 4 phases
  294. * - until stage 1 we simply yield the thread to let others add data
  295. * - between stage 1 and stage2 we use ManualResetEventSlim light waiting mechanism
  296. * - after stage2 we fall back to the heavier WaitHandle waiting mechanism
  297. * - if really the situation isn't evolving after a couple of sleep, we disable
  298. * task fitness check altogether
  299. */
  300. if (--tries > stage1)
  301. Thread.Yield ();
  302. else if (tries >= stage2)
  303. predicateEvt.Wait (ComputeTimeout (5, millisecondsTimeout, watch));
  304. else {
  305. if (tries == stage2 - 1)
  306. handles = new [] { predicateEvt.WaitHandle, evt };
  307. WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
  308. if (tries == stage2 - 10)
  309. aggressive = true;
  310. }
  311. }
  312. }
  313. static bool CheckTaskFitness (Task self, Task t)
  314. {
  315. return ((t.CreationOptions & TaskCreationOptions.LongRunning) == 0 && t.Id < self.Id)
  316. || t.Parent == self
  317. || t == self
  318. || (autoReference != null && autoReference.currentTask != null && autoReference.currentTask == t.Parent);
  319. }
  320. #else
  321. public static ThreadWorker AutoReference {
  322. get {
  323. return autoReference;
  324. }
  325. set {
  326. autoReference = value;
  327. }
  328. }
  329. protected IConcurrentDeque<Task> Deque {
  330. get {
  331. return dDeque;
  332. }
  333. }
  334. protected ThreadWorker[] Others {
  335. get {
  336. return others;
  337. }
  338. }
  339. protected ManualResetEvent WaitHandle {
  340. get {
  341. return waitHandle;
  342. }
  343. }
  344. protected ThreadPriority Priority {
  345. get {
  346. return threadPriority;
  347. }
  348. }
  349. protected int WorkerPosition {
  350. get {
  351. return workerPosition;
  352. }
  353. }
  354. #endif
  355. #if INSIDE_MONO_PARALLEL
  356. protected virtual
  357. #endif
  358. internal void ChildWorkAdder (Task t)
  359. {
  360. dDeque.PushBottom (t);
  361. waitHandle.Set ();
  362. }
  363. static int ComputeTimeout (int proposed, int timeout, Watch watch)
  364. {
  365. return timeout == int.MaxValue ? proposed : System.Math.Min (proposed, System.Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
  366. }
  367. public bool Finished {
  368. get {
  369. return started == 0;
  370. }
  371. }
  372. public int Id {
  373. get {
  374. return workerThread.ManagedThreadId;
  375. }
  376. }
  377. public virtual bool Equals (ThreadWorker other)
  378. {
  379. return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
  380. }
  381. public override bool Equals (object obj)
  382. {
  383. ThreadWorker temp = obj as ThreadWorker;
  384. return temp == null ? false : Equals (temp);
  385. }
  386. public override int GetHashCode ()
  387. {
  388. return workerThread.ManagedThreadId.GetHashCode ();
  389. }
  390. }
  391. }
  392. #endif