FileStreamCompletionSource.Win32.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT license.
  3. // See the LICENSE file in the project root for more information.
  4. using System.Buffers;
  5. using System.Diagnostics;
  6. using System.Runtime.ExceptionServices;
  7. using System.Runtime.InteropServices;
  8. using System.Threading;
  9. using System.Threading.Tasks;
  10. namespace System.IO
  11. {
  12. public partial class FileStream : Stream
  13. {
  14. // This is an internal object extending TaskCompletionSource with fields
  15. // for all of the relevant data necessary to complete the IO operation.
  16. // This is used by IOCallback and all of the async methods.
  17. private unsafe class FileStreamCompletionSource : TaskCompletionSource<int>
  18. {
  19. private const long NoResult = 0;
  20. private const long ResultSuccess = (long)1 << 32;
  21. private const long ResultError = (long)2 << 32;
  22. private const long RegisteringCancellation = (long)4 << 32;
  23. private const long CompletedCallback = (long)8 << 32;
  24. private const ulong ResultMask = ((ulong)uint.MaxValue) << 32;
  25. private static Action<object?>? s_cancelCallback;
  26. private readonly FileStream _stream;
  27. private readonly int _numBufferedBytes;
  28. private CancellationTokenRegistration _cancellationRegistration;
  29. #if DEBUG
  30. private bool _cancellationHasBeenRegistered;
  31. #endif
  32. private NativeOverlapped* _overlapped; // Overlapped class responsible for operations in progress when an appdomain unload occurs
  33. private long _result; // Using long since this needs to be used in Interlocked APIs
  34. // Using RunContinuationsAsynchronously for compat reasons (old API used Task.Factory.StartNew for continuations)
  35. protected FileStreamCompletionSource(FileStream stream, int numBufferedBytes, byte[]? bytes)
  36. : base(TaskCreationOptions.RunContinuationsAsynchronously)
  37. {
  38. _numBufferedBytes = numBufferedBytes;
  39. _stream = stream;
  40. _result = NoResult;
  41. // Create the native overlapped. We try to use the preallocated overlapped if possible: it's possible if the byte
  42. // buffer is null (there's nothing to pin) or the same one that's associated with the preallocated overlapped (and
  43. // thus is already pinned) and if no one else is currently using the preallocated overlapped. This is the fast-path
  44. // for cases where the user-provided buffer is smaller than the FileStream's buffer (such that the FileStream's
  45. // buffer is used) and where operations on the FileStream are not being performed concurrently.
  46. Debug.Assert(bytes == null || ReferenceEquals(bytes, _stream._buffer));
  47. // The _preallocatedOverlapped is null if the internal buffer was never created, so we check for
  48. // a non-null bytes before using the stream's _preallocatedOverlapped
  49. _overlapped = bytes != null && _stream.CompareExchangeCurrentOverlappedOwner(this, null) == null ?
  50. _stream._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(_stream._preallocatedOverlapped!) : // allocated when buffer was created, and buffer is non-null
  51. _stream._fileHandle.ThreadPoolBinding!.AllocateNativeOverlapped(s_ioCallback, this, bytes);
  52. Debug.Assert(_overlapped != null, "AllocateNativeOverlapped returned null");
  53. }
  54. internal NativeOverlapped* Overlapped => _overlapped;
  55. public void SetCompletedSynchronously(int numBytes)
  56. {
  57. ReleaseNativeResource();
  58. TrySetResult(numBytes + _numBufferedBytes);
  59. }
  60. public void RegisterForCancellation(CancellationToken cancellationToken)
  61. {
  62. #if DEBUG
  63. Debug.Assert(cancellationToken.CanBeCanceled);
  64. Debug.Assert(!_cancellationHasBeenRegistered, "Cannot register for cancellation twice");
  65. _cancellationHasBeenRegistered = true;
  66. #endif
  67. // Quick check to make sure the IO hasn't completed
  68. if (_overlapped != null)
  69. {
  70. Action<object?>? cancelCallback = s_cancelCallback ??= Cancel;
  71. // Register the cancellation only if the IO hasn't completed
  72. long packedResult = Interlocked.CompareExchange(ref _result, RegisteringCancellation, NoResult);
  73. if (packedResult == NoResult)
  74. {
  75. _cancellationRegistration = cancellationToken.UnsafeRegister(cancelCallback, this);
  76. // Switch the result, just in case IO completed while we were setting the registration
  77. packedResult = Interlocked.Exchange(ref _result, NoResult);
  78. }
  79. else if (packedResult != CompletedCallback)
  80. {
  81. // Failed to set the result, IO is in the process of completing
  82. // Attempt to take the packed result
  83. packedResult = Interlocked.Exchange(ref _result, NoResult);
  84. }
  85. // If we have a callback that needs to be completed
  86. if ((packedResult != NoResult) && (packedResult != CompletedCallback) && (packedResult != RegisteringCancellation))
  87. {
  88. CompleteCallback((ulong)packedResult);
  89. }
  90. }
  91. }
  92. internal virtual void ReleaseNativeResource()
  93. {
  94. // Ensure that cancellation has been completed and cleaned up.
  95. _cancellationRegistration.Dispose();
  96. // Free the overlapped.
  97. // NOTE: The cancellation must *NOT* be running at this point, or it may observe freed memory
  98. // (this is why we disposed the registration above).
  99. if (_overlapped != null)
  100. {
  101. _stream._fileHandle.ThreadPoolBinding!.FreeNativeOverlapped(_overlapped);
  102. _overlapped = null;
  103. }
  104. // Ensure we're no longer set as the current completion source (we may not have been to begin with).
  105. // Only one operation at a time is eligible to use the preallocated overlapped,
  106. _stream.CompareExchangeCurrentOverlappedOwner(null, this);
  107. }
  108. // When doing IO asynchronously (i.e. _isAsync==true), this callback is
  109. // called by a free thread in the threadpool when the IO operation
  110. // completes.
  111. internal static void IOCallback(uint errorCode, uint numBytes, NativeOverlapped* pOverlapped)
  112. {
  113. // Extract the completion source from the overlapped. The state in the overlapped
  114. // will either be a FileStream (in the case where the preallocated overlapped was used),
  115. // in which case the operation being completed is its _currentOverlappedOwner, or it'll
  116. // be directly the FileStreamCompletionSource that's completing (in the case where the preallocated
  117. // overlapped was already in use by another operation).
  118. object? state = ThreadPoolBoundHandle.GetNativeOverlappedState(pOverlapped);
  119. Debug.Assert(state is FileStream || state is FileStreamCompletionSource);
  120. FileStreamCompletionSource completionSource = state is FileStream fs ?
  121. fs._currentOverlappedOwner! : // must be owned
  122. (FileStreamCompletionSource)state!;
  123. Debug.Assert(completionSource != null);
  124. Debug.Assert(completionSource._overlapped == pOverlapped, "Overlaps don't match");
  125. // Handle reading from & writing to closed pipes. While I'm not sure
  126. // this is entirely necessary anymore, maybe it's possible for
  127. // an async read on a pipe to be issued and then the pipe is closed,
  128. // returning this error. This may very well be necessary.
  129. ulong packedResult;
  130. if (errorCode != 0 && errorCode != ERROR_BROKEN_PIPE && errorCode != ERROR_NO_DATA)
  131. {
  132. packedResult = ((ulong)ResultError | errorCode);
  133. }
  134. else
  135. {
  136. packedResult = ((ulong)ResultSuccess | numBytes);
  137. }
  138. // Stow the result so that other threads can observe it
  139. // And, if no other thread is registering cancellation, continue
  140. if (NoResult == Interlocked.Exchange(ref completionSource._result, (long)packedResult))
  141. {
  142. // Successfully set the state, attempt to take back the callback
  143. if (Interlocked.Exchange(ref completionSource._result, CompletedCallback) != NoResult)
  144. {
  145. // Successfully got the callback, finish the callback
  146. completionSource.CompleteCallback(packedResult);
  147. }
  148. // else: Some other thread stole the result, so now it is responsible to finish the callback
  149. }
  150. // else: Some other thread is registering a cancellation, so it *must* finish the callback
  151. }
  152. private void CompleteCallback(ulong packedResult)
  153. {
  154. // Free up the native resource and cancellation registration
  155. CancellationToken cancellationToken = _cancellationRegistration.Token; // access before disposing registration
  156. ReleaseNativeResource();
  157. // Unpack the result and send it to the user
  158. long result = (long)(packedResult & ResultMask);
  159. if (result == ResultError)
  160. {
  161. int errorCode = unchecked((int)(packedResult & uint.MaxValue));
  162. if (errorCode == Interop.Errors.ERROR_OPERATION_ABORTED)
  163. {
  164. TrySetCanceled(cancellationToken.IsCancellationRequested ? cancellationToken : new CancellationToken(true));
  165. }
  166. else
  167. {
  168. Exception e = Win32Marshal.GetExceptionForWin32Error(errorCode);
  169. e.SetCurrentStackTrace();
  170. TrySetException(e);
  171. }
  172. }
  173. else
  174. {
  175. Debug.Assert(result == ResultSuccess, "Unknown result");
  176. TrySetResult((int)(packedResult & uint.MaxValue) + _numBufferedBytes);
  177. }
  178. }
  179. private static void Cancel(object? state)
  180. {
  181. // WARNING: This may potentially be called under a lock (during cancellation registration)
  182. Debug.Assert(state is FileStreamCompletionSource, "Unknown state passed to cancellation");
  183. FileStreamCompletionSource completionSource = (FileStreamCompletionSource)state;
  184. Debug.Assert(completionSource._overlapped != null && !completionSource.Task.IsCompleted, "IO should not have completed yet");
  185. // If the handle is still valid, attempt to cancel the IO
  186. if (!completionSource._stream._fileHandle.IsInvalid &&
  187. !Interop.Kernel32.CancelIoEx(completionSource._stream._fileHandle, completionSource._overlapped))
  188. {
  189. int errorCode = Marshal.GetLastWin32Error();
  190. // ERROR_NOT_FOUND is returned if CancelIoEx cannot find the request to cancel.
  191. // This probably means that the IO operation has completed.
  192. if (errorCode != Interop.Errors.ERROR_NOT_FOUND)
  193. {
  194. throw Win32Marshal.GetExceptionForWin32Error(errorCode);
  195. }
  196. }
  197. }
  198. public static FileStreamCompletionSource Create(FileStream stream, int numBufferedBytesRead, ReadOnlyMemory<byte> memory)
  199. {
  200. // If the memory passed in is the stream's internal buffer, we can use the base FileStreamCompletionSource,
  201. // which has a PreAllocatedOverlapped with the memory already pinned. Otherwise, we use the derived
  202. // MemoryFileStreamCompletionSource, which Retains the memory, which will result in less pinning in the case
  203. // where the underlying memory is backed by pre-pinned buffers.
  204. return MemoryMarshal.TryGetArray(memory, out ArraySegment<byte> buffer) && ReferenceEquals(buffer.Array, stream._buffer) ?
  205. new FileStreamCompletionSource(stream, numBufferedBytesRead, buffer.Array) :
  206. new MemoryFileStreamCompletionSource(stream, numBufferedBytesRead, memory);
  207. }
  208. }
  209. /// <summary>
  210. /// Extends <see cref="FileStreamCompletionSource"/> with to support disposing of a
  211. /// <see cref="MemoryHandle"/> when the operation has completed. This should only be used
  212. /// when memory doesn't wrap a byte[].
  213. /// </summary>
  214. private sealed class MemoryFileStreamCompletionSource : FileStreamCompletionSource
  215. {
  216. private MemoryHandle _handle; // mutable struct; do not make this readonly
  217. internal MemoryFileStreamCompletionSource(FileStream stream, int numBufferedBytes, ReadOnlyMemory<byte> memory) :
  218. base(stream, numBufferedBytes, bytes: null) // this type handles the pinning, so null is passed for bytes
  219. {
  220. _handle = memory.Pin();
  221. }
  222. internal override void ReleaseNativeResource()
  223. {
  224. _handle.Dispose();
  225. base.ReleaseNativeResource();
  226. }
  227. }
  228. }
  229. }