Stream.cs 50 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344
  1. // Licensed to the .NET Foundation under one or more agreements.
  2. // The .NET Foundation licenses this file to you under the MIT license.
  3. // See the LICENSE file in the project root for more information.
  4. /*============================================================
  5. **
  6. **
  7. **
  8. **
  9. **
  10. ** Purpose: Abstract base class for all Streams. Provides
  11. ** default implementations of asynchronous reads & writes, in
  12. ** terms of the synchronous reads & writes (and vice versa).
  13. **
  14. **
  15. ===========================================================*/
  16. using System.Buffers;
  17. using System.Diagnostics;
  18. using System.Runtime.ExceptionServices;
  19. using System.Runtime.InteropServices;
  20. using System.Threading;
  21. using System.Threading.Tasks;
  22. namespace System.IO
  23. {
  24. public abstract partial class Stream : MarshalByRefObject, IDisposable, IAsyncDisposable
  25. {
  26. public static readonly Stream Null = new NullStream();
  27. // We pick a value that is the largest multiple of 4096 that is still smaller than the large object heap threshold (85K).
  28. // The CopyTo/CopyToAsync buffer is short-lived and is likely to be collected at Gen0, and it offers a significant
  29. // improvement in Copy performance.
  30. private const int DefaultCopyBufferSize = 81920;
  31. // To implement Async IO operations on streams that don't support async IO
  32. private ReadWriteTask? _activeReadWriteTask;
  33. private SemaphoreSlim? _asyncActiveSemaphore;
  34. internal SemaphoreSlim EnsureAsyncActiveSemaphoreInitialized()
  35. {
  36. // Lazily-initialize _asyncActiveSemaphore. As we're never accessing the SemaphoreSlim's
  37. // WaitHandle, we don't need to worry about Disposing it.
  38. return LazyInitializer.EnsureInitialized(ref _asyncActiveSemaphore, () => new SemaphoreSlim(1, 1));
  39. }
  40. public abstract bool CanRead
  41. {
  42. get;
  43. }
  44. // If CanSeek is false, Position, Seek, Length, and SetLength should throw.
  45. public abstract bool CanSeek
  46. {
  47. get;
  48. }
  49. public virtual bool CanTimeout
  50. {
  51. get
  52. {
  53. return false;
  54. }
  55. }
  56. public abstract bool CanWrite
  57. {
  58. get;
  59. }
  60. public abstract long Length
  61. {
  62. get;
  63. }
  64. public abstract long Position
  65. {
  66. get;
  67. set;
  68. }
  69. public virtual int ReadTimeout
  70. {
  71. get
  72. {
  73. throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
  74. }
  75. set
  76. {
  77. throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
  78. }
  79. }
  80. public virtual int WriteTimeout
  81. {
  82. get
  83. {
  84. throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
  85. }
  86. set
  87. {
  88. throw new InvalidOperationException(SR.InvalidOperation_TimeoutsNotSupported);
  89. }
  90. }
  91. public Task CopyToAsync(Stream destination)
  92. {
  93. int bufferSize = GetCopyBufferSize();
  94. return CopyToAsync(destination, bufferSize);
  95. }
  96. public Task CopyToAsync(Stream destination, int bufferSize)
  97. {
  98. return CopyToAsync(destination, bufferSize, CancellationToken.None);
  99. }
  100. public Task CopyToAsync(Stream destination, CancellationToken cancellationToken)
  101. {
  102. int bufferSize = GetCopyBufferSize();
  103. return CopyToAsync(destination, bufferSize, cancellationToken);
  104. }
  105. public virtual Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
  106. {
  107. StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
  108. return CopyToAsyncInternal(destination, bufferSize, cancellationToken);
  109. }
  110. private async Task CopyToAsyncInternal(Stream destination, int bufferSize, CancellationToken cancellationToken)
  111. {
  112. byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
  113. try
  114. {
  115. while (true)
  116. {
  117. int bytesRead = await ReadAsync(new Memory<byte>(buffer), cancellationToken).ConfigureAwait(false);
  118. if (bytesRead == 0) break;
  119. await destination.WriteAsync(new ReadOnlyMemory<byte>(buffer, 0, bytesRead), cancellationToken).ConfigureAwait(false);
  120. }
  121. }
  122. finally
  123. {
  124. ArrayPool<byte>.Shared.Return(buffer);
  125. }
  126. }
  127. // Reads the bytes from the current stream and writes the bytes to
  128. // the destination stream until all bytes are read, starting at
  129. // the current position.
  130. public void CopyTo(Stream destination)
  131. {
  132. int bufferSize = GetCopyBufferSize();
  133. CopyTo(destination, bufferSize);
  134. }
  135. public virtual void CopyTo(Stream destination, int bufferSize)
  136. {
  137. StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
  138. byte[] buffer = ArrayPool<byte>.Shared.Rent(bufferSize);
  139. try
  140. {
  141. int read;
  142. while ((read = Read(buffer, 0, buffer.Length)) != 0)
  143. {
  144. destination.Write(buffer, 0, read);
  145. }
  146. }
  147. finally
  148. {
  149. ArrayPool<byte>.Shared.Return(buffer);
  150. }
  151. }
  152. private int GetCopyBufferSize()
  153. {
  154. int bufferSize = DefaultCopyBufferSize;
  155. if (CanSeek)
  156. {
  157. long length = Length;
  158. long position = Position;
  159. if (length <= position) // Handles negative overflows
  160. {
  161. // There are no bytes left in the stream to copy.
  162. // However, because CopyTo{Async} is virtual, we need to
  163. // ensure that any override is still invoked to provide its
  164. // own validation, so we use the smallest legal buffer size here.
  165. bufferSize = 1;
  166. }
  167. else
  168. {
  169. long remaining = length - position;
  170. if (remaining > 0)
  171. {
  172. // In the case of a positive overflow, stick to the default size
  173. bufferSize = (int)Math.Min(bufferSize, remaining);
  174. }
  175. }
  176. }
  177. return bufferSize;
  178. }
  179. // Stream used to require that all cleanup logic went into Close(),
  180. // which was thought up before we invented IDisposable. However, we
  181. // need to follow the IDisposable pattern so that users can write
  182. // sensible subclasses without needing to inspect all their base
  183. // classes, and without worrying about version brittleness, from a
  184. // base class switching to the Dispose pattern. We're moving
  185. // Stream to the Dispose(bool) pattern - that's where all subclasses
  186. // should put their cleanup now.
  187. public virtual void Close()
  188. {
  189. Dispose(true);
  190. GC.SuppressFinalize(this);
  191. }
  192. public void Dispose()
  193. {
  194. Close();
  195. }
  196. protected virtual void Dispose(bool disposing)
  197. {
  198. // Note: Never change this to call other virtual methods on Stream
  199. // like Write, since the state on subclasses has already been
  200. // torn down. This is the last code to run on cleanup for a stream.
  201. }
  202. public virtual ValueTask DisposeAsync()
  203. {
  204. try
  205. {
  206. Dispose();
  207. return default;
  208. }
  209. catch (Exception exc)
  210. {
  211. return new ValueTask(Task.FromException(exc));
  212. }
  213. }
  214. public abstract void Flush();
  215. public Task FlushAsync()
  216. {
  217. return FlushAsync(CancellationToken.None);
  218. }
  219. public virtual Task FlushAsync(CancellationToken cancellationToken)
  220. {
  221. return Task.Factory.StartNew(state => ((Stream)state!).Flush(), this,
  222. cancellationToken, TaskCreationOptions.DenyChildAttach, TaskScheduler.Default);
  223. }
  224. [Obsolete("CreateWaitHandle will be removed eventually. Please use \"new ManualResetEvent(false)\" instead.")]
  225. protected virtual WaitHandle CreateWaitHandle()
  226. {
  227. return new ManualResetEvent(false);
  228. }
  229. public virtual IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  230. {
  231. return BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
  232. }
  233. internal IAsyncResult BeginReadInternal(
  234. byte[] buffer, int offset, int count, AsyncCallback? callback, object? state,
  235. bool serializeAsynchronously, bool apm)
  236. {
  237. if (!CanRead) throw Error.GetReadNotSupported();
  238. // To avoid a race with a stream's position pointer & generating race conditions
  239. // with internal buffer indexes in our own streams that
  240. // don't natively support async IO operations when there are multiple
  241. // async requests outstanding, we will block the application's main
  242. // thread if it does a second IO request until the first one completes.
  243. var semaphore = EnsureAsyncActiveSemaphoreInitialized();
  244. Task? semaphoreTask = null;
  245. if (serializeAsynchronously)
  246. {
  247. semaphoreTask = semaphore.WaitAsync();
  248. }
  249. else
  250. {
  251. semaphore.Wait();
  252. }
  253. // Create the task to asynchronously do a Read. This task serves both
  254. // as the asynchronous work item and as the IAsyncResult returned to the user.
  255. var asyncResult = new ReadWriteTask(true /*isRead*/, apm, delegate
  256. {
  257. // The ReadWriteTask stores all of the parameters to pass to Read.
  258. // As we're currently inside of it, we can get the current task
  259. // and grab the parameters from it.
  260. var thisTask = Task.InternalCurrent as ReadWriteTask;
  261. Debug.Assert(thisTask != null && thisTask._stream != null && thisTask._buffer != null,
  262. "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream and buffer should be set");
  263. try
  264. {
  265. // Do the Read and return the number of bytes read
  266. return thisTask._stream.Read(thisTask._buffer, thisTask._offset, thisTask._count);
  267. }
  268. finally
  269. {
  270. // If this implementation is part of Begin/EndXx, then the EndXx method will handle
  271. // finishing the async operation. However, if this is part of XxAsync, then there won't
  272. // be an end method, and this task is responsible for cleaning up.
  273. if (!thisTask._apm)
  274. {
  275. thisTask._stream.FinishTrackingAsyncOperation();
  276. }
  277. thisTask.ClearBeginState(); // just to help alleviate some memory pressure
  278. }
  279. }, state, this, buffer, offset, count, callback);
  280. // Schedule it
  281. if (semaphoreTask != null)
  282. RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
  283. else
  284. RunReadWriteTask(asyncResult);
  285. return asyncResult; // return it
  286. }
  287. public virtual int EndRead(IAsyncResult asyncResult)
  288. {
  289. if (asyncResult == null)
  290. throw new ArgumentNullException(nameof(asyncResult));
  291. var readTask = _activeReadWriteTask;
  292. if (readTask == null)
  293. {
  294. throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
  295. }
  296. else if (readTask != asyncResult)
  297. {
  298. throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
  299. }
  300. else if (!readTask._isRead)
  301. {
  302. throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndReadCalledMultiple);
  303. }
  304. try
  305. {
  306. return readTask.GetAwaiter().GetResult(); // block until completion, then get result / propagate any exception
  307. }
  308. finally
  309. {
  310. FinishTrackingAsyncOperation();
  311. }
  312. }
  313. public Task<int> ReadAsync(byte[] buffer, int offset, int count)
  314. {
  315. return ReadAsync(buffer, offset, count, CancellationToken.None);
  316. }
  317. public virtual Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  318. {
  319. // If cancellation was requested, bail early with an already completed task.
  320. // Otherwise, return a task that represents the Begin/End methods.
  321. return cancellationToken.IsCancellationRequested
  322. ? Task.FromCanceled<int>(cancellationToken)
  323. : BeginEndReadAsync(buffer, offset, count);
  324. }
  325. public virtual ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
  326. {
  327. if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
  328. {
  329. return new ValueTask<int>(ReadAsync(array.Array!, array.Offset, array.Count, cancellationToken));
  330. }
  331. else
  332. {
  333. byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
  334. return FinishReadAsync(ReadAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer, buffer);
  335. async ValueTask<int> FinishReadAsync(Task<int> readTask, byte[] localBuffer, Memory<byte> localDestination)
  336. {
  337. try
  338. {
  339. int result = await readTask.ConfigureAwait(false);
  340. new Span<byte>(localBuffer, 0, result).CopyTo(localDestination.Span);
  341. return result;
  342. }
  343. finally
  344. {
  345. ArrayPool<byte>.Shared.Return(localBuffer);
  346. }
  347. }
  348. }
  349. }
  350. private Task<int> BeginEndReadAsync(byte[] buffer, int offset, int count)
  351. {
  352. if (!HasOverriddenBeginEndRead())
  353. {
  354. // If the Stream does not override Begin/EndRead, then we can take an optimized path
  355. // that skips an extra layer of tasks / IAsyncResults.
  356. return (Task<int>)BeginReadInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
  357. }
  358. // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
  359. return TaskFactory<int>.FromAsyncTrim(
  360. this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
  361. (stream, args, callback, state) => stream.BeginRead(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
  362. (stream, asyncResult) => stream.EndRead(asyncResult)); // cached by compiler
  363. }
  364. private struct ReadWriteParameters // struct for arguments to Read and Write calls
  365. {
  366. internal byte[] Buffer;
  367. internal int Offset;
  368. internal int Count;
  369. }
  370. public virtual IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  371. {
  372. return BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: false, apm: true);
  373. }
  374. internal IAsyncResult BeginWriteInternal(
  375. byte[] buffer, int offset, int count, AsyncCallback? callback, object? state,
  376. bool serializeAsynchronously, bool apm)
  377. {
  378. if (!CanWrite) throw Error.GetWriteNotSupported();
  379. // To avoid a race condition with a stream's position pointer & generating conditions
  380. // with internal buffer indexes in our own streams that
  381. // don't natively support async IO operations when there are multiple
  382. // async requests outstanding, we will block the application's main
  383. // thread if it does a second IO request until the first one completes.
  384. var semaphore = EnsureAsyncActiveSemaphoreInitialized();
  385. Task? semaphoreTask = null;
  386. if (serializeAsynchronously)
  387. {
  388. semaphoreTask = semaphore.WaitAsync(); // kick off the asynchronous wait, but don't block
  389. }
  390. else
  391. {
  392. semaphore.Wait(); // synchronously wait here
  393. }
  394. // Create the task to asynchronously do a Write. This task serves both
  395. // as the asynchronous work item and as the IAsyncResult returned to the user.
  396. var asyncResult = new ReadWriteTask(false /*isRead*/, apm, delegate
  397. {
  398. // The ReadWriteTask stores all of the parameters to pass to Write.
  399. // As we're currently inside of it, we can get the current task
  400. // and grab the parameters from it.
  401. var thisTask = Task.InternalCurrent as ReadWriteTask;
  402. Debug.Assert(thisTask != null && thisTask._stream != null && thisTask._buffer != null,
  403. "Inside ReadWriteTask, InternalCurrent should be the ReadWriteTask, and stream and buffer should be set");
  404. try
  405. {
  406. // Do the Write
  407. thisTask._stream.Write(thisTask._buffer, thisTask._offset, thisTask._count);
  408. return 0; // not used, but signature requires a value be returned
  409. }
  410. finally
  411. {
  412. // If this implementation is part of Begin/EndXx, then the EndXx method will handle
  413. // finishing the async operation. However, if this is part of XxAsync, then there won't
  414. // be an end method, and this task is responsible for cleaning up.
  415. if (!thisTask._apm)
  416. {
  417. thisTask._stream.FinishTrackingAsyncOperation();
  418. }
  419. thisTask.ClearBeginState(); // just to help alleviate some memory pressure
  420. }
  421. }, state, this, buffer, offset, count, callback);
  422. // Schedule it
  423. if (semaphoreTask != null)
  424. RunReadWriteTaskWhenReady(semaphoreTask, asyncResult);
  425. else
  426. RunReadWriteTask(asyncResult);
  427. return asyncResult; // return it
  428. }
  429. private void RunReadWriteTaskWhenReady(Task asyncWaiter, ReadWriteTask readWriteTask)
  430. {
  431. Debug.Assert(readWriteTask != null);
  432. Debug.Assert(asyncWaiter != null);
  433. // If the wait has already completed, run the task.
  434. if (asyncWaiter.IsCompleted)
  435. {
  436. Debug.Assert(asyncWaiter.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
  437. RunReadWriteTask(readWriteTask);
  438. }
  439. else // Otherwise, wait for our turn, and then run the task.
  440. {
  441. asyncWaiter.ContinueWith((t, state) =>
  442. {
  443. Debug.Assert(t.IsCompletedSuccessfully, "The semaphore wait should always complete successfully.");
  444. var rwt = (ReadWriteTask)state!;
  445. Debug.Assert(rwt._stream != null);
  446. rwt._stream.RunReadWriteTask(rwt); // RunReadWriteTask(readWriteTask);
  447. }, readWriteTask, default, TaskContinuationOptions.ExecuteSynchronously, TaskScheduler.Default);
  448. }
  449. }
  450. private void RunReadWriteTask(ReadWriteTask readWriteTask)
  451. {
  452. Debug.Assert(readWriteTask != null);
  453. Debug.Assert(_activeReadWriteTask == null, "Expected no other readers or writers");
  454. // Schedule the task. ScheduleAndStart must happen after the write to _activeReadWriteTask to avoid a race.
  455. // Internally, we're able to directly call ScheduleAndStart rather than Start, avoiding
  456. // two interlocked operations. However, if ReadWriteTask is ever changed to use
  457. // a cancellation token, this should be changed to use Start.
  458. _activeReadWriteTask = readWriteTask; // store the task so that EndXx can validate it's given the right one
  459. readWriteTask.m_taskScheduler = TaskScheduler.Default;
  460. readWriteTask.ScheduleAndStart(needsProtection: false);
  461. }
  462. private void FinishTrackingAsyncOperation()
  463. {
  464. _activeReadWriteTask = null;
  465. Debug.Assert(_asyncActiveSemaphore != null, "Must have been initialized in order to get here.");
  466. _asyncActiveSemaphore.Release();
  467. }
  468. public virtual void EndWrite(IAsyncResult asyncResult)
  469. {
  470. if (asyncResult == null)
  471. throw new ArgumentNullException(nameof(asyncResult));
  472. var writeTask = _activeReadWriteTask;
  473. if (writeTask == null)
  474. {
  475. throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
  476. }
  477. else if (writeTask != asyncResult)
  478. {
  479. throw new InvalidOperationException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
  480. }
  481. else if (writeTask._isRead)
  482. {
  483. throw new ArgumentException(SR.InvalidOperation_WrongAsyncResultOrEndWriteCalledMultiple);
  484. }
  485. try
  486. {
  487. writeTask.GetAwaiter().GetResult(); // block until completion, then propagate any exceptions
  488. Debug.Assert(writeTask.Status == TaskStatus.RanToCompletion);
  489. }
  490. finally
  491. {
  492. FinishTrackingAsyncOperation();
  493. }
  494. }
  495. // Task used by BeginRead / BeginWrite to do Read / Write asynchronously.
  496. // A single instance of this task serves four purposes:
  497. // 1. The work item scheduled to run the Read / Write operation
  498. // 2. The state holding the arguments to be passed to Read / Write
  499. // 3. The IAsyncResult returned from BeginRead / BeginWrite
  500. // 4. The completion action that runs to invoke the user-provided callback.
  501. // This last item is a bit tricky. Before the AsyncCallback is invoked, the
  502. // IAsyncResult must have completed, so we can't just invoke the handler
  503. // from within the task, since it is the IAsyncResult, and thus it's not
  504. // yet completed. Instead, we use AddCompletionAction to install this
  505. // task as its own completion handler. That saves the need to allocate
  506. // a separate completion handler, it guarantees that the task will
  507. // have completed by the time the handler is invoked, and it allows
  508. // the handler to be invoked synchronously upon the completion of the
  509. // task. This all enables BeginRead / BeginWrite to be implemented
  510. // with a single allocation.
  511. private sealed class ReadWriteTask : Task<int>, ITaskCompletionAction
  512. {
  513. internal readonly bool _isRead;
  514. internal readonly bool _apm; // true if this is from Begin/EndXx; false if it's from XxAsync
  515. internal Stream? _stream;
  516. internal byte[]? _buffer;
  517. internal readonly int _offset;
  518. internal readonly int _count;
  519. private AsyncCallback? _callback;
  520. private ExecutionContext? _context;
  521. internal void ClearBeginState() // Used to allow the args to Read/Write to be made available for GC
  522. {
  523. _stream = null;
  524. _buffer = null;
  525. }
  526. public ReadWriteTask(
  527. bool isRead,
  528. bool apm,
  529. Func<object?, int> function, object? state,
  530. Stream stream, byte[] buffer, int offset, int count, AsyncCallback? callback) :
  531. base(function, state, CancellationToken.None, TaskCreationOptions.DenyChildAttach)
  532. {
  533. Debug.Assert(function != null);
  534. Debug.Assert(stream != null);
  535. Debug.Assert(buffer != null);
  536. // Store the arguments
  537. _isRead = isRead;
  538. _apm = apm;
  539. _stream = stream;
  540. _buffer = buffer;
  541. _offset = offset;
  542. _count = count;
  543. // If a callback was provided, we need to:
  544. // - Store the user-provided handler
  545. // - Capture an ExecutionContext under which to invoke the handler
  546. // - Add this task as its own completion handler so that the Invoke method
  547. // will run the callback when this task completes.
  548. if (callback != null)
  549. {
  550. _callback = callback;
  551. _context = ExecutionContext.Capture();
  552. base.AddCompletionAction(this);
  553. }
  554. }
  555. private static void InvokeAsyncCallback(object? completedTask)
  556. {
  557. Debug.Assert(completedTask is ReadWriteTask);
  558. var rwc = (ReadWriteTask)completedTask;
  559. var callback = rwc._callback;
  560. Debug.Assert(callback != null);
  561. rwc._callback = null;
  562. callback(rwc);
  563. }
  564. private static ContextCallback? s_invokeAsyncCallback;
  565. void ITaskCompletionAction.Invoke(Task completingTask)
  566. {
  567. // Get the ExecutionContext. If there is none, just run the callback
  568. // directly, passing in the completed task as the IAsyncResult.
  569. // If there is one, process it with ExecutionContext.Run.
  570. var context = _context;
  571. if (context == null)
  572. {
  573. var callback = _callback;
  574. Debug.Assert(callback != null);
  575. _callback = null;
  576. callback(completingTask);
  577. }
  578. else
  579. {
  580. _context = null;
  581. var invokeAsyncCallback = s_invokeAsyncCallback;
  582. if (invokeAsyncCallback == null) s_invokeAsyncCallback = invokeAsyncCallback = InvokeAsyncCallback; // benign race condition
  583. ExecutionContext.RunInternal(context, invokeAsyncCallback, this);
  584. }
  585. }
  586. bool ITaskCompletionAction.InvokeMayRunArbitraryCode { get { return true; } }
  587. }
  588. public Task WriteAsync(byte[] buffer, int offset, int count)
  589. {
  590. return WriteAsync(buffer, offset, count, CancellationToken.None);
  591. }
  592. public virtual Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  593. {
  594. // If cancellation was requested, bail early with an already completed task.
  595. // Otherwise, return a task that represents the Begin/End methods.
  596. return cancellationToken.IsCancellationRequested
  597. ? Task.FromCanceled(cancellationToken)
  598. : BeginEndWriteAsync(buffer, offset, count);
  599. }
  600. public virtual ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
  601. {
  602. if (MemoryMarshal.TryGetArray(buffer, out ArraySegment<byte> array))
  603. {
  604. return new ValueTask(WriteAsync(array.Array!, array.Offset, array.Count, cancellationToken));
  605. }
  606. else
  607. {
  608. byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
  609. buffer.Span.CopyTo(sharedBuffer);
  610. return new ValueTask(FinishWriteAsync(WriteAsync(sharedBuffer, 0, buffer.Length, cancellationToken), sharedBuffer));
  611. }
  612. }
  613. private async Task FinishWriteAsync(Task writeTask, byte[] localBuffer)
  614. {
  615. try
  616. {
  617. await writeTask.ConfigureAwait(false);
  618. }
  619. finally
  620. {
  621. ArrayPool<byte>.Shared.Return(localBuffer);
  622. }
  623. }
  624. private Task BeginEndWriteAsync(byte[] buffer, int offset, int count)
  625. {
  626. if (!HasOverriddenBeginEndWrite())
  627. {
  628. // If the Stream does not override Begin/EndWrite, then we can take an optimized path
  629. // that skips an extra layer of tasks / IAsyncResults.
  630. return (Task)BeginWriteInternal(buffer, offset, count, null, null, serializeAsynchronously: true, apm: false);
  631. }
  632. // Otherwise, we need to wrap calls to Begin/EndWrite to ensure we use the derived type's functionality.
  633. return TaskFactory<VoidTaskResult>.FromAsyncTrim(
  634. this, new ReadWriteParameters { Buffer = buffer, Offset = offset, Count = count },
  635. (stream, args, callback, state) => stream.BeginWrite(args.Buffer, args.Offset, args.Count, callback, state), // cached by compiler
  636. (stream, asyncResult) => // cached by compiler
  637. {
  638. stream.EndWrite(asyncResult);
  639. return default;
  640. });
  641. }
  642. public abstract long Seek(long offset, SeekOrigin origin);
  643. public abstract void SetLength(long value);
  644. public abstract int Read(byte[] buffer, int offset, int count);
  645. public virtual int Read(Span<byte> buffer)
  646. {
  647. byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
  648. try
  649. {
  650. int numRead = Read(sharedBuffer, 0, buffer.Length);
  651. if ((uint)numRead > (uint)buffer.Length)
  652. {
  653. throw new IOException(SR.IO_StreamTooLong);
  654. }
  655. new Span<byte>(sharedBuffer, 0, numRead).CopyTo(buffer);
  656. return numRead;
  657. }
  658. finally { ArrayPool<byte>.Shared.Return(sharedBuffer); }
  659. }
  660. // Reads one byte from the stream by calling Read(byte[], int, int).
  661. // Will return an unsigned byte cast to an int or -1 on end of stream.
  662. // This implementation does not perform well because it allocates a new
  663. // byte[] each time you call it, and should be overridden by any
  664. // subclass that maintains an internal buffer. Then, it can help perf
  665. // significantly for people who are reading one byte at a time.
  666. public virtual int ReadByte()
  667. {
  668. byte[] oneByteArray = new byte[1];
  669. int r = Read(oneByteArray, 0, 1);
  670. if (r == 0)
  671. return -1;
  672. return oneByteArray[0];
  673. }
  674. public abstract void Write(byte[] buffer, int offset, int count);
  675. public virtual void Write(ReadOnlySpan<byte> buffer)
  676. {
  677. byte[] sharedBuffer = ArrayPool<byte>.Shared.Rent(buffer.Length);
  678. try
  679. {
  680. buffer.CopyTo(sharedBuffer);
  681. Write(sharedBuffer, 0, buffer.Length);
  682. }
  683. finally { ArrayPool<byte>.Shared.Return(sharedBuffer); }
  684. }
  685. // Writes one byte from the stream by calling Write(byte[], int, int).
  686. // This implementation does not perform well because it allocates a new
  687. // byte[] each time you call it, and should be overridden by any
  688. // subclass that maintains an internal buffer. Then, it can help perf
  689. // significantly for people who are writing one byte at a time.
  690. public virtual void WriteByte(byte value)
  691. {
  692. byte[] oneByteArray = new byte[1];
  693. oneByteArray[0] = value;
  694. Write(oneByteArray, 0, 1);
  695. }
  696. public static Stream Synchronized(Stream stream)
  697. {
  698. if (stream == null)
  699. throw new ArgumentNullException(nameof(stream));
  700. if (stream is SyncStream)
  701. return stream;
  702. return new SyncStream(stream);
  703. }
  704. [Obsolete("Do not call or override this method.")]
  705. protected virtual void ObjectInvariant()
  706. {
  707. }
  708. internal IAsyncResult BlockingBeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  709. {
  710. // To avoid a race with a stream's position pointer & generating conditions
  711. // with internal buffer indexes in our own streams that
  712. // don't natively support async IO operations when there are multiple
  713. // async requests outstanding, we will block the application's main
  714. // thread and do the IO synchronously.
  715. // This can't perform well - use a different approach.
  716. SynchronousAsyncResult asyncResult;
  717. try
  718. {
  719. int numRead = Read(buffer, offset, count);
  720. asyncResult = new SynchronousAsyncResult(numRead, state);
  721. }
  722. catch (IOException ex)
  723. {
  724. asyncResult = new SynchronousAsyncResult(ex, state, isWrite: false);
  725. }
  726. if (callback != null)
  727. {
  728. callback(asyncResult);
  729. }
  730. return asyncResult;
  731. }
  732. internal static int BlockingEndRead(IAsyncResult asyncResult)
  733. {
  734. return SynchronousAsyncResult.EndRead(asyncResult);
  735. }
  736. internal IAsyncResult BlockingBeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  737. {
  738. // To avoid a race condition with a stream's position pointer & generating conditions
  739. // with internal buffer indexes in our own streams that
  740. // don't natively support async IO operations when there are multiple
  741. // async requests outstanding, we will block the application's main
  742. // thread and do the IO synchronously.
  743. // This can't perform well - use a different approach.
  744. SynchronousAsyncResult asyncResult;
  745. try
  746. {
  747. Write(buffer, offset, count);
  748. asyncResult = new SynchronousAsyncResult(state);
  749. }
  750. catch (IOException ex)
  751. {
  752. asyncResult = new SynchronousAsyncResult(ex, state, isWrite: true);
  753. }
  754. if (callback != null)
  755. {
  756. callback(asyncResult);
  757. }
  758. return asyncResult;
  759. }
  760. internal static void BlockingEndWrite(IAsyncResult asyncResult)
  761. {
  762. SynchronousAsyncResult.EndWrite(asyncResult);
  763. }
  764. private sealed class NullStream : Stream
  765. {
  766. private static readonly Task<int> s_zeroTask = Task.FromResult(0);
  767. internal NullStream() { }
  768. public override bool CanRead => true;
  769. public override bool CanWrite => true;
  770. public override bool CanSeek => true;
  771. public override long Length => 0;
  772. public override long Position
  773. {
  774. get { return 0; }
  775. set { }
  776. }
  777. public override void CopyTo(Stream destination, int bufferSize)
  778. {
  779. StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
  780. // After we validate arguments this is a nop.
  781. }
  782. public override Task CopyToAsync(Stream destination, int bufferSize, CancellationToken cancellationToken)
  783. {
  784. // Validate arguments here for compat, since previously this method
  785. // was inherited from Stream (which did check its arguments).
  786. StreamHelpers.ValidateCopyToArgs(this, destination, bufferSize);
  787. return cancellationToken.IsCancellationRequested ?
  788. Task.FromCanceled(cancellationToken) :
  789. Task.CompletedTask;
  790. }
  791. protected override void Dispose(bool disposing)
  792. {
  793. // Do nothing - we don't want NullStream singleton (static) to be closable
  794. }
  795. public override void Flush()
  796. {
  797. }
  798. public override Task FlushAsync(CancellationToken cancellationToken)
  799. {
  800. return cancellationToken.IsCancellationRequested ?
  801. Task.FromCanceled(cancellationToken) :
  802. Task.CompletedTask;
  803. }
  804. public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  805. {
  806. if (!CanRead) throw Error.GetReadNotSupported();
  807. return BlockingBeginRead(buffer, offset, count, callback, state);
  808. }
  809. public override int EndRead(IAsyncResult asyncResult)
  810. {
  811. if (asyncResult == null)
  812. throw new ArgumentNullException(nameof(asyncResult));
  813. return BlockingEndRead(asyncResult);
  814. }
  815. public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  816. {
  817. if (!CanWrite) throw Error.GetWriteNotSupported();
  818. return BlockingBeginWrite(buffer, offset, count, callback, state);
  819. }
  820. public override void EndWrite(IAsyncResult asyncResult)
  821. {
  822. if (asyncResult == null)
  823. throw new ArgumentNullException(nameof(asyncResult));
  824. BlockingEndWrite(asyncResult);
  825. }
  826. public override int Read(byte[] buffer, int offset, int count)
  827. {
  828. return 0;
  829. }
  830. public override int Read(Span<byte> buffer)
  831. {
  832. return 0;
  833. }
  834. public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  835. {
  836. return s_zeroTask;
  837. }
  838. public override ValueTask<int> ReadAsync(Memory<byte> buffer, CancellationToken cancellationToken = default)
  839. {
  840. return new ValueTask<int>(0);
  841. }
  842. public override int ReadByte()
  843. {
  844. return -1;
  845. }
  846. public override void Write(byte[] buffer, int offset, int count)
  847. {
  848. }
  849. public override void Write(ReadOnlySpan<byte> buffer)
  850. {
  851. }
  852. public override Task WriteAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  853. {
  854. return cancellationToken.IsCancellationRequested ?
  855. Task.FromCanceled(cancellationToken) :
  856. Task.CompletedTask;
  857. }
  858. public override ValueTask WriteAsync(ReadOnlyMemory<byte> buffer, CancellationToken cancellationToken = default)
  859. {
  860. return cancellationToken.IsCancellationRequested ?
  861. new ValueTask(Task.FromCanceled(cancellationToken)) :
  862. default;
  863. }
  864. public override void WriteByte(byte value)
  865. {
  866. }
  867. public override long Seek(long offset, SeekOrigin origin)
  868. {
  869. return 0;
  870. }
  871. public override void SetLength(long length)
  872. {
  873. }
  874. }
  875. /// <summary>Used as the IAsyncResult object when using asynchronous IO methods on the base Stream class.</summary>
  876. private sealed class SynchronousAsyncResult : IAsyncResult
  877. {
  878. private readonly object? _stateObject;
  879. private readonly bool _isWrite;
  880. private ManualResetEvent? _waitHandle;
  881. private ExceptionDispatchInfo? _exceptionInfo;
  882. private bool _endXxxCalled;
  883. private int _bytesRead;
  884. internal SynchronousAsyncResult(int bytesRead, object? asyncStateObject)
  885. {
  886. _bytesRead = bytesRead;
  887. _stateObject = asyncStateObject;
  888. //_isWrite = false;
  889. }
  890. internal SynchronousAsyncResult(object? asyncStateObject)
  891. {
  892. _stateObject = asyncStateObject;
  893. _isWrite = true;
  894. }
  895. internal SynchronousAsyncResult(Exception ex, object? asyncStateObject, bool isWrite)
  896. {
  897. _exceptionInfo = ExceptionDispatchInfo.Capture(ex);
  898. _stateObject = asyncStateObject;
  899. _isWrite = isWrite;
  900. }
  901. public bool IsCompleted
  902. {
  903. // We never hand out objects of this type to the user before the synchronous IO completed:
  904. get { return true; }
  905. }
  906. public WaitHandle AsyncWaitHandle
  907. {
  908. get
  909. {
  910. return LazyInitializer.EnsureInitialized(ref _waitHandle, () => new ManualResetEvent(true));
  911. }
  912. }
  913. public object? AsyncState
  914. {
  915. get { return _stateObject; }
  916. }
  917. public bool CompletedSynchronously
  918. {
  919. get { return true; }
  920. }
  921. internal void ThrowIfError()
  922. {
  923. if (_exceptionInfo != null)
  924. _exceptionInfo.Throw();
  925. }
  926. internal static int EndRead(IAsyncResult asyncResult)
  927. {
  928. if (!(asyncResult is SynchronousAsyncResult ar) || ar._isWrite)
  929. throw new ArgumentException(SR.Arg_WrongAsyncResult);
  930. if (ar._endXxxCalled)
  931. throw new ArgumentException(SR.InvalidOperation_EndReadCalledMultiple);
  932. ar._endXxxCalled = true;
  933. ar.ThrowIfError();
  934. return ar._bytesRead;
  935. }
  936. internal static void EndWrite(IAsyncResult asyncResult)
  937. {
  938. if (!(asyncResult is SynchronousAsyncResult ar) || !ar._isWrite)
  939. throw new ArgumentException(SR.Arg_WrongAsyncResult);
  940. if (ar._endXxxCalled)
  941. throw new ArgumentException(SR.InvalidOperation_EndWriteCalledMultiple);
  942. ar._endXxxCalled = true;
  943. ar.ThrowIfError();
  944. }
  945. } // class SynchronousAsyncResult
  946. // SyncStream is a wrapper around a stream that takes
  947. // a lock for every operation making it thread safe.
  948. private sealed class SyncStream : Stream, IDisposable
  949. {
  950. private Stream _stream;
  951. internal SyncStream(Stream stream)
  952. {
  953. if (stream == null)
  954. throw new ArgumentNullException(nameof(stream));
  955. _stream = stream;
  956. }
  957. public override bool CanRead => _stream.CanRead;
  958. public override bool CanWrite => _stream.CanWrite;
  959. public override bool CanSeek => _stream.CanSeek;
  960. public override bool CanTimeout => _stream.CanTimeout;
  961. public override long Length
  962. {
  963. get
  964. {
  965. lock (_stream)
  966. {
  967. return _stream.Length;
  968. }
  969. }
  970. }
  971. public override long Position
  972. {
  973. get
  974. {
  975. lock (_stream)
  976. {
  977. return _stream.Position;
  978. }
  979. }
  980. set
  981. {
  982. lock (_stream)
  983. {
  984. _stream.Position = value;
  985. }
  986. }
  987. }
  988. public override int ReadTimeout
  989. {
  990. get
  991. {
  992. return _stream.ReadTimeout;
  993. }
  994. set
  995. {
  996. _stream.ReadTimeout = value;
  997. }
  998. }
  999. public override int WriteTimeout
  1000. {
  1001. get
  1002. {
  1003. return _stream.WriteTimeout;
  1004. }
  1005. set
  1006. {
  1007. _stream.WriteTimeout = value;
  1008. }
  1009. }
  1010. // In the off chance that some wrapped stream has different
  1011. // semantics for Close vs. Dispose, let's preserve that.
  1012. public override void Close()
  1013. {
  1014. lock (_stream)
  1015. {
  1016. try
  1017. {
  1018. _stream.Close();
  1019. }
  1020. finally
  1021. {
  1022. base.Dispose(true);
  1023. }
  1024. }
  1025. }
  1026. protected override void Dispose(bool disposing)
  1027. {
  1028. lock (_stream)
  1029. {
  1030. try
  1031. {
  1032. // Explicitly pick up a potentially methodimpl'ed Dispose
  1033. if (disposing)
  1034. ((IDisposable)_stream).Dispose();
  1035. }
  1036. finally
  1037. {
  1038. base.Dispose(disposing);
  1039. }
  1040. }
  1041. }
  1042. public override ValueTask DisposeAsync()
  1043. {
  1044. lock (_stream)
  1045. return _stream.DisposeAsync();
  1046. }
  1047. public override void Flush()
  1048. {
  1049. lock (_stream)
  1050. _stream.Flush();
  1051. }
  1052. public override int Read(byte[] bytes, int offset, int count)
  1053. {
  1054. lock (_stream)
  1055. return _stream.Read(bytes, offset, count);
  1056. }
  1057. public override int Read(Span<byte> buffer)
  1058. {
  1059. lock (_stream)
  1060. return _stream.Read(buffer);
  1061. }
  1062. public override int ReadByte()
  1063. {
  1064. lock (_stream)
  1065. return _stream.ReadByte();
  1066. }
  1067. public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  1068. {
  1069. #if CORERT
  1070. throw new NotImplementedException(); // TODO: https://github.com/dotnet/corert/issues/3251
  1071. #else
  1072. bool overridesBeginRead = _stream.HasOverriddenBeginEndRead();
  1073. lock (_stream)
  1074. {
  1075. // If the Stream does have its own BeginRead implementation, then we must use that override.
  1076. // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
  1077. // which ensures only one asynchronous operation does so with an asynchronous wait rather
  1078. // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
  1079. // the EndXx method for the outstanding async operation won't be able to acquire the lock on
  1080. // _stream due to this call blocked while holding the lock.
  1081. return overridesBeginRead ?
  1082. _stream.BeginRead(buffer, offset, count, callback, state) :
  1083. _stream.BeginReadInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
  1084. }
  1085. #endif
  1086. }
  1087. public override int EndRead(IAsyncResult asyncResult)
  1088. {
  1089. if (asyncResult == null)
  1090. throw new ArgumentNullException(nameof(asyncResult));
  1091. lock (_stream)
  1092. return _stream.EndRead(asyncResult);
  1093. }
  1094. public override long Seek(long offset, SeekOrigin origin)
  1095. {
  1096. lock (_stream)
  1097. return _stream.Seek(offset, origin);
  1098. }
  1099. public override void SetLength(long length)
  1100. {
  1101. lock (_stream)
  1102. _stream.SetLength(length);
  1103. }
  1104. public override void Write(byte[] bytes, int offset, int count)
  1105. {
  1106. lock (_stream)
  1107. _stream.Write(bytes, offset, count);
  1108. }
  1109. public override void Write(ReadOnlySpan<byte> buffer)
  1110. {
  1111. lock (_stream)
  1112. _stream.Write(buffer);
  1113. }
  1114. public override void WriteByte(byte b)
  1115. {
  1116. lock (_stream)
  1117. _stream.WriteByte(b);
  1118. }
  1119. public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object? state)
  1120. {
  1121. #if CORERT
  1122. throw new NotImplementedException(); // TODO: https://github.com/dotnet/corert/issues/3251
  1123. #else
  1124. bool overridesBeginWrite = _stream.HasOverriddenBeginEndWrite();
  1125. lock (_stream)
  1126. {
  1127. // If the Stream does have its own BeginWrite implementation, then we must use that override.
  1128. // If it doesn't, then we'll use the base implementation, but we'll make sure that the logic
  1129. // which ensures only one asynchronous operation does so with an asynchronous wait rather
  1130. // than a synchronous wait. A synchronous wait will result in a deadlock condition, because
  1131. // the EndXx method for the outstanding async operation won't be able to acquire the lock on
  1132. // _stream due to this call blocked while holding the lock.
  1133. return overridesBeginWrite ?
  1134. _stream.BeginWrite(buffer, offset, count, callback, state) :
  1135. _stream.BeginWriteInternal(buffer, offset, count, callback, state, serializeAsynchronously: true, apm: true);
  1136. }
  1137. #endif
  1138. }
  1139. public override void EndWrite(IAsyncResult asyncResult)
  1140. {
  1141. if (asyncResult == null)
  1142. throw new ArgumentNullException(nameof(asyncResult));
  1143. lock (_stream)
  1144. _stream.EndWrite(asyncResult);
  1145. }
  1146. }
  1147. }
  1148. }