CancellationTokenSource.cs 52 KB


  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT license.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Collections.Generic;
  5. using System.Diagnostics;
  6. using System.Diagnostics.CodeAnalysis;
  7. using System.Threading.Tasks;
  8. namespace System.Threading
  9. {
  10. /// <summary>Signals to a <see cref="CancellationToken"/> that it should be canceled.</summary>
  11. /// <remarks>
  12. /// <para>
  13. /// <see cref="CancellationTokenSource"/> is used to instantiate a <see cref="CancellationToken"/> (via
  14. /// the source's <see cref="Token">Token</see> property) that can be handed to operations that wish to be
  15. /// notified of cancellation or that can be used to register asynchronous operations for cancellation. That
  16. /// token may have cancellation requested by calling to the source's <see cref="Cancel()"/> method.
  17. /// </para>
  18. /// <para>
  19. /// All members of this class, except <see cref="Dispose()"/>, are thread-safe and may be used
  20. /// concurrently from multiple threads.
  21. /// </para>
  22. /// </remarks>
  23. public class CancellationTokenSource : IDisposable
  24. {
  25. /// <summary>A <see cref="CancellationTokenSource"/> that's already canceled.</summary>
  26. internal static readonly CancellationTokenSource s_canceledSource = new CancellationTokenSource() { _state = NotifyingCompleteState };
  27. /// <summary>A <see cref="CancellationTokenSource"/> that's never canceled. This isn't enforced programmatically, only by usage. Do not cancel!</summary>
  28. internal static readonly CancellationTokenSource s_neverCanceledSource = new CancellationTokenSource();
  29. /// <summary>Delegate used with <see cref="Timer"/> to trigger cancellation of a <see cref="CancellationTokenSource"/>.</summary>
  30. private static readonly TimerCallback s_timerCallback = obj =>
  31. {
  32. Debug.Assert(obj is CancellationTokenSource, $"Expected {typeof(CancellationTokenSource)}, got {obj}");
  33. ((CancellationTokenSource)obj).NotifyCancellation(throwOnFirstException: false); // skip ThrowIfDisposed() check in Cancel()
  34. };
  35. /// <summary>The number of callback partitions to use in a <see cref="CancellationTokenSource"/>. Must be a power of 2.</summary>
  36. private static readonly int s_numPartitions = GetPartitionCount();
  37. /// <summary><see cref="s_numPartitions"/> - 1, used to quickly mod into <see cref="_callbackPartitions"/>.</summary>
  38. private static readonly int s_numPartitionsMask = s_numPartitions - 1;
  39. /// <summary>The current state of the CancellationTokenSource.</summary>
  40. private volatile int _state;
  41. /// <summary>The ID of the thread currently executing the main body of CTS.Cancel()</summary>
  42. /// <remarks>
  43. /// This helps us to know if a call to ctr.Dispose() is running 'within' a cancellation callback.
  44. /// This is updated as we move between the main thread calling cts.Cancel() and any syncContexts
  45. /// that are used to actually run the callbacks.
  46. /// </remarks>
  47. private volatile int _threadIDExecutingCallbacks = -1;
  48. /// <summary>Tracks the running callback to assist ctr.Dispose() to wait for the target callback to complete.</summary>
  49. private long _executingCallbackId;
  50. /// <summary>Partitions of callbacks. Split into multiple partitions to help with scalability of registering/unregistering; each is protected by its own lock.</summary>
  51. private volatile CallbackPartition?[]? _callbackPartitions;
  52. /// <summary>TimerQueueTimer used by CancelAfter and Timer-related ctors. Used instead of Timer to avoid extra allocations and because the rooted behavior is desired.</summary>
  53. private volatile TimerQueueTimer? _timer;
  54. /// <summary><see cref="System.Threading.WaitHandle"/> lazily initialized and returned from <see cref="WaitHandle"/>.</summary>
  55. private volatile ManualResetEvent? _kernelEvent;
  56. /// <summary>Whether this <see cref="CancellationTokenSource"/> has been disposed.</summary>
  57. private bool _disposed;
  58. // legal values for _state
  59. private const int NotCanceledState = 1;
  60. private const int NotifyingState = 2;
  61. private const int NotifyingCompleteState = 3;
  62. /// <summary>Gets whether cancellation has been requested for this <see cref="CancellationTokenSource" />.</summary>
  63. /// <value>Whether cancellation has been requested for this <see cref="CancellationTokenSource" />.</value>
  64. /// <remarks>
  65. /// <para>
  66. /// This property indicates whether cancellation has been requested for this token source, such as
  67. /// due to a call to its <see cref="Cancel()"/> method.
  68. /// </para>
  69. /// <para>
  70. /// If this property returns true, it only guarantees that cancellation has been requested. It does not
  71. /// guarantee that every handler registered with the corresponding token has finished executing, nor
  72. /// that cancellation requests have finished propagating to all registered handlers. Additional
  73. /// synchronization may be required, particularly in situations where related objects are being
  74. /// canceled concurrently.
  75. /// </para>
  76. /// </remarks>
  77. public bool IsCancellationRequested => _state >= NotifyingState;
  78. /// <summary>A simple helper to determine whether cancellation has finished.</summary>
  79. internal bool IsCancellationCompleted => _state == NotifyingCompleteState;
  80. /// <summary>A simple helper to determine whether disposal has occurred.</summary>
  81. internal bool IsDisposed => _disposed;
  82. /// <summary>The ID of the thread that is running callbacks.</summary>
  83. internal int ThreadIDExecutingCallbacks
  84. {
  85. get => _threadIDExecutingCallbacks;
  86. set => _threadIDExecutingCallbacks = value;
  87. }
  88. /// <summary>Gets the <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/>.</summary>
  89. /// <value>The <see cref="CancellationToken"/> associated with this <see cref="CancellationTokenSource"/>.</value>
  90. /// <exception cref="ObjectDisposedException">The token source has been disposed.</exception>
  91. public CancellationToken Token
  92. {
  93. get
  94. {
  95. ThrowIfDisposed();
  96. return new CancellationToken(this);
  97. }
  98. }
  99. internal WaitHandle WaitHandle
  100. {
  101. get
  102. {
  103. ThrowIfDisposed();
  104. // Return the handle if it was already allocated.
  105. if (_kernelEvent != null)
  106. {
  107. return _kernelEvent;
  108. }
  109. // Lazily-initialize the handle.
  110. var mre = new ManualResetEvent(false);
  111. if (Interlocked.CompareExchange(ref _kernelEvent, mre, null) != null)
  112. {
  113. mre.Dispose();
  114. }
  115. // There is a race condition between checking IsCancellationRequested and setting the event.
  116. // However, at this point, the kernel object definitely exists and the cases are:
  117. // 1. if IsCancellationRequested = true, then we will call Set()
  118. // 2. if IsCancellationRequested = false, then NotifyCancellation will see that the event exists, and will call Set().
  119. if (IsCancellationRequested)
  120. {
  121. _kernelEvent.Set();
  122. }
  123. return _kernelEvent;
  124. }
  125. }
  126. /// <summary>Gets the ID of the currently executing callback.</summary>
  127. internal long ExecutingCallback => Volatile.Read(ref _executingCallbackId);
  128. /// <summary>Initializes the <see cref="CancellationTokenSource"/>.</summary>
  129. public CancellationTokenSource() => _state = NotCanceledState;
  130. /// <summary>
  131. /// Constructs a <see cref="CancellationTokenSource"/> that will be canceled after a specified time span.
  132. /// </summary>
  133. /// <param name="delay">The time span to wait before canceling this <see cref="CancellationTokenSource"/></param>
  134. /// <exception cref="ArgumentOutOfRangeException">
  135. /// The exception that is thrown when <paramref name="delay"/> is less than -1 or greater than int.MaxValue.
  136. /// </exception>
  137. /// <remarks>
  138. /// <para>
  139. /// The countdown for the delay starts during the call to the constructor. When the delay expires,
  140. /// the constructed <see cref="CancellationTokenSource"/> is canceled, if it has
  141. /// not been canceled already.
  142. /// </para>
  143. /// <para>
  144. /// Subsequent calls to CancelAfter will reset the delay for the constructed
  145. /// <see cref="CancellationTokenSource"/>, if it has not been
  146. /// canceled already.
  147. /// </para>
  148. /// </remarks>
  149. public CancellationTokenSource(TimeSpan delay)
  150. {
  151. long totalMilliseconds = (long)delay.TotalMilliseconds;
  152. if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
  153. {
  154. throw new ArgumentOutOfRangeException(nameof(delay));
  155. }
  156. InitializeWithTimer((int)totalMilliseconds);
  157. }
  158. /// <summary>
  159. /// Constructs a <see cref="CancellationTokenSource"/> that will be canceled after a specified time span.
  160. /// </summary>
  161. /// <param name="millisecondsDelay">The time span to wait before canceling this <see cref="CancellationTokenSource"/></param>
  162. /// <exception cref="ArgumentOutOfRangeException">
  163. /// The exception that is thrown when <paramref name="millisecondsDelay"/> is less than -1.
  164. /// </exception>
  165. /// <remarks>
  166. /// <para>
  167. /// The countdown for the millisecondsDelay starts during the call to the constructor. When the millisecondsDelay expires,
  168. /// the constructed <see cref="CancellationTokenSource"/> is canceled (if it has
  169. /// not been canceled already).
  170. /// </para>
  171. /// <para>
  172. /// Subsequent calls to CancelAfter will reset the millisecondsDelay for the constructed
  173. /// <see cref="CancellationTokenSource"/>, if it has not been
  174. /// canceled already.
  175. /// </para>
  176. /// </remarks>
  177. public CancellationTokenSource(int millisecondsDelay)
  178. {
  179. if (millisecondsDelay < -1)
  180. {
  181. throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
  182. }
  183. InitializeWithTimer(millisecondsDelay);
  184. }
  185. /// <summary>
  186. /// Common initialization logic when constructing a CTS with a delay parameter.
  187. /// A zero delay will result in immediate cancellation.
  188. /// </summary>
  189. private void InitializeWithTimer(int millisecondsDelay)
  190. {
  191. if (millisecondsDelay == 0)
  192. {
  193. _state = NotifyingCompleteState;
  194. }
  195. else
  196. {
  197. _state = NotCanceledState;
  198. _timer = new TimerQueueTimer(s_timerCallback, this, (uint)millisecondsDelay, Timeout.UnsignedInfinite, flowExecutionContext: false);
  199. // The timer roots this CTS instance while it's scheduled. That is by design, so
  200. // that code like:
  201. // CancellationToken ct = new CancellationTokenSource(timeout).Token;
  202. // will successfully cancel the token after the timeout.
  203. }
  204. }
  205. /// <summary>Communicates a request for cancellation.</summary>
  206. /// <remarks>
  207. /// <para>
  208. /// The associated <see cref="CancellationToken" /> will be notified of the cancellation
  209. /// and will transition to a state where <see cref="CancellationToken.IsCancellationRequested"/> returns true.
  210. /// Any callbacks or cancelable operations registered with the <see cref="CancellationToken"/> will be executed.
  211. /// </para>
  212. /// <para>
  213. /// Cancelable operations and callbacks registered with the token should not throw exceptions.
  214. /// However, this overload of Cancel will aggregate any exceptions thrown into a <see cref="AggregateException"/>,
  215. /// such that one callback throwing an exception will not prevent other registered callbacks from being executed.
  216. /// </para>
  217. /// <para>
  218. /// The <see cref="ExecutionContext"/> that was captured when each callback was registered
  219. /// will be reestablished when the callback is invoked.
  220. /// </para>
  221. /// </remarks>
  222. /// <exception cref="AggregateException">An aggregate exception containing all the exceptions thrown
  223. /// by the registered callbacks on the associated <see cref="CancellationToken"/>.</exception>
  224. /// <exception cref="ObjectDisposedException">This <see cref="CancellationTokenSource"/> has been disposed.</exception>
  225. public void Cancel() => Cancel(false);
  226. /// <summary>Communicates a request for cancellation.</summary>
  227. /// <remarks>
  228. /// <para>
  229. /// The associated <see cref="CancellationToken" /> will be notified of the cancellation and will transition to a state where
  230. /// <see cref="CancellationToken.IsCancellationRequested"/> returns true. Any callbacks or cancelable operationsregistered
  231. /// with the <see cref="CancellationToken"/> will be executed.
  232. /// </para>
  233. /// <para>
  234. /// Cancelable operations and callbacks registered with the token should not throw exceptions.
  235. /// If <paramref name="throwOnFirstException"/> is true, an exception will immediately propagate out of the
  236. /// call to Cancel, preventing the remaining callbacks and cancelable operations from being processed.
  237. /// If <paramref name="throwOnFirstException"/> is false, this overload will aggregate any
  238. /// exceptions thrown into a <see cref="AggregateException"/>,
  239. /// such that one callback throwing an exception will not prevent other registered callbacks from being executed.
  240. /// </para>
  241. /// <para>
  242. /// The <see cref="ExecutionContext"/> that was captured when each callback was registered
  243. /// will be reestablished when the callback is invoked.
  244. /// </para>
  245. /// </remarks>
  246. /// <param name="throwOnFirstException">Specifies whether exceptions should immediately propagate.</param>
  247. /// <exception cref="AggregateException">An aggregate exception containing all the exceptions thrown
  248. /// by the registered callbacks on the associated <see cref="CancellationToken"/>.</exception>
  249. /// <exception cref="ObjectDisposedException">This <see cref="CancellationTokenSource"/> has been disposed.</exception>
  250. public void Cancel(bool throwOnFirstException)
  251. {
  252. ThrowIfDisposed();
  253. NotifyCancellation(throwOnFirstException);
  254. }
  255. /// <summary>Schedules a Cancel operation on this <see cref="CancellationTokenSource"/>.</summary>
  256. /// <param name="delay">The time span to wait before canceling this <see cref="CancellationTokenSource"/>.
  257. /// </param>
  258. /// <exception cref="ObjectDisposedException">The exception thrown when this <see
  259. /// cref="CancellationTokenSource"/> has been disposed.
  260. /// </exception>
  261. /// <exception cref="ArgumentOutOfRangeException">
  262. /// The exception thrown when <paramref name="delay"/> is less than -1 or
  263. /// greater than int.MaxValue.
  264. /// </exception>
  265. /// <remarks>
  266. /// <para>
  267. /// The countdown for the delay starts during this call. When the delay expires,
  268. /// this <see cref="CancellationTokenSource"/> is canceled, if it has
  269. /// not been canceled already.
  270. /// </para>
  271. /// <para>
  272. /// Subsequent calls to CancelAfter will reset the delay for this
  273. /// <see cref="CancellationTokenSource"/>, if it has not been canceled already.
  274. /// </para>
  275. /// </remarks>
  276. public void CancelAfter(TimeSpan delay)
  277. {
  278. long totalMilliseconds = (long)delay.TotalMilliseconds;
  279. if (totalMilliseconds < -1 || totalMilliseconds > int.MaxValue)
  280. {
  281. throw new ArgumentOutOfRangeException(nameof(delay));
  282. }
  283. CancelAfter((int)totalMilliseconds);
  284. }
  285. /// <summary>
  286. /// Schedules a Cancel operation on this <see cref="CancellationTokenSource"/>.
  287. /// </summary>
  288. /// <param name="millisecondsDelay">The time span to wait before canceling this <see
  289. /// cref="CancellationTokenSource"/>.
  290. /// </param>
  291. /// <exception cref="ObjectDisposedException">The exception thrown when this <see
  292. /// cref="CancellationTokenSource"/> has been disposed.
  293. /// </exception>
  294. /// <exception cref="ArgumentOutOfRangeException">
  295. /// The exception thrown when <paramref name="millisecondsDelay"/> is less than -1.
  296. /// </exception>
  297. /// <remarks>
  298. /// <para>
  299. /// The countdown for the millisecondsDelay starts during this call. When the millisecondsDelay expires,
  300. /// this <see cref="CancellationTokenSource"/> is canceled, if it has
  301. /// not been canceled already.
  302. /// </para>
  303. /// <para>
  304. /// Subsequent calls to CancelAfter will reset the millisecondsDelay for this
  305. /// <see cref="CancellationTokenSource"/>, if it has not been
  306. /// canceled already.
  307. /// </para>
  308. /// </remarks>
  309. public void CancelAfter(int millisecondsDelay)
  310. {
  311. ThrowIfDisposed();
  312. if (millisecondsDelay < -1)
  313. {
  314. throw new ArgumentOutOfRangeException(nameof(millisecondsDelay));
  315. }
  316. if (IsCancellationRequested)
  317. {
  318. return;
  319. }
  320. // There is a race condition here as a Cancel could occur between the check of
  321. // IsCancellationRequested and the creation of the timer. This is benign; in the
  322. // worst case, a timer will be created that has no effect when it expires.
  323. // Also, if Dispose() is called right here (after ThrowIfDisposed(), before timer
  324. // creation), it would result in a leaked Timer object (at least until the timer
  325. // expired and Disposed itself). But this would be considered bad behavior, as
  326. // Dispose() is not thread-safe and should not be called concurrently with CancelAfter().
  327. TimerQueueTimer? timer = _timer;
  328. if (timer == null)
  329. {
  330. // Lazily initialize the timer in a thread-safe fashion.
  331. // Initially set to "never go off" because we don't want to take a
  332. // chance on a timer "losing" the initialization and then
  333. // cancelling the token before it (the timer) can be disposed.
  334. timer = new TimerQueueTimer(s_timerCallback, this, Timeout.UnsignedInfinite, Timeout.UnsignedInfinite, flowExecutionContext: false);
  335. TimerQueueTimer? currentTimer = Interlocked.CompareExchange(ref _timer, timer, null);
  336. if (currentTimer != null)
  337. {
  338. // We did not initialize the timer. Dispose the new timer.
  339. timer.Close();
  340. timer = currentTimer;
  341. }
  342. }
  343. // It is possible that _timer has already been disposed, so we must do
  344. // the following in a try/catch block.
  345. try
  346. {
  347. timer.Change((uint)millisecondsDelay, Timeout.UnsignedInfinite);
  348. }
  349. catch (ObjectDisposedException)
  350. {
  351. // Just eat the exception. There is no other way to tell that
  352. // the timer has been disposed, and even if there were, there
  353. // would not be a good way to deal with the observe/dispose
  354. // race condition.
  355. }
  356. }
  357. /// <summary>Releases the resources used by this <see cref="CancellationTokenSource" />.</summary>
  358. /// <remarks>This method is not thread-safe for any other concurrent calls.</remarks>
  359. public void Dispose()
  360. {
  361. Dispose(true);
  362. GC.SuppressFinalize(this);
  363. }
  364. /// <summary>
  365. /// Releases the unmanaged resources used by the <see cref="CancellationTokenSource" /> class and optionally releases the managed resources.
  366. /// </summary>
  367. /// <param name="disposing">true to release both managed and unmanaged resources; false to release only unmanaged resources.</param>
  368. protected virtual void Dispose(bool disposing)
  369. {
  370. if (disposing && !_disposed)
  371. {
  372. // We specifically tolerate that a callback can be unregistered
  373. // after the CTS has been disposed and/or concurrently with cts.Dispose().
  374. // This is safe without locks because Dispose doesn't interact with values
  375. // in the callback partitions, only nulling out the ref to existing partitions.
  376. //
  377. // We also tolerate that a callback can be registered after the CTS has been
  378. // disposed. This is safe because InternalRegister is tolerant
  379. // of _callbackPartitions becoming null during its execution. However,
  380. // we run the acceptable risk of _callbackPartitions getting reinitialized
  381. // to non-null if there is a race between Dispose and Register, in which case this
  382. // instance may unnecessarily hold onto a registered callback. But that's no worse
  383. // than if Dispose wasn't safe to use concurrently, as Dispose would never be called,
  384. // and thus no handlers would be dropped.
  385. //
  386. // And, we tolerate Dispose being used concurrently with Cancel. This is necessary
  387. // to properly support, e.g., LinkedCancellationTokenSource, where, due to common usage patterns,
  388. // it's possible for this pairing to occur with valid usage (e.g. a component accepts
  389. // an external CancellationToken and uses CreateLinkedTokenSource to combine it with an
  390. // internal source of cancellation, then Disposes of that linked source, which could
  391. // happen at the same time the external entity is requesting cancellation).
  392. TimerQueueTimer? timer = _timer;
  393. if (timer != null)
  394. {
  395. _timer = null;
  396. timer.Close(); // TimerQueueTimer.Close is thread-safe
  397. }
  398. _callbackPartitions = null; // free for GC; Cancel correctly handles a null field
  399. // If a kernel event was created via WaitHandle, we'd like to Dispose of it. However,
  400. // we only want to do so if it's not being used by Cancel concurrently. First, we
  401. // interlocked exchange it to be null, and then we check whether cancellation is currently
  402. // in progress. NotifyCancellation will only try to set the event if it exists after it's
  403. // transitioned to and while it's in the NotifyingState.
  404. if (_kernelEvent != null)
  405. {
  406. ManualResetEvent? mre = Interlocked.Exchange<ManualResetEvent?>(ref _kernelEvent!, null);
  407. if (mre != null && _state != NotifyingState)
  408. {
  409. mre.Dispose();
  410. }
  411. }
  412. _disposed = true;
  413. }
  414. }
  415. /// <summary>Throws an exception if the source has been disposed.</summary>
  416. private void ThrowIfDisposed()
  417. {
  418. if (_disposed)
  419. {
  420. ThrowObjectDisposedException();
  421. }
  422. }
  423. /// <summary>Throws an <see cref="ObjectDisposedException"/>. Separated out from ThrowIfDisposed to help with inlining.</summary>
  424. [DoesNotReturn]
  425. private static void ThrowObjectDisposedException() =>
  426. throw new ObjectDisposedException(null, SR.CancellationTokenSource_Disposed);
  427. /// <summary>
  428. /// Registers a callback object. If cancellation has already occurred, the
  429. /// callback will have been run by the time this method returns.
  430. /// </summary>
  431. internal CancellationTokenRegistration InternalRegister(
  432. Action<object?> callback, object? stateForCallback, SynchronizationContext? syncContext, ExecutionContext? executionContext)
  433. {
  434. Debug.Assert(this != s_neverCanceledSource, "This source should never be exposed via a CancellationToken.");
  435. // If not canceled, register the handler; if canceled already, run the callback synchronously.
  436. // This also ensures that during ExecuteCallbackHandlers() there will be no mutation of the _callbackPartitions.
  437. if (!IsCancellationRequested)
  438. {
  439. // In order to enable code to not leak too many handlers, we allow Dispose to be called concurrently
  440. // with Register. While this is not a recommended practice, consumers can and do use it this way.
  441. // We don't make any guarantees about whether the CTS will hold onto the supplied callback if the CTS
  442. // has already been disposed when the callback is registered, but we try not to while at the same time
  443. // not paying any non-negligible overhead. The simple compromise is to check whether we're disposed
  444. // (not volatile), and if we see we are, to return an empty registration. If there's a race and _disposed
  445. // is false even though it's been disposed, or if the disposal request comes in after this line, we simply
  446. // run the minor risk of having _callbackPartitions reinitialized (after it was cleared to null during Dispose).
  447. if (_disposed)
  448. {
  449. return new CancellationTokenRegistration();
  450. }
  451. // Get the partitions...
  452. CallbackPartition?[]? partitions = _callbackPartitions;
  453. if (partitions == null)
  454. {
  455. partitions = new CallbackPartition[s_numPartitions];
  456. partitions = Interlocked.CompareExchange(ref _callbackPartitions, partitions, null) ?? partitions;
  457. }
  458. // ...and determine which partition to use.
  459. int partitionIndex = Environment.CurrentManagedThreadId & s_numPartitionsMask;
  460. Debug.Assert(partitionIndex < partitions.Length, $"Expected {partitionIndex} to be less than {partitions.Length}");
  461. CallbackPartition? partition = partitions[partitionIndex];
  462. if (partition == null)
  463. {
  464. partition = new CallbackPartition(this);
  465. partition = Interlocked.CompareExchange(ref partitions[partitionIndex], partition, null) ?? partition;
  466. }
  467. // Store the callback information into the callback arrays.
  468. long id;
  469. CallbackNode? node;
  470. bool lockTaken = false;
  471. partition.Lock.Enter(ref lockTaken);
  472. try
  473. {
  474. // Assign the next available unique ID.
  475. id = partition.NextAvailableId++;
  476. // Get a node, from the free list if possible or else a new one.
  477. node = partition.FreeNodeList;
  478. if (node != null)
  479. {
  480. partition.FreeNodeList = node.Next;
  481. Debug.Assert(node.Prev == null, "Nodes in the free list should all have a null Prev");
  482. // node.Next will be overwritten below so no need to set it here.
  483. }
  484. else
  485. {
  486. node = new CallbackNode(partition);
  487. }
  488. // Configure the node.
  489. node.Id = id;
  490. node.Callback = callback;
  491. node.CallbackState = stateForCallback;
  492. node.ExecutionContext = executionContext;
  493. node.SynchronizationContext = syncContext;
  494. // Add it to the callbacks list.
  495. node.Next = partition.Callbacks;
  496. if (node.Next != null)
  497. {
  498. node.Next.Prev = node;
  499. }
  500. partition.Callbacks = node;
  501. }
  502. finally
  503. {
  504. partition.Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts
  505. }
  506. // If cancellation hasn't been requested, return the registration.
  507. // if cancellation has been requested, try to undo the registration and run the callback
  508. // ourselves, but if we can't unregister it (e.g. the thread running Cancel snagged
  509. // our callback for execution), return the registration so that the caller can wait
  510. // for callback completion in ctr.Dispose().
  511. var ctr = new CancellationTokenRegistration(id, node);
  512. if (!IsCancellationRequested || !partition.Unregister(id, node))
  513. {
  514. return ctr;
  515. }
  516. }
  517. // Cancellation already occurred. Run the callback on this thread and return an empty registration.
  518. callback(stateForCallback);
  519. return default;
  520. }
  521. private void NotifyCancellation(bool throwOnFirstException)
  522. {
  523. // If we're the first to signal cancellation, do the main extra work.
  524. if (!IsCancellationRequested && Interlocked.CompareExchange(ref _state, NotifyingState, NotCanceledState) == NotCanceledState)
  525. {
  526. // Dispose of the timer, if any. Dispose may be running concurrently here, but TimerQueueTimer.Close is thread-safe.
  527. TimerQueueTimer? timer = _timer;
  528. if (timer != null)
  529. {
  530. _timer = null;
  531. timer.Close();
  532. }
  533. // Set the event if it's been lazily initialized and hasn't yet been disposed of. Dispose may
  534. // be running concurrently, in which case either it'll have set m_kernelEvent back to null and
  535. // we won't see it here, or it'll see that we've transitioned to NOTIFYING and will skip disposing it,
  536. // leaving cleanup to finalization.
  537. _kernelEvent?.Set(); // update the MRE value.
  538. // - late enlisters to the Canceled event will have their callbacks called immediately in the Register() methods.
  539. // - Callbacks are not called inside a lock.
  540. // - After transition, no more delegates will be added to the
  541. // - list of handlers, and hence it can be consumed and cleared at leisure by ExecuteCallbackHandlers.
  542. ExecuteCallbackHandlers(throwOnFirstException);
  543. Debug.Assert(IsCancellationCompleted, "Expected cancellation to have finished");
  544. }
  545. }
  546. /// <summary>Invoke all registered callbacks.</summary>
  547. /// <remarks>The handlers are invoked synchronously in LIFO order.</remarks>
  548. private void ExecuteCallbackHandlers(bool throwOnFirstException)
  549. {
  550. Debug.Assert(IsCancellationRequested, "ExecuteCallbackHandlers should only be called after setting IsCancellationRequested->true");
  551. // Record the threadID being used for running the callbacks.
  552. ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
  553. // If there are no callbacks to run, we can safely exit. Any race conditions to lazy initialize it
  554. // will see IsCancellationRequested and will then run the callback themselves.
  555. CallbackPartition?[]? partitions = Interlocked.Exchange(ref _callbackPartitions, null);
  556. if (partitions == null)
  557. {
  558. Interlocked.Exchange(ref _state, NotifyingCompleteState);
  559. return;
  560. }
  561. List<Exception>? exceptionList = null;
  562. try
  563. {
  564. // For each partition, and each callback in that partition, execute the associated handler.
  565. // We call the delegates in LIFO order on each partition so that callbacks fire 'deepest first'.
  566. // This is intended to help with nesting scenarios so that child enlisters cancel before their parents.
  567. foreach (CallbackPartition? partition in partitions)
  568. {
  569. if (partition == null)
  570. {
  571. // Uninitialized partition. Nothing to do.
  572. continue;
  573. }
  574. // Iterate through all nodes in the partition. We remove each node prior
  575. // to processing it. This allows for unregistration of subsequent registrations
  576. // to still be effective even as other registrations are being invoked.
  577. while (true)
  578. {
  579. CallbackNode? node;
  580. bool lockTaken = false;
  581. partition.Lock.Enter(ref lockTaken);
  582. try
  583. {
  584. // Pop the next registration from the callbacks list.
  585. node = partition.Callbacks;
  586. if (node == null)
  587. {
  588. // No more registrations to process.
  589. break;
  590. }
  591. else
  592. {
  593. Debug.Assert(node.Prev == null);
  594. if (node.Next != null) node.Next.Prev = null;
  595. partition.Callbacks = node.Next;
  596. }
  597. // Publish the intended callback ID, to ensure ctr.Dispose can tell if a wait is necessary.
  598. // This write happens while the lock is held so that Dispose is either able to successfully
  599. // unregister or is guaranteed to see an accurate executing callback ID, since it takes
  600. // the same lock to remove the node from the callback list.
  601. _executingCallbackId = node.Id;
  602. // Now that we've grabbed the Id, reset the node's Id to 0. This signals
  603. // to code unregistering that the node is no longer associated with a callback.
  604. node.Id = 0;
  605. }
  606. finally
  607. {
  608. partition.Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts
  609. }
  610. // Invoke the callback on this thread if there's no sync context or on the
  611. // target sync context if there is one.
  612. try
  613. {
  614. if (node.SynchronizationContext != null)
  615. {
  616. // Transition to the target syncContext and continue there.
  617. node.SynchronizationContext.Send(s =>
  618. {
  619. var n = (CallbackNode)s!;
  620. n.Partition.Source.ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId;
  621. n.ExecuteCallback();
  622. }, node);
  623. ThreadIDExecutingCallbacks = Environment.CurrentManagedThreadId; // above may have altered ThreadIDExecutingCallbacks, so reset it
  624. }
  625. else
  626. {
  627. node.ExecuteCallback();
  628. }
  629. }
  630. catch (Exception ex) when (!throwOnFirstException)
  631. {
  632. // Store the exception and continue
  633. (exceptionList ?? (exceptionList = new List<Exception>())).Add(ex);
  634. }
  635. // Drop the node. While we could add it to the free list, doing so has cost (we'd need to take the lock again)
  636. // and very limited value. Since a source can only be canceled once, and after it's canceled registrations don't
  637. // need nodes, the only benefit to putting this on the free list would be if Register raced with cancellation
  638. // occurring, such that it could have used this free node but would instead need to allocate a new node (if
  639. // there wasn't another free node available).
  640. }
  641. }
  642. }
  643. finally
  644. {
  645. _state = NotifyingCompleteState;
  646. Volatile.Write(ref _executingCallbackId, 0);
  647. Interlocked.MemoryBarrier(); // for safety, prevent reorderings crossing this point and seeing inconsistent state.
  648. }
  649. if (exceptionList != null)
  650. {
  651. Debug.Assert(exceptionList.Count > 0, $"Expected {exceptionList.Count} > 0");
  652. throw new AggregateException(exceptionList);
  653. }
  654. }
  655. /// <summary>Gets the number of callback partitions to use based on the number of cores.</summary>
  656. /// <returns>A power of 2 representing the number of partitions to use.</returns>
  657. private static int GetPartitionCount()
  658. {
  659. int procs = PlatformHelper.ProcessorCount;
  660. int count =
  661. procs > 8 ? 16 : // capped at 16 to limit memory usage on larger machines
  662. procs > 4 ? 8 :
  663. procs > 2 ? 4 :
  664. procs > 1 ? 2 :
  665. 1;
  666. Debug.Assert(count > 0 && (count & (count - 1)) == 0, $"Got {count}, but expected a power of 2");
  667. return count;
  668. }
  669. /// <summary>
  670. /// Creates a <see cref="CancellationTokenSource"/> that will be in the canceled state
  671. /// when any of the source tokens are in the canceled state.
  672. /// </summary>
  673. /// <param name="token1">The first <see cref="CancellationToken">CancellationToken</see> to observe.</param>
  674. /// <param name="token2">The second <see cref="CancellationToken">CancellationToken</see> to observe.</param>
  675. /// <returns>A <see cref="CancellationTokenSource"/> that is linked
  676. /// to the source tokens.</returns>
  677. public static CancellationTokenSource CreateLinkedTokenSource(CancellationToken token1, CancellationToken token2) =>
  678. !token1.CanBeCanceled ? CreateLinkedTokenSource(token2) :
  679. token2.CanBeCanceled ? new Linked2CancellationTokenSource(token1, token2) :
  680. (CancellationTokenSource)new Linked1CancellationTokenSource(token1);
  681. /// <summary>
  682. /// Creates a <see cref="CancellationTokenSource"/> that will be in the canceled state
  683. /// when any of the source tokens are in the canceled state.
  684. /// </summary>
  685. /// <param name="token">The first <see cref="CancellationToken">CancellationToken</see> to observe.</param>
  686. /// <returns>A <see cref="CancellationTokenSource"/> that is linked to the source tokens.</returns>
  687. internal static CancellationTokenSource CreateLinkedTokenSource(CancellationToken token) =>
  688. token.CanBeCanceled ? new Linked1CancellationTokenSource(token) : new CancellationTokenSource();
  689. /// <summary>
  690. /// Creates a <see cref="CancellationTokenSource"/> that will be in the canceled state
  691. /// when any of the source tokens are in the canceled state.
  692. /// </summary>
  693. /// <param name="tokens">The <see cref="CancellationToken">CancellationToken</see> instances to observe.</param>
  694. /// <returns>A <see cref="CancellationTokenSource"/> that is linked to the source tokens.</returns>
  695. /// <exception cref="System.ArgumentNullException"><paramref name="tokens"/> is null.</exception>
  696. public static CancellationTokenSource CreateLinkedTokenSource(params CancellationToken[] tokens)
  697. {
  698. if (tokens == null)
  699. {
  700. throw new ArgumentNullException(nameof(tokens));
  701. }
  702. switch (tokens.Length)
  703. {
  704. case 0:
  705. throw new ArgumentException(SR.CancellationToken_CreateLinkedToken_TokensIsEmpty);
  706. case 1:
  707. return CreateLinkedTokenSource(tokens[0]);
  708. case 2:
  709. return CreateLinkedTokenSource(tokens[0], tokens[1]);
  710. default:
  711. // a defensive copy is not required as the array has value-items that have only a single reference field,
  712. // hence each item cannot be null itself, and reads of the payloads cannot be torn.
  713. return new LinkedNCancellationTokenSource(tokens);
  714. }
  715. }
  716. /// <summary>
  717. /// Wait for a single callback to complete (or, more specifically, to not be running).
  718. /// It is ok to call this method if the callback has already finished.
  719. /// Calling this method before the target callback has been selected for execution would be an error.
  720. /// </summary>
  721. internal void WaitForCallbackToComplete(long id)
  722. {
  723. var sw = new SpinWait();
  724. while (ExecutingCallback == id)
  725. {
  726. sw.SpinOnce(); // spin, as we assume callback execution is fast and that this situation is rare.
  727. }
  728. }
  729. /// <summary>
  730. /// Asynchronously wait for a single callback to complete (or, more specifically, to not be running).
  731. /// It is ok to call this method if the callback has already finished.
  732. /// Calling this method before the target callback has been selected for execution would be an error.
  733. /// </summary>
  734. internal ValueTask WaitForCallbackToCompleteAsync(long id)
  735. {
  736. // If the currently executing callback is not the target one, then the target one has already
  737. // completed and we can simply return. This should be the most common case, as the caller
  738. // calls if we're currently canceling but doesn't know what callback is running, if any.
  739. if (ExecutingCallback != id)
  740. {
  741. return default;
  742. }
  743. // The specified callback is actually running: queue a task that'll poll for the currently
  744. // executing callback to complete. In general scheduling such a work item that polls is a really
  745. // unfortunate thing to do. However, we expect this to be a rare case (disposing while the associated
  746. // callback is running), and brief when it happens (so the polling will be minimal), and making
  747. // this work with a callback mechanism will add additional cost to other more common cases.
  748. return new ValueTask(Task.Factory.StartNew(s =>
  749. {
  750. Debug.Assert(s is Tuple<CancellationTokenSource, long>);
  751. var state = (Tuple<CancellationTokenSource, long>)s;
  752. state.Item1.WaitForCallbackToComplete(state.Item2);
  753. }, Tuple.Create(this, id), CancellationToken.None, TaskCreationOptions.None, TaskScheduler.Default));
  754. }
  755. private sealed class Linked1CancellationTokenSource : CancellationTokenSource
  756. {
  757. private readonly CancellationTokenRegistration _reg1;
  758. internal Linked1CancellationTokenSource(CancellationToken token1)
  759. {
  760. _reg1 = token1.UnsafeRegister(LinkedNCancellationTokenSource.s_linkedTokenCancelDelegate, this);
  761. }
  762. protected override void Dispose(bool disposing)
  763. {
  764. if (!disposing || _disposed)
  765. {
  766. return;
  767. }
  768. _reg1.Dispose();
  769. base.Dispose(disposing);
  770. }
  771. }
  772. private sealed class Linked2CancellationTokenSource : CancellationTokenSource
  773. {
  774. private readonly CancellationTokenRegistration _reg1;
  775. private readonly CancellationTokenRegistration _reg2;
  776. internal Linked2CancellationTokenSource(CancellationToken token1, CancellationToken token2)
  777. {
  778. _reg1 = token1.UnsafeRegister(LinkedNCancellationTokenSource.s_linkedTokenCancelDelegate, this);
  779. _reg2 = token2.UnsafeRegister(LinkedNCancellationTokenSource.s_linkedTokenCancelDelegate, this);
  780. }
  781. protected override void Dispose(bool disposing)
  782. {
  783. if (!disposing || _disposed)
  784. {
  785. return;
  786. }
  787. _reg1.Dispose();
  788. _reg2.Dispose();
  789. base.Dispose(disposing);
  790. }
  791. }
  792. private sealed class LinkedNCancellationTokenSource : CancellationTokenSource
  793. {
  794. internal static readonly Action<object?> s_linkedTokenCancelDelegate = s =>
  795. {
  796. Debug.Assert(s is CancellationTokenSource, $"Expected {typeof(CancellationTokenSource)}, got {s}");
  797. ((CancellationTokenSource)s).NotifyCancellation(throwOnFirstException: false); // skip ThrowIfDisposed() check in Cancel()
  798. };
  799. private CancellationTokenRegistration[]? _linkingRegistrations;
  800. internal LinkedNCancellationTokenSource(params CancellationToken[] tokens)
  801. {
  802. _linkingRegistrations = new CancellationTokenRegistration[tokens.Length];
  803. for (int i = 0; i < tokens.Length; i++)
  804. {
  805. if (tokens[i].CanBeCanceled)
  806. {
  807. _linkingRegistrations[i] = tokens[i].UnsafeRegister(s_linkedTokenCancelDelegate, this);
  808. }
  809. // Empty slots in the array will be default(CancellationTokenRegistration), which are nops to Dispose.
  810. // Based on usage patterns, such occurrences should also be rare, such that it's not worth resizing
  811. // the array and incurring the related costs.
  812. }
  813. }
  814. protected override void Dispose(bool disposing)
  815. {
  816. if (!disposing || _disposed)
  817. {
  818. return;
  819. }
  820. CancellationTokenRegistration[]? linkingRegistrations = _linkingRegistrations;
  821. if (linkingRegistrations != null)
  822. {
  823. _linkingRegistrations = null; // release for GC once we're done enumerating
  824. for (int i = 0; i < linkingRegistrations.Length; i++)
  825. {
  826. linkingRegistrations[i].Dispose();
  827. }
  828. }
  829. base.Dispose(disposing);
  830. }
  831. }
  832. internal sealed class CallbackPartition
  833. {
  834. /// <summary>The associated source that owns this partition.</summary>
  835. public readonly CancellationTokenSource Source;
  836. /// <summary>Lock that protects all state in the partition.</summary>
  837. public SpinLock Lock = new SpinLock(enableThreadOwnerTracking: false); // mutable struct; do not make this readonly
  838. /// <summary>Doubly-linked list of callbacks registered with the partition. Callbacks are removed during unregistration and as they're invoked.</summary>
  839. public CallbackNode? Callbacks;
  840. /// <summary>Singly-linked list of free nodes that can be used for subsequent callback registrations.</summary>
  841. public CallbackNode? FreeNodeList;
  842. /// <summary>Every callback is assigned a unique, never-reused ID. This defines the next available ID.</summary>
  843. public long NextAvailableId = 1; // avoid using 0, as that's the default long value and used to represent an empty node
  844. public CallbackPartition(CancellationTokenSource source)
  845. {
  846. Debug.Assert(source != null, "Expected non-null source");
  847. Source = source;
  848. }
  849. internal bool Unregister(long id, CallbackNode node)
  850. {
  851. Debug.Assert(id != 0, "Expected non-zero id");
  852. Debug.Assert(node != null, "Expected non-null node");
  853. bool lockTaken = false;
  854. Lock.Enter(ref lockTaken);
  855. try
  856. {
  857. if (node.Id != id)
  858. {
  859. // Either:
  860. // - The callback is currently or has already been invoked, in which case node.Id
  861. // will no longer equal the assigned id, as it will have transitioned to 0.
  862. // - The registration was already disposed of, in which case node.Id will similarly
  863. // no longer equal the assigned id, as it will have transitioned to 0 and potentially
  864. // then to another (larger) value when reused for a new registration.
  865. // In either case, there's nothing to unregister.
  866. return false;
  867. }
  868. // The registration must still be in the callbacks list. Remove it.
  869. if (Callbacks == node)
  870. {
  871. Debug.Assert(node.Prev == null);
  872. Callbacks = node.Next;
  873. }
  874. else
  875. {
  876. Debug.Assert(node.Prev != null);
  877. node.Prev.Next = node.Next;
  878. }
  879. if (node.Next != null)
  880. {
  881. node.Next.Prev = node.Prev;
  882. }
  883. // Clear out the now unused node and put it on the singly-linked free list.
  884. // The only field we don't clear out is the associated Partition, as that's fixed
  885. // throughout the nodes lifetime, regardless of how many times its reused by
  886. // the same partition (it's never used on a different partition).
  887. node.Id = 0;
  888. node.Callback = null;
  889. node.CallbackState = null;
  890. node.ExecutionContext = null;
  891. node.SynchronizationContext = null;
  892. node.Prev = null;
  893. node.Next = FreeNodeList;
  894. FreeNodeList = node;
  895. return true;
  896. }
  897. finally
  898. {
  899. Lock.Exit(useMemoryBarrier: false); // no check on lockTaken needed without thread aborts
  900. }
  901. }
  902. }
  903. /// <summary>All of the state associated a registered callback, in a node that's part of a linked list of registered callbacks.</summary>
  904. internal sealed class CallbackNode
  905. {
  906. public readonly CallbackPartition Partition;
  907. public CallbackNode? Prev;
  908. public CallbackNode? Next;
  909. public long Id;
  910. public Action<object?>? Callback;
  911. public object? CallbackState;
  912. public ExecutionContext? ExecutionContext;
  913. public SynchronizationContext? SynchronizationContext;
  914. public CallbackNode(CallbackPartition partition)
  915. {
  916. Debug.Assert(partition != null, "Expected non-null partition");
  917. Partition = partition;
  918. }
  919. public void ExecuteCallback()
  920. {
  921. ExecutionContext? context = ExecutionContext;
  922. if (context != null)
  923. {
  924. ExecutionContext.RunInternal(context, s =>
  925. {
  926. Debug.Assert(s is CallbackNode, $"Expected {typeof(CallbackNode)}, got {s}");
  927. CallbackNode n = (CallbackNode)s;
  928. Debug.Assert(n.Callback != null);
  929. n.Callback(n.CallbackState);
  930. }, this);
  931. }
  932. else
  933. {
  934. Debug.Assert(Callback != null);
  935. Callback(CallbackState);
  936. }
  937. }
  938. }
  939. }
  940. }