BufferedStream2.cs 29 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714
  1. // ==++==
  2. //
  3. // Copyright (c) Microsoft Corporation. All rights reserved.
  4. //
  5. // ==--==
  6. /*============================================================
  7. **
  8. ** Class: BufferedStream2
  9. **
  10. **
  11. ===========================================================*/
  12. using System;
  13. using System.Diagnostics;
  14. using System.Globalization;
  15. using System.Runtime.Versioning;
  16. using System.Threading;
  17. using System.Runtime.InteropServices;
  18. using System.Runtime.Remoting.Messaging;
  19. using System.Runtime.CompilerServices;
  20. using Microsoft.Win32;
  21. using Microsoft.Win32.SafeHandles;
  22. using System.Security.Permissions;
  23. namespace System.IO {
  24. // This abstract implementation adds thread safe buffering on top
  25. // of the underlying stream. For most streams, having this intermediate
  26. // buffer translates to better performance due to the costly nature of
  27. // underlying IO, P/Invoke (such as disk IO). This also improves the locking
  28. // efficiency when operating under heavy concurrency. The synchronization
  29. // technique used in this implementation is specifically optimized for IO
  30. // The main differences between this implementation and the existing System.IO.BufferedStream
  31. // - the design allows for inheritance as opposed to wrapping streams
  32. // - it is thread safe, though currently only synchronous Write is optimized
  33. [HostProtection(Synchronization=true)]
  34. internal abstract class BufferedStream2 : Stream
  35. {
  36. protected internal const int DefaultBufferSize = 32*1024; //32KB or 64KB seems to give the best throughput
  37. protected int bufferSize; // Length of internal buffer, if it's allocated.
  38. private byte[] _buffer; // Internal buffer. Alloc on first use.
  39. // At present only concurrent buffer writing is optimized implicitly
  40. // while reading relies on explicit locking.
  41. // Ideally we want these fields to be volatile
  42. private /*volatile*/ int _pendingBufferCopy; // How many buffer writes are pending.
  43. private /*volatile*/ int _writePos; // Write pointer within shared buffer.
  44. // Should we use a separate buffer for reading Vs writing?
  45. private /*volatile*/ int _readPos; // Read pointer within shared buffer.
  46. private /*volatile*/ int _readLen; // Number of bytes read in buffer from file.
  47. // Ideally we want this field to be volatile but Interlocked operations
  48. // on 64bit int is not guaranteed to be atomic especially on 32bit platforms
  49. protected long pos; // Cache current location in the underlying stream.
  50. // Derived streams should override CanRead/CanWrite/CanSeek to enable/disable functionality as desired
  51. [ResourceExposure(ResourceScope.None)]
  52. [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
  53. public override void Write(byte[] array, int offset, int count)
  54. {
  55. if (array==null)
  56. throw new ArgumentNullException("array", SR.GetString(SR.ArgumentNull_Buffer));
  57. if (offset < 0)
  58. throw new ArgumentOutOfRangeException("offset", SR.GetString(SR.ArgumentOutOfRange_NeedNonNegNum));
  59. if (count < 0)
  60. throw new ArgumentOutOfRangeException("count", SR.GetString(SR.ArgumentOutOfRange_NeedNonNegNum));
  61. if (array.Length - offset < count)
  62. throw new ArgumentException(SR.GetString(SR.Argument_InvalidOffLen));
  63. Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
  64. if (_writePos==0) {
  65. // Ensure we can write to the stream, and ready buffer for writing.
  66. if (!CanWrite) __Error.WriteNotSupported();
  67. if (_readPos < _readLen) FlushRead();
  68. _readPos = 0;
  69. _readLen = 0;
  70. }
  71. // Optimization:
  72. // Don't allocate a buffer then call memcpy for 0 bytes.
  73. if (count == 0)
  74. return;
  75. do {
  76. // Avoid contention around spilling over the buffer, the locking mechanism here is bit unconventional.
  77. // Let's call this a YieldLock. It is closer to a spin lock than a semaphore but not quite a spin lock.
  78. // Forced thread context switching is better than a tight spin lock here for several reasons.
  79. // We utilize less CPU, yield to other threads (potentially the one doing the write, this is
  80. // especially important under heavy thread/processor contention environment) and also yield to
  81. // runtime thread aborts (important when run from a high pri thread like finalizer).
  82. if (_writePos > bufferSize) {
  83. Thread.Sleep(1);
  84. continue;
  85. }
  86. // Optimization:
  87. // For input chunk larger than internal buffer size, write directly
  88. // It is okay to have a ---- here with the _writePos check, which means
  89. // we have a loose order between flushing the intenal cache Vs writing
  90. // this larger chunk but that is fine. This step will nicely optimize
  91. // repeated writing of larger chunks by skipping the interlocked operation
  92. if ((_writePos == 0) && (count >= bufferSize)) {
  93. WriteCore(array, offset, count, true);
  94. return;
  95. }
  96. // We should review whether we need critical region markers for hosts.
  97. Thread.BeginCriticalRegion();
  98. Interlocked.Increment(ref _pendingBufferCopy);
  99. int newPos = Interlocked.Add(ref _writePos, count);
  100. int oldPos = (newPos - count);
  101. // Clear the buffer
  102. if (newPos > bufferSize) {
  103. Interlocked.Decrement(ref _pendingBufferCopy);
  104. Thread.EndCriticalRegion();
  105. // Though the lock below is not necessary for correctness, when operating in a heavy
  106. // thread contention environment, augmenting the YieldLock techinique with a critical
  107. // section around write seems to be giving slightly better performance while
  108. // not having noticable impact in the less contended situations.
  109. // Perhaps we can build a technique that keeps track of the contention?
  110. //lock (this)
  111. {
  112. // Make sure we didn't get pre-empted by another thread
  113. if (_writePos > bufferSize) {
  114. if ((oldPos <= bufferSize) && (oldPos > 0)) {
  115. while (_pendingBufferCopy != 0) {
  116. Thread.SpinWait(1);
  117. }
  118. WriteCore(_buffer, 0, oldPos, true);
  119. _writePos = 0;
  120. }
  121. }
  122. }
  123. }
  124. else {
  125. if (_buffer == null)
  126. Interlocked.CompareExchange(ref _buffer, new byte[bufferSize], null);
  127. // Copy user data into buffer, to write at a later date.
  128. Buffer.BlockCopy(array, offset, _buffer, oldPos, count);
  129. Interlocked.Decrement(ref _pendingBufferCopy);
  130. Thread.EndCriticalRegion();
  131. return;
  132. }
  133. } while (true);
  134. }
  135. #if _ENABLE_STREAM_FACTORING
  136. public override long Position
  137. {
  138. // Making the getter thread safe is not very useful anyways
  139. get {
  140. if (!CanSeek) __Error.SeekNotSupported();
  141. Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
  142. // Compensate for buffer that we read from the handle (_readLen) Vs what the user
  143. // read so far from the internel buffer (_readPos). Of course add any unwrittern
  144. // buffered data
  145. return pos + (_readPos - _readLen + _writePos);
  146. }
  147. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  148. set {
  149. if (value < 0) throw new ArgumentOutOfRangeException("value", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  150. if (_writePos > 0) FlushWrite(false);
  151. _readPos = 0;
  152. _readLen = 0;
  153. Seek(value, SeekOrigin.Begin);
  154. }
  155. }
  156. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  157. public override int Read(/*[In, Out]*/ byte[] array, int offset, int count)
  158. {
  159. if (array == null)
  160. throw new ArgumentNullException("array", Helper.GetResourceString("ArgumentNull_Buffer"));
  161. if (offset < 0)
  162. throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  163. if (count < 0)
  164. throw new ArgumentOutOfRangeException("count", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  165. if (array.Length - offset < count)
  166. throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));
  167. Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
  168. bool isBlocked = false;
  169. int n = _readLen - _readPos;
  170. // If the read buffer is empty, read into either user's array or our
  171. // buffer, depending on number of bytes user asked for and buffer size.
  172. if (n == 0) {
  173. if (!CanRead) __Error.ReadNotSupported();
  174. if (_writePos > 0) FlushWrite(false);
  175. if (!CanSeek || (count >= bufferSize)) {
  176. n = ReadCore(array, offset, count);
  177. // Throw away read buffer.
  178. _readPos = 0;
  179. _readLen = 0;
  180. return n;
  181. }
  182. if (_buffer == null) _buffer = new byte[bufferSize];
  183. n = ReadCore(_buffer, 0, bufferSize);
  184. if (n == 0) return 0;
  185. isBlocked = n < bufferSize;
  186. _readPos = 0;
  187. _readLen = n;
  188. }
  189. // Now copy min of count or numBytesAvailable (ie, near EOF) to array.
  190. if (n > count) n = count;
  191. Buffer.BlockCopy(_buffer, _readPos, array, offset, n);
  192. _readPos += n;
  193. // We may have read less than the number of bytes the user asked
  194. // for, but that is part of the Stream contract. Reading again for
  195. // more data may cause us to block if we're using a device with
  196. // no clear end of file, such as a serial port or pipe. If we
  197. // blocked here & this code was used with redirected pipes for a
  198. // process's standard output, this can lead to deadlocks involving
  199. // two processes. But leave this here for files to avoid what would
  200. // probably be a breaking change. --
  201. return n;
  202. }
  203. [HostProtection(ExternalThreading=true)]
  204. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  205. public override IAsyncResult BeginRead(byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
  206. {
  207. if (array==null)
  208. throw new ArgumentNullException("array");
  209. if (offset < 0)
  210. throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  211. if (numBytes < 0)
  212. throw new ArgumentOutOfRangeException("numBytes", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  213. if (array.Length - offset < numBytes)
  214. throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));
  215. if (!CanRead) __Error.ReadNotSupported();
  216. Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
  217. if (_writePos > 0) FlushWrite(false);
  218. if (_readPos == _readLen) {
  219. // I can't see how to handle buffering of async requests when
  220. // filling the buffer asynchronously, without a lot of complexity.
  221. // The problems I see are issuing an async read, we do an async
  222. // read to fill the buffer, then someone issues another read
  223. // (either synchronously or asynchronously) before the first one
  224. // returns. This would involve some sort of complex buffer locking
  225. // that we probably don't want to get into, at least not in V1.
  226. // If we did a [....] read to fill the buffer, we could avoid the
  227. // problem, and any async read less than 64K gets turned into a
  228. // synchronous read by NT anyways... --
  229. if (numBytes < bufferSize) {
  230. if (_buffer == null) _buffer = new byte[bufferSize];
  231. IAsyncResult bufferRead = BeginReadCore(_buffer, 0, bufferSize, null, null, 0);
  232. _readLen = EndRead(bufferRead);
  233. int n = _readLen;
  234. if (n > numBytes) n = numBytes;
  235. Buffer.BlockCopy(_buffer, 0, array, offset, n);
  236. _readPos = n;
  237. // Fake async
  238. return BufferedStreamAsyncResult.Complete(n, userCallback, stateObject, false);
  239. }
  240. // Here we're making our position pointer inconsistent
  241. // with our read buffer. Throw away the read buffer's contents.
  242. _readPos = 0;
  243. _readLen = 0;
  244. return BeginReadCore(array, offset, numBytes, userCallback, stateObject, 0);
  245. }
  246. else {
  247. int n = _readLen - _readPos;
  248. if (n > numBytes) n = numBytes;
  249. Buffer.BlockCopy(_buffer, _readPos, array, offset, n);
  250. _readPos += n;
  251. if (n >= numBytes)
  252. return BufferedStreamAsyncResult.Complete(n, userCallback, stateObject, false);
  253. // For streams with no clear EOF like serial ports or pipes
  254. // we cannot read more data without causing an app to block
  255. // incorrectly. Pipes don't go down this path
  256. // though. This code needs to be fixed.
  257. // Throw away read buffer.
  258. _readPos = 0;
  259. _readLen = 0;
  260. // WARNING: all state on asyncResult objects must be set before
  261. // we call ReadFile in BeginReadCore, since the OS can run our
  262. // callback & the user's callback before ReadFile returns.
  263. return BeginReadCore(array, offset + n, numBytes - n, userCallback, stateObject, n);
  264. }
  265. }
  266. public unsafe override int EndRead(IAsyncResult asyncResult)
  267. {
  268. if (asyncResult == null)
  269. throw new ArgumentNullException("asyncResult");
  270. BufferedStreamAsyncResult bsar = asyncResult as BufferedStreamAsyncResult;
  271. if (bsar != null) {
  272. if (bsar._isWrite)
  273. __Error.WrongAsyncResult();
  274. return bsar._numBytes;
  275. }
  276. else {
  277. return EndReadCore(asyncResult);
  278. }
  279. }
  280. [HostProtection(ExternalThreading=true)]
  281. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  282. public override IAsyncResult BeginWrite(byte[] array, int offset, int numBytes, AsyncCallback userCallback, Object stateObject)
  283. {
  284. if (array==null)
  285. throw new ArgumentNullException("array");
  286. if (offset < 0)
  287. throw new ArgumentOutOfRangeException("offset", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  288. if (numBytes < 0)
  289. throw new ArgumentOutOfRangeException("numBytes", Helper.GetResourceString("ArgumentOutOfRange_NeedNonNegNum"));
  290. if (array.Length - offset < numBytes)
  291. throw new ArgumentException(Helper.GetResourceString("Argument_InvalidOffLen"));
  292. if (!CanWrite) __Error.WriteNotSupported();
  293. Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
  294. if (_writePos==0) {
  295. if (_readPos < _readLen) FlushRead();
  296. _readPos = 0;
  297. _readLen = 0;
  298. }
  299. int n = bufferSize - _writePos;
  300. if (numBytes <= n) {
  301. if (_buffer == null) _buffer = new byte[bufferSize];
  302. Buffer.BlockCopy(array, offset, _buffer, _writePos, numBytes);
  303. _writePos += numBytes;
  304. return BufferedStreamAsyncResult.Complete(numBytes, userCallback, stateObject, true);
  305. }
  306. if (_writePos > 0) FlushWrite(false);
  307. return BeginWriteCore(array, offset, numBytes, userCallback, stateObject);
  308. }
  309. public unsafe override void EndWrite(IAsyncResult asyncResult)
  310. {
  311. if (asyncResult == null)
  312. throw new ArgumentNullException("asyncResult");
  313. BufferedStreamAsyncResult bsar = asyncResult as BufferedStreamAsyncResult;
  314. if (bsar == null)
  315. EndWriteCore(asyncResult);
  316. }
  317. // Reads a byte from the file stream. Returns the byte cast to an int
  318. // or -1 if reading from the end of the stream.
  319. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  320. public override int ReadByte()
  321. {
  322. if (_readLen == 0 && !CanRead)
  323. __Error.ReadNotSupported();
  324. Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
  325. if (_readPos == _readLen) {
  326. if (_writePos > 0) FlushWrite(false);
  327. Debug.Assert(bufferSize > 0, "bufferSize > 0");
  328. if (_buffer == null) _buffer = new byte[bufferSize];
  329. _readLen = ReadCore(_buffer, 0, bufferSize);
  330. _readPos = 0;
  331. }
  332. if (_readPos == _readLen)
  333. return -1;
  334. int result = _buffer[_readPos];
  335. _readPos++;
  336. return result;
  337. }
  338. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  339. public override void WriteByte(byte value)
  340. {
  341. if (_writePos == 0) {
  342. if (!CanWrite)
  343. __Error.WriteNotSupported();
  344. if (_readPos < _readLen)
  345. FlushRead();
  346. _readPos = 0;
  347. _readLen = 0;
  348. Debug.Assert(bufferSize > 0, "bufferSize > 0");
  349. if (_buffer==null)
  350. _buffer = new byte[bufferSize];
  351. }
  352. if (_writePos == bufferSize)
  353. FlushWrite(false);
  354. _buffer[_writePos] = value;
  355. _writePos++;
  356. }
  357. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  358. public override long Seek(long offset, SeekOrigin origin)
  359. {
  360. if (origin<SeekOrigin.Begin || origin>SeekOrigin.End)
  361. throw new ArgumentException(Helper.GetResourceString("Argument_InvalidSeekOrigin"));
  362. if (!CanSeek) __Error.SeekNotSupported();
  363. Debug.Assert((_readPos==0 && _readLen==0 && _writePos >= 0) || (_writePos==0 && _readPos <= _readLen), "We're either reading or writing, but not both.");
  364. // If we've got bytes in our buffer to write, write them out.
  365. // If we've read in and consumed some bytes, we'll have to adjust
  366. // our seek positions ONLY IF we're seeking relative to the current
  367. // position in the stream. This simulates doing a seek to the new
  368. // position, then a read for the number of bytes we have in our buffer.
  369. if (_writePos > 0) {
  370. FlushWrite(false);
  371. }
  372. else if (origin == SeekOrigin.Current) {
  373. // Don't call FlushRead here, which would have caused an infinite
  374. // loop. Simply adjust the seek origin. This isn't necessary
  375. // if we're seeking relative to the beginning or end of the stream.
  376. offset -= (_readLen - _readPos);
  377. }
  378. long oldPos = pos + (_readPos - _readLen);
  379. long curPos = SeekCore(offset, origin);
  380. // We now must update the read buffer. We can in some cases simply
  381. // update _readPos within the buffer, copy around the buffer so our
  382. // Position property is still correct, and avoid having to do more
  383. // reads from the disk. Otherwise, discard the buffer's contents.
  384. if (_readLen > 0) {
  385. // We can optimize the following condition:
  386. // oldPos - _readPos <= curPos < oldPos + _readLen - _readPos
  387. if (oldPos == curPos) {
  388. if (_readPos > 0) {
  389. Buffer.BlockCopy(_buffer, _readPos, _buffer, 0, _readLen - _readPos);
  390. _readLen -= _readPos;
  391. _readPos = 0;
  392. }
  393. // If we still have buffered data, we must update the stream's
  394. // position so our Position property is correct.
  395. if (_readLen > 0)
  396. SeekCore(_readLen, SeekOrigin.Current);
  397. }
  398. else if (oldPos - _readPos < curPos && curPos < oldPos + _readLen - _readPos) {
  399. int diff = (int)(curPos - oldPos);
  400. Buffer.BlockCopy(_buffer, _readPos+diff, _buffer, 0, _readLen - (_readPos + diff));
  401. _readLen -= (_readPos + diff);
  402. _readPos = 0;
  403. if (_readLen > 0)
  404. SeekCore(_readLen, SeekOrigin.Current);
  405. }
  406. else {
  407. // Lose the read buffer.
  408. _readPos = 0;
  409. _readLen = 0;
  410. }
  411. Debug.Assert(_readLen >= 0 && _readPos <= _readLen, "_readLen should be nonnegative, and _readPos should be less than or equal _readLen");
  412. Debug.Assert(curPos == Position, "Seek optimization: curPos != Position! Buffer math was mangled.");
  413. }
  414. return curPos;
  415. }
  416. #endif //_ENABLE_STREAM_FACTORING
  417. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  418. public override void Flush()
  419. {
  420. try {
  421. if (_writePos > 0)
  422. FlushWrite(false);
  423. else if (_readPos < _readLen)
  424. FlushRead();
  425. }
  426. finally {
  427. _writePos = 0;
  428. _readPos = 0;
  429. _readLen = 0;
  430. }
  431. }
  432. // Reading is done by blocks from the file, but someone could read
  433. // 1 byte from the buffer then write. At that point, the OS's file
  434. // pointer is out of [....] with the stream's position. All write
  435. // functions should call this function to preserve the position in the file.
  436. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  437. protected void FlushRead() {
  438. #if _ENABLE_STREAM_FACTORING
  439. Debug.Assert(_writePos == 0, "BufferedStream: Write buffer must be empty in FlushRead!");
  440. if (_readPos - _readLen != 0) {
  441. Debug.Assert(CanSeek, "BufferedStream will lose buffered read data now.");
  442. if (CanSeek)
  443. SeekCore(_readPos - _readLen, SeekOrigin.Current);
  444. }
  445. _readPos = 0;
  446. _readLen = 0;
  447. #endif //_ENABLE_STREAM_FACTORING
  448. }
  449. // Writes are buffered. Anytime the buffer fills up
  450. // (_writePos + delta > bufferSize) or the buffer switches to reading
  451. // and there is left over data (_writePos > 0), this function must be called.
  452. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  453. protected void FlushWrite(bool blockForWrite) {
  454. Debug.Assert(_readPos == 0 && _readLen == 0, "BufferedStream: Read buffer must be empty in FlushWrite!");
  455. if (_writePos > 0)
  456. WriteCore(_buffer, 0, _writePos, blockForWrite);
  457. _writePos = 0;
  458. }
  459. protected override void Dispose(bool disposing)
  460. {
  461. try {
  462. // Flush data to disk iff we were writing.
  463. if (_writePos > 0) {
  464. // With our Whidbey async IO & overlapped support for AD unloads,
  465. // we don't strictly need to block here to release resources
  466. // if the underlying IO is overlapped since that support
  467. // takes care of the pinning & freeing the
  468. // overlapped struct. We need to do this when called from
  469. // Close so that the handle is closed when Close returns, but
  470. // we do't need to call EndWrite from the finalizer.
  471. // Additionally, if we do call EndWrite, we block forever
  472. // because AD unloads prevent us from running the managed
  473. // callback from the IO completion port. Blocking here when
  474. // called from the finalizer during AD unload is clearly wrong,
  475. // but we can't use any sort of test for whether the AD is
  476. // unloading because if we weren't unloading, an AD unload
  477. // could happen on a separate thread before we call EndWrite.
  478. FlushWrite(disposing);
  479. }
  480. }
  481. finally {
  482. // Don't set the buffer to null, to avoid a NullReferenceException
  483. // when users have a race condition in their code (ie, they call
  484. // Close when calling another method on Stream like Read).
  485. //_buffer = null;
  486. _readPos = 0;
  487. _readLen = 0;
  488. _writePos = 0;
  489. base.Dispose(disposing);
  490. }
  491. }
  492. //
  493. // Helper methods
  494. //
  495. #if _ENABLE_STREAM_FACTORING
  496. protected int BufferedWritePosition
  497. {
  498. // Making the getter thread safe is not very useful anyways
  499. get {
  500. return _writePos;
  501. }
  502. //set {
  503. // Interlocked.Exchange(ref _writePos, value);
  504. //}
  505. }
  506. protected int BufferedReadPosition
  507. {
  508. // Making the getter thread safe is not very useful anyways
  509. get {
  510. return _readPos;
  511. }
  512. //set {
  513. // Interlocked.Exchange(ref _readPos, value);
  514. //}
  515. }
  516. #endif //_ENABLE_STREAM_FACTORING
  517. protected long UnderlyingStreamPosition
  518. {
  519. // Making the getter thread safe is not very useful anyways
  520. get {
  521. return pos;
  522. }
  523. set {
  524. Interlocked.Exchange(ref pos, value);
  525. }
  526. }
  527. protected long AddUnderlyingStreamPosition(long posDelta)
  528. {
  529. return Interlocked.Add(ref pos, posDelta);
  530. }
  531. [MethodImplAttribute(MethodImplOptions.Synchronized)]
  532. protected internal void DiscardBuffer()
  533. {
  534. _readPos = 0;
  535. _readLen = 0;
  536. _writePos = 0;
  537. }
  538. private void WriteCore(byte[] buffer, int offset, int count, bool blockForWrite)
  539. {
  540. long streamPos;
  541. WriteCore(buffer, offset, count, blockForWrite, out streamPos);
  542. }
  543. protected abstract void WriteCore(byte[] buffer, int offset, int count, bool blockForWrite, out long streamPos);
  544. #if _ENABLE_STREAM_FACTORING
  545. private int ReadCore(byte[] buffer, int offset, int count)
  546. {
  547. long streamPos;
  548. return ReadCore(buffer, offset, count, out streamPos);
  549. }
  550. private int EndReadCore(IAsyncResult asyncResult)
  551. {
  552. long streamPos;
  553. return EndReadCore(asyncResult, out streamPos);
  554. }
  555. private void EndWriteCore(IAsyncResult asyncResult)
  556. {
  557. long streamPos;
  558. EndWriteCore(asyncResult, out streamPos);
  559. }
  560. // Derived streams should implement the following core methods
  561. protected abstract int ReadCore(byte[] buffer, int offset, int count, out long streamPos);
  562. [ResourceExposure(ResourceScope.None)]
  563. [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
  564. protected abstract IAsyncResult BeginReadCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, Object stateObject, int numBufferedBytesRead);
  565. protected abstract int EndReadCore(IAsyncResult asyncResult, out long streamPos);
  566. [ResourceExposure(ResourceScope.None)]
  567. [ResourceConsumption(ResourceScope.AppDomain, ResourceScope.AppDomain)]
  568. protected abstract IAsyncResult BeginWriteCore(byte[] bytes, int offset, int numBytes, AsyncCallback userCallback, Object stateObject);
  569. protected abstract void EndWriteCore(IAsyncResult asyncResult, out long streamPos);
  570. protected abstract long SeekCore(long offset, SeekOrigin origin);
  571. #endif //_ENABLE_STREAM_FACTORING
  572. }
  573. #if _ENABLE_STREAM_FACTORING
  574. // Fake async result
  575. unsafe internal sealed class BufferedStreamAsyncResult : IAsyncResult
  576. {
  577. // User code callback
  578. internal AsyncCallback _userCallback;
  579. internal Object _userStateObject;
  580. internal int _numBytes; // number of bytes read OR written
  581. //internal int _errorCode;
  582. internal bool _isWrite; // Whether this is a read or a write
  583. public Object AsyncState
  584. {
  585. get { return _userStateObject; }
  586. }
  587. public bool IsCompleted
  588. {
  589. get { return true; }
  590. }
  591. public WaitHandle AsyncWaitHandle
  592. {
  593. get { return null; }
  594. }
  595. public bool CompletedSynchronously
  596. {
  597. get { return true; }
  598. }
  599. internal static IAsyncResult Complete(int numBufferedBytes, AsyncCallback userCallback, Object userStateObject, bool isWrite)
  600. {
  601. // Fake async
  602. BufferedStreamAsyncResult asyncResult = new BufferedStreamAsyncResult();
  603. asyncResult._numBytes = numBufferedBytes;
  604. asyncResult._userCallback = userCallback;
  605. asyncResult._userStateObject = userStateObject;
  606. asyncResult._isWrite = isWrite;
  607. if (userCallback != null) {
  608. userCallback(asyncResult);
  609. }
  610. return asyncResult;
  611. }
  612. }
  613. #endif //_ENABLE_STREAM_FACTORING
  614. }