BufferedOutputAsyncStream.cs 33 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Globalization;
  9. using System.IO;
  10. using System.Runtime;
  11. using System.Runtime.Diagnostics;
  12. using System.ServiceModel;
  13. using System.ServiceModel.Diagnostics.Application;
  14. using System.Threading;
  15. /// <summary>
  16. ///
  17. /// BufferedOutputAsyncStream is used for writing streamed response.
  18. /// For performance reasons, the behavior we want is chunk, chunk, chunk,.. terminating chunk without a delay.
  19. /// We call BeginWrite,BeginWrite,BeginWrite and Close()(close sends the terminating chunk) without
  20. /// waiting for all outstanding BeginWrites to complete.
  21. ///
  22. /// BufferedOutputAsyncStream is not a general-purpose stream wrapper, it requires that the base stream
  23. /// 1. allow concurrent IO (for multiple BeginWrite calls)
  24. /// 2. support the BeginWrite,BeginWrite,BeginWrite,.. Close() calling pattern.
  25. ///
  26. /// Currently BufferedOutputAsyncStream only used to wrap the System.Net.HttpResponseStream, which satisfy both requirements.
  27. ///
  28. /// BufferedOutputAsyncStream can also be used when doing asynchronous operations. [....] operations are not allowed when an async
  29. /// operation is in-flight. If a [....] operation is in progress (i.e., data exists in our CurrentBuffer) and we issue an async operation,
  30. /// we flush everything in the buffers (and block while doing so) before the async operation is allowed to proceed.
  31. ///
  32. /// </summary>
  33. class BufferedOutputAsyncStream : Stream
  34. {
  35. readonly Stream stream;
  36. readonly int bufferSize;
  37. readonly int bufferLimit;
  38. readonly BufferQueue buffers;
  39. ByteBuffer currentByteBuffer;
  40. int availableBufferCount;
  41. static AsyncEventArgsCallback onFlushComplete = new AsyncEventArgsCallback(OnFlushComplete);
  42. int asyncWriteCount;
  43. WriteAsyncState writeState;
  44. WriteAsyncArgs writeArgs;
  45. static AsyncEventArgsCallback onAsyncFlushComplete;
  46. static AsyncEventArgsCallback onWriteCallback;
  47. EventTraceActivity activity;
  48. bool closed;
  49. internal BufferedOutputAsyncStream(Stream stream, int bufferSize, int bufferLimit)
  50. {
  51. this.stream = stream;
  52. this.bufferSize = bufferSize;
  53. this.bufferLimit = bufferLimit;
  54. this.buffers = new BufferQueue(this.bufferLimit);
  55. this.buffers.Add(new ByteBuffer(this, this.bufferSize, stream));
  56. this.availableBufferCount = 1;
  57. }
  58. public override bool CanRead
  59. {
  60. get { return false; }
  61. }
  62. public override bool CanSeek
  63. {
  64. get { return false; }
  65. }
  66. public override bool CanWrite
  67. {
  68. get { return stream.CanWrite && (!this.closed); }
  69. }
  70. public override long Length
  71. {
  72. get
  73. {
  74. #pragma warning suppress 56503 // [....], required by the Stream.Length contract
  75. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported)));
  76. }
  77. }
  78. public override long Position
  79. {
  80. get
  81. {
  82. #pragma warning suppress 56503 // [....], required by the Stream.Position contract
  83. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  84. }
  85. set
  86. {
  87. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  88. }
  89. }
  90. internal EventTraceActivity EventTraceActivity
  91. {
  92. get
  93. {
  94. if (TD.BufferedAsyncWriteStartIsEnabled())
  95. {
  96. if (this.activity == null)
  97. {
  98. this.activity = EventTraceActivity.GetFromThreadOrCreate();
  99. }
  100. }
  101. return this.activity;
  102. }
  103. }
  104. ByteBuffer GetCurrentBuffer()
  105. {
  106. // Dequeue will null out the buffer
  107. this.ThrowOnException();
  108. if (this.currentByteBuffer == null)
  109. {
  110. this.currentByteBuffer = this.buffers.CurrentBuffer();
  111. }
  112. return this.currentByteBuffer;
  113. }
  114. public override void Close()
  115. {
  116. this.FlushPendingBuffer();
  117. stream.Close();
  118. this.WaitForAllWritesToComplete();
  119. this.closed = true;
  120. }
  121. public override void Flush()
  122. {
  123. FlushPendingBuffer();
  124. stream.Flush();
  125. }
  126. void FlushPendingBuffer()
  127. {
  128. ByteBuffer asyncBuffer = this.buffers.CurrentBuffer();
  129. if (asyncBuffer != null)
  130. {
  131. this.DequeueAndFlush(asyncBuffer, onFlushComplete);
  132. }
  133. }
  134. void IncrementAsyncWriteCount()
  135. {
  136. if (Interlocked.Increment(ref this.asyncWriteCount) > 1)
  137. {
  138. throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.WriterAsyncWritePending)));
  139. }
  140. }
  141. void DecrementAsyncWriteCount()
  142. {
  143. if (Interlocked.Decrement(ref this.asyncWriteCount) != 0)
  144. {
  145. throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.NoAsyncWritePending)));
  146. }
  147. }
  148. void EnsureNoAsyncWritePending()
  149. {
  150. if (this.asyncWriteCount != 0)
  151. {
  152. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.WriterAsyncWritePending)));
  153. }
  154. }
  155. void EnsureOpened()
  156. {
  157. if (this.closed)
  158. {
  159. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.StreamClosed)));
  160. }
  161. }
  162. ByteBuffer NextBuffer()
  163. {
  164. if (!this.AdjustBufferSize())
  165. {
  166. this.buffers.WaitForAny();
  167. }
  168. return this.GetCurrentBuffer();
  169. }
  170. bool AdjustBufferSize()
  171. {
  172. if (this.availableBufferCount < this.bufferLimit)
  173. {
  174. buffers.Add(new ByteBuffer(this, bufferSize, stream));
  175. this.availableBufferCount++;
  176. return true;
  177. }
  178. return false;
  179. }
  180. public override int Read(byte[] buffer, int offset, int count)
  181. {
  182. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported)));
  183. }
  184. public override int ReadByte()
  185. {
  186. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.ReadNotSupported)));
  187. }
  188. public override long Seek(long offset, SeekOrigin origin)
  189. {
  190. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  191. }
  192. public override void SetLength(long value)
  193. {
  194. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  195. }
  196. void WaitForAllWritesToComplete()
  197. {
  198. // Complete all outstanding writes
  199. this.buffers.WaitForAllWritesToComplete();
  200. }
  201. public override void Write(byte[] buffer, int offset, int count)
  202. {
  203. this.EnsureOpened();
  204. this.EnsureNoAsyncWritePending();
  205. while (count > 0)
  206. {
  207. ByteBuffer currentBuffer = this.GetCurrentBuffer();
  208. if (currentBuffer == null)
  209. {
  210. currentBuffer = this.NextBuffer();
  211. }
  212. int freeBytes = currentBuffer.FreeBytes; // space left in the CurrentBuffer
  213. if (freeBytes > 0)
  214. {
  215. if (freeBytes > count)
  216. freeBytes = count;
  217. currentBuffer.CopyData(buffer, offset, freeBytes);
  218. offset += freeBytes;
  219. count -= freeBytes;
  220. }
  221. if (currentBuffer.FreeBytes == 0)
  222. {
  223. this.DequeueAndFlush(currentBuffer, onFlushComplete);
  224. }
  225. }
  226. }
  227. public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  228. {
  229. this.EnsureOpened();
  230. this.IncrementAsyncWriteCount();
  231. Fx.Assert(this.writeState == null ||
  232. this.writeState.Arguments == null ||
  233. this.writeState.Arguments.Count <= 0,
  234. "All data has not been written yet.");
  235. if (onWriteCallback == null)
  236. {
  237. onWriteCallback = new AsyncEventArgsCallback(OnWriteCallback);
  238. onAsyncFlushComplete = new AsyncEventArgsCallback(OnAsyncFlushComplete);
  239. }
  240. if (this.writeState == null)
  241. {
  242. this.writeState = new WriteAsyncState();
  243. this.writeArgs = new WriteAsyncArgs();
  244. }
  245. else
  246. {
  247. // Since writeState!= null, check if the stream has an
  248. // exception as the async path has already been invoked.
  249. this.ThrowOnException();
  250. }
  251. this.writeArgs.Set(buffer, offset, count, callback, state);
  252. this.writeState.Set(onWriteCallback, this.writeArgs, this);
  253. if (this.WriteAsync(this.writeState) == AsyncCompletionResult.Completed)
  254. {
  255. this.writeState.Complete(true);
  256. if (callback != null)
  257. {
  258. callback(this.writeState.CompletedSynchronouslyAsyncResult);
  259. }
  260. return this.writeState.CompletedSynchronouslyAsyncResult;
  261. }
  262. return this.writeState.PendingAsyncResult;
  263. }
  264. public override void EndWrite(IAsyncResult asyncResult)
  265. {
  266. this.DecrementAsyncWriteCount();
  267. this.ThrowOnException();
  268. }
  269. public override void WriteByte(byte value)
  270. {
  271. this.EnsureNoAsyncWritePending();
  272. ByteBuffer currentBuffer = this.GetCurrentBuffer();
  273. if (currentBuffer == null)
  274. {
  275. currentBuffer = NextBuffer();
  276. }
  277. currentBuffer.CopyData(value);
  278. if (currentBuffer.FreeBytes == 0)
  279. {
  280. this.DequeueAndFlush(currentBuffer, onFlushComplete);
  281. }
  282. }
  283. void DequeueAndFlush(ByteBuffer currentBuffer, AsyncEventArgsCallback callback)
  284. {
  285. // Dequeue does a checkout of the buffer from its slot.
  286. // the callback for the [....] path only enqueues the buffer.
  287. // The WriteAsync callback needs to enqueue and also complete.
  288. this.currentByteBuffer = null;
  289. ByteBuffer dequeued = this.buffers.Dequeue();
  290. Fx.Assert(dequeued == currentBuffer, "Buffer queue in an inconsistent state.");
  291. WriteFlushAsyncEventArgs writeflushState = (WriteFlushAsyncEventArgs)currentBuffer.FlushAsyncArgs;
  292. if (writeflushState == null)
  293. {
  294. writeflushState = new WriteFlushAsyncEventArgs();
  295. currentBuffer.FlushAsyncArgs = writeflushState;
  296. }
  297. writeflushState.Set(callback, null, this);
  298. if (currentBuffer.FlushAsync() == AsyncCompletionResult.Completed)
  299. {
  300. this.buffers.Enqueue(currentBuffer);
  301. writeflushState.Complete(true);
  302. }
  303. }
  304. static void OnFlushComplete(IAsyncEventArgs state)
  305. {
  306. BufferedOutputAsyncStream thisPtr = (BufferedOutputAsyncStream)state.AsyncState;
  307. WriteFlushAsyncEventArgs flushState = (WriteFlushAsyncEventArgs)state;
  308. ByteBuffer byteBuffer = flushState.Result;
  309. thisPtr.buffers.Enqueue(byteBuffer);
  310. }
  311. AsyncCompletionResult WriteAsync(WriteAsyncState state)
  312. {
  313. Fx.Assert(state != null && state.Arguments != null, "Invalid WriteAsyncState parameter.");
  314. if (state.Arguments.Count == 0)
  315. {
  316. return AsyncCompletionResult.Completed;
  317. }
  318. byte[] buffer = state.Arguments.Buffer;
  319. int offset = state.Arguments.Offset;
  320. int count = state.Arguments.Count;
  321. ByteBuffer currentBuffer = this.GetCurrentBuffer();
  322. while (count > 0)
  323. {
  324. if (currentBuffer == null)
  325. {
  326. throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.WriteAsyncWithoutFreeBuffer)));
  327. }
  328. int freeBytes = currentBuffer.FreeBytes; // space left in the CurrentBuffer
  329. if (freeBytes > 0)
  330. {
  331. if (freeBytes > count)
  332. freeBytes = count;
  333. currentBuffer.CopyData(buffer, offset, freeBytes);
  334. offset += freeBytes;
  335. count -= freeBytes;
  336. }
  337. if (currentBuffer.FreeBytes == 0)
  338. {
  339. this.DequeueAndFlush(currentBuffer, onAsyncFlushComplete);
  340. // We might need to increase the number of buffers available
  341. // if there is more data to be written or no buffer is available.
  342. if (count > 0 || this.buffers.Count == 0)
  343. {
  344. this.AdjustBufferSize();
  345. }
  346. }
  347. //Update state for any pending writes.
  348. state.Arguments.Offset = offset;
  349. state.Arguments.Count = count;
  350. // We can complete synchronously only
  351. // if there a buffer available for writes.
  352. currentBuffer = this.GetCurrentBuffer();
  353. if (currentBuffer == null)
  354. {
  355. if (this.buffers.TryUnlock())
  356. {
  357. return AsyncCompletionResult.Queued;
  358. }
  359. currentBuffer = this.GetCurrentBuffer();
  360. }
  361. }
  362. return AsyncCompletionResult.Completed;
  363. }
  364. static void OnAsyncFlushComplete(IAsyncEventArgs state)
  365. {
  366. BufferedOutputAsyncStream thisPtr = (BufferedOutputAsyncStream)state.AsyncState;
  367. Exception completionException = null;
  368. bool completeSelf = false;
  369. try
  370. {
  371. OnFlushComplete(state);
  372. if (thisPtr.buffers.TryAcquireLock())
  373. {
  374. WriteFlushAsyncEventArgs flushState = (WriteFlushAsyncEventArgs)state;
  375. if (flushState.Exception != null)
  376. {
  377. completeSelf = true;
  378. completionException = flushState.Exception;
  379. }
  380. else
  381. {
  382. if (thisPtr.WriteAsync(thisPtr.writeState) == AsyncCompletionResult.Completed)
  383. {
  384. completeSelf = true;
  385. }
  386. }
  387. }
  388. }
  389. catch (Exception exception)
  390. {
  391. if (Fx.IsFatal(exception))
  392. {
  393. throw;
  394. }
  395. if (completionException == null)
  396. {
  397. completionException = exception;
  398. }
  399. completeSelf = true;
  400. }
  401. if (completeSelf)
  402. {
  403. thisPtr.writeState.Complete(false, completionException);
  404. }
  405. }
  406. static void OnWriteCallback(IAsyncEventArgs state)
  407. {
  408. BufferedOutputAsyncStream thisPtr = (BufferedOutputAsyncStream)state.AsyncState;
  409. IAsyncResult returnResult = thisPtr.writeState.PendingAsyncResult;
  410. AsyncCallback callback = thisPtr.writeState.Arguments.Callback;
  411. thisPtr.writeState.Arguments.Callback = null;
  412. if (callback != null)
  413. {
  414. callback(returnResult);
  415. }
  416. }
  417. void ThrowOnException()
  418. {
  419. // if any of the buffers or the write state has an
  420. // exception the stream is not usable anymore.
  421. this.buffers.ThrowOnException();
  422. if (this.writeState != null)
  423. {
  424. this.writeState.ThrowOnException();
  425. }
  426. }
  427. class BufferQueue
  428. {
  429. readonly List<ByteBuffer> refBufferList;
  430. readonly int size;
  431. readonly Slot[] buffers;
  432. Exception completionException;
  433. int head;
  434. int count;
  435. bool waiting;
  436. bool pendingCompletion;
  437. internal BufferQueue(int queueSize)
  438. {
  439. this.head = 0;
  440. this.count = 0;
  441. this.size = queueSize;
  442. this.buffers = new Slot[size];
  443. this.refBufferList = new List<ByteBuffer>();
  444. for (int i = 0; i < queueSize; i++)
  445. {
  446. Slot s = new Slot();
  447. s.checkedOut = true; //Start with all buffers checkedout.
  448. this.buffers[i] = s;
  449. }
  450. }
  451. object ThisLock
  452. {
  453. get
  454. {
  455. return this.buffers;
  456. }
  457. }
  458. internal int Count
  459. {
  460. get
  461. {
  462. lock (ThisLock)
  463. {
  464. return count;
  465. }
  466. }
  467. }
  468. internal ByteBuffer Dequeue()
  469. {
  470. Fx.Assert(!this.pendingCompletion, "Dequeue cannot be invoked when there is a pending completion");
  471. lock (ThisLock)
  472. {
  473. if (count == 0)
  474. {
  475. return null;
  476. }
  477. Slot s = buffers[head];
  478. Fx.Assert(!s.checkedOut, "This buffer is already in use.");
  479. this.head = (this.head + 1) % size;
  480. this.count--;
  481. ByteBuffer buffer = s.buffer;
  482. s.buffer = null;
  483. s.checkedOut = true;
  484. return buffer;
  485. }
  486. }
  487. internal void Add(ByteBuffer buffer)
  488. {
  489. lock (ThisLock)
  490. {
  491. Fx.Assert(this.refBufferList.Count < size, "Bufferlist is already full.");
  492. if (this.refBufferList.Count < this.size)
  493. {
  494. this.refBufferList.Add(buffer);
  495. this.Enqueue(buffer);
  496. }
  497. }
  498. }
  499. internal void Enqueue(ByteBuffer buffer)
  500. {
  501. lock (ThisLock)
  502. {
  503. this.completionException = this.completionException ?? buffer.CompletionException;
  504. Fx.Assert(count < size, "The queue is already full.");
  505. int tail = (this.head + this.count) % size;
  506. Slot s = this.buffers[tail];
  507. this.count++;
  508. Fx.Assert(s.checkedOut, "Current buffer is still free.");
  509. s.checkedOut = false;
  510. s.buffer = buffer;
  511. if (this.waiting)
  512. {
  513. Monitor.Pulse(this.ThisLock);
  514. }
  515. }
  516. }
  517. internal ByteBuffer CurrentBuffer()
  518. {
  519. lock (ThisLock)
  520. {
  521. ThrowOnException();
  522. Slot s = this.buffers[head];
  523. return s.buffer;
  524. }
  525. }
  526. internal void WaitForAllWritesToComplete()
  527. {
  528. for (int i = 0; i < this.refBufferList.Count; i++)
  529. {
  530. this.refBufferList[i].WaitForWriteComplete();
  531. }
  532. }
  533. internal void WaitForAny()
  534. {
  535. lock (ThisLock)
  536. {
  537. if (this.count == 0)
  538. {
  539. this.waiting = true;
  540. Monitor.Wait(ThisLock);
  541. this.waiting = false;
  542. }
  543. }
  544. this.ThrowOnException();
  545. }
  546. internal void ThrowOnException()
  547. {
  548. if (this.completionException != null)
  549. {
  550. throw FxTrace.Exception.AsError(this.completionException);
  551. }
  552. }
  553. internal bool TryUnlock()
  554. {
  555. // The main thread tries to indicate a pending completion
  556. // if there aren't any free buffers for the next write.
  557. // The callback should try to complete() through TryAcquireLock.
  558. lock (ThisLock)
  559. {
  560. Fx.Assert(!this.pendingCompletion, "There is already a completion pending.");
  561. if (this.count == 0)
  562. {
  563. this.pendingCompletion = true;
  564. return true;
  565. }
  566. }
  567. return false;
  568. }
  569. internal bool TryAcquireLock()
  570. {
  571. // The callback tries to acquire the lock if there is a pending completion and a free buffer.
  572. // Buffers might get dequeued by the main writing thread as soon as they are enqueued.
  573. lock (ThisLock)
  574. {
  575. if (this.pendingCompletion && this.count > 0)
  576. {
  577. this.pendingCompletion = false;
  578. return true;
  579. }
  580. }
  581. return false;
  582. }
  583. class Slot
  584. {
  585. internal bool checkedOut;
  586. internal ByteBuffer buffer;
  587. }
  588. }
  589. /// <summary>
  590. /// AsyncEventArgs used to invoke the FlushAsync() on the ByteBuffer.
  591. /// </summary>
  592. class WriteFlushAsyncEventArgs : AsyncEventArgs<object, ByteBuffer>
  593. {
  594. }
  595. class ByteBuffer
  596. {
  597. byte[] bytes;
  598. int position;
  599. Stream stream;
  600. bool writePending;
  601. bool waiting;
  602. Exception completionException;
  603. BufferedOutputAsyncStream parent;
  604. static AsyncCallback writeCallback = Fx.ThunkCallback(new AsyncCallback(WriteCallback));
  605. static AsyncCallback flushCallback;
  606. internal ByteBuffer(BufferedOutputAsyncStream parent, int bufferSize, Stream stream)
  607. {
  608. this.waiting = false;
  609. this.writePending = false;
  610. this.position = 0;
  611. this.bytes = DiagnosticUtility.Utility.AllocateByteArray(bufferSize);
  612. this.stream = stream;
  613. this.parent = parent;
  614. }
  615. object ThisLock
  616. {
  617. get { return this; }
  618. }
  619. internal Exception CompletionException
  620. {
  621. get { return this.completionException; }
  622. }
  623. internal int FreeBytes
  624. {
  625. get
  626. {
  627. return this.bytes.Length - this.position;
  628. }
  629. }
  630. internal AsyncEventArgs<object, ByteBuffer> FlushAsyncArgs
  631. {
  632. get;
  633. set;
  634. }
  635. static void WriteCallback(IAsyncResult result)
  636. {
  637. if (result.CompletedSynchronously)
  638. return;
  639. // Fetch our state information: ByteBuffer
  640. ByteBuffer buffer = (ByteBuffer)result.AsyncState;
  641. try
  642. {
  643. if (TD.BufferedAsyncWriteStopIsEnabled())
  644. {
  645. TD.BufferedAsyncWriteStop(buffer.parent.EventTraceActivity);
  646. }
  647. buffer.stream.EndWrite(result);
  648. }
  649. #pragma warning suppress 56500 // [....], transferring exception to another thread
  650. catch (Exception e)
  651. {
  652. if (Fx.IsFatal(e))
  653. {
  654. throw;
  655. }
  656. buffer.completionException = e;
  657. }
  658. // Tell the main thread we've finished.
  659. lock (buffer.ThisLock)
  660. {
  661. buffer.writePending = false;
  662. // Do not Pulse if no one is waiting, to avoid the overhead of Pulse
  663. if (!buffer.waiting)
  664. return;
  665. Monitor.Pulse(buffer.ThisLock);
  666. }
  667. }
  668. internal void WaitForWriteComplete()
  669. {
  670. lock (ThisLock)
  671. {
  672. if (this.writePending)
  673. {
  674. // Wait until the async write of this buffer is finished.
  675. this.waiting = true;
  676. Monitor.Wait(ThisLock);
  677. this.waiting = false;
  678. }
  679. }
  680. // Raise exception if necessary
  681. if (this.completionException != null)
  682. {
  683. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(completionException);
  684. }
  685. }
  686. internal void CopyData(byte[] buffer, int offset, int count)
  687. {
  688. Fx.Assert(this.position + count <= this.bytes.Length, string.Format(CultureInfo.InvariantCulture, "Chunk is too big to fit in this buffer. Chunk size={0}, free space={1}", count, this.bytes.Length - this.position));
  689. Fx.Assert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position));
  690. Buffer.BlockCopy(buffer, offset, this.bytes, this.position, count);
  691. this.position += count;
  692. }
  693. internal void CopyData(byte value)
  694. {
  695. Fx.Assert(this.position < this.bytes.Length, "Buffer is full");
  696. Fx.Assert(!this.writePending, string.Format(CultureInfo.InvariantCulture, "The buffer is in use, position={0}", this.position));
  697. this.bytes[this.position++] = value;
  698. }
  699. /// <summary>
  700. /// Set the ByteBuffer's FlushAsyncArgs to invoke FlushAsync()
  701. /// </summary>
  702. /// <returns></returns>
  703. internal AsyncCompletionResult FlushAsync()
  704. {
  705. if (this.position <= 0)
  706. return AsyncCompletionResult.Completed;
  707. Fx.Assert(this.FlushAsyncArgs != null, "FlushAsyncArgs not set.");
  708. if (flushCallback == null)
  709. {
  710. flushCallback = new AsyncCallback(OnAsyncFlush);
  711. }
  712. int bytesToWrite = this.position;
  713. this.SetWritePending();
  714. this.position = 0;
  715. if (TD.BufferedAsyncWriteStartIsEnabled())
  716. {
  717. TD.BufferedAsyncWriteStart(this.parent.EventTraceActivity, this.GetHashCode(), bytesToWrite);
  718. }
  719. IAsyncResult asyncResult = this.stream.BeginWrite(this.bytes, 0, bytesToWrite, flushCallback, this);
  720. if (asyncResult.CompletedSynchronously)
  721. {
  722. if (TD.BufferedAsyncWriteStopIsEnabled())
  723. {
  724. TD.BufferedAsyncWriteStop(this.parent.EventTraceActivity);
  725. }
  726. this.stream.EndWrite(asyncResult);
  727. this.ResetWritePending();
  728. return AsyncCompletionResult.Completed;
  729. }
  730. return AsyncCompletionResult.Queued;
  731. }
  732. static void OnAsyncFlush(IAsyncResult result)
  733. {
  734. if (result.CompletedSynchronously)
  735. {
  736. return;
  737. }
  738. ByteBuffer thisPtr = (ByteBuffer)result.AsyncState;
  739. AsyncEventArgs<object, ByteBuffer> asyncEventArgs = thisPtr.FlushAsyncArgs;
  740. try
  741. {
  742. ByteBuffer.WriteCallback(result);
  743. asyncEventArgs.Result = thisPtr;
  744. }
  745. catch (Exception exception)
  746. {
  747. if (Fx.IsFatal(exception))
  748. {
  749. throw;
  750. }
  751. if (thisPtr.completionException == null)
  752. {
  753. thisPtr.completionException = exception;
  754. }
  755. }
  756. asyncEventArgs.Complete(false, thisPtr.completionException);
  757. }
  758. void ResetWritePending()
  759. {
  760. lock (ThisLock)
  761. {
  762. this.writePending = false;
  763. }
  764. }
  765. void SetWritePending()
  766. {
  767. lock (ThisLock)
  768. {
  769. if (this.writePending)
  770. {
  771. throw FxTrace.Exception.AsError(new InvalidOperationException(SR.GetString(SR.FlushBufferAlreadyInUse)));
  772. }
  773. this.writePending = true;
  774. }
  775. }
  776. }
  777. /// <summary>
  778. /// Used to hold the users callback and state and arguments when BeginWrite is invoked.
  779. /// </summary>
  780. class WriteAsyncArgs
  781. {
  782. internal byte[] Buffer { get; set; }
  783. internal int Offset { get; set; }
  784. internal int Count { get; set; }
  785. internal AsyncCallback Callback { get; set; }
  786. internal object AsyncState { get; set; }
  787. internal void Set(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  788. {
  789. this.Buffer = buffer;
  790. this.Offset = offset;
  791. this.Count = count;
  792. this.Callback = callback;
  793. this.AsyncState = state;
  794. }
  795. }
  796. class WriteAsyncState : AsyncEventArgs<WriteAsyncArgs, BufferedOutputAsyncStream>
  797. {
  798. PooledAsyncResult pooledAsyncResult;
  799. PooledAsyncResult completedSynchronouslyResult;
  800. internal IAsyncResult PendingAsyncResult
  801. {
  802. get
  803. {
  804. if (this.pooledAsyncResult == null)
  805. {
  806. this.pooledAsyncResult = new PooledAsyncResult(this, false);
  807. }
  808. return this.pooledAsyncResult;
  809. }
  810. }
  811. internal IAsyncResult CompletedSynchronouslyAsyncResult
  812. {
  813. get
  814. {
  815. if (this.completedSynchronouslyResult == null)
  816. {
  817. this.completedSynchronouslyResult = new PooledAsyncResult(this, true);
  818. }
  819. return completedSynchronouslyResult;
  820. }
  821. }
  822. internal void ThrowOnException()
  823. {
  824. if (this.Exception != null)
  825. {
  826. throw FxTrace.Exception.AsError(this.Exception);
  827. }
  828. }
  829. class PooledAsyncResult : IAsyncResult
  830. {
  831. readonly WriteAsyncState writeState;
  832. readonly bool completedSynchronously;
  833. internal PooledAsyncResult(WriteAsyncState parentState, bool completedSynchronously)
  834. {
  835. this.writeState = parentState;
  836. this.completedSynchronously = completedSynchronously;
  837. }
  838. public object AsyncState
  839. {
  840. get
  841. {
  842. return this.writeState.Arguments != null ? this.writeState.Arguments.AsyncState : null;
  843. }
  844. }
  845. public WaitHandle AsyncWaitHandle
  846. {
  847. get { throw FxTrace.Exception.AsError(new NotImplementedException()); }
  848. }
  849. public bool CompletedSynchronously
  850. {
  851. get { return this.completedSynchronously; }
  852. }
  853. public bool IsCompleted
  854. {
  855. get { throw FxTrace.Exception.AsError(new NotImplementedException()); }
  856. }
  857. }
  858. }
  859. }
  860. }