ThreadWorker.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448
  1. // ThreadWorker.cs
  2. //
  3. // Copyright (c) 2008 Jérémie "Garuma" Laval
  4. //
  5. // Permission is hereby granted, free of charge, to any person obtaining a copy
  6. // of this software and associated documentation files (the "Software"), to deal
  7. // in the Software without restriction, including without limitation the rights
  8. // to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
  9. // copies of the Software, and to permit persons to whom the Software is
  10. // furnished to do so, subject to the following conditions:
  11. //
  12. // The above copyright notice and this permission notice shall be included in
  13. // all copies or substantial portions of the Software.
  14. //
  15. // THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
  16. // IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
  17. // FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
  18. // AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
  19. // LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
  20. // OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
  21. // THE SOFTWARE.
  22. //
  23. //
  24. #if NET_4_0 || MOBILE
  25. using System;
  26. using System.Threading;
  27. using System.Collections.Concurrent;
  28. #if INSIDE_MONO_PARALLEL
  29. using System.Threading.Tasks;
  30. using Watch = System.Diagnostics.Stopwatch;
  31. namespace Mono.Threading.Tasks
  32. #else
  33. namespace System.Threading.Tasks
  34. #endif
  35. {
  36. #if INSIDE_MONO_PARALLEL
  37. public
  38. #endif
  39. class ThreadWorker : IDisposable
  40. {
  41. Thread workerThread;
  42. /* This field is used when a TheadWorker have to call Task.Wait
  43. * which bring him back here with the static WorkerMethod although
  44. * it's more optimized for him to continue calling its own WorkerMethod
  45. */
  46. [ThreadStatic]
  47. static ThreadWorker autoReference;
  48. readonly IConcurrentDeque<Task> dDeque;
  49. readonly ThreadWorker[] others;
  50. readonly ManualResetEvent waitHandle;
  51. readonly IProducerConsumerCollection<Task> sharedWorkQueue;
  52. readonly ThreadPriority threadPriority;
  53. // Flag to tell if workerThread is running
  54. int started = 0;
  55. readonly int workerLength;
  56. readonly int workerPosition;
  57. const int maxRetry = 3;
  58. const int sleepThreshold = 100;
  59. int deepSleepTime = 8;
  60. readonly Action<Task> adder;
  61. public ThreadWorker (ThreadWorker[] others,
  62. int workerPosition,
  63. IProducerConsumerCollection<Task> sharedWorkQueue,
  64. IConcurrentDeque<Task> dDeque,
  65. ThreadPriority priority,
  66. ManualResetEvent handle)
  67. {
  68. this.others = others;
  69. this.dDeque = dDeque;
  70. this.sharedWorkQueue = sharedWorkQueue;
  71. this.workerLength = others.Length;
  72. this.workerPosition = workerPosition;
  73. this.waitHandle = handle;
  74. this.threadPriority = priority;
  75. this.adder = new Action<Task> (ChildWorkAdder);
  76. InitializeUnderlyingThread ();
  77. }
  78. #if INSIDE_MONO_PARALLEL
  79. protected virtual
  80. #endif
  81. void InitializeUnderlyingThread ()
  82. {
  83. this.workerThread = new Thread (WorkerMethodWrapper);
  84. this.workerThread.IsBackground = true;
  85. this.workerThread.Priority = threadPriority;
  86. this.workerThread.Name = "ParallelFxThreadWorker";
  87. }
  88. #if INSIDE_MONO_PARALLEL
  89. virtual
  90. #endif
  91. public void Dispose ()
  92. {
  93. Stop ();
  94. if (workerThread.ThreadState != ThreadState.Stopped)
  95. workerThread.Abort ();
  96. }
  97. #if INSIDE_MONO_PARALLEL
  98. virtual
  99. #endif
  100. public void Pulse ()
  101. {
  102. if (started == 1)
  103. return;
  104. // If the thread was stopped then set it in use and restart it
  105. int result = Interlocked.Exchange (ref started, 1);
  106. if (result != 0)
  107. return;
  108. if (this.workerThread.ThreadState != ThreadState.Unstarted) {
  109. InitializeUnderlyingThread ();
  110. }
  111. workerThread.Start ();
  112. }
  113. #if INSIDE_MONO_PARALLEL
  114. virtual
  115. #endif
  116. public void Stop ()
  117. {
  118. // Set the flag to stop so that the while in the thread will stop
  119. // doing its infinite loop.
  120. started = 0;
  121. }
  122. #if INSIDE_MONO_PARALLEL
  123. protected virtual
  124. #endif
  125. // This is the actual method called in the Thread
  126. void WorkerMethodWrapper ()
  127. {
  128. int sleepTime = 0;
  129. autoReference = this;
  130. bool wasWokenUp = true;
  131. // Main loop
  132. while (started == 1) {
  133. bool result = false;
  134. result = WorkerMethod ();
  135. if (!result && wasWokenUp)
  136. waitHandle.Reset ();
  137. wasWokenUp = false;
  138. Thread.Yield ();
  139. if (result) {
  140. deepSleepTime = 8;
  141. sleepTime = 0;
  142. continue;
  143. }
  144. // If we are spinning too much, have a deeper sleep
  145. if (++sleepTime > sleepThreshold && sharedWorkQueue.Count == 0) {
  146. wasWokenUp = waitHandle.WaitOne ((deepSleepTime = deepSleepTime >= 0x4000 ? 0x4000 : deepSleepTime << 1));
  147. }
  148. }
  149. started = 0;
  150. }
  151. #if INSIDE_MONO_PARALLEL
  152. protected virtual
  153. #endif
  154. // Main method, used to do all the logic of retrieving, processing and stealing work.
  155. bool WorkerMethod ()
  156. {
  157. bool result = false;
  158. bool hasStolenFromOther;
  159. do {
  160. hasStolenFromOther = false;
  161. Task value;
  162. // We fill up our work deque concurrently with other ThreadWorker
  163. while (sharedWorkQueue.Count > 0) {
  164. waitHandle.Set ();
  165. while (sharedWorkQueue.TryTake (out value)) {
  166. dDeque.PushBottom (value);
  167. }
  168. // Now we process our work
  169. while (dDeque.PopBottom (out value) == PopResult.Succeed) {
  170. waitHandle.Set ();
  171. if (value != null) {
  172. value.Execute (adder);
  173. result = true;
  174. }
  175. }
  176. }
  177. // When we have finished, steal from other worker
  178. ThreadWorker other;
  179. // Repeat the operation a little so that we can let other things process.
  180. for (int j = 0; j < maxRetry; ++j) {
  181. int len = workerLength + workerPosition;
  182. // Start stealing with the ThreadWorker at our right to minimize contention
  183. for (int it = workerPosition + 1; it < len; ++it) {
  184. int i = it % workerLength;
  185. if ((other = others [i]) == null || other == this)
  186. continue;
  187. // Maybe make this steal more than one item at a time, see TODO.
  188. while (other.dDeque.PopTop (out value) == PopResult.Succeed) {
  189. if (!hasStolenFromOther)
  190. waitHandle.Set ();
  191. hasStolenFromOther = true;
  192. if (value != null) {
  193. value.Execute (adder);
  194. result = true;
  195. }
  196. }
  197. }
  198. }
  199. } while (sharedWorkQueue.Count > 0 || hasStolenFromOther);
  200. return result;
  201. }
  202. #if !INSIDE_MONO_PARALLEL
  203. // Almost same as above but with an added predicate and treating one item at a time.
  204. // It's used by Scheduler Participate(...) method for special waiting case like
  205. // Task.WaitAll(someTasks) or Task.WaitAny(someTasks)
  206. // Predicate should be really fast and not blocking as it is called a good deal of time
  207. // Also, the method skip tasks that are LongRunning to avoid blocking (Task are not LongRunning by default)
  208. public static void ParticipativeWorkerMethod (Task self,
  209. ManualResetEventSlim predicateEvt,
  210. int millisecondsTimeout,
  211. IProducerConsumerCollection<Task> sharedWorkQueue,
  212. ThreadWorker[] others,
  213. ManualResetEvent evt)
  214. {
  215. const int stage1 = 5, stage2 = 0;
  216. int tries = 50;
  217. WaitHandle[] handles = null;
  218. Watch watch = Watch.StartNew ();
  219. if (millisecondsTimeout == -1)
  220. millisecondsTimeout = int.MaxValue;
  221. bool aggressive = false;
  222. bool hasAutoReference = autoReference != null;
  223. Action<Task> adder = null;
  224. while (!predicateEvt.IsSet && watch.ElapsedMilliseconds < millisecondsTimeout && !self.IsCompleted) {
  225. // We try to execute the self task as it may be the simplest way to unlock
  226. // the situation
  227. if (self.Status == TaskStatus.WaitingToRun) {
  228. self.Execute (hasAutoReference ? autoReference.adder : (Action<Task>)null);
  229. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  230. return;
  231. }
  232. Task value;
  233. // If we are in fact a normal ThreadWorker, use our own deque
  234. if (hasAutoReference) {
  235. var enumerable = autoReference.dDeque.GetEnumerable ();
  236. if (adder == null)
  237. adder = hasAutoReference ? autoReference.adder : (Action<Task>)null;
  238. if (enumerable != null) {
  239. foreach (var t in enumerable) {
  240. if (t == null)
  241. continue;
  242. if (CheckTaskFitness (self, t))
  243. t.Execute (adder);
  244. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  245. return;
  246. }
  247. }
  248. }
  249. int count = sharedWorkQueue.Count;
  250. // Dequeue only one item as we have restriction
  251. while (--count >= 0 && sharedWorkQueue.TryTake (out value) && value != null) {
  252. evt.Set ();
  253. if (CheckTaskFitness (self, value) || aggressive)
  254. value.Execute (null);
  255. else {
  256. if (autoReference == null)
  257. sharedWorkQueue.TryAdd (value);
  258. else
  259. autoReference.dDeque.PushBottom (value);
  260. evt.Set ();
  261. }
  262. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  263. return;
  264. }
  265. // First check to see if we comply to predicate
  266. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  267. return;
  268. // Try to complete other work by stealing since our desired tasks may be in other worker
  269. ThreadWorker other;
  270. for (int i = 0; i < others.Length; i++) {
  271. if ((other = others [i]) == autoReference || other == null)
  272. continue;
  273. if (other.dDeque.PopTop (out value) == PopResult.Succeed && value != null) {
  274. evt.Set ();
  275. if (CheckTaskFitness (self, value) || aggressive)
  276. value.Execute (null);
  277. else {
  278. if (autoReference == null)
  279. sharedWorkQueue.TryAdd (value);
  280. else
  281. autoReference.dDeque.PushBottom (value);
  282. evt.Set ();
  283. }
  284. }
  285. if (predicateEvt.IsSet || watch.ElapsedMilliseconds > millisecondsTimeout)
  286. return;
  287. }
  288. /* Waiting is split in 4 phases
  289. * - until stage 1 we simply yield the thread to let others add data
  290. * - between stage 1 and stage2 we use ManualResetEventSlim light waiting mechanism
  291. * - after stage2 we fall back to the heavier WaitHandle waiting mechanism
  292. * - if really the situation isn't evolving after a couple of sleep, we disable
  293. * task fitness check altogether
  294. */
  295. if (--tries > stage1)
  296. Thread.Yield ();
  297. else if (tries >= stage2)
  298. predicateEvt.Wait (ComputeTimeout (5, millisecondsTimeout, watch));
  299. else {
  300. if (tries == stage2 - 1)
  301. handles = new [] { predicateEvt.WaitHandle, evt };
  302. WaitHandle.WaitAny (handles, ComputeTimeout (1000, millisecondsTimeout, watch));
  303. if (tries == stage2 - 10)
  304. aggressive = true;
  305. }
  306. }
  307. }
  308. static bool CheckTaskFitness (Task self, Task t)
  309. {
  310. return ((t.CreationOptions & TaskCreationOptions.LongRunning) == 0 && t.Id < self.Id) || t.Parent == self || t == self;
  311. }
  312. #else
  313. public static ThreadWorker AutoReference {
  314. get {
  315. return autoReference;
  316. }
  317. set {
  318. autoReference = value;
  319. }
  320. }
  321. protected IConcurrentDeque<Task> Deque {
  322. get {
  323. return dDeque;
  324. }
  325. }
  326. protected ThreadWorker[] Others {
  327. get {
  328. return others;
  329. }
  330. }
  331. protected ManualResetEvent WaitHandle {
  332. get {
  333. return waitHandle;
  334. }
  335. }
  336. protected ThreadPriority Priority {
  337. get {
  338. return threadPriority;
  339. }
  340. }
  341. protected int WorkerPosition {
  342. get {
  343. return workerPosition;
  344. }
  345. }
  346. #endif
  347. #if INSIDE_MONO_PARALLEL
  348. protected virtual
  349. #endif
  350. internal void ChildWorkAdder (Task t)
  351. {
  352. dDeque.PushBottom (t);
  353. waitHandle.Set ();
  354. }
  355. static int ComputeTimeout (int proposed, int timeout, Watch watch)
  356. {
  357. return timeout == int.MaxValue ? proposed : System.Math.Min (proposed, System.Math.Max (0, (int)(timeout - watch.ElapsedMilliseconds)));
  358. }
  359. public bool Finished {
  360. get {
  361. return started == 0;
  362. }
  363. }
  364. public int Id {
  365. get {
  366. return workerThread.ManagedThreadId;
  367. }
  368. }
  369. public virtual bool Equals (ThreadWorker other)
  370. {
  371. return (other == null) ? false : object.ReferenceEquals (this.dDeque, other.dDeque);
  372. }
  373. public override bool Equals (object obj)
  374. {
  375. ThreadWorker temp = obj as ThreadWorker;
  376. return temp == null ? false : Equals (temp);
  377. }
  378. public override int GetHashCode ()
  379. {
  380. return workerThread.ManagedThreadId.GetHashCode ();
  381. }
  382. }
  383. }
  384. #endif