IOThreadScheduler.cs 30 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.Runtime
  5. {
  6. using System.Threading;
  7. using System.Security;
  8. // IOThreadScheduler takes no locks due to contention problems on multiproc.
  9. [Fx.Tag.SynchronizationPrimitive(Fx.Tag.BlocksUsing.NonBlocking)]
  10. class IOThreadScheduler
  11. {
  12. // Do not increase the maximum capacity above 32k! It must be a power of two, 0x8000 or less, in order to
  13. // work with the strategy for 'headTail'.
  14. const int MaximumCapacity = 0x8000;
  15. [Fx.Tag.SecurityNote(Miscellaneous = "can be called outside user context")]
  16. static class Bits
  17. {
  18. public const int HiShift = 32 / 2;
  19. public const int HiOne = 1 << HiShift;
  20. public const int LoHiBit = HiOne >> 1;
  21. public const int HiHiBit = LoHiBit << HiShift;
  22. public const int LoCountMask = LoHiBit - 1;
  23. public const int HiCountMask = LoCountMask << HiShift;
  24. public const int LoMask = LoCountMask | LoHiBit;
  25. public const int HiMask = HiCountMask | HiHiBit;
  26. public const int HiBits = LoHiBit | HiHiBit;
  27. public static int Count(int slot)
  28. {
  29. return ((slot >> HiShift) - slot + 2 & LoMask) - 1;
  30. }
  31. public static int CountNoIdle(int slot)
  32. {
  33. return (slot >> HiShift) - slot + 1 & LoMask;
  34. }
  35. public static int IncrementLo(int slot)
  36. {
  37. return slot + 1 & LoMask | slot & HiMask;
  38. }
  39. // This method is only valid if you already know that (gate & HiBits) != 0.
  40. public static bool IsComplete(int gate)
  41. {
  42. return (gate & HiMask) == gate << HiShift;
  43. }
  44. }
  45. static IOThreadScheduler current = new IOThreadScheduler(32, 32);
  46. readonly ScheduledOverlapped overlapped;
  47. [Fx.Tag.Queue(typeof(Slot), Scope = Fx.Tag.Strings.AppDomain)]
  48. [Fx.Tag.SecurityNote(Critical = "holds callbacks which get called outside of the app security context")]
  49. [SecurityCritical]
  50. readonly Slot[] slots;
  51. [Fx.Tag.Queue(typeof(Slot), Scope = Fx.Tag.Strings.AppDomain)]
  52. [Fx.Tag.SecurityNote(Critical = "holds callbacks which get called outside of the app security context")]
  53. [SecurityCritical]
  54. readonly Slot[] slotsLowPri;
  55. // This field holds both the head (HiWord) and tail (LoWord) indicies into the slot array. This limits each
  56. // value to 64k. In order to be able to distinguish wrapping the slot array (allowed) from wrapping the
  57. // indicies relative to each other (not allowed), the size of the slot array is limited by an additional bit
  58. // to 32k.
  59. //
  60. // The HiWord (head) holds the index of the last slot to have been scheduled into. The LoWord (tail) holds
  61. // the index of the next slot to be dispatched from. When the queue is empty, the LoWord will be exactly
  62. // one slot ahead of the HiWord. When the two are equal, the queue holds one item.
  63. //
  64. // When the tail is *two* slots ahead of the head (equivalent to a count of -1), that means the IOTS is
  65. // idle. Hence, we start out headTail with a -2 (equivalent) in the head and zero in the tail.
  66. [Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.InterlockedNoSpin)]
  67. int headTail = -2 << Bits.HiShift;
  68. // This field is the same except that it governs the low-priority work items. It doesn't have a concept
  69. // of idle (-2) so starts empty (-1).
  70. [Fx.Tag.SynchronizationObject(Blocking = false, Kind = Fx.Tag.SynchronizationKind.InterlockedNoSpin)]
  71. int headTailLowPri = -1 << Bits.HiShift;
  72. [Fx.Tag.SecurityNote(Critical = "creates a ScheduledOverlapped, touches slots, can be called outside of user context",
  73. Safe = "The scheduled overlapped is only used internally, and flows security.")]
  74. [SecuritySafeCritical]
  75. IOThreadScheduler(int capacity, int capacityLowPri)
  76. {
  77. Fx.Assert(capacity > 0, "Capacity must be positive.");
  78. Fx.Assert(capacity <= 0x8000, "Capacity cannot exceed 32k.");
  79. Fx.Assert(capacityLowPri > 0, "Low-priority capacity must be positive.");
  80. Fx.Assert(capacityLowPri <= 0x8000, "Low-priority capacity cannot exceed 32k.");
  81. this.slots = new Slot[capacity];
  82. Fx.Assert((this.slots.Length & SlotMask) == 0, "Capacity must be a power of two.");
  83. this.slotsLowPri = new Slot[capacityLowPri];
  84. Fx.Assert((this.slotsLowPri.Length & SlotMaskLowPri) == 0, "Low-priority capacity must be a power of two.");
  85. this.overlapped = new ScheduledOverlapped();
  86. }
  87. [Fx.Tag.SecurityNote(Critical = "Calls into critical class CriticalHelper, doesn't flow context")]
  88. [SecurityCritical]
  89. public static void ScheduleCallbackNoFlow(Action<object> callback, object state)
  90. {
  91. if (callback == null)
  92. {
  93. throw Fx.Exception.ArgumentNull("callback");
  94. }
  95. bool queued = false;
  96. while (!queued)
  97. {
  98. try { } finally
  99. {
  100. // Called in a finally because it needs to run uninterrupted in order to maintain consistency.
  101. queued = IOThreadScheduler.current.ScheduleCallbackHelper(callback, state);
  102. }
  103. }
  104. }
  105. [Fx.Tag.SecurityNote(Critical = "Calls into critical class CriticalHelper, doesn't flow context")]
  106. [SecurityCritical]
  107. public static void ScheduleCallbackLowPriNoFlow(Action<object> callback, object state)
  108. {
  109. if (callback == null)
  110. {
  111. throw Fx.Exception.ArgumentNull("callback");
  112. }
  113. bool queued = false;
  114. while (!queued)
  115. {
  116. try { } finally
  117. {
  118. // Called in a finally because it needs to run uninterrupted in order to maintain consistency.
  119. queued = IOThreadScheduler.current.ScheduleCallbackLowPriHelper(callback, state);
  120. }
  121. }
  122. }
  123. // Returns true if successfully scheduled, false otherwise.
  124. [Fx.Tag.SecurityNote(Critical = "calls into ScheduledOverlapped to post it, touches slots, can be called outside user context.")]
  125. [SecurityCritical]
  126. bool ScheduleCallbackHelper(Action<object> callback, object state)
  127. {
  128. // See if there's a free slot. Fortunately the overflow bit is simply lost.
  129. int slot = Interlocked.Add(ref this.headTail, Bits.HiOne);
  130. // If this brings us to 'empty', then the IOTS used to be 'idle'. Remember that, and increment
  131. // again. This doesn't need to be in a loop, because until we call Post(), we can't go back to idle.
  132. bool wasIdle = Bits.Count(slot) == 0;
  133. if (wasIdle)
  134. {
  135. slot = Interlocked.Add(ref this.headTail, Bits.HiOne);
  136. Fx.Assert(Bits.Count(slot) != 0, "IOTS went idle when it shouldn't have.");
  137. }
  138. // Check if we wrapped *around* to idle.
  139. if (Bits.Count(slot) == -1)
  140. {
  141. // Since the capacity is limited to 32k, this means we wrapped the array at least twice. That's bad
  142. // because headTail no longer knows how many work items we have - it looks like zero. This can
  143. // only happen if 32k threads come through here while one is swapped out.
  144. throw Fx.AssertAndThrowFatal("Head/Tail overflow!");
  145. }
  146. bool wrapped;
  147. bool queued = this.slots[slot >> Bits.HiShift & SlotMask].TryEnqueueWorkItem(callback, state, out wrapped);
  148. if (wrapped)
  149. {
  150. // Wrapped around the circular buffer. Create a new, bigger IOThreadScheduler.
  151. IOThreadScheduler next =
  152. new IOThreadScheduler(Math.Min(this.slots.Length * 2, MaximumCapacity), this.slotsLowPri.Length);
  153. Interlocked.CompareExchange<IOThreadScheduler>(ref IOThreadScheduler.current, next, this);
  154. }
  155. if (wasIdle)
  156. {
  157. // It's our responsibility to kick off the overlapped.
  158. this.overlapped.Post(this);
  159. }
  160. return queued;
  161. }
  162. // Returns true if successfully scheduled, false otherwise.
  163. [Fx.Tag.SecurityNote(Critical = "calls into ScheduledOverlapped to post it, touches slots, can be called outside user context.")]
  164. [SecurityCritical]
  165. bool ScheduleCallbackLowPriHelper(Action<object> callback, object state)
  166. {
  167. // See if there's a free slot. Fortunately the overflow bit is simply lost.
  168. int slot = Interlocked.Add(ref this.headTailLowPri, Bits.HiOne);
  169. // If this is the first low-priority work item, make sure we're not idle.
  170. bool wasIdle = false;
  171. if (Bits.CountNoIdle(slot) == 1)
  172. {
  173. // Since Interlocked calls create a full thread barrier, this will read the value of headTail
  174. // at the time of the Interlocked.Add or later. The invariant is that the IOTS is unidle at some
  175. // point after the Add.
  176. int ht = this.headTail;
  177. if (Bits.Count(ht) == -1)
  178. {
  179. // Use a temporary local here to store the result of the Interlocked.CompareExchange. This
  180. // works around a codegen bug in the 32-bit JIT (TFS 749182).
  181. int interlockedResult = Interlocked.CompareExchange(ref this.headTail, ht + Bits.HiOne, ht);
  182. if (ht == interlockedResult)
  183. {
  184. wasIdle = true;
  185. }
  186. }
  187. }
  188. // Check if we wrapped *around* to empty.
  189. if (Bits.CountNoIdle(slot) == 0)
  190. {
  191. // Since the capacity is limited to 32k, this means we wrapped the array at least twice. That's bad
  192. // because headTail no longer knows how many work items we have - it looks like zero. This can
  193. // only happen if 32k threads come through here while one is swapped out.
  194. throw Fx.AssertAndThrowFatal("Low-priority Head/Tail overflow!");
  195. }
  196. bool wrapped;
  197. bool queued = this.slotsLowPri[slot >> Bits.HiShift & SlotMaskLowPri].TryEnqueueWorkItem(
  198. callback, state, out wrapped);
  199. if (wrapped)
  200. {
  201. IOThreadScheduler next =
  202. new IOThreadScheduler(this.slots.Length, Math.Min(this.slotsLowPri.Length * 2, MaximumCapacity));
  203. Interlocked.CompareExchange<IOThreadScheduler>(ref IOThreadScheduler.current, next, this);
  204. }
  205. if (wasIdle)
  206. {
  207. // It's our responsibility to kick off the overlapped.
  208. this.overlapped.Post(this);
  209. }
  210. return queued;
  211. }
  212. [Fx.Tag.SecurityNote(Critical = "calls into ScheduledOverlapped to post it, touches slots, may be called outside of user context")]
  213. [SecurityCritical]
  214. void CompletionCallback(out Action<object> callback, out object state)
  215. {
  216. int slot = this.headTail;
  217. int slotLowPri;
  218. while (true)
  219. {
  220. Fx.Assert(Bits.Count(slot) != -1, "CompletionCallback called on idle IOTS!");
  221. bool wasEmpty = Bits.Count(slot) == 0;
  222. if (wasEmpty)
  223. {
  224. // We're about to set this to idle. First check the low-priority queue. This alone doesn't
  225. // guarantee we service all the low-pri items - there hasn't even been an Interlocked yet. But
  226. // we take care of that later.
  227. slotLowPri = this.headTailLowPri;
  228. while (Bits.CountNoIdle(slotLowPri) != 0)
  229. {
  230. if (slotLowPri == (slotLowPri = Interlocked.CompareExchange(ref this.headTailLowPri,
  231. Bits.IncrementLo(slotLowPri), slotLowPri)))
  232. {
  233. this.overlapped.Post(this);
  234. this.slotsLowPri[slotLowPri & SlotMaskLowPri].DequeueWorkItem(out callback, out state);
  235. return;
  236. }
  237. }
  238. }
  239. if (slot == (slot = Interlocked.CompareExchange(ref this.headTail, Bits.IncrementLo(slot), slot)))
  240. {
  241. if (!wasEmpty)
  242. {
  243. this.overlapped.Post(this);
  244. this.slots[slot & SlotMask].DequeueWorkItem(out callback, out state);
  245. return;
  246. }
  247. // We just set the IOThreadScheduler to idle. Check if a low-priority item got added in the
  248. // interim.
  249. // Interlocked calls create a thread barrier, so this read will give us the value of
  250. // headTailLowPri at the time of the interlocked that set us to idle, or later. The invariant
  251. // here is that either the low-priority queue was empty at some point after we set the IOTS to
  252. // idle (so that the next enqueue will notice, and issue a Post), or that the IOTS was unidle at
  253. // some point after we set it to idle (so that the next attempt to go idle will verify that the
  254. // low-priority queue is empty).
  255. slotLowPri = this.headTailLowPri;
  256. if (Bits.CountNoIdle(slotLowPri) != 0)
  257. {
  258. // Whoops, go back from being idle (unless someone else already did). If we go back, start
  259. // over. (We still owe a Post.)
  260. slot = Bits.IncrementLo(slot);
  261. if (slot == Interlocked.CompareExchange(ref this.headTail, slot + Bits.HiOne, slot))
  262. {
  263. slot += Bits.HiOne;
  264. continue;
  265. }
  266. // We know that there's a low-priority work item. But we also know that the IOThreadScheduler
  267. // wasn't idle. It's best to let it take care of itself, since according to this method, we
  268. // just set the IOThreadScheduler to idle so shouldn't take on any tasks.
  269. }
  270. break;
  271. }
  272. }
  273. callback = null;
  274. state = null;
  275. return;
  276. }
  277. [Fx.Tag.SecurityNote(Critical = "touches slots, may be called outside of user context")]
  278. [SecurityCritical]
  279. bool TryCoalesce(out Action<object> callback, out object state)
  280. {
  281. int slot = this.headTail;
  282. int slotLowPri;
  283. while (true)
  284. {
  285. if (Bits.Count(slot) > 0)
  286. {
  287. if (slot == (slot = Interlocked.CompareExchange(ref this.headTail, Bits.IncrementLo(slot), slot)))
  288. {
  289. this.slots[slot & SlotMask].DequeueWorkItem(out callback, out state);
  290. return true;
  291. }
  292. continue;
  293. }
  294. slotLowPri = this.headTailLowPri;
  295. if (Bits.CountNoIdle(slotLowPri) > 0)
  296. {
  297. if (slotLowPri == (slotLowPri = Interlocked.CompareExchange(ref this.headTailLowPri,
  298. Bits.IncrementLo(slotLowPri), slotLowPri)))
  299. {
  300. this.slotsLowPri[slotLowPri & SlotMaskLowPri].DequeueWorkItem(out callback, out state);
  301. return true;
  302. }
  303. slot = this.headTail;
  304. continue;
  305. }
  306. break;
  307. }
  308. callback = null;
  309. state = null;
  310. return false;
  311. }
  312. int SlotMask
  313. {
  314. [Fx.Tag.SecurityNote(Critical = "touches slots, may be called outside of user context")]
  315. [SecurityCritical]
  316. get
  317. {
  318. return this.slots.Length - 1;
  319. }
  320. }
  321. int SlotMaskLowPri
  322. {
  323. [Fx.Tag.SecurityNote(Critical = "touches slots, may be called outside of user context")]
  324. [SecurityCritical]
  325. get
  326. {
  327. return this.slotsLowPri.Length - 1;
  328. }
  329. }
  330. //
  331. ~IOThreadScheduler()
  332. {
  333. // If the AppDomain is shutting down, we may still have pending ops. The AppDomain shutdown will clean
  334. // everything up.
  335. if (!Environment.HasShutdownStarted && !AppDomain.CurrentDomain.IsFinalizingForUnload())
  336. {
  337. #if DEBUG
  338. DebugVerifyHeadTail();
  339. #endif
  340. Cleanup();
  341. }
  342. }
  343. [SecuritySafeCritical]
  344. void Cleanup()
  345. {
  346. if (this.overlapped != null)
  347. {
  348. this.overlapped.Cleanup();
  349. }
  350. }
  351. #if DEBUG
  352. [SecuritySafeCritical]
  353. private void DebugVerifyHeadTail()
  354. {
  355. if (this.slots != null)
  356. {
  357. // The headTail value could technically be zero if the constructor was aborted early. The
  358. // constructor wasn't aborted early if the slot array got created.
  359. Fx.Assert(Bits.Count(this.headTail) == -1, "IOTS finalized while not idle.");
  360. for (int i = 0; i < this.slots.Length; i++)
  361. {
  362. this.slots[i].DebugVerifyEmpty();
  363. }
  364. }
  365. if (this.slotsLowPri != null)
  366. {
  367. Fx.Assert(Bits.CountNoIdle(this.headTailLowPri) == 0, "IOTS finalized with low-priority items queued.");
  368. for (int i = 0; i < this.slotsLowPri.Length; i++)
  369. {
  370. this.slotsLowPri[i].DebugVerifyEmpty();
  371. }
  372. }
  373. }
  374. #endif
  375. // TryEnqueueWorkItem and DequeueWorkItem use the slot's 'gate' field for synchronization. Because the
  376. // slot array is circular and there are no locks, we must assume that multiple threads can be entering each
  377. // method simultaneously. If the first DequeueWorkItem occurs before the first TryEnqueueWorkItem, the
  378. // sequencing (and the enqueue) fails.
  379. //
  380. // The gate is a 32-bit int divided into four fields. The bottom 15 bits (0x00007fff) are the count of
  381. // threads that have entered TryEnqueueWorkItem. The first thread to enter is the one responsible for
  382. // filling the slot with work. The 16th bit (0x00008000) is a flag indicating that the slot has been
  383. // successfully filled. Only the first thread to enter TryEnqueueWorkItem can set this flag. The
  384. // high-word (0x7fff0000) is the count of threads entering DequeueWorkItem. The first thread to enter
  385. // is the one responsible for accepting (and eventually dispatching) the work in the slot. The
  386. // high-bit (0x80000000) is a flag indicating that the slot has been successfully emptied.
  387. //
  388. // When the low-word and high-work counters are equal, and both bit flags have been set, the gate is considered
  389. // 'complete' and can be reset back to zero. Any operation on the gate might bring it to this state.
  390. // It's the responsibility of the thread that brings the gate to a completed state to reset it to zero.
  391. // (It's possible that the gate will fall out of the completed state before it can be reset - that's ok,
  392. // the next time it becomes completed it can be reset.)
  393. //
  394. // It's unlikely either count will ever go higher than 2 or 3.
  395. //
  396. // The value of 'callback' has these properties:
  397. // - When the gate is zero, callback is null.
  398. // - When the low-word count is non-zero, but the 0x8000 bit is unset, callback is writable by the thread
  399. // that incremented the low word to 1. Its value is undefined for other threads. The thread that
  400. // sets callback is responsible for setting the 0x8000 bit when it's done.
  401. // - When the 0x8000 bit is set and the high-word count is zero, callback is valid. (It may be null.)
  402. // - When the 0x8000 bit is set, the high-word count is non-zero, and the high bit is unset, callback is
  403. // writable by the thread that incremented the high word to 1 *or* the thread that set the 0x8000 bit,
  404. // whichever happened last. That thread can read the value and set callback to null. Its value is
  405. // undefined for other threads. The thread that clears the callback is responsible for setting the
  406. // high bit.
  407. // - When the high bit is set, callback is null.
  408. // - It's illegal for the gate to be in a state that would satisfy more than one of these conditions.
  409. // - The state field follows the same rules as callback.
  410. struct Slot
  411. {
  412. int gate;
  413. Action<object> callback;
  414. object state;
  415. [Fx.Tag.SecurityNote(Miscellaneous = "called by critical code, can be called outside user context")]
  416. public bool TryEnqueueWorkItem(Action<object> callback, object state, out bool wrapped)
  417. {
  418. // Register our arrival and check the state of this slot. If the slot was already full, we wrapped.
  419. int gateSnapshot = Interlocked.Increment(ref this.gate);
  420. wrapped = (gateSnapshot & Bits.LoCountMask) != 1;
  421. if (wrapped)
  422. {
  423. if ((gateSnapshot & Bits.LoHiBit) != 0 && Bits.IsComplete(gateSnapshot))
  424. {
  425. Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
  426. }
  427. return false;
  428. }
  429. Fx.Assert(this.callback == null, "Slot already has a work item.");
  430. Fx.Assert((gateSnapshot & Bits.HiBits) == 0, "Slot already marked.");
  431. this.state = state;
  432. this.callback = callback;
  433. // Set the special bit to show that the slot is filled.
  434. gateSnapshot = Interlocked.Add(ref this.gate, Bits.LoHiBit);
  435. Fx.Assert((gateSnapshot & Bits.HiBits) == Bits.LoHiBit, "Slot already empty.");
  436. if ((gateSnapshot & Bits.HiCountMask) == 0)
  437. {
  438. // Good - no one has shown up looking for this work yet.
  439. return true;
  440. }
  441. // Oops - someone already came looking for this work. We have to abort and reschedule.
  442. this.state = null;
  443. this.callback = null;
  444. // Indicate that the slot is clear. We might be able to bypass setting the high bit.
  445. if (gateSnapshot >> Bits.HiShift != (gateSnapshot & Bits.LoCountMask) ||
  446. Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot) != gateSnapshot)
  447. {
  448. gateSnapshot = Interlocked.Add(ref this.gate, Bits.HiHiBit);
  449. if (Bits.IsComplete(gateSnapshot))
  450. {
  451. Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
  452. }
  453. }
  454. return false;
  455. }
  456. [Fx.Tag.SecurityNote(Miscellaneous = "called by critical code, can be called outside user context")]
  457. public void DequeueWorkItem(out Action<object> callback, out object state)
  458. {
  459. // Stake our claim on the item.
  460. int gateSnapshot = Interlocked.Add(ref this.gate, Bits.HiOne);
  461. if ((gateSnapshot & Bits.LoHiBit) == 0)
  462. {
  463. // Whoops, a ----. The work item hasn't made it in yet. In this context, returning a null callback
  464. // is treated like a degenrate work item (rather than an empty queue). The enqueuing thread will
  465. // notice this ---- and reschedule the real work in a new slot. Do not reset the slot to zero,
  466. // since it's still going to get enqueued into. (The enqueueing thread will reset it.)
  467. callback = null;
  468. state = null;
  469. return;
  470. }
  471. // If we're the first, we get to do the work.
  472. if ((gateSnapshot & Bits.HiCountMask) == Bits.HiOne)
  473. {
  474. callback = this.callback;
  475. state = this.state;
  476. this.state = null;
  477. this.callback = null;
  478. // Indicate that the slot is clear.
  479. // We should be able to bypass setting the high-bit in the common case.
  480. if ((gateSnapshot & Bits.LoCountMask) != 1 ||
  481. Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot) != gateSnapshot)
  482. {
  483. gateSnapshot = Interlocked.Add(ref this.gate, Bits.HiHiBit);
  484. if (Bits.IsComplete(gateSnapshot))
  485. {
  486. Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
  487. }
  488. }
  489. }
  490. else
  491. {
  492. callback = null;
  493. state = null;
  494. // If we're the last, we get to reset the slot.
  495. if (Bits.IsComplete(gateSnapshot))
  496. {
  497. Interlocked.CompareExchange(ref this.gate, 0, gateSnapshot);
  498. }
  499. }
  500. }
  501. #if DEBUG
  502. public void DebugVerifyEmpty()
  503. {
  504. Fx.Assert(this.gate == 0, "Finalized with unfinished slot.");
  505. Fx.Assert(this.callback == null, "Finalized with leaked callback.");
  506. Fx.Assert(this.state == null, "Finalized with leaked state.");
  507. }
  508. #endif
  509. }
  510. // A note about the IOThreadScheduler and the ScheduledOverlapped references:
  511. // Although for each scheduler we have a single instance of overlapped, we cannot point to the scheduler from the
  512. // overlapped, through the entire lifetime of the overlapped. This is because the ScheduledOverlapped is pinned
  513. // and if it has a reference to the IOTS, it would be rooted and the finalizer will never get called.
  514. // Therefore, we are passing the reference, when we post a pending callback and reset it, once the callback was
  515. // invoked; during that time the scheduler is rooted but in that time we don't want that it would be collected
  516. // by the GC anyway.
  517. [Fx.Tag.SecurityNote(Critical = "manages NativeOverlapped instance, can be called outside user context")]
  518. [SecurityCritical]
  519. unsafe class ScheduledOverlapped
  520. {
  521. readonly NativeOverlapped* nativeOverlapped;
  522. IOThreadScheduler scheduler;
  523. public ScheduledOverlapped()
  524. {
  525. this.nativeOverlapped = (new Overlapped()).UnsafePack(
  526. Fx.ThunkCallback(new IOCompletionCallback(IOCallback)), null);
  527. }
  528. [Fx.Tag.SecurityNote(Miscellaneous = "note that in some hosts this runs without any user context on the stack")]
  529. void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* nativeOverlapped)
  530. {
  531. // Unhook the IOThreadScheduler ASAP to prevent it from leaking.
  532. IOThreadScheduler iots = this.scheduler;
  533. this.scheduler = null;
  534. Fx.Assert(iots != null, "Overlapped completed without a scheduler.");
  535. Action<object> callback;
  536. object state;
  537. try { } finally
  538. {
  539. // Called in a finally because it needs to run uninterrupted in order to maintain consistency.
  540. iots.CompletionCallback(out callback, out state);
  541. }
  542. bool found = true;
  543. while (found)
  544. {
  545. // The callback can be null if synchronization misses result in unsuable slots. Keep going onto
  546. // the next slot in such cases until there are no more slots.
  547. if (callback != null)
  548. {
  549. callback(state);
  550. }
  551. try { } finally
  552. {
  553. // Called in a finally because it needs to run uninterrupted in order to maintain consistency.
  554. found = iots.TryCoalesce(out callback, out state);
  555. }
  556. }
  557. }
  558. public void Post(IOThreadScheduler iots)
  559. {
  560. Fx.Assert(this.scheduler == null, "Post called on an overlapped that is already posted.");
  561. Fx.Assert(iots != null, "Post called with a null scheduler.");
  562. this.scheduler = iots;
  563. ThreadPool.UnsafeQueueNativeOverlapped(this.nativeOverlapped);
  564. }
  565. [Fx.Tag.SecurityNote(Miscellaneous = "note that this runs on the finalizer thread")]
  566. public void Cleanup()
  567. {
  568. if (this.scheduler != null)
  569. {
  570. throw Fx.AssertAndThrowFatal("Cleanup called on an overlapped that is in-flight.");
  571. }
  572. Overlapped.Free(this.nativeOverlapped);
  573. }
  574. }
  575. }
  576. }