| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343 |
- using System;
- using System.Data.Common;
- using System.Diagnostics;
- using System.Threading;
- using System.Threading.Tasks;
- namespace System.Data.SqlClient
- {
- sealed internal class SqlSequentialStream : System.IO.Stream
- {
- private SqlDataReader _reader; // The SqlDataReader that we are reading data from
- private int _columnIndex; // The index of out column in the table
- private Task _currentTask; // Holds the current task being processed
- private int _readTimeout; // Read timeout for this stream in ms (for Stream.ReadTimeout)
- private CancellationTokenSource _disposalTokenSource; // Used to indicate that a cancellation is requested due to disposal
- internal SqlSequentialStream(SqlDataReader reader, int columnIndex)
- {
- Debug.Assert(reader != null, "Null reader when creating sequential stream");
- Debug.Assert(columnIndex >= 0, "Invalid column index when creating sequential stream");
- _reader = reader;
- _columnIndex = columnIndex;
- _currentTask = null;
- _disposalTokenSource = new CancellationTokenSource();
- // Safely safely convert the CommandTimeout from seconds to milliseconds
- if ((reader.Command != null) && (reader.Command.CommandTimeout != 0))
- {
- _readTimeout = (int)Math.Min((long)reader.Command.CommandTimeout * 1000L, (long)Int32.MaxValue);
- }
- else
- {
- _readTimeout = Timeout.Infinite;
- }
- }
- public override bool CanRead
- {
- get { return ((_reader != null) && (!_reader.IsClosed)); }
- }
- public override bool CanSeek
- {
- get { return false; }
- }
- public override bool CanTimeout
- {
- get { return true; }
- }
- public override bool CanWrite
- {
- get { return false; }
- }
- public override void Flush()
- { }
- public override long Length
- {
- get { throw ADP.NotSupported(); }
- }
- public override long Position
- {
- get { throw ADP.NotSupported(); }
- set { throw ADP.NotSupported(); }
- }
- public override int ReadTimeout
- {
- get { return _readTimeout; }
- set
- {
- if ((value > 0) || (value == Timeout.Infinite))
- {
- _readTimeout = value;
- }
- else
- {
- throw ADP.ArgumentOutOfRange("value");
- }
- }
- }
- internal int ColumnIndex
- {
- get { return _columnIndex; }
- }
- public override int Read(byte[] buffer, int offset, int count)
- {
- ValidateReadParameters(buffer, offset, count);
- if (!CanRead)
- {
- throw ADP.ObjectDisposed(this);
- }
- if (_currentTask != null)
- {
- throw ADP.AsyncOperationPending();
- }
- try
- {
- return _reader.GetBytesInternalSequential(_columnIndex, buffer, offset, count, _readTimeout);
- }
- catch (SqlException ex)
- {
- // Stream.Read() can't throw a SqlException - so wrap it in an IOException
- throw ADP.ErrorReadingFromStream(ex);
- }
- }
- public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
- {
- if (!CanRead)
- {
- // This is checked in ReadAsync - but its a better for the user if it throw here instead of having to wait for EndRead
- throw ADP.ObjectDisposed(this);
- }
- Task readTask = ReadAsync(buffer, offset, count, CancellationToken.None);
- if (callback != null)
- {
- readTask.ContinueWith((t) => callback(t), TaskScheduler.Default);
- }
- return readTask;
- }
- public override int EndRead(IAsyncResult asyncResult)
- {
- if (asyncResult == null)
- {
- throw ADP.ArgumentNull("asyncResult");
- }
- // Wait for the task to complete - this will also cause any exceptions to be thrown
- Task<int> readTask = (Task<int>)asyncResult;
- try
- {
- readTask.Wait();
- }
- catch (AggregateException ex)
- {
- throw ex.InnerException;
- }
- return readTask.Result;
- }
- public override Task<int> ReadAsync(byte[] buffer, int offset, int count, CancellationToken cancellationToken)
- {
- ValidateReadParameters(buffer, offset, count);
- TaskCompletionSource<int> completion = new TaskCompletionSource<int>();
- if (!CanRead)
- {
- completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
- }
- else
- {
- try
- {
- Task original = Interlocked.CompareExchange<Task>(ref _currentTask, completion.Task, null);
- if (original != null)
- {
- completion.SetException(ADP.ExceptionWithStackTrace(ADP.AsyncOperationPending()));
- }
- else
- {
- // Set up a combined cancellation token for both the user's and our disposal tokens
- CancellationTokenSource combinedTokenSource;
- if (!cancellationToken.CanBeCanceled)
- {
- // Users token is not cancellable - just use ours
- combinedTokenSource = _disposalTokenSource;
- }
- else
- {
- // Setup registrations from user and disposal token to cancel the combined token
- combinedTokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, _disposalTokenSource.Token);
- }
- int bytesRead = 0;
- Task<int> getBytesTask = null;
- var reader = _reader;
- if ((reader != null) && (!cancellationToken.IsCancellationRequested) && (!_disposalTokenSource.Token.IsCancellationRequested))
- {
- getBytesTask = reader.GetBytesAsync(_columnIndex, buffer, offset, count, _readTimeout, combinedTokenSource.Token, out bytesRead);
- }
- if (getBytesTask == null)
- {
- _currentTask = null;
- if (cancellationToken.IsCancellationRequested)
- {
- completion.SetCanceled();
- }
- else if (!CanRead)
- {
- completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
- }
- else
- {
- completion.SetResult(bytesRead);
- }
- if (combinedTokenSource != _disposalTokenSource)
- {
- combinedTokenSource.Dispose();
- }
- }
- else
- {
- getBytesTask.ContinueWith((t) =>
- {
- _currentTask = null;
- // If we completed, but _reader is null (i.e. the stream is closed), then report cancellation
- if ((t.Status == TaskStatus.RanToCompletion) && (CanRead))
- {
- completion.SetResult((int)t.Result);
- }
- else if (t.Status == TaskStatus.Faulted)
- {
- if (t.Exception.InnerException is SqlException)
- {
- // Stream.ReadAsync() can't throw a SqlException - so wrap it in an IOException
- completion.SetException(ADP.ExceptionWithStackTrace(ADP.ErrorReadingFromStream(t.Exception.InnerException)));
- }
- else
- {
- completion.SetException(t.Exception.InnerException);
- }
- }
- else if (!CanRead)
- {
- completion.SetException(ADP.ExceptionWithStackTrace(ADP.ObjectDisposed(this)));
- }
- else
- {
- completion.SetCanceled();
- }
-
- if (combinedTokenSource != _disposalTokenSource)
- {
- combinedTokenSource.Dispose();
- }
- }, TaskScheduler.Default);
- }
- }
- }
- catch (Exception ex)
- {
- // In case of any errors, ensure that the completion is completed and the task is set back to null if we switched it
- completion.TrySetException(ex);
- Interlocked.CompareExchange(ref _currentTask, null, completion.Task);
- throw;
- }
- }
- return completion.Task;
- }
- public override long Seek(long offset, IO.SeekOrigin origin)
- {
- throw ADP.NotSupported();
- }
- public override void SetLength(long value)
- {
- throw ADP.NotSupported();
- }
- public override void Write(byte[] buffer, int offset, int count)
- {
- throw ADP.NotSupported();
- }
- /// <summary>
- /// Forces the stream to act as if it was closed (i.e. CanRead=false and Read() throws)
- /// This does not actually close the stream, read off the rest of the data or dispose this
- /// </summary>
- internal void SetClosed()
- {
- _disposalTokenSource.Cancel();
- _reader = null;
- // Wait for pending task
- var currentTask = _currentTask;
- if (currentTask != null)
- {
- ((IAsyncResult)currentTask).AsyncWaitHandle.WaitOne();
- }
- }
- protected override void Dispose(bool disposing)
- {
- if (disposing)
- {
- // Set the stream as closed
- SetClosed();
- }
- base.Dispose(disposing);
- }
- /// <summary>
- /// Checks the the parameters passed into a Read() method are valid
- /// </summary>
- /// <param name="buffer"></param>
- /// <param name="index"></param>
- /// <param name="count"></param>
- internal static void ValidateReadParameters(byte[] buffer, int offset, int count)
- {
- if (buffer == null)
- {
- throw ADP.ArgumentNull(ADP.ParameterBuffer);
- }
- if (offset < 0)
- {
- throw ADP.ArgumentOutOfRange(ADP.ParameterOffset);
- }
- if (count < 0)
- {
- throw ADP.ArgumentOutOfRange(ADP.ParameterCount);
- }
- try
- {
- if (checked(offset + count) > buffer.Length)
- {
- throw ExceptionBuilder.InvalidOffsetLength();
- }
- }
- catch (OverflowException)
- {
- // If we've overflowed when adding offset and count, then they never would have fit into buffer anyway
- throw ExceptionBuilder.InvalidOffsetLength();
- }
- }
- }
- }
|