SqlSequentialStream.cs 12 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343
  1. using System;
  2. using System.Data.Common;
  3. using System.Diagnostics;
  4. using System.Threading;
  5. using System.Threading.Tasks;
  6. namespace System.Data.SqlClient
  7. {
  8. sealed internal class SqlSequentialStream : System.IO.Stream
  9. {
  10. private SqlDataReader _reader; // The SqlDataReader that we are reading data from
  11. private int _columnIndex; // The index of out column in the table
  12. private Task _currentTask; // Holds the current task being processed
  13. private int _readTimeout; // Read timeout for this stream in ms (for Stream.ReadTimeout)
  14. private CancellationTokenSource _disposalTokenSource; // Used to indicate that a cancellation is requested due to disposal
  15. internal SqlSequentialStream(SqlDataReader reader, int columnIndex)
  16. {
  17. Debug.Assert(reader != null, "Null reader when creating sequential stream");
  18. Debug.Assert(columnIndex >= 0, "Invalid column index when creating sequential stream");
  19. _reader = reader;
  20. _columnIndex = columnIndex;
  21. _currentTask = null;
  22. _disposalTokenSource = new CancellationTokenSource();
  23. // Safely safely convert the CommandTimeout from seconds to milliseconds
  24. if ((reader.Command != null) && (reader.Command.CommandTimeout != 0))
  25. {
  26. _readTimeout = (int)Math.Min((long)reader.Command.CommandTimeout * 1000L, (long)Int32.MaxValue);
  27. }
  28. else
  29. {
  30. _readTimeout = Timeout.Infinite;
  31. }
  32. }
  33. public override bool CanRead
  34. {
  35. get { return ((_reader != null) && (!_reader.IsClosed)); }
  36. }
  37. public override bool CanSeek
  38. {
  39. get { return false; }
  40. }
  41. public override bool CanTimeout
  42. {
  43. get { return true; }
  44. }
  45. public override bool CanWrite
  46. {
  47. get { return false; }
  48. }
  49. public override void Flush()
  50. { }
  51. public override long Length
  52. {
  53. get { throw ADP.NotSupported(); }
  54. }
  55. public override long Position
  56. {
  57. get { throw ADP.NotSupported(); }
  58. set { throw ADP.NotSupported(); }
  59. }
  60. public override int ReadTimeout
  61. {
  62. get { return _readTimeout; }
  63. set
  64. {
  65. if ((value > 0) || (value == Timeout.Infinite))
  66. {
  67. _readTimeout = value;
  68. }
  69. else
  70. {
  71. throw ADP.ArgumentOutOfRange("value");
  72. }
  73. }
  74. }
  75. internal int ColumnIndex
  76. {
  77. get { return _columnIndex; }
  78. }
  79. public override int Read(byte[] buffer, int offset, int count)
  80. {
  81. ValidateReadParameters(buffer, offset, count);
  82. if (!CanRead)
  83. {
  84. throw ADP.ObjectDisposed(this);
  85. }
  86. if (_currentTask != null)
  87. {
  88. throw ADP.AsyncOperationPending();
  89. }
  90. try
  91. {
  92. return _reader.GetBytesInternalSequential(_columnIndex, buffer, offset, count, _readTimeout);
  93. }
  94. catch (SqlException ex)
  95. {
  96. // Stream.Read() can't throw a SqlException - so wrap it in an IOException
  97. throw ADP.ErrorReadingFromStream(ex);
  98. }
  99. }
  100. public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  101. {
  102. if (!CanRead)
  103. {
  104. // This is checked in ReadAsync - but its a better for the user if it throw here instead of having to wait for EndRead
  105. throw ADP.ObjectDisposed(this);
  106. }
  107. Task readTask = ReadAsync(buffer, offset, count, CancellationToken.None);
  108. if (callback != null)
  109. {
  110. readTask.ContinueWith((t) => callback(t), TaskScheduler.Default);
  111. }
  112. return readTask;
  113. }
  114. public override int EndRead(IAsyncResult asyncResult)
  115. {
  116. if (asyncResult == null)
  117. {
  118. throw ADP.ArgumentNull("asyncResult");
  119. }
  120. // Wait for the task to complete - this will also cause any exceptions to be thrown
  121. Task<int> readTask = (Task<int>)asyncResult;
  122. try
  123. {
  124. readTask.Wait();
  125. }
  126. catch (AggregateException ex)
  127. {
  128. throw ex.InnerException;
  129. }
  130. return readTask.Result;
  131. }
  132. public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
  133. {
  134. ValidateReadParameters(buffer, offset, count);
  135. TaskCompletionSource<int> completion = new TaskCompletionSource<int>();
  136. if (!CanRead)
  137. {
  138. completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
  139. }
  140. else
  141. {
  142. try
  143. {
  144. Task original = Interlocked.CompareExchange<Task>(ref _currentTask, completion.Task, null);
  145. if (original != null)
  146. {
  147. completion.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
  148. }
  149. else
  150. {
  151. // Set up a combined cancellation token for both the user's and our disposal tokens
  152. CancellationTokenSource combinedTokenSource;
  153. if (!cancellationToken.CanBeCanceled)
  154. {
  155. // Users token is not cancellable - just use ours
  156. combinedTokenSource = _disposalTokenSource;
  157. }
  158. else
  159. {
  160. // Setup registrations from user and disposal token to cancel the combined token
  161. combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposalTokenSource.Token);
  162. }
  163. int bytesRead = 0;
  164. Task<int> getBytesTask = null;
  165. var reader = _reader;
  166. if ((reader != null) && (!cancellationToken.IsCancellationRequested) && (!_disposalTokenSource.Token.IsCancellationRequested))
  167. {
  168. getBytesTask = reader.GetBytesAsync(_columnIndex, buffer, offset, count, _readTimeout, combinedTokenSource.Token, out bytesRead);
  169. }
  170. if (getBytesTask == null)
  171. {
  172. _currentTask = null;
  173. if (cancellationToken.IsCancellationRequested)
  174. {
  175. completion.SetCanceled();
  176. }
  177. else if (!CanRead)
  178. {
  179. completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
  180. }
  181. else
  182. {
  183. completion.SetResult(bytesRead);
  184. }
  185. if (combinedTokenSource != _disposalTokenSource)
  186. {
  187. combinedTokenSource.Dispose();
  188. }
  189. }
  190. else
  191. {
  192. getBytesTask.ContinueWith((t) =>
  193. {
  194. _currentTask = null;
  195. // If we completed, but _reader is null (i.e. the stream is closed), then report cancellation
  196. if ((t.Status == TaskStatus.RanToCompletion) && (CanRead))
  197. {
  198. completion.SetResult((int)t.Result);
  199. }
  200. else if (t.Status == TaskStatus.Faulted)
  201. {
  202. if (t.Exception.InnerException is SqlException)
  203. {
  204. // Stream.ReadAsync() can't throw a SqlException - so wrap it in an IOException
  205. completion.SetException(ADP.ExceptionWithStackTrace(ADP.ErrorReadingFromStream(t.Exception.InnerException)));
  206. }
  207. else
  208. {
  209. completion.SetException(t.Exception.InnerException);
  210. }
  211. }
  212. else if (!CanRead)
  213. {
  214. completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
  215. }
  216. else
  217. {
  218. completion.SetCanceled();
  219. }
  220. if (combinedTokenSource != _disposalTokenSource)
  221. {
  222. combinedTokenSource.Dispose();
  223. }
  224. }, TaskScheduler.Default);
  225. }
  226. }
  227. }
  228. catch (Exception ex)
  229. {
  230. // In case of any errors, ensure that the completion is completed and the task is set back to null if we switched it
  231. completion.TrySetException(ex);
  232. Interlocked.CompareExchange(ref _currentTask, null, completion.Task);
  233. throw;
  234. }
  235. }
  236. return completion.Task;
  237. }
  238. public override long Seek(long offset, IO.SeekOrigin origin)
  239. {
  240. throw ADP.NotSupported();
  241. }
  242. public override void SetLength(long value)
  243. {
  244. throw ADP.NotSupported();
  245. }
  246. public override void Write(byte[] buffer, int offset, int count)
  247. {
  248. throw ADP.NotSupported();
  249. }
  250. /// <summary>
  251. /// Forces the stream to act as if it was closed (i.e. CanRead=false and Read() throws)
  252. /// This does not actually close the stream, read off the rest of the data or dispose this
  253. /// </summary>
  254. internal void SetClosed()
  255. {
  256. _disposalTokenSource.Cancel();
  257. _reader = null;
  258. // Wait for pending task
  259. var currentTask = _currentTask;
  260. if (currentTask != null)
  261. {
  262. ((IAsyncResult)currentTask).AsyncWaitHandle.WaitOne();
  263. }
  264. }
  265. protected override void Dispose(bool disposing)
  266. {
  267. if (disposing)
  268. {
  269. // Set the stream as closed
  270. SetClosed();
  271. }
  272. base.Dispose(disposing);
  273. }
  274. /// <summary>
  275. /// Checks the the parameters passed into a Read() method are valid
  276. /// </summary>
  277. /// <param name="buffer"></param>
  278. /// <param name="index"></param>
  279. /// <param name="count"></param>
  280. internal static void ValidateReadParameters(byte[] buffer, int offset, int count)
  281. {
  282. if (buffer == null)
  283. {
  284. throw ADP.ArgumentNull(ADP.ParameterBuffer);
  285. }
  286. if (offset < 0)
  287. {
  288. throw ADP.ArgumentOutOfRange(ADP.ParameterOffset);
  289. }
  290. if (count < 0)
  291. {
  292. throw ADP.ArgumentOutOfRange(ADP.ParameterCount);
  293. }
  294. try
  295. {
  296. if (checked(offset + count) > buffer.Length)
  297. {
  298. throw ExceptionBuilder.InvalidOffsetLength();
  299. }
  300. }
  301. catch (OverflowException)
  302. {
  303. // If we've overflowed when adding offset and count, then they never would have fit into buffer anyway
  304. throw ExceptionBuilder.InvalidOffsetLength();
  305. }
  306. }
  307. }
  308. }