ThreadPool.cs 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT license.
  3. // See the LICENSE file in the project root for more information.
  4. /*=============================================================================
  5. **
  6. **
  7. **
  8. ** Purpose: Class for creating and managing a threadpool
  9. **
  10. **
  11. =============================================================================*/
  12. using System.Collections.Concurrent;
  13. using System.Collections.Generic;
  14. using System.Diagnostics;
  15. using System.Diagnostics.CodeAnalysis;
  16. using System.Diagnostics.Tracing;
  17. using System.Runtime.CompilerServices;
  18. using System.Runtime.ConstrainedExecution;
  19. using System.Runtime.InteropServices;
  20. using System.Threading.Tasks;
  21. using Internal.Runtime.CompilerServices;
  22. using Thread = Internal.Runtime.Augments.RuntimeThread;
  23. namespace System.Threading
  24. {
  25. internal static class ThreadPoolGlobals
  26. {
  27. public static readonly int processorCount = Environment.ProcessorCount;
  28. public static volatile bool threadPoolInitialized;
  29. public static bool enableWorkerTracking;
  30. public static readonly ThreadPoolWorkQueue workQueue = new ThreadPoolWorkQueue();
  31. /// <summary>Shim used to invoke <see cref="IAsyncStateMachineBox.MoveNext"/> of the supplied <see cref="IAsyncStateMachineBox"/>.</summary>
  32. internal static readonly Action<object> s_invokeAsyncStateMachineBox = state =>
  33. {
  34. if (!(state is IAsyncStateMachineBox box))
  35. {
  36. ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state);
  37. return;
  38. }
  39. box.MoveNext();
  40. };
  41. }
  42. [StructLayout(LayoutKind.Sequential)] // enforce layout so that padding reduces false sharing
  43. internal sealed class ThreadPoolWorkQueue
  44. {
  45. internal static class WorkStealingQueueList
  46. {
  47. private static volatile WorkStealingQueue[] _queues = new WorkStealingQueue[0];
  48. public static WorkStealingQueue[] Queues => _queues;
  49. public static void Add(WorkStealingQueue queue)
  50. {
  51. Debug.Assert(queue != null);
  52. while (true)
  53. {
  54. WorkStealingQueue[] oldQueues = _queues;
  55. Debug.Assert(Array.IndexOf(oldQueues, queue) == -1);
  56. var newQueues = new WorkStealingQueue[oldQueues.Length + 1];
  57. Array.Copy(oldQueues, 0, newQueues, 0, oldQueues.Length);
  58. newQueues[newQueues.Length - 1] = queue;
  59. if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues)
  60. {
  61. break;
  62. }
  63. }
  64. }
  65. public static void Remove(WorkStealingQueue queue)
  66. {
  67. Debug.Assert(queue != null);
  68. while (true)
  69. {
  70. WorkStealingQueue[] oldQueues = _queues;
  71. if (oldQueues.Length == 0)
  72. {
  73. return;
  74. }
  75. int pos = Array.IndexOf(oldQueues, queue);
  76. if (pos == -1)
  77. {
  78. Debug.Fail("Should have found the queue");
  79. return;
  80. }
  81. var newQueues = new WorkStealingQueue[oldQueues.Length - 1];
  82. if (pos == 0)
  83. {
  84. Array.Copy(oldQueues, 1, newQueues, 0, newQueues.Length);
  85. }
  86. else if (pos == oldQueues.Length - 1)
  87. {
  88. Array.Copy(oldQueues, 0, newQueues, 0, newQueues.Length);
  89. }
  90. else
  91. {
  92. Array.Copy(oldQueues, 0, newQueues, 0, pos);
  93. Array.Copy(oldQueues, pos + 1, newQueues, pos, newQueues.Length - pos);
  94. }
  95. if (Interlocked.CompareExchange(ref _queues, newQueues, oldQueues) == oldQueues)
  96. {
  97. break;
  98. }
  99. }
  100. }
  101. }
  102. internal sealed class WorkStealingQueue
  103. {
  104. private const int INITIAL_SIZE = 32;
  105. internal volatile object[] m_array = new object[INITIAL_SIZE]; // SOS's ThreadPool command depends on this name
  106. private volatile int m_mask = INITIAL_SIZE - 1;
  107. #if DEBUG
  108. // in debug builds, start at the end so we exercise the index reset logic.
  109. private const int START_INDEX = int.MaxValue;
  110. #else
  111. private const int START_INDEX = 0;
  112. #endif
  113. private volatile int m_headIndex = START_INDEX;
  114. private volatile int m_tailIndex = START_INDEX;
  115. private SpinLock m_foreignLock = new SpinLock(enableThreadOwnerTracking: false);
  116. public void LocalPush(object obj)
  117. {
  118. int tail = m_tailIndex;
  119. // We're going to increment the tail; if we'll overflow, then we need to reset our counts
  120. if (tail == int.MaxValue)
  121. {
  122. bool lockTaken = false;
  123. try
  124. {
  125. m_foreignLock.Enter(ref lockTaken);
  126. if (m_tailIndex == int.MaxValue)
  127. {
  128. //
  129. // Rather than resetting to zero, we'll just mask off the bits we don't care about.
  130. // This way we don't need to rearrange the items already in the queue; they'll be found
  131. // correctly exactly where they are. One subtlety here is that we need to make sure that
  132. // if head is currently < tail, it remains that way. This happens to just fall out from
  133. // the bit-masking, because we only do this if tail == int.MaxValue, meaning that all
  134. // bits are set, so all of the bits we're keeping will also be set. Thus it's impossible
  135. // for the head to end up > than the tail, since you can't set any more bits than all of
  136. // them.
  137. //
  138. m_headIndex = m_headIndex & m_mask;
  139. m_tailIndex = tail = m_tailIndex & m_mask;
  140. Debug.Assert(m_headIndex <= m_tailIndex);
  141. }
  142. }
  143. finally
  144. {
  145. if (lockTaken)
  146. m_foreignLock.Exit(useMemoryBarrier: true);
  147. }
  148. }
  149. // When there are at least 2 elements' worth of space, we can take the fast path.
  150. if (tail < m_headIndex + m_mask)
  151. {
  152. Volatile.Write(ref m_array[tail & m_mask], obj);
  153. m_tailIndex = tail + 1;
  154. }
  155. else
  156. {
  157. // We need to contend with foreign pops, so we lock.
  158. bool lockTaken = false;
  159. try
  160. {
  161. m_foreignLock.Enter(ref lockTaken);
  162. int head = m_headIndex;
  163. int count = m_tailIndex - m_headIndex;
  164. // If there is still space (one left), just add the element.
  165. if (count >= m_mask)
  166. {
  167. // We're full; expand the queue by doubling its size.
  168. var newArray = new object[m_array.Length << 1];
  169. for (int i = 0; i < m_array.Length; i++)
  170. newArray[i] = m_array[(i + head) & m_mask];
  171. // Reset the field values, incl. the mask.
  172. m_array = newArray;
  173. m_headIndex = 0;
  174. m_tailIndex = tail = count;
  175. m_mask = (m_mask << 1) | 1;
  176. }
  177. Volatile.Write(ref m_array[tail & m_mask], obj);
  178. m_tailIndex = tail + 1;
  179. }
  180. finally
  181. {
  182. if (lockTaken)
  183. m_foreignLock.Exit(useMemoryBarrier: false);
  184. }
  185. }
  186. }
  187. [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
  188. public bool LocalFindAndPop(object obj)
  189. {
  190. // Fast path: check the tail. If equal, we can skip the lock.
  191. if (m_array[(m_tailIndex - 1) & m_mask] == obj)
  192. {
  193. object unused = LocalPop();
  194. Debug.Assert(unused == null || unused == obj);
  195. return unused != null;
  196. }
  197. // Else, do an O(N) search for the work item. The theory of work stealing and our
  198. // inlining logic is that most waits will happen on recently queued work. And
  199. // since recently queued work will be close to the tail end (which is where we
  200. // begin our search), we will likely find it quickly. In the worst case, we
  201. // will traverse the whole local queue; this is typically not going to be a
  202. // problem (although degenerate cases are clearly an issue) because local work
  203. // queues tend to be somewhat shallow in length, and because if we fail to find
  204. // the work item, we are about to block anyway (which is very expensive).
  205. for (int i = m_tailIndex - 2; i >= m_headIndex; i--)
  206. {
  207. if (m_array[i & m_mask] == obj)
  208. {
  209. // If we found the element, block out steals to avoid interference.
  210. bool lockTaken = false;
  211. try
  212. {
  213. m_foreignLock.Enter(ref lockTaken);
  214. // If we encountered a race condition, bail.
  215. if (m_array[i & m_mask] == null)
  216. return false;
  217. // Otherwise, null out the element.
  218. Volatile.Write(ref m_array[i & m_mask], null);
  219. // And then check to see if we can fix up the indexes (if we're at
  220. // the edge). If we can't, we just leave nulls in the array and they'll
  221. // get filtered out eventually (but may lead to superfluous resizing).
  222. if (i == m_tailIndex)
  223. m_tailIndex -= 1;
  224. else if (i == m_headIndex)
  225. m_headIndex += 1;
  226. return true;
  227. }
  228. finally
  229. {
  230. if (lockTaken)
  231. m_foreignLock.Exit(useMemoryBarrier: false);
  232. }
  233. }
  234. }
  235. return false;
  236. }
  237. public object LocalPop() => m_headIndex < m_tailIndex ? LocalPopCore() : null;
  238. [SuppressMessage("Microsoft.Concurrency", "CA8001", Justification = "Reviewed for thread safety")]
  239. private object LocalPopCore()
  240. {
  241. while (true)
  242. {
  243. int tail = m_tailIndex;
  244. if (m_headIndex >= tail)
  245. {
  246. return null;
  247. }
  248. // Decrement the tail using a fence to ensure subsequent read doesn't come before.
  249. tail -= 1;
  250. Interlocked.Exchange(ref m_tailIndex, tail);
  251. // If there is no interaction with a take, we can head down the fast path.
  252. if (m_headIndex <= tail)
  253. {
  254. int idx = tail & m_mask;
  255. object obj = Volatile.Read(ref m_array[idx]);
  256. // Check for nulls in the array.
  257. if (obj == null) continue;
  258. m_array[idx] = null;
  259. return obj;
  260. }
  261. else
  262. {
  263. // Interaction with takes: 0 or 1 elements left.
  264. bool lockTaken = false;
  265. try
  266. {
  267. m_foreignLock.Enter(ref lockTaken);
  268. if (m_headIndex <= tail)
  269. {
  270. // Element still available. Take it.
  271. int idx = tail & m_mask;
  272. object obj = Volatile.Read(ref m_array[idx]);
  273. // Check for nulls in the array.
  274. if (obj == null) continue;
  275. m_array[idx] = null;
  276. return obj;
  277. }
  278. else
  279. {
  280. // If we encountered a race condition and element was stolen, restore the tail.
  281. m_tailIndex = tail + 1;
  282. return null;
  283. }
  284. }
  285. finally
  286. {
  287. if (lockTaken)
  288. m_foreignLock.Exit(useMemoryBarrier: false);
  289. }
  290. }
  291. }
  292. }
  293. public bool CanSteal => m_headIndex < m_tailIndex;
  294. public object TrySteal(ref bool missedSteal)
  295. {
  296. while (true)
  297. {
  298. if (CanSteal)
  299. {
  300. bool taken = false;
  301. try
  302. {
  303. m_foreignLock.TryEnter(ref taken);
  304. if (taken)
  305. {
  306. // Increment head, and ensure read of tail doesn't move before it (fence).
  307. int head = m_headIndex;
  308. Interlocked.Exchange(ref m_headIndex, head + 1);
  309. if (head < m_tailIndex)
  310. {
  311. int idx = head & m_mask;
  312. object obj = Volatile.Read(ref m_array[idx]);
  313. // Check for nulls in the array.
  314. if (obj == null) continue;
  315. m_array[idx] = null;
  316. return obj;
  317. }
  318. else
  319. {
  320. // Failed, restore head.
  321. m_headIndex = head;
  322. }
  323. }
  324. }
  325. finally
  326. {
  327. if (taken)
  328. m_foreignLock.Exit(useMemoryBarrier: false);
  329. }
  330. missedSteal = true;
  331. }
  332. return null;
  333. }
  334. }
  335. }
  336. internal bool loggingEnabled;
  337. internal readonly ConcurrentQueue<object> workItems = new ConcurrentQueue<object>(); // SOS's ThreadPool command depends on this name
  338. private Internal.PaddingFor32 pad1;
  339. private volatile int numOutstandingThreadRequests = 0;
  340. private Internal.PaddingFor32 pad2;
  341. public ThreadPoolWorkQueue()
  342. {
  343. loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
  344. }
  345. public ThreadPoolWorkQueueThreadLocals GetOrCreateThreadLocals() =>
  346. ThreadPoolWorkQueueThreadLocals.threadLocals ?? CreateThreadLocals();
  347. [MethodImpl(MethodImplOptions.NoInlining)]
  348. private ThreadPoolWorkQueueThreadLocals CreateThreadLocals()
  349. {
  350. Debug.Assert(ThreadPoolWorkQueueThreadLocals.threadLocals == null);
  351. return (ThreadPoolWorkQueueThreadLocals.threadLocals = new ThreadPoolWorkQueueThreadLocals(this));
  352. }
  353. internal void EnsureThreadRequested()
  354. {
  355. //
  356. // If we have not yet requested #procs threads, then request a new thread.
  357. //
  358. // CoreCLR: Note that there is a separate count in the VM which has already been incremented
  359. // by the VM by the time we reach this point.
  360. //
  361. int count = numOutstandingThreadRequests;
  362. while (count < ThreadPoolGlobals.processorCount)
  363. {
  364. int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count + 1, count);
  365. if (prev == count)
  366. {
  367. ThreadPool.RequestWorkerThread();
  368. break;
  369. }
  370. count = prev;
  371. }
  372. }
  373. internal void MarkThreadRequestSatisfied()
  374. {
  375. //
  376. // One of our outstanding thread requests has been satisfied.
  377. // Decrement the count so that future calls to EnsureThreadRequested will succeed.
  378. //
  379. // CoreCLR: Note that there is a separate count in the VM which has already been decremented
  380. // by the VM by the time we reach this point.
  381. //
  382. int count = numOutstandingThreadRequests;
  383. while (count > 0)
  384. {
  385. int prev = Interlocked.CompareExchange(ref numOutstandingThreadRequests, count - 1, count);
  386. if (prev == count)
  387. {
  388. break;
  389. }
  390. count = prev;
  391. }
  392. }
  393. public void Enqueue(object callback, bool forceGlobal)
  394. {
  395. Debug.Assert((callback is IThreadPoolWorkItem) ^ (callback is Task));
  396. if (loggingEnabled)
  397. System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolEnqueueWorkObject(callback);
  398. ThreadPoolWorkQueueThreadLocals tl = null;
  399. if (!forceGlobal)
  400. tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
  401. if (null != tl)
  402. {
  403. tl.workStealingQueue.LocalPush(callback);
  404. }
  405. else
  406. {
  407. workItems.Enqueue(callback);
  408. }
  409. EnsureThreadRequested();
  410. }
  411. internal bool LocalFindAndPop(object callback)
  412. {
  413. ThreadPoolWorkQueueThreadLocals tl = ThreadPoolWorkQueueThreadLocals.threadLocals;
  414. return tl != null && tl.workStealingQueue.LocalFindAndPop(callback);
  415. }
  416. public object Dequeue(ThreadPoolWorkQueueThreadLocals tl, ref bool missedSteal)
  417. {
  418. WorkStealingQueue localWsq = tl.workStealingQueue;
  419. object callback;
  420. if ((callback = localWsq.LocalPop()) == null && // first try the local queue
  421. !workItems.TryDequeue(out callback)) // then try the global queue
  422. {
  423. // finally try to steal from another thread's local queue
  424. WorkStealingQueue[] queues = WorkStealingQueueList.Queues;
  425. int c = queues.Length;
  426. Debug.Assert(c > 0, "There must at least be a queue for this thread.");
  427. int maxIndex = c - 1;
  428. int i = tl.random.Next(c);
  429. while (c > 0)
  430. {
  431. i = (i < maxIndex) ? i + 1 : 0;
  432. WorkStealingQueue otherQueue = queues[i];
  433. if (otherQueue != localWsq && otherQueue.CanSteal)
  434. {
  435. callback = otherQueue.TrySteal(ref missedSteal);
  436. if (callback != null)
  437. {
  438. break;
  439. }
  440. }
  441. c--;
  442. }
  443. }
  444. return callback;
  445. }
  446. /// <summary>
  447. /// Dispatches work items to this thread.
  448. /// </summary>
  449. /// <returns>
  450. /// <c>true</c> if this thread did as much work as was available or its quantum expired.
  451. /// <c>false</c> if this thread stopped working early.
  452. /// </returns>
  453. internal static bool Dispatch()
  454. {
  455. ThreadPoolWorkQueue outerWorkQueue = ThreadPoolGlobals.workQueue;
  456. //
  457. // Save the start time
  458. //
  459. int startTickCount = Environment.TickCount;
  460. //
  461. // Update our records to indicate that an outstanding request for a thread has now been fulfilled.
  462. // From this point on, we are responsible for requesting another thread if we stop working for any
  463. // reason, and we believe there might still be work in the queue.
  464. //
  465. // CoreCLR: Note that if this thread is aborted before we get a chance to request another one, the VM will
  466. // record a thread request on our behalf. So we don't need to worry about getting aborted right here.
  467. //
  468. outerWorkQueue.MarkThreadRequestSatisfied();
  469. // Has the desire for logging changed since the last time we entered?
  470. outerWorkQueue.loggingEnabled = FrameworkEventSource.Log.IsEnabled(EventLevel.Verbose, FrameworkEventSource.Keywords.ThreadPool | FrameworkEventSource.Keywords.ThreadTransfer);
  471. //
  472. // Assume that we're going to need another thread if this one returns to the VM. We'll set this to
  473. // false later, but only if we're absolutely certain that the queue is empty.
  474. //
  475. bool needAnotherThread = true;
  476. object outerWorkItem = null;
  477. try
  478. {
  479. //
  480. // Set up our thread-local data
  481. //
  482. // Use operate on workQueue local to try block so it can be enregistered
  483. ThreadPoolWorkQueue workQueue = outerWorkQueue;
  484. ThreadPoolWorkQueueThreadLocals tl = workQueue.GetOrCreateThreadLocals();
  485. Thread currentThread = tl.currentThread;
  486. // Start on clean ExecutionContext and SynchronizationContext
  487. currentThread.ExecutionContext = null;
  488. currentThread.SynchronizationContext = null;
  489. //
  490. // Loop until our quantum expires or there is no work.
  491. //
  492. while (ThreadPool.KeepDispatching(startTickCount))
  493. {
  494. bool missedSteal = false;
  495. // Use operate on workItem local to try block so it can be enregistered
  496. object workItem = outerWorkItem = workQueue.Dequeue(tl, ref missedSteal);
  497. if (workItem == null)
  498. {
  499. //
  500. // No work.
  501. // If we missed a steal, though, there may be more work in the queue.
  502. // Instead of looping around and trying again, we'll just request another thread. Hopefully the thread
  503. // that owns the contended work-stealing queue will pick up its own workitems in the meantime,
  504. // which will be more efficient than this thread doing it anyway.
  505. //
  506. needAnotherThread = missedSteal;
  507. // Tell the VM we're returning normally, not because Hill Climbing asked us to return.
  508. return true;
  509. }
  510. if (workQueue.loggingEnabled)
  511. System.Diagnostics.Tracing.FrameworkEventSource.Log.ThreadPoolDequeueWorkObject(workItem);
  512. //
  513. // If we found work, there may be more work. Ask for another thread so that the other work can be processed
  514. // in parallel. Note that this will only ask for a max of #procs threads, so it's safe to call it for every dequeue.
  515. //
  516. workQueue.EnsureThreadRequested();
  517. //
  518. // Execute the workitem outside of any finally blocks, so that it can be aborted if needed.
  519. //
  520. if (ThreadPoolGlobals.enableWorkerTracking)
  521. {
  522. bool reportedStatus = false;
  523. try
  524. {
  525. ThreadPool.ReportThreadStatus(isWorking: true);
  526. reportedStatus = true;
  527. if (workItem is Task task)
  528. {
  529. task.ExecuteFromThreadPool(currentThread);
  530. }
  531. else
  532. {
  533. Debug.Assert(workItem is IThreadPoolWorkItem);
  534. Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
  535. }
  536. }
  537. finally
  538. {
  539. if (reportedStatus)
  540. ThreadPool.ReportThreadStatus(isWorking: false);
  541. }
  542. }
  543. else if (workItem is Task task)
  544. {
  545. // Check for Task first as it's currently faster to type check
  546. // for Task and then Unsafe.As for the interface, rather than
  547. // vice versa, in particular when the object implements a bunch
  548. // of interfaces.
  549. task.ExecuteFromThreadPool(currentThread);
  550. }
  551. else
  552. {
  553. Debug.Assert(workItem is IThreadPoolWorkItem);
  554. Unsafe.As<IThreadPoolWorkItem>(workItem).Execute();
  555. }
  556. currentThread.ResetThreadPoolThread();
  557. // Release refs
  558. outerWorkItem = workItem = null;
  559. // Return to clean ExecutionContext and SynchronizationContext
  560. ExecutionContext.ResetThreadPoolThread(currentThread);
  561. //
  562. // Notify the VM that we executed this workitem. This is also our opportunity to ask whether Hill Climbing wants
  563. // us to return the thread to the pool or not.
  564. //
  565. if (!ThreadPool.NotifyWorkItemComplete())
  566. return false;
  567. }
  568. // If we get here, it's because our quantum expired. Tell the VM we're returning normally.
  569. return true;
  570. }
  571. finally
  572. {
  573. //
  574. // If we are exiting for any reason other than that the queue is definitely empty, ask for another
  575. // thread to pick up where we left off.
  576. //
  577. if (needAnotherThread)
  578. outerWorkQueue.EnsureThreadRequested();
  579. }
  580. }
  581. }
  582. // Simple random number generator. We don't need great randomness, we just need a little and for it to be fast.
  583. internal struct FastRandom // xorshift prng
  584. {
  585. private uint _w, _x, _y, _z;
  586. public FastRandom(int seed)
  587. {
  588. _x = (uint)seed;
  589. _w = 88675123;
  590. _y = 362436069;
  591. _z = 521288629;
  592. }
  593. public int Next(int maxValue)
  594. {
  595. Debug.Assert(maxValue > 0);
  596. uint t = _x ^ (_x << 11);
  597. _x = _y; _y = _z; _z = _w;
  598. _w = _w ^ (_w >> 19) ^ (t ^ (t >> 8));
  599. return (int)(_w % (uint)maxValue);
  600. }
  601. }
  602. // Holds a WorkStealingQueue, and removes it from the list when this object is no longer referenced.
  603. internal sealed class ThreadPoolWorkQueueThreadLocals
  604. {
  605. [ThreadStatic]
  606. public static ThreadPoolWorkQueueThreadLocals threadLocals;
  607. public readonly ThreadPoolWorkQueue workQueue;
  608. public readonly ThreadPoolWorkQueue.WorkStealingQueue workStealingQueue;
  609. public readonly Thread currentThread;
  610. public FastRandom random = new FastRandom(Thread.CurrentThread.ManagedThreadId); // mutable struct, do not copy or make readonly
  611. public ThreadPoolWorkQueueThreadLocals(ThreadPoolWorkQueue tpq)
  612. {
  613. workQueue = tpq;
  614. workStealingQueue = new ThreadPoolWorkQueue.WorkStealingQueue();
  615. ThreadPoolWorkQueue.WorkStealingQueueList.Add(workStealingQueue);
  616. currentThread = Thread.CurrentThread;
  617. }
  618. private void CleanUp()
  619. {
  620. if (null != workStealingQueue)
  621. {
  622. if (null != workQueue)
  623. {
  624. object cb;
  625. while ((cb = workStealingQueue.LocalPop()) != null)
  626. {
  627. Debug.Assert(null != cb);
  628. workQueue.Enqueue(cb, forceGlobal: true);
  629. }
  630. }
  631. ThreadPoolWorkQueue.WorkStealingQueueList.Remove(workStealingQueue);
  632. }
  633. }
  634. ~ThreadPoolWorkQueueThreadLocals()
  635. {
  636. // Since the purpose of calling CleanUp is to transfer any pending workitems into the global
  637. // queue so that they will be executed by another thread, there's no point in doing this cleanup
  638. // if we're in the process of shutting down or unloading the AD. In those cases, the work won't
  639. // execute anyway. And there are subtle race conditions involved there that would lead us to do the wrong
  640. // thing anyway. So we'll only clean up if this is a "normal" finalization.
  641. if (!Environment.HasShutdownStarted)
  642. CleanUp();
  643. }
  644. }
  645. public delegate void WaitCallback(object state);
  646. public delegate void WaitOrTimerCallback(object state, bool timedOut); // signaled or timed out
  647. internal abstract class QueueUserWorkItemCallbackBase : IThreadPoolWorkItem
  648. {
  649. #if DEBUG
  650. private volatile int executed;
  651. [System.Diagnostics.CodeAnalysis.SuppressMessage("Microsoft.Performance", "CA1821:RemoveEmptyFinalizers")]
  652. ~QueueUserWorkItemCallbackBase()
  653. {
  654. Debug.Assert(
  655. executed != 0 || Environment.HasShutdownStarted,
  656. "A QueueUserWorkItemCallback was never called!");
  657. }
  658. #endif
  659. public virtual void Execute()
  660. {
  661. #if DEBUG
  662. GC.SuppressFinalize(this);
  663. Debug.Assert(
  664. 0 == Interlocked.Exchange(ref executed, 1),
  665. "A QueueUserWorkItemCallback was called twice!");
  666. #endif
  667. }
  668. }
  669. internal sealed class QueueUserWorkItemCallback : QueueUserWorkItemCallbackBase
  670. {
  671. private WaitCallback _callback; // SOS's ThreadPool command depends on this name
  672. private readonly object _state;
  673. private readonly ExecutionContext _context;
  674. private static readonly Action<QueueUserWorkItemCallback> s_executionContextShim = quwi =>
  675. {
  676. WaitCallback callback = quwi._callback;
  677. quwi._callback = null;
  678. callback(quwi._state);
  679. };
  680. internal QueueUserWorkItemCallback(WaitCallback callback, object state, ExecutionContext context)
  681. {
  682. Debug.Assert(context != null);
  683. _callback = callback;
  684. _state = state;
  685. _context = context;
  686. }
  687. public override void Execute()
  688. {
  689. base.Execute();
  690. ExecutionContext.RunForThreadPoolUnsafe(_context, s_executionContextShim, this);
  691. }
  692. }
  693. internal sealed class QueueUserWorkItemCallback<TState> : QueueUserWorkItemCallbackBase
  694. {
  695. private Action<TState> _callback; // SOS's ThreadPool command depends on this name
  696. private readonly TState _state;
  697. private readonly ExecutionContext _context;
  698. internal QueueUserWorkItemCallback(Action<TState> callback, TState state, ExecutionContext context)
  699. {
  700. Debug.Assert(callback != null);
  701. _callback = callback;
  702. _state = state;
  703. _context = context;
  704. }
  705. public override void Execute()
  706. {
  707. base.Execute();
  708. Action<TState> callback = _callback;
  709. _callback = null;
  710. ExecutionContext.RunForThreadPoolUnsafe(_context, callback, in _state);
  711. }
  712. }
  713. internal sealed class QueueUserWorkItemCallbackDefaultContext : QueueUserWorkItemCallbackBase
  714. {
  715. private WaitCallback _callback; // SOS's ThreadPool command depends on this name
  716. private readonly object _state;
  717. internal QueueUserWorkItemCallbackDefaultContext(WaitCallback callback, object state)
  718. {
  719. Debug.Assert(callback != null);
  720. _callback = callback;
  721. _state = state;
  722. }
  723. public override void Execute()
  724. {
  725. ExecutionContext.CheckThreadPoolAndContextsAreDefault();
  726. base.Execute();
  727. WaitCallback callback = _callback;
  728. _callback = null;
  729. callback(_state);
  730. // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
  731. }
  732. }
  733. internal sealed class QueueUserWorkItemCallbackDefaultContext<TState> : QueueUserWorkItemCallbackBase
  734. {
  735. private Action<TState> _callback; // SOS's ThreadPool command depends on this name
  736. private readonly TState _state;
  737. internal QueueUserWorkItemCallbackDefaultContext(Action<TState> callback, TState state)
  738. {
  739. Debug.Assert(callback != null);
  740. _callback = callback;
  741. _state = state;
  742. }
  743. public override void Execute()
  744. {
  745. ExecutionContext.CheckThreadPoolAndContextsAreDefault();
  746. base.Execute();
  747. Action<TState> callback = _callback;
  748. _callback = null;
  749. callback(_state);
  750. // ThreadPoolWorkQueue.Dispatch will handle notifications and reset EC and SyncCtx back to default
  751. }
  752. }
  753. internal class _ThreadPoolWaitOrTimerCallback
  754. {
  755. private WaitOrTimerCallback _waitOrTimerCallback;
  756. private ExecutionContext _executionContext;
  757. private object _state;
  758. private static readonly ContextCallback _ccbt = new ContextCallback(WaitOrTimerCallback_Context_t);
  759. private static readonly ContextCallback _ccbf = new ContextCallback(WaitOrTimerCallback_Context_f);
  760. internal _ThreadPoolWaitOrTimerCallback(WaitOrTimerCallback waitOrTimerCallback, object state, bool flowExecutionContext)
  761. {
  762. _waitOrTimerCallback = waitOrTimerCallback;
  763. _state = state;
  764. if (flowExecutionContext)
  765. {
  766. // capture the exection context
  767. _executionContext = ExecutionContext.Capture();
  768. }
  769. }
  770. private static void WaitOrTimerCallback_Context_t(object state) =>
  771. WaitOrTimerCallback_Context(state, timedOut: true);
  772. private static void WaitOrTimerCallback_Context_f(object state) =>
  773. WaitOrTimerCallback_Context(state, timedOut: false);
  774. private static void WaitOrTimerCallback_Context(object state, bool timedOut)
  775. {
  776. _ThreadPoolWaitOrTimerCallback helper = (_ThreadPoolWaitOrTimerCallback)state;
  777. helper._waitOrTimerCallback(helper._state, timedOut);
  778. }
  779. // call back helper
  780. internal static void PerformWaitOrTimerCallback(_ThreadPoolWaitOrTimerCallback helper, bool timedOut)
  781. {
  782. Debug.Assert(helper != null, "Null state passed to PerformWaitOrTimerCallback!");
  783. // call directly if it is an unsafe call OR EC flow is suppressed
  784. ExecutionContext context = helper._executionContext;
  785. if (context == null)
  786. {
  787. WaitOrTimerCallback callback = helper._waitOrTimerCallback;
  788. callback(helper._state, timedOut);
  789. }
  790. else
  791. {
  792. ExecutionContext.Run(context, timedOut ? _ccbt : _ccbf, helper);
  793. }
  794. }
  795. }
  796. public static partial class ThreadPool
  797. {
  798. [CLSCompliant(false)]
  799. public static RegisteredWaitHandle RegisterWaitForSingleObject(
  800. WaitHandle waitObject,
  801. WaitOrTimerCallback callBack,
  802. object state,
  803. uint millisecondsTimeOutInterval,
  804. bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
  805. )
  806. {
  807. if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue)
  808. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
  809. return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, true);
  810. }
  811. [CLSCompliant(false)]
  812. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
  813. WaitHandle waitObject,
  814. WaitOrTimerCallback callBack,
  815. object state,
  816. uint millisecondsTimeOutInterval,
  817. bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
  818. )
  819. {
  820. if (millisecondsTimeOutInterval > (uint)int.MaxValue && millisecondsTimeOutInterval != uint.MaxValue)
  821. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
  822. return RegisterWaitForSingleObject(waitObject, callBack, state, millisecondsTimeOutInterval, executeOnlyOnce, false);
  823. }
  824. public static RegisteredWaitHandle RegisterWaitForSingleObject(
  825. WaitHandle waitObject,
  826. WaitOrTimerCallback callBack,
  827. object state,
  828. int millisecondsTimeOutInterval,
  829. bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
  830. )
  831. {
  832. if (millisecondsTimeOutInterval < -1)
  833. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
  834. return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true);
  835. }
  836. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
  837. WaitHandle waitObject,
  838. WaitOrTimerCallback callBack,
  839. object state,
  840. int millisecondsTimeOutInterval,
  841. bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
  842. )
  843. {
  844. if (millisecondsTimeOutInterval < -1)
  845. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
  846. return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false);
  847. }
  848. public static RegisteredWaitHandle RegisterWaitForSingleObject(
  849. WaitHandle waitObject,
  850. WaitOrTimerCallback callBack,
  851. object state,
  852. long millisecondsTimeOutInterval,
  853. bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
  854. )
  855. {
  856. if (millisecondsTimeOutInterval < -1)
  857. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
  858. if (millisecondsTimeOutInterval > (uint)int.MaxValue)
  859. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
  860. return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, true);
  861. }
  862. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
  863. WaitHandle waitObject,
  864. WaitOrTimerCallback callBack,
  865. object state,
  866. long millisecondsTimeOutInterval,
  867. bool executeOnlyOnce // NOTE: we do not allow other options that allow the callback to be queued as an APC
  868. )
  869. {
  870. if (millisecondsTimeOutInterval < -1)
  871. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
  872. if (millisecondsTimeOutInterval > (uint)int.MaxValue)
  873. throw new ArgumentOutOfRangeException(nameof(millisecondsTimeOutInterval), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
  874. return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)millisecondsTimeOutInterval, executeOnlyOnce, false);
  875. }
  876. public static RegisteredWaitHandle RegisterWaitForSingleObject(
  877. WaitHandle waitObject,
  878. WaitOrTimerCallback callBack,
  879. object state,
  880. TimeSpan timeout,
  881. bool executeOnlyOnce
  882. )
  883. {
  884. long tm = (long)timeout.TotalMilliseconds;
  885. if (tm < -1)
  886. throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
  887. if (tm > (long)int.MaxValue)
  888. throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
  889. return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, true);
  890. }
  891. public static RegisteredWaitHandle UnsafeRegisterWaitForSingleObject(
  892. WaitHandle waitObject,
  893. WaitOrTimerCallback callBack,
  894. object state,
  895. TimeSpan timeout,
  896. bool executeOnlyOnce
  897. )
  898. {
  899. long tm = (long)timeout.TotalMilliseconds;
  900. if (tm < -1)
  901. throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_NeedNonNegOrNegative1);
  902. if (tm > (long)int.MaxValue)
  903. throw new ArgumentOutOfRangeException(nameof(timeout), SR.ArgumentOutOfRange_LessEqualToIntegerMaxVal);
  904. return RegisterWaitForSingleObject(waitObject, callBack, state, (uint)tm, executeOnlyOnce, false);
  905. }
  906. public static bool QueueUserWorkItem(WaitCallback callBack) =>
  907. QueueUserWorkItem(callBack, null);
  908. public static bool QueueUserWorkItem(WaitCallback callBack, object state)
  909. {
  910. if (callBack == null)
  911. {
  912. ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
  913. }
  914. EnsureInitialized();
  915. ExecutionContext context = ExecutionContext.Capture();
  916. object tpcallBack = (context == null || context.IsDefault) ?
  917. new QueueUserWorkItemCallbackDefaultContext(callBack, state) :
  918. (object)new QueueUserWorkItemCallback(callBack, state, context);
  919. ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true);
  920. return true;
  921. }
  922. public static bool QueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)
  923. {
  924. if (callBack == null)
  925. {
  926. ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
  927. }
  928. EnsureInitialized();
  929. ExecutionContext context = ExecutionContext.Capture();
  930. object tpcallBack = (context == null || context.IsDefault) ?
  931. new QueueUserWorkItemCallbackDefaultContext<TState>(callBack, state) :
  932. (object)new QueueUserWorkItemCallback<TState>(callBack, state, context);
  933. ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: !preferLocal);
  934. return true;
  935. }
  936. public static bool UnsafeQueueUserWorkItem<TState>(Action<TState> callBack, TState state, bool preferLocal)
  937. {
  938. if (callBack == null)
  939. {
  940. ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
  941. }
  942. // If the callback is the runtime-provided invocation of an IAsyncStateMachineBox,
  943. // then we can queue the Task state directly to the ThreadPool instead of
  944. // wrapping it in a QueueUserWorkItemCallback.
  945. //
  946. // This occurs when user code queues its provided continuation to the ThreadPool;
  947. // internally we call UnsafeQueueUserWorkItemInternal directly for Tasks.
  948. if (ReferenceEquals(callBack, ThreadPoolGlobals.s_invokeAsyncStateMachineBox))
  949. {
  950. if (!(state is IAsyncStateMachineBox))
  951. {
  952. // The provided state must be the internal IAsyncStateMachineBox (Task) type
  953. ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.state);
  954. }
  955. UnsafeQueueUserWorkItemInternal((object)state, preferLocal);
  956. return true;
  957. }
  958. EnsureInitialized();
  959. ThreadPoolGlobals.workQueue.Enqueue(
  960. new QueueUserWorkItemCallbackDefaultContext<TState>(callBack, state), forceGlobal: !preferLocal);
  961. return true;
  962. }
  963. public static bool UnsafeQueueUserWorkItem(WaitCallback callBack, object state)
  964. {
  965. if (callBack == null)
  966. {
  967. ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
  968. }
  969. EnsureInitialized();
  970. object tpcallBack = new QueueUserWorkItemCallbackDefaultContext(callBack, state);
  971. ThreadPoolGlobals.workQueue.Enqueue(tpcallBack, forceGlobal: true);
  972. return true;
  973. }
  974. public static bool UnsafeQueueUserWorkItem(IThreadPoolWorkItem callBack, bool preferLocal)
  975. {
  976. if (callBack == null)
  977. {
  978. ThrowHelper.ThrowArgumentNullException(ExceptionArgument.callBack);
  979. }
  980. if (callBack is Task)
  981. {
  982. // Prevent code from queueing a derived Task that also implements the interface,
  983. // as that would bypass Task.Start and its safety checks.
  984. ThrowHelper.ThrowArgumentOutOfRangeException(ExceptionArgument.callBack);
  985. }
  986. UnsafeQueueUserWorkItemInternal(callBack, preferLocal);
  987. return true;
  988. }
  989. internal static void UnsafeQueueUserWorkItemInternal(object callBack, bool preferLocal)
  990. {
  991. Debug.Assert((callBack is IThreadPoolWorkItem) ^ (callBack is Task));
  992. EnsureInitialized();
  993. ThreadPoolGlobals.workQueue.Enqueue(callBack, forceGlobal: !preferLocal);
  994. }
  995. // This method tries to take the target callback out of the current thread's queue.
  996. internal static bool TryPopCustomWorkItem(object workItem)
  997. {
  998. Debug.Assert(null != workItem);
  999. return
  1000. ThreadPoolGlobals.threadPoolInitialized && // if not initialized, so there's no way this workitem was ever queued.
  1001. ThreadPoolGlobals.workQueue.LocalFindAndPop(workItem);
  1002. }
  1003. // Get all workitems. Called by TaskScheduler in its debugger hooks.
  1004. internal static IEnumerable<object> GetQueuedWorkItems()
  1005. {
  1006. // Enumerate global queue
  1007. foreach (object workItem in ThreadPoolGlobals.workQueue.workItems)
  1008. {
  1009. yield return workItem;
  1010. }
  1011. // Enumerate each local queue
  1012. foreach (ThreadPoolWorkQueue.WorkStealingQueue wsq in ThreadPoolWorkQueue.WorkStealingQueueList.Queues)
  1013. {
  1014. if (wsq != null && wsq.m_array != null)
  1015. {
  1016. object[] items = wsq.m_array;
  1017. for (int i = 0; i < items.Length; i++)
  1018. {
  1019. object item = items[i];
  1020. if (item != null)
  1021. {
  1022. yield return item;
  1023. }
  1024. }
  1025. }
  1026. }
  1027. }
  1028. internal static IEnumerable<object> GetLocallyQueuedWorkItems()
  1029. {
  1030. ThreadPoolWorkQueue.WorkStealingQueue wsq = ThreadPoolWorkQueueThreadLocals.threadLocals.workStealingQueue;
  1031. if (wsq != null && wsq.m_array != null)
  1032. {
  1033. object[] items = wsq.m_array;
  1034. for (int i = 0; i < items.Length; i++)
  1035. {
  1036. object item = items[i];
  1037. if (item != null)
  1038. yield return item;
  1039. }
  1040. }
  1041. }
  1042. internal static IEnumerable<object> GetGloballyQueuedWorkItems() => ThreadPoolGlobals.workQueue.workItems;
  1043. private static object[] ToObjectArray(IEnumerable<object> workitems)
  1044. {
  1045. int i = 0;
  1046. foreach (object item in workitems)
  1047. {
  1048. i++;
  1049. }
  1050. object[] result = new object[i];
  1051. i = 0;
  1052. foreach (object item in workitems)
  1053. {
  1054. if (i < result.Length) //just in case someone calls us while the queues are in motion
  1055. result[i] = item;
  1056. i++;
  1057. }
  1058. return result;
  1059. }
  1060. // This is the method the debugger will actually call, if it ends up calling
  1061. // into ThreadPool directly. Tests can use this to simulate a debugger, as well.
  1062. internal static object[] GetQueuedWorkItemsForDebugger() =>
  1063. ToObjectArray(GetQueuedWorkItems());
  1064. internal static object[] GetGloballyQueuedWorkItemsForDebugger() =>
  1065. ToObjectArray(GetGloballyQueuedWorkItems());
  1066. internal static object[] GetLocallyQueuedWorkItemsForDebugger() =>
  1067. ToObjectArray(GetLocallyQueuedWorkItems());
  1068. }
  1069. }