| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583 |
- //------------------------------------------------------------------------------
- // <copyright file="SqlBulkCopy.cs" company="Microsoft">
- // Copyright (c) Microsoft Corporation. All rights reserved.
- // </copyright>
- // <owner current="true" primary="true">[....]</owner>
- // <owner current="true" primary="false">[....]</owner>
- //------------------------------------------------------------------------------
- // todo list:
- // * An ID column need to be ignored - even if there is an association
- // * Spec: ID columns will be ignored - even if there is an association
- // * Spec: How do we publish CommandTimeout on the bcpoperation?
- //
- namespace System.Data.SqlClient {
- using System;
- using System.Collections;
- using System.Collections.Generic;
- using System.ComponentModel;
- using System.Data;
- using System.Data.Common;
- using System.Data.Sql;
- using System.Data.SqlTypes;
- using System.Diagnostics;
- using System.Globalization;
- using System.Runtime.CompilerServices;
- using System.Runtime.ConstrainedExecution;
- using System.Text;
- using System.Threading;
- using System.Threading.Tasks;
- using System.Transactions;
- using System.Xml;
- using MSS = Microsoft.SqlServer.Server;
- // -------------------------------------------------------------------------------------------------
- // this internal class helps us to associate the metadata (from the target)
- // with columnordinals (from the source)
- //
- sealed internal class _ColumnMapping {
- internal int _sourceColumnOrdinal;
- internal _SqlMetaData _metadata;
- internal _ColumnMapping(int columnId, _SqlMetaData metadata) {
- _sourceColumnOrdinal = columnId;
- _metadata = metadata;
- }
- }
- sealed internal class Row {
- private object[] _dataFields;
- internal Row(int rowCount) {
- _dataFields = new object[rowCount];
- }
- internal object[] DataFields {
- get {
- return _dataFields;
- }
- }
- internal object this[int index] {
- get {
- return _dataFields[index];
- }
- }
- }
- // the controlling class for one result (metadata + rows)
- //
- sealed internal class Result {
- private _SqlMetaDataSet _metadata;
- private ArrayList _rowset;
- internal Result(_SqlMetaDataSet metadata) {
- this._metadata = metadata;
- this._rowset = new ArrayList();
- }
- internal int Count {
- get {
- return _rowset.Count;
- }
- }
- internal _SqlMetaDataSet MetaData {
- get {
- return _metadata;
- }
- }
- internal Row this[int index] {
- get {
- return (Row)_rowset[index];
- }
- }
- internal void AddRow(Row row) {
- _rowset.Add(row);
- }
- }
- // A wrapper object for metadata and rowsets returned by our initial queries
- //
- sealed internal class BulkCopySimpleResultSet {
- private ArrayList _results; // the list of results
- private Result resultSet; // the current result
- private int[] indexmap; // associates columnids with indexes in the rowarray
- // c-tor
- //
- internal BulkCopySimpleResultSet() {
- _results = new ArrayList();
- }
- // indexer
- //
- internal Result this[int idx] {
- get {
- return (Result)_results[idx];
- }
- }
- // callback function for the tdsparser
- // note that setting the metadata adds a resultset
- //
- internal void SetMetaData(_SqlMetaDataSet metadata) {
- resultSet = new Result(metadata);
- _results.Add(resultSet);
- indexmap = new int[resultSet.MetaData.Length];
- for(int i = 0; i < indexmap.Length; i++) {
- indexmap[i] = i;
- }
- }
- // callback function for the tdsparser
- // this will create an indexmap for the active resultset
- //
- internal int[] CreateIndexMap() {
- return indexmap;
- }
- // callback function for the tdsparser
- // this will return an array of rows to store the rowdata
- //
- internal object[] CreateRowBuffer() {
- Row row = new Row(resultSet.MetaData.Length);
- resultSet.AddRow(row);
- return row.DataFields;
- }
- }
- // -------------------------------------------------------------------------------------------------
- //
- //
- public sealed class SqlBulkCopy : IDisposable {
- private enum TableNameComponents {
- Server = 0,
- Catalog,
- Owner,
- TableName,
- }
- private enum ValueSourceType {
- Unspecified = 0,
- IDataReader,
- DataTable,
- RowArray
- }
- // Enum for specifying SqlDataReader.Get method used
- private enum ValueMethod : byte {
- GetValue,
- SqlTypeSqlDecimal,
- SqlTypeSqlDouble,
- SqlTypeSqlSingle,
- DataFeedStream,
- DataFeedText,
- DataFeedXml
- }
- // Used to hold column metadata for SqlDataReader case
- private struct SourceColumnMetadata {
- public SourceColumnMetadata(ValueMethod method, bool isSqlType, bool isDataFeed) {
- Method = method;
- IsSqlType = isSqlType;
- IsDataFeed = isDataFeed;
- }
- public readonly ValueMethod Method;
- public readonly bool IsSqlType;
- public readonly bool IsDataFeed;
- }
- // The initial query will return three tables.
- // Transaction count has only one value in one column and one row
- // MetaData has n columns but no rows
- // Collation has 4 columns and n rows
- private const int TranCountResultId = 0;
- private const int TranCountRowId = 0;
- private const int TranCountValueId = 0;
- private const int MetaDataResultId = 1;
- private const int CollationResultId = 2;
- private const int ColIdId = 0;
- private const int NameId = 1;
- private const int Tds_CollationId = 2;
- private const int CollationId = 3;
- private const int MAX_LENGTH = 0x7FFFFFFF;
- private const int DefaultCommandTimeout = 30;
- private bool _enableStreaming = false;
- private int _batchSize;
- private bool _ownConnection;
- private SqlBulkCopyOptions _copyOptions;
- private int _timeout = DefaultCommandTimeout;
- private string _destinationTableName;
- private int _rowsCopied;
- private int _notifyAfter;
- private int _rowsUntilNotification;
- private bool _insideRowsCopiedEvent;
- private object _rowSource;
- private SqlDataReader _SqlDataReaderRowSource;
- private bool _rowSourceIsSqlDataReaderSmi;
- private DbDataReader _DbDataReaderRowSource;
- private DataTable _dataTableSource;
- private SqlBulkCopyColumnMappingCollection _columnMappings;
- private SqlBulkCopyColumnMappingCollection _localColumnMappings;
- private SqlConnection _connection;
- private SqlTransaction _internalTransaction;
- private SqlTransaction _externalTransaction;
- private ValueSourceType _rowSourceType = ValueSourceType.Unspecified;
- private DataRow _currentRow;
- private int _currentRowLength;
- private DataRowState _rowStateToSkip;
- private IEnumerator _rowEnumerator;
- private TdsParser _parser;
- private TdsParserStateObject _stateObj;
- private List<_ColumnMapping> _sortedColumnMappings;
- private SqlRowsCopiedEventHandler _rowsCopiedEventHandler;
- private static int _objectTypeCount; // Bid counter
- internal readonly int _objectID = System.Threading.Interlocked.Increment(ref _objectTypeCount);
- //newly added member variables for Async modification, m = member variable to bcp
- private int _savedBatchSize = 0; //save the batchsize so that changes are not affected unexpectedly
- private bool _hasMoreRowToCopy = false;
- private bool _isAsyncBulkCopy = false;
- private bool _isBulkCopyingInProgress = false;
- private SqlInternalConnectionTds.SyncAsyncLock _parserLock = null;
- private SourceColumnMetadata[] _currentRowMetadata;
- // for debug purpose only.
- //
- #if DEBUG
- internal static bool _setAlwaysTaskOnWrite = false; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task
- internal static bool SetAlwaysTaskOnWrite {
- set {
- _setAlwaysTaskOnWrite = value;
- }
- get{
- return _setAlwaysTaskOnWrite;
- }
- }
- #endif
- // ctor
- //
- public SqlBulkCopy(SqlConnection connection) {
- if(connection == null) {
- throw ADP.ArgumentNull("connection");
- }
- _connection = connection;
- _columnMappings = new SqlBulkCopyColumnMappingCollection();
- }
- public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction)
- : this (connection) {
- _copyOptions = copyOptions;
- if(externalTransaction != null && IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) {
- throw SQL.BulkLoadConflictingTransactionOption();
- }
- if(!IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) {
- _externalTransaction = externalTransaction;
- }
- }
- public SqlBulkCopy(string connectionString) : this (new SqlConnection(connectionString)) {
- if(connectionString == null) {
- throw ADP.ArgumentNull("connectionString");
- }
- _connection = new SqlConnection(connectionString);
- _columnMappings = new SqlBulkCopyColumnMappingCollection();
- _ownConnection = true;
- }
- public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions)
- : this (connectionString) {
- _copyOptions = copyOptions;
- }
- public int BatchSize {
- get {
- return _batchSize;
- }
- set {
- if(value >= 0) {
- _batchSize = value;
- }
- else {
- throw ADP.ArgumentOutOfRange("BatchSize");
- }
- }
- }
- public int BulkCopyTimeout {
- get {
- return _timeout;
- }
- set {
- if(value < 0) {
- throw SQL.BulkLoadInvalidTimeout(value);
- }
- _timeout = value;
- }
- }
- public bool EnableStreaming {
- get {
- return _enableStreaming;
- }
- set {
- _enableStreaming = value;
- }
- }
- public SqlBulkCopyColumnMappingCollection ColumnMappings {
- get {
- return _columnMappings;
- }
- }
- public string DestinationTableName {
- get {
- return _destinationTableName;
- }
- set {
- if(value == null) {
- throw ADP.ArgumentNull("DestinationTableName");
- }
- else if(value.Length == 0) {
- throw ADP.ArgumentOutOfRange("DestinationTableName");
- }
- _destinationTableName = value;
- }
- }
- public int NotifyAfter {
- get {
- return _notifyAfter;
- }
- set {
- if(value >= 0) {
- _notifyAfter = value;
- }
- else {
- throw ADP.ArgumentOutOfRange("NotifyAfter");
- }
- }
- }
- internal int ObjectID {
- get {
- return _objectID;
- }
- }
- public event SqlRowsCopiedEventHandler SqlRowsCopied {
- add {
- _rowsCopiedEventHandler += value;
- }
- remove {
- _rowsCopiedEventHandler -= value;
- }
- }
- internal SqlStatistics Statistics {
- get {
- if(null != _connection) {
- if(_connection.StatisticsEnabled) {
- return _connection.Statistics;
- }
- }
- return null;
- }
- }
- //================================================================
- // IDisposable
- //================================================================
- void IDisposable.Dispose() {
- this.Dispose(true);
- GC.SuppressFinalize(this);
- }
- private bool IsCopyOption(SqlBulkCopyOptions copyOption) {
- return (_copyOptions & copyOption) == copyOption;
- }
-
- //Creates the initial query string, but does not execute it.
- //
- private string CreateInitialQuery() {
- string[] parts;
- try {
- parts = MultipartIdentifier.ParseMultipartIdentifier(this.DestinationTableName, "[\"", "]\"", Res.SQL_BulkCopyDestinationTableName, true);
- }
- catch (Exception e) {
- throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, e);
- }
- if (ADP.IsEmpty(parts[MultipartIdentifier.TableIndex])) {
- throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, null);
- }
- string TDSCommand;
-
- TDSCommand = "select @@trancount; SET FMTONLY ON select * from " + this.DestinationTableName + " SET FMTONLY OFF ";
- if (_connection.IsShiloh) {
- // If its a temp DB then try to connect
- string TableCollationsStoredProc;
- if (_connection.IsKatmaiOrNewer) {
- TableCollationsStoredProc = "sp_tablecollations_100";
- }
- else if (_connection.IsYukonOrNewer) {
- TableCollationsStoredProc = "sp_tablecollations_90";
- }
- else {
- TableCollationsStoredProc = "sp_tablecollations";
- }
- string TableName = parts[MultipartIdentifier.TableIndex];
- bool isTempTable = TableName.Length > 0 && '#' == TableName[0];
- if (!ADP.IsEmpty(TableName)) {
- // Escape table name to be put inside TSQL literal block (within N'').
- TableName = SqlServerEscapeHelper.EscapeStringAsLiteral(TableName);
- // VSDD 581951 - escape the table name
- TableName = SqlServerEscapeHelper.EscapeIdentifier(TableName);
- }
- string SchemaName = parts[MultipartIdentifier.SchemaIndex];
- if (!ADP.IsEmpty(SchemaName)) {
- // Escape schema name to be put inside TSQL literal block (within N'').
- SchemaName = SqlServerEscapeHelper.EscapeStringAsLiteral(SchemaName);
- // VSDD 581951 - escape the schema name
- SchemaName = SqlServerEscapeHelper.EscapeIdentifier(SchemaName);
- }
- string CatalogName = parts[MultipartIdentifier.CatalogIndex];
- if (isTempTable && ADP.IsEmpty(CatalogName)) {
- TDSCommand += String.Format((IFormatProvider)null, "exec tempdb..{0} N'{1}.{2}'",
- TableCollationsStoredProc,
- SchemaName,
- TableName
- );
- }
- else {
- // VSDD 581951 - escape the catalog name
- if (!ADP.IsEmpty(CatalogName)) {
- CatalogName = SqlServerEscapeHelper.EscapeIdentifier(CatalogName);
- }
- TDSCommand += String.Format((IFormatProvider)null, "exec {0}..{1} N'{2}.{3}'",
- CatalogName,
- TableCollationsStoredProc,
- SchemaName,
- TableName
- );
- }
- }
- return TDSCommand;
- }
- // Creates and then executes initial query to get information about the targettable
- // When __isAsyncBulkCopy == false (i.e. it is [....] copy): out result contains the resulset. Returns null.
- // When __isAsyncBulkCopy == true (i.e. it is Async copy): This still uses the _parser.Run method synchronously and return Task<BulkCopySimpleResultSet>.
- // We need to have a _parser.RunAsync to make it real async.
- private Task<BulkCopySimpleResultSet> CreateAndExecuteInitialQueryAsync(out BulkCopySimpleResultSet result) {
- string TDSCommand = CreateInitialQuery();
- Bid.Trace("<sc.SqlBulkCopy.CreateAndExecuteInitialQueryAsync|INFO> Initial Query: '%ls' \n", TDSCommand);
- Bid.CorrelationTrace("<sc.SqlBulkCopy.CreateAndExecuteInitialQueryAsync|Info|Correlation> ObjectID%d#, ActivityID %ls\n", ObjectID);
- Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true);
- if (executeTask == null) {
- result = new BulkCopySimpleResultSet();
- RunParser(result);
- return null;
- }
- else {
- Debug.Assert(_isAsyncBulkCopy, "Execution pended when not doing async bulk copy");
- result = null;
- return executeTask.ContinueWith<BulkCopySimpleResultSet>(t => {
- Debug.Assert(!t.IsCanceled, "Execution task was canceled");
- if (t.IsFaulted) {
- throw t.Exception.InnerException;
- }
- else {
- var internalResult = new BulkCopySimpleResultSet();
- RunParserReliably(internalResult);
- return internalResult;
- }
- }, TaskScheduler.Default);
- }
- }
- // Matches associated columns with metadata from initial query
- // builds and executes the update bulk command
- //
- private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet internalResults) {
- StringBuilder updateBulkCommandText = new StringBuilder();
- if (_connection.IsShiloh && 0 == internalResults[CollationResultId].Count) {
- throw SQL.BulkLoadNoCollation();
- }
- Debug.Assert((internalResults != null), "Where are the results from the initial query?");
- updateBulkCommandText.AppendFormat("insert bulk {0} (", this.DestinationTableName);
- int nmatched = 0; // number of columns that match and are accepted
- int nrejected = 0; // number of columns that match but were rejected
- bool rejectColumn; // true if a column is rejected because of an excluded type
- bool isInTransaction;
- if(_parser.IsYukonOrNewer) {
- isInTransaction = _connection.HasLocalTransaction;
- }
- else {
- isInTransaction = (bool)(0 < (SqlInt32)(internalResults[TranCountResultId][TranCountRowId][TranCountValueId]));
- }
- // Throw if there is a transaction but no flag is set
- if(isInTransaction && null == _externalTransaction && null == _internalTransaction && (_connection.Parser != null && _connection.Parser.CurrentTransaction != null && _connection.Parser.CurrentTransaction.IsLocal)) {
- throw SQL.BulkLoadExistingTransaction();
- }
- // loop over the metadata for each column
- //
- _SqlMetaDataSet metaDataSet = internalResults[MetaDataResultId].MetaData;
- _sortedColumnMappings = new List<_ColumnMapping>(metaDataSet.Length);
- for(int i = 0; i < metaDataSet.Length; i++) {
- _SqlMetaData metadata = metaDataSet[i];
- rejectColumn = false;
- // Check for excluded types
- //
- if((metadata.type == SqlDbType.Timestamp)
- || ((metadata.isIdentity) && !IsCopyOption(SqlBulkCopyOptions.KeepIdentity))) {
- // remove metadata for excluded columns
- metaDataSet[i] = null;
- rejectColumn = true;
- // we still need to find a matching column association
- }
- // find out if this column is associated
- int assocId;
- for(assocId = 0; assocId < _localColumnMappings.Count; assocId++) {
- if((_localColumnMappings[assocId]._destinationColumnOrdinal == metadata.ordinal) ||
- (UnquotedName(_localColumnMappings[assocId]._destinationColumnName) == metadata.column)) {
- if(rejectColumn) {
- nrejected++; // count matched columns only
- break;
- }
- _sortedColumnMappings.Add(new _ColumnMapping(_localColumnMappings[assocId]._internalSourceColumnOrdinal, metadata));
- nmatched++;
- if(nmatched > 1) {
- updateBulkCommandText.Append(", "); // a leading comma for all but the first one
- }
- // some datatypes need special handling ...
- //
- if(metadata.type == SqlDbType.Variant) {
- AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "sql_variant");
- }
- else if(metadata.type == SqlDbType.Udt) {
- // UDTs are sent as varbinary
- AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
- }
- else {
- AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, typeof(SqlDbType).GetEnumName(metadata.type));
- }
- switch(metadata.metaType.NullableType) {
- case TdsEnums.SQLNUMERICN:
- case TdsEnums.SQLDECIMALN:
- // decimal and numeric need to include precision and scale
- //
- updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0},{1})", metadata.precision, metadata.scale);
- break;
- case TdsEnums.SQLUDT: {
- if (metadata.IsLargeUdt) {
- updateBulkCommandText.Append("(max)");
- } else {
- int size = metadata.length;
- updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
- }
- break;
- }
- case TdsEnums.SQLTIME:
- case TdsEnums.SQLDATETIME2:
- case TdsEnums.SQLDATETIMEOFFSET:
- // date, dateime2, and datetimeoffset need to include scale
- //
- updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", metadata.scale);
- break;
- default: {
- // for non-long non-fixed types we need to add the Size
- //
- if(!metadata.metaType.IsFixed && !metadata.metaType.IsLong) {
- int size = metadata.length;
- switch(metadata.metaType.NullableType) {
- case TdsEnums.SQLNCHAR:
- case TdsEnums.SQLNVARCHAR:
- case TdsEnums.SQLNTEXT:
- size /= 2;
- break;
- default:
- break;
- }
- updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
- }
- else if(metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml) {
- // Partial length column prefix (max)
- updateBulkCommandText.Append("(max)");
- }
- break;
- }
- }
- if(_connection.IsShiloh) {
- // Shiloh or above!
- // get collation for column i
- Result rowset = internalResults[CollationResultId];
- object rowvalue = rowset[i][CollationId];
- if(rowvalue != null) {
- Debug.Assert(rowvalue is SqlString);
- SqlString collation_name = (SqlString)rowvalue;
- if(!collation_name.IsNull) {
- updateBulkCommandText.Append(" COLLATE " + collation_name.Value);
- // VSTFDEVDIV 461426: compare collations only if the collation value was set on the metadata
- if (null != _SqlDataReaderRowSource && metadata.collation != null) {
- // On SqlDataReader we can verify the sourcecolumn collation!
- int sourceColumnId = _localColumnMappings[assocId]._internalSourceColumnOrdinal;
- int destinationLcid = metadata.collation.LCID;
- int sourceLcid = _SqlDataReaderRowSource.GetLocaleId(sourceColumnId);
- if(sourceLcid != destinationLcid) {
- throw SQL.BulkLoadLcidMismatch(sourceLcid, _SqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column);
- }
- }
- }
- }
- }
- break;
- } // end if found
- } // end of (inner) for loop
- if(assocId == _localColumnMappings.Count) {
- // remove metadata for unmatched columns
- metaDataSet[i] = null;
- }
- } // end of (outer) for loop
- // all columnmappings should have matched up
- if(nmatched + nrejected != _localColumnMappings.Count) {
- throw (SQL.BulkLoadNonMatchingColumnMapping());
- }
- updateBulkCommandText.Append(")");
- if((_copyOptions & (
- SqlBulkCopyOptions.KeepNulls
- | SqlBulkCopyOptions.TableLock
- | SqlBulkCopyOptions.CheckConstraints
- | SqlBulkCopyOptions.FireTriggers)) != SqlBulkCopyOptions.Default) {
- bool addSeparator = false; // insert a comma character if multiple options in list ...
- updateBulkCommandText.Append(" with (");
- if(IsCopyOption(SqlBulkCopyOptions.KeepNulls)) {
- updateBulkCommandText.Append("KEEP_NULLS");
- addSeparator = true;
- }
- if(IsCopyOption(SqlBulkCopyOptions.TableLock)) {
- updateBulkCommandText.Append((addSeparator ? ", " : "") + "TABLOCK");
- addSeparator = true;
- }
- if(IsCopyOption(SqlBulkCopyOptions.CheckConstraints)) {
- updateBulkCommandText.Append((addSeparator ? ", " : "") + "CHECK_CONSTRAINTS");
- addSeparator = true;
- }
- if(IsCopyOption(SqlBulkCopyOptions.FireTriggers)) {
- updateBulkCommandText.Append((addSeparator ? ", " : "") + "FIRE_TRIGGERS");
- addSeparator = true;
- }
- updateBulkCommandText.Append(")");
- }
- return (updateBulkCommandText.ToString());
- }
- // submitts the updatebulk command
- //
- private Task SubmitUpdateBulkCommand(string TDSCommand) {
- Bid.CorrelationTrace("<sc.SqlBulkCopy.SubmitUpdateBulkCommand|Info|Correlation> ObjectID%d#, ActivityID %ls\n", ObjectID);
- Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true);
- if (executeTask == null) {
- RunParser();
- return null;
- }
- else {
- Debug.Assert(_isAsyncBulkCopy, "Execution pended when not doing async bulk copy");
- return executeTask.ContinueWith(t => {
- Debug.Assert(!t.IsCanceled, "Execution task was canceled");
- if (t.IsFaulted) {
- throw t.Exception.InnerException;
- }
- else {
- RunParserReliably();
- }
- }, TaskScheduler.Default);
- }
- }
- // Starts writing the Bulkcopy data stream
- //
- private void WriteMetaData(BulkCopySimpleResultSet internalResults) {
- _stateObj.SetTimeoutSeconds(this.BulkCopyTimeout);
- _SqlMetaDataSet metadataCollection = internalResults[MetaDataResultId].MetaData;
- _stateObj._outputMessageType = TdsEnums.MT_BULK;
- _parser.WriteBulkCopyMetaData(metadataCollection, _sortedColumnMappings.Count, _stateObj);
- }
- //================================================================
- // Close()
- //
- // Terminates the bulk copy operation.
- // Must be called at the end of the bulk copy session.
- //================================================================
- public void Close() {
- if(_insideRowsCopiedEvent) {
- throw SQL.InvalidOperationInsideEvent();
- }
- Dispose(true);
- GC.SuppressFinalize(this);
- }
- private void Dispose(bool disposing) {
- if(disposing) {
- // dispose dependend objects
- _columnMappings = null;
- _parser = null;
- try {
- // Just in case there is a lingering transaction (which there shouldn't be)
- try {
- Debug.Assert(_internalTransaction == null, "Internal transaction exists during dispose");
- if (null != _internalTransaction) {
- _internalTransaction.Rollback();
- _internalTransaction.Dispose();
- _internalTransaction = null;
- }
- }
- catch(Exception e) {
- //
- if(!ADP.IsCatchableExceptionType(e)) {
- throw;
- }
- ADP.TraceExceptionWithoutRethrow(e);
- }
- }
- finally {
- if(_connection != null) {
- if(_ownConnection) {
- _connection.Dispose();
- }
- _connection = null;
- }
- }
- }
- // free unmanaged objects
- }
- // unified method to read a value from the current row
- //
- private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out bool isDataFeed, out bool isNull) {
- _SqlMetaData metadata = _sortedColumnMappings[destRowIndex]._metadata;
- int sourceOrdinal = _sortedColumnMappings[destRowIndex]._sourceColumnOrdinal;
- switch(_rowSourceType) {
- case ValueSourceType.IDataReader:
- // Handle data feeds (common for both DbDataReader and SqlDataReader)
- if (_currentRowMetadata[destRowIndex].IsDataFeed) {
- if (_DbDataReaderRowSource.IsDBNull(sourceOrdinal)) {
- isSqlType = false;
- isDataFeed = false;
- isNull = true;
- return DBNull.Value;
- }
- else {
- isSqlType = false;
- isDataFeed = true;
- isNull = false;
- switch (_currentRowMetadata[destRowIndex].Method) {
- case ValueMethod.DataFeedStream:
- return new StreamDataFeed(_DbDataReaderRowSource.GetStream(sourceOrdinal));
- case ValueMethod.DataFeedText:
- return new TextDataFeed(_DbDataReaderRowSource.GetTextReader(sourceOrdinal));
- case ValueMethod.DataFeedXml:
- // Only SqlDataReader supports an XmlReader
- // There is no GetXmlReader on DbDataReader, however if GetValue returns XmlReader we will read it as stream if it is assigned to XML field
- Debug.Assert(_SqlDataReaderRowSource != null, "Should not be reading row as an XmlReader if bulk copy source is not a SqlDataReader");
- return new XmlDataFeed(_SqlDataReaderRowSource.GetXmlReader(sourceOrdinal));
- default:
- Debug.Assert(false, string.Format("Current column is marked as being a DataFeed, but no DataFeed compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method));
- isDataFeed = false;
- object columnValue = _DbDataReaderRowSource.GetValue(sourceOrdinal);
- ADP.IsNullOrSqlType(columnValue, out isNull, out isSqlType);
- return columnValue;
- }
- }
- }
- // SqlDataReader-specific logic
- else if (null != _SqlDataReaderRowSource) {
- if (_currentRowMetadata[destRowIndex].IsSqlType) {
- INullable value;
- isSqlType = true;
- isDataFeed = false;
- switch (_currentRowMetadata[destRowIndex].Method) {
- case ValueMethod.SqlTypeSqlDecimal:
- value = _SqlDataReaderRowSource.GetSqlDecimal(sourceOrdinal);
- break;
- case ValueMethod.SqlTypeSqlDouble:
- value = new SqlDecimal(_SqlDataReaderRowSource.GetSqlDouble(sourceOrdinal).Value);
- break;
- case ValueMethod.SqlTypeSqlSingle:
- value = new SqlDecimal(_SqlDataReaderRowSource.GetSqlSingle(sourceOrdinal).Value);
- break;
- default:
- Debug.Assert(false, string.Format("Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method));
- value = (INullable)_SqlDataReaderRowSource.GetSqlValue(sourceOrdinal);
- break;
- }
- isNull = value.IsNull;
- return value;
- }
- else {
- isSqlType = false;
- isDataFeed = false;
- object value = _SqlDataReaderRowSource.GetValue(sourceOrdinal);
- isNull = ((value == null) || (value == DBNull.Value));
- if ((!isNull) && (metadata.type == SqlDbType.Udt)) {
- var columnAsINullable = value as INullable;
- isNull = (columnAsINullable != null) && columnAsINullable.IsNull;
- }
- #if DEBUG
- else if (!isNull) {
- Debug.Assert(!(value is INullable) || !((INullable)value).IsNull, "IsDBNull returned false, but GetValue returned a null INullable");
- }
- #endif
- return value;
- }
- }
- else {
- isDataFeed = false;
- IDataReader rowSourceAsIDataReader = (IDataReader)_rowSource;
- // Back-compat with 4.0 and 4.5 - only use IsDbNull when streaming is enabled and only for non-SqlDataReader
- if ((_enableStreaming) && (_SqlDataReaderRowSource == null) && (rowSourceAsIDataReader.IsDBNull(sourceOrdinal))) {
- isSqlType = false;
- isNull = true;
- return DBNull.Value;
- }
- else {
- object columnValue = rowSourceAsIDataReader.GetValue(sourceOrdinal);
- ADP.IsNullOrSqlType(columnValue, out isNull, out isSqlType);
- return columnValue;
- }
- }
-
- case ValueSourceType.DataTable:
- case ValueSourceType.RowArray: {
- Debug.Assert(_currentRow != null, "uninitialized _currentRow");
- Debug.Assert(sourceOrdinal < _currentRowLength, "inconsistency of length of rows from rowsource!");
- isDataFeed = false;
- object currentRowValue = _currentRow[sourceOrdinal];
- ADP.IsNullOrSqlType(currentRowValue, out isNull, out isSqlType);
- // If this row is not null, and there are special storage types for this row, then handle the special storage types
- if ((!isNull) && (_currentRowMetadata[destRowIndex].IsSqlType)) {
- switch (_currentRowMetadata[destRowIndex].Method) {
- case ValueMethod.SqlTypeSqlSingle: {
- if (isSqlType) {
- return new SqlDecimal(((SqlSingle)currentRowValue).Value);
- }
- else {
- float f = (float)currentRowValue;
- if (!float.IsNaN(f)) {
- isSqlType = true;
- return new SqlDecimal(f);
- }
- break;
- }
- }
- case ValueMethod.SqlTypeSqlDouble: {
- if (isSqlType) {
- return new SqlDecimal(((SqlDouble)currentRowValue).Value);
- }
- else {
- double d = (double)currentRowValue;
- if (!double.IsNaN(d)) {
- isSqlType = true;
- return new SqlDecimal(d);
- }
- break;
- }
- }
- case ValueMethod.SqlTypeSqlDecimal: {
- if (isSqlType) {
- return (SqlDecimal)currentRowValue;
- }
- else {
- isSqlType = true;
- return new SqlDecimal((Decimal)currentRowValue);
- }
- }
- default: {
- Debug.Assert(false, string.Format("Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method));
- break;
- }
- }
- }
-
- // If we are here then either the value is null, there was no special storage type for this column or the special storage type wasn't handled (e.g. if the currentRowValue is NaN)
- return currentRowValue;
- }
- default: {
- Debug.Assert(false, "ValueSourcType unspecified");
- throw ADP.NotSupported();
- }
- }
- }
-
- // unified method to read a row from the current rowsource
- // When _isAsyncBulkCopy == true (i.e. async copy): returns Task<bool> when IDataReader is a DbDataReader, Null for others.
- // When _isAsyncBulkCopy == false (i.e. [....] copy): returns null. Uses ReadFromRowSource to get the boolean value.
- // "more" -- should be used by the caller only when the return value is null.
- private Task ReadFromRowSourceAsync(CancellationToken cts) {
- DbDataReader dbRowSource = _rowSource as DbDataReader;
- if (_isAsyncBulkCopy && _rowSourceType == ValueSourceType.IDataReader && (_rowSource as DbDataReader) != null) {
- //This will call ReadAsync for DbDataReader (for SqlDataReader it will be truely async read; for non-SqlDataReader it may block.)
- return ((DbDataReader)_rowSource).ReadAsync(cts).ContinueWith((t) => {
- if (t.Status == TaskStatus.RanToCompletion) {
- _hasMoreRowToCopy = t.Result;
- }
- return t;
- }, TaskScheduler.Default).Unwrap();
- }
- else { //this will call Read for DataRows, DataTable and IDataReader (this includes all IDataReader except DbDataReader)
- _hasMoreRowToCopy = false;
- try {
- _hasMoreRowToCopy = ReadFromRowSource(); //Synchronous calls for DataRows and DataTable won't block. For IDataReader, it may block.
- }
- catch(Exception ex) {
- if (_isAsyncBulkCopy) {
- TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
- tcs.SetException(ex);
- return tcs.Task;
- }
- else {
- throw;
- }
- }
- return null;
- }
- }
-
- private bool ReadFromRowSource() {
- switch(_rowSourceType) {
- case ValueSourceType.IDataReader:
- return ((IDataReader)_rowSource).Read();
- // treatment for RowArray case is same as for DataTable, prevent code duplicate
- case ValueSourceType.RowArray:
- case ValueSourceType.DataTable:
- Debug.Assert(_rowEnumerator != null, "uninitialized _rowEnumerator");
- Debug.Assert((_rowStateToSkip & DataRowState.Deleted) != 0, "Deleted is a permitted rowstate?");
- // repeat until we get a row that is not deleted or there are no more rows ...
- do {
- if(!_rowEnumerator.MoveNext()) {
- return false;
- }
- _currentRow = (DataRow)_rowEnumerator.Current;
- } while ((_currentRow.RowState & _rowStateToSkip) != 0); // repeat if there is an unexpected rowstate
- // SQLBUVSTS01:36286 - move this line out of loop because
- // ItemArray raises exception when used on deleted row
- _currentRowLength = _currentRow.ItemArray.Length;
- return true;
-
- default:
- Debug.Assert(false, "ValueSourcType unspecified");
- throw ADP.NotSupported();
- }
- }
- private SourceColumnMetadata GetColumnMetadata(int ordinal) {
- int sourceOrdinal = _sortedColumnMappings[ordinal]._sourceColumnOrdinal;
- _SqlMetaData metadata = _sortedColumnMappings[ordinal]._metadata;
- // Handle special Sql data types for SqlDataReader and DataTables
- ValueMethod method;
- bool isSqlType;
- bool isDataFeed;
- if (((_SqlDataReaderRowSource != null) || (_dataTableSource != null)) && ((metadata.metaType.NullableType == TdsEnums.SQLDECIMALN) || (metadata.metaType.NullableType == TdsEnums.SQLNUMERICN))) {
- isDataFeed = false;
- Type t;
- switch(_rowSourceType) {
- case ValueSourceType.IDataReader:
- t = _SqlDataReaderRowSource.GetFieldType(sourceOrdinal);
- break;
- case ValueSourceType.DataTable:
- case ValueSourceType.RowArray:
- t = _dataTableSource.Columns[sourceOrdinal].DataType;
- break;
- default:
- t = null;
- Debug.Assert(false, string.Format("Unknown value source: {0}", _rowSourceType));
- break;
- }
-
- if (typeof(SqlDecimal) == t || typeof(Decimal) == t) {
- isSqlType = true;
- method = ValueMethod.SqlTypeSqlDecimal; // Source Type Decimal
- }
- else if (typeof(SqlDouble) == t || typeof (double) == t) {
- isSqlType = true;
- method = ValueMethod.SqlTypeSqlDouble; // Source Type SqlDouble
- }
- else if (typeof(SqlSingle) == t || typeof (float) == t) {
- isSqlType = true;
- method = ValueMethod.SqlTypeSqlSingle; // Source Type SqlSingle
- }
- else {
- isSqlType = false;
- method = ValueMethod.GetValue;
- }
- }
- // Check for data streams
- else if ((_enableStreaming) && (metadata.length == MAX_LENGTH) && (!_rowSourceIsSqlDataReaderSmi)) {
- isSqlType = false;
- if (_SqlDataReaderRowSource != null) {
- // MetaData property is not set for SMI, but since streaming is disabled we do not need it
- MetaType mtSource = _SqlDataReaderRowSource.MetaData[sourceOrdinal].metaType;
- // There is no memory gain for non-sequential access for binary
- if ((metadata.type == SqlDbType.VarBinary) && (mtSource.IsBinType) && (mtSource.SqlDbType != SqlDbType.Timestamp) && _SqlDataReaderRowSource.IsCommandBehavior(CommandBehavior.SequentialAccess)) {
- isDataFeed = true;
- method = ValueMethod.DataFeedStream;
- }
- // For text and XML there is memory gain from streaming on destination side even if reader is non-sequential
- else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml)) {
- isDataFeed = true;
- method = ValueMethod.DataFeedText;
- }
- else if ((metadata.type == SqlDbType.Xml) && (mtSource.SqlDbType == SqlDbType.Xml)) {
- isDataFeed = true;
- method = ValueMethod.DataFeedXml;
- }
- else {
- isDataFeed = false;
- method = ValueMethod.GetValue;
- }
- }
- else if (_DbDataReaderRowSource != null) {
- if (metadata.type == SqlDbType.VarBinary) {
- isDataFeed = true;
- method = ValueMethod.DataFeedStream;
- }
- else if ((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) {
- isDataFeed = true;
- method = ValueMethod.DataFeedText;
- }
- else {
- isDataFeed = false;
- method = ValueMethod.GetValue;
- }
- }
- else {
- isDataFeed = false;
- method = ValueMethod.GetValue;
- }
- }
- else {
- isSqlType = false;
- isDataFeed = false;
- method = ValueMethod.GetValue;
- }
- return new SourceColumnMetadata(method, isSqlType, isDataFeed);
- }
- //
- //
- private void CreateOrValidateConnection(string method) {
- if(null == _connection) {
- throw ADP.ConnectionRequired(method);
- }
- if (_connection.IsContextConnection) {
- throw SQL.NotAvailableOnContextConnection();
- }
-
- if(_ownConnection && _connection.State != ConnectionState.Open) {
- _connection.Open();
- }
- // close any non MARS dead readers, if applicable, and then throw if still busy.
- _connection.ValidateConnectionForExecute(method, null);
- // if we have a transaction, check to ensure that the active
- // connection property matches the connection associated with
- // the transaction
- if(null != _externalTransaction && _connection != _externalTransaction.Connection) {
- throw ADP.TransactionConnectionMismatch();
- }
- }
- // Runs the _parser until it is done and ensures that ThreadHasParserLockForClose is correctly set and unset
- // Ensure that you only call this inside of a Reliabilty Section
- private void RunParser(BulkCopySimpleResultSet bulkCopyHandler = null) {
- // In case of error while reading, we should let the connection know that we already own the _parserLock
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- internalConnection.ThreadHasParserLockForClose = true;
- try {
- _parser.Run(RunBehavior.UntilDone, null, null, bulkCopyHandler, _stateObj);
- }
- finally {
- internalConnection.ThreadHasParserLockForClose = false;
- }
- }
- // Runs the _parser until it is done and ensures that ThreadHasParserLockForClose is correctly set and unset
- // This takes care of setting up the Reliability Section, and will doom the connect if there is a catastrophic (OOM, StackOverflow, ThreadAbort) error
- private void RunParserReliably(BulkCopySimpleResultSet bulkCopyHandler = null) {
- // In case of error while reading, we should let the connection know that we already own the _parserLock
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- internalConnection.ThreadHasParserLockForClose = true;
- try {
- _parser.RunReliably(RunBehavior.UntilDone, null, null, bulkCopyHandler, _stateObj);
- }
- finally {
- internalConnection.ThreadHasParserLockForClose = false;
- }
- }
- private void CommitTransaction() {
- if (null != _internalTransaction) {
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock
- try {
- _internalTransaction.Commit(); //commit.
- _internalTransaction.Dispose();
- _internalTransaction = null;
- }
- finally {
- internalConnection.ThreadHasParserLockForClose = false;
- }
- }
- }
- private void AbortTransaction() {
- if (_internalTransaction != null) {
- if (!_internalTransaction.IsZombied) {
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock
- try {
- _internalTransaction.Rollback();
- }
- finally {
- internalConnection.ThreadHasParserLockForClose = false;
- }
- }
- _internalTransaction.Dispose();
- _internalTransaction = null;
- }
- }
- // Appends columnname in square brackets, a space and the typename to the query
- // putting the name in quotes also requires doubling existing ']' so that they are not mistaken for
- // the closing quote
- // example: abc will become [abc] but abc[] will becom [abc[]]]
- //
- private void AppendColumnNameAndTypeName(StringBuilder query, string columnName, string typeName) {
- SqlServerEscapeHelper.EscapeIdentifier(query, columnName);
- query.Append(" ");
- query.Append(typeName);
- }
- private string UnquotedName(string name) {
- if(ADP.IsEmpty(name)) return null;
- if(name[0] == '[') {
- int l = name.Length;
- Debug.Assert(name[l - 1] == ']', "Name starts with [ but doesn not end with ]");
- name = name.Substring(1, l - 2);
- }
- return name;
- }
- private object ValidateBulkCopyVariant(object value) {
- // from the spec:
- // "The only acceptable types are ..."
- // GUID, BIGVARBINARY, BIGBINARY, BIGVARCHAR, BIGCHAR, NVARCHAR, NCHAR, BIT, INT1, INT2, INT4, INT8,
- // MONEY4, MONEY, DECIMALN, NUMERICN, FTL4, FLT8, DATETIME4 and DATETIME
- //
- MetaType metatype = MetaType.GetMetaTypeFromValue(value);
- switch(metatype.TDSType) {
- case TdsEnums.SQLFLT4:
- case TdsEnums.SQLFLT8:
- case TdsEnums.SQLINT8:
- case TdsEnums.SQLINT4:
- case TdsEnums.SQLINT2:
- case TdsEnums.SQLINT1:
- case TdsEnums.SQLBIT:
- case TdsEnums.SQLBIGVARBINARY:
- case TdsEnums.SQLBIGVARCHAR:
- case TdsEnums.SQLUNIQUEID:
- case TdsEnums.SQLNVARCHAR:
- case TdsEnums.SQLDATETIME:
- case TdsEnums.SQLMONEY:
- case TdsEnums.SQLNUMERICN:
- case TdsEnums.SQLDATE:
- case TdsEnums.SQLTIME:
- case TdsEnums.SQLDATETIME2:
- case TdsEnums.SQLDATETIMEOFFSET:
- if (value is INullable) { // Current limitation in the SqlBulkCopy Variant code limits BulkCopy to CLR/COM Types.
- return MetaType.GetComValueFromSqlVariant (value);
- } else {
- return value;
- }
- default:
- throw SQL.BulkLoadInvalidVariantValue();
- }
- }
- private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, ref bool isSqlType, out bool coercedToDataFeed) {
- coercedToDataFeed = false;
- if (isNull) {
- if(!metadata.isNullable) {
- throw SQL.BulkLoadBulkLoadNotAllowDBNull(metadata.column);
- }
- return value;
- }
- MetaType type = metadata.metaType;
- bool typeChanged = false;
- try {
- MetaType mt;
- switch(type.NullableType) {
- case TdsEnums.SQLNUMERICN:
- case TdsEnums.SQLDECIMALN:
- mt = MetaType.GetMetaTypeFromSqlDbType(type.SqlDbType, false);
- value = SqlParameter.CoerceValue(value, mt, out coercedToDataFeed, out typeChanged, false);
-
- // Convert Source Decimal Percision and Scale to Destination Percision and Scale
- // Fix Bug: 385971 sql decimal data could get corrupted on insert if the scale of
- // the source and destination weren't the same. The BCP protocal, specifies the
- // scale of the incoming data in the insert statement, we just tell the server we
- // are inserting the same scale back. This then created a bug inside the BCP opperation
- // if the scales didn't match. The fix is to do the same thing that SQL Paramater does,
- // and adjust the scale before writing. In Orcas is scale adjustment should be removed from
- // SqlParamater and SqlBulkCopy and Isoloated inside SqlParamater.CoerceValue, but becouse of
- // where we are in the cycle, the changes must be kept at minimum, so I'm just bringing the
- // code over to SqlBulkCopy.
-
- SqlDecimal sqlValue;
- if ((isSqlType) && (!typeChanged)) {
- sqlValue = (SqlDecimal)value;
- }
- else {
- sqlValue = new SqlDecimal((Decimal)value);
- }
-
- if (sqlValue.Scale != metadata.scale) {
- sqlValue = TdsParser.AdjustSqlDecimalScale(sqlValue, metadata.scale);
- }
-
- // Perf: It is more effecient to write a SqlDecimal than a decimal since we need to break it into its 'bits' when writing
- value = sqlValue;
- isSqlType = true;
- typeChanged = false; // Setting this to false as SqlParameter.CoerceValue will only set it to true when coverting to a CLR type
-
- if (sqlValue.Precision > metadata.precision) {
- throw SQL.BulkLoadCannotConvertValue(value.GetType(), mt, ADP.ParameterValueOutOfRange(sqlValue));
- }
- break;
- case TdsEnums.SQLINTN:
- case TdsEnums.SQLFLTN:
- case TdsEnums.SQLFLT4:
- case TdsEnums.SQLFLT8:
- case TdsEnums.SQLMONEYN:
- case TdsEnums.SQLDATETIM4:
- case TdsEnums.SQLDATETIME:
- case TdsEnums.SQLDATETIMN:
- case TdsEnums.SQLBIT:
- case TdsEnums.SQLBITN:
- case TdsEnums.SQLUNIQUEID:
- case TdsEnums.SQLBIGBINARY:
- case TdsEnums.SQLBIGVARBINARY:
- case TdsEnums.SQLIMAGE:
- case TdsEnums.SQLBIGCHAR:
- case TdsEnums.SQLBIGVARCHAR:
- case TdsEnums.SQLTEXT:
- case TdsEnums.SQLDATE:
- case TdsEnums.SQLTIME:
- case TdsEnums.SQLDATETIME2:
- case TdsEnums.SQLDATETIMEOFFSET:
- mt = MetaType.GetMetaTypeFromSqlDbType (type.SqlDbType, false);
- value = SqlParameter.CoerceValue(value, mt, out coercedToDataFeed, out typeChanged, false);
- break;
- case TdsEnums.SQLNCHAR:
- case TdsEnums.SQLNVARCHAR:
- case TdsEnums.SQLNTEXT:
- mt = MetaType.GetMetaTypeFromSqlDbType(type.SqlDbType, false);
- value = SqlParameter.CoerceValue(value, mt, out coercedToDataFeed, out typeChanged, false);
- if (!coercedToDataFeed) { // We do not need to test for TextDataFeed as it is only assigned to (N)VARCHAR(MAX)
- int len = ((isSqlType) && (!typeChanged)) ? ((SqlString)value).Value.Length : ((string)value).Length;
- if (len > metadata.length / 2) {
- throw SQL.BulkLoadStringTooLong();
- }
- }
- break;
- case TdsEnums.SQLVARIANT:
- value = ValidateBulkCopyVariant(value);
- typeChanged = true;
- break;
- case TdsEnums.SQLUDT:
- // UDTs are sent as varbinary so we need to get the raw bytes
- // unlike other types the parser does not like SQLUDT in form of SqlType
- // so we cast to a CLR type.
- // Hack for type system version knob - only call GetBytes if the value is not already
- // in byte[] form.
- if (!(value is byte[])) {
- value = _connection.GetBytes(value);
- typeChanged = true;
- }
- break;
- case TdsEnums.SQLXMLTYPE:
- // Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed
- Debug.Assert((value is XmlReader) || (value is SqlCachedBuffer) || (value is string) || (value is SqlString) || (value is XmlDataFeed), "Invalid value type of Xml datatype");
- if(value is XmlReader) {
- value = new XmlDataFeed((XmlReader)value);
- typeChanged = true;
- coercedToDataFeed = true;
- }
- break;
- default:
- Debug.Assert(false, "Unknown TdsType!" + type.NullableType.ToString("x2", (IFormatProvider)null));
- throw SQL.BulkLoadCannotConvertValue(value.GetType(), metadata.metaType, null);
- }
- if (typeChanged) {
- // All type changes change to CLR types
- isSqlType = false;
- }
- return value;
- }
- catch(Exception e) {
- if(!ADP.IsCatchableExceptionType(e)) {
- throw;
- }
- throw SQL.BulkLoadCannotConvertValue(value.GetType(), metadata.metaType, e);
- }
- }
- public void WriteToServer(IDataReader reader) {
- SqlConnection.ExecutePermission.Demand();
- if (reader == null) {
- throw new ArgumentNullException("reader");
- }
-
- if (_isBulkCopyingInProgress) {
- throw SQL.BulkLoadPendingOperation();
- }
-
- SqlStatistics statistics = Statistics;
- try {
- statistics = SqlStatistics.StartTimer(Statistics);
- _rowSource = reader;
- _SqlDataReaderRowSource = _rowSource as SqlDataReader;
- if (_SqlDataReaderRowSource != null) {
- _rowSourceIsSqlDataReaderSmi = _SqlDataReaderRowSource is SqlDataReaderSmi;
- }
- _DbDataReaderRowSource = _rowSource as DbDataReader;
- _dataTableSource = null;
- _rowSourceType = ValueSourceType.IDataReader;
- _isAsyncBulkCopy = false;
- WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
- }
- finally {
- SqlStatistics.StopTimer(statistics);
- }
- }
- public void WriteToServer(DataTable table) {
- WriteToServer(table, 0);
- }
- public void WriteToServer(DataTable table, DataRowState rowState) {
- SqlConnection.ExecutePermission.Demand();
- if (table == null) {
- throw new ArgumentNullException("table");
- }
- if (_isBulkCopyingInProgress) {
- throw SQL.BulkLoadPendingOperation();
- }
- SqlStatistics statistics = Statistics;
- try {
- statistics = SqlStatistics.StartTimer(Statistics);
- _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
- _rowSource = table;
- _dataTableSource = table;
- _SqlDataReaderRowSource = null;
- _rowSourceType = ValueSourceType.DataTable;
- _rowEnumerator = table.Rows.GetEnumerator();
- _isAsyncBulkCopy = false;
- WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
- }
- finally {
- SqlStatistics.StopTimer(statistics);
- }
- }
- public void WriteToServer(DataRow[] rows) {
- SqlConnection.ExecutePermission.Demand();
- SqlStatistics statistics = Statistics;
- if (rows == null) {
- throw new ArgumentNullException("rows");
- }
- if (_isBulkCopyingInProgress) {
- throw SQL.BulkLoadPendingOperation();
- }
- if (rows.Length == 0) {
- return; // nothing to do. user passed us an empty array
- }
- try {
- statistics = SqlStatistics.StartTimer(Statistics);
-
- DataTable table = rows[0].Table;
- Debug.Assert(null != table, "How can we have rows without a table?");
- _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
- _rowSource = rows;
- _dataTableSource = table;
- _SqlDataReaderRowSource = null;
- _rowSourceType = ValueSourceType.RowArray;
- _rowEnumerator = rows.GetEnumerator();
- _isAsyncBulkCopy = false;
- WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
- }
- finally {
- SqlStatistics.StopTimer(statistics);
- }
- }
- /*Async overloads start here*/
- public Task WriteToServerAsync(DataRow[] rows) {
- return WriteToServerAsync(rows, CancellationToken.None);
- }
- public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationToken) {
- Task resultTask = null;
- SqlConnection.ExecutePermission.Demand();
-
- if (rows == null) {
- throw new ArgumentNullException("rows");
- }
- if (_isBulkCopyingInProgress) {
- throw SQL.BulkLoadPendingOperation();
- }
-
- SqlStatistics statistics = Statistics;
- try {
- statistics = SqlStatistics.StartTimer(Statistics);
-
- if (rows.Length == 0) {
- TaskCompletionSource<object> source = new TaskCompletionSource<object>();
- if (cancellationToken.IsCancellationRequested) {
- source.SetCanceled();
- }
- else{
- source.SetResult(null);
- }
- resultTask = source.Task;
- return resultTask; // nothing to do. user passed us an empty array. Return a completed Task.
- }
-
- DataTable table = rows[0].Table;
- Debug.Assert(null != table, "How can we have rows without a table?");
- _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
- _rowSource = rows;
- _dataTableSource = table;
- _SqlDataReaderRowSource = null;
- _rowSourceType = ValueSourceType.RowArray;
- _rowEnumerator = rows.GetEnumerator();
- _isAsyncBulkCopy = true;
- resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
- }
- finally{
- SqlStatistics.StopTimer(statistics);
- }
- return resultTask;
- }
- public Task WriteToServerAsync(IDataReader reader) {
- return WriteToServerAsync(reader, CancellationToken.None);
- }
- public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellationToken) {
- Task resultTask = null;
- SqlConnection.ExecutePermission.Demand();
-
- if (reader == null) {
- throw new ArgumentNullException("reader");
- }
- if (_isBulkCopyingInProgress) {
- throw SQL.BulkLoadPendingOperation();
- }
- SqlStatistics statistics = Statistics;
- try {
- statistics = SqlStatistics.StartTimer(Statistics);
- _rowSource = reader;
- _SqlDataReaderRowSource = _rowSource as SqlDataReader;
- _DbDataReaderRowSource = _rowSource as DbDataReader;
- _dataTableSource = null;
- _rowSourceType = ValueSourceType.IDataReader;
- _isAsyncBulkCopy = true;
- resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
- }
- finally {
- SqlStatistics.StopTimer(statistics);
- }
- return resultTask;
- }
- public Task WriteToServerAsync(DataTable table) {
- return WriteToServerAsync(table, 0, CancellationToken.None);
- }
- public Task WriteToServerAsync(DataTable table, CancellationToken cancellationToken) {
- return WriteToServerAsync(table, 0, cancellationToken);
- }
- public Task WriteToServerAsync(DataTable table, DataRowState rowState) {
- return WriteToServerAsync(table, rowState, CancellationToken.None);
- }
- public Task WriteToServerAsync(DataTable table, DataRowState rowState, CancellationToken cancellationToken) {
- Task resultTask = null;
- SqlConnection.ExecutePermission.Demand();
- if (table == null) {
- throw new ArgumentNullException("table");
- }
- if (_isBulkCopyingInProgress){
- throw SQL.BulkLoadPendingOperation();
- }
-
- SqlStatistics statistics = Statistics;
- try {
- statistics = SqlStatistics.StartTimer(Statistics);
- _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
- _rowSource = table;
- _SqlDataReaderRowSource = null;
- _dataTableSource = table;
- _rowSourceType = ValueSourceType.DataTable;
- _rowEnumerator = table.Rows.GetEnumerator();
- _isAsyncBulkCopy = true;
- resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
- }
- finally {
- SqlStatistics.StopTimer(statistics);
- }
- return resultTask;
- }
- // Writes row source.
- //
- private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctoken) {
- Task reconnectTask = _connection._currentReconnectionTask;
- if (reconnectTask != null && !reconnectTask.IsCompleted) {
- if (this._isAsyncBulkCopy) {
- TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
- reconnectTask.ContinueWith((t) => {
- Task writeTask = WriteRowSourceToServerAsync(columnCount, ctoken);
- if (writeTask == null) {
- tcs.SetResult(null);
- }
- else {
- AsyncHelper.ContinueTask(writeTask, tcs, () => tcs.SetResult(null));
- }
- }, ctoken); // we do not need to propagate exception etc. from reconnect task, we just need to wait for it to finish
- return tcs.Task;
- }
- else {
- AsyncHelper.WaitForCompletion(reconnectTask, BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }, rethrowExceptions: false);
- }
- }
- bool finishedSynchronously = true;
- _isBulkCopyingInProgress = true;
- CreateOrValidateConnection(SQL.WriteToServer);
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- Debug.Assert(_parserLock == null, "Previous parser lock not cleaned");
- _parserLock = internalConnection._parserLock;
- _parserLock.Wait(canReleaseFromAnyThread: _isAsyncBulkCopy);
-
- TdsParser bestEffortCleanupTarget = null;
- RuntimeHelpers.PrepareConstrainedRegions();
- try {
- #if DEBUG
- TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
- RuntimeHelpers.PrepareConstrainedRegions();
- try {
- tdsReliabilitySection.Start();
- #else
- {
- #endif //DEBUG
- bestEffortCleanupTarget = SqlInternalConnection.GetBestEffortCleanupTarget(_connection);
- WriteRowSourceToServerCommon(columnCount); //this is common in both sync and async
- Task resultTask = WriteToServerInternalAsync(ctoken); // resultTask is null for sync, but Task for async.
- if (resultTask != null) {
- finishedSynchronously = false;
- return resultTask.ContinueWith((t) => {
- AbortTransaction(); // if there is one, on success transactions will be commited
- _isBulkCopyingInProgress = false;
- if (_parser != null) {
- _parser._asyncWrite = false;
- }
- if (_parserLock != null) {
- _parserLock.Release();
- _parserLock = null;
- }
- return t;
- }, TaskScheduler.Default).Unwrap();
- }
- return null;
- }
- #if DEBUG
- finally {
- tdsReliabilitySection.Stop();
- }
- #endif //DEBUG
- }
- catch (System.OutOfMemoryException e) {
- _connection.Abort(e);
- throw;
- }
- catch (System.StackOverflowException e) {
- _connection.Abort(e);
- throw;
- }
- catch (System.Threading.ThreadAbortException e) {
- _connection.Abort(e);
- SqlInternalConnection.BestEffortCleanup(bestEffortCleanupTarget);
- throw;
- }
- finally {
- _columnMappings.ReadOnly = false;
- if (finishedSynchronously) {
- AbortTransaction(); // if there is one, on success transactions will be commited
- _isBulkCopyingInProgress = false;
- if (_parser != null) {
- _parser._asyncWrite = false;
- }
- if (_parserLock != null) {
- _parserLock.Release();
- _parserLock = null;
- }
- }
- }
- }
-
- // Handles the column mapping.
- //
- private void WriteRowSourceToServerCommon(int columnCount) {
- bool unspecifiedColumnOrdinals = false;
- _columnMappings.ReadOnly = true;
- _localColumnMappings = _columnMappings;
- if (_localColumnMappings.Count > 0) {
- _localColumnMappings.ValidateCollection();
- foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
- if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
- unspecifiedColumnOrdinals = true;
- break;
- }
- }
- }
- else {
- _localColumnMappings = new SqlBulkCopyColumnMappingCollection();
- _localColumnMappings.CreateDefaultMapping(columnCount);
- }
- // perf: If the user specified all column ordinals we do not need to get a schematable
- //
- if (unspecifiedColumnOrdinals) {
- int index = -1;
- unspecifiedColumnOrdinals = false;
- // Match up sourceColumn names with sourceColumn ordinals
- //
- if (_localColumnMappings.Count > 0) {
- foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
- if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
- string unquotedColumnName = UnquotedName(bulkCopyColumn.SourceColumn);
- switch (this._rowSourceType) {
- case ValueSourceType.DataTable:
- index = ((DataTable)_rowSource).Columns.IndexOf(unquotedColumnName);
- break;
- case ValueSourceType.RowArray:
- index = ((DataRow[])_rowSource)[0].Table.Columns.IndexOf(unquotedColumnName);
- break;
- case ValueSourceType.IDataReader:
- try {
- index = ((IDataRecord)this._rowSource).GetOrdinal(unquotedColumnName);
- }
- catch (IndexOutOfRangeException e) {
- throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e));
- }
- break;
- }
- if (index == -1) {
- throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName));
- }
- bulkCopyColumn._internalSourceColumnOrdinal = index;
- }
- }
- }
- }
- }
- internal void OnConnectionClosed() {
- TdsParserStateObject stateObj = _stateObj;
- if (stateObj != null) {
- stateObj.OnConnectionClosed();
- }
- }
- private void OnRowsCopied(SqlRowsCopiedEventArgs value) {
- SqlRowsCopiedEventHandler handler = _rowsCopiedEventHandler;
- if(handler != null) {
- handler(this, value);
- }
- }
- // fxcop:
- // Use the .Net Event System whenever appropriate.
- private bool FireRowsCopiedEvent(long rowsCopied) {
- // release lock to prevent possible deadlocks
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- bool semaphoreLock = internalConnection._parserLock.CanBeReleasedFromAnyThread;
- internalConnection._parserLock.Release();
- SqlRowsCopiedEventArgs eventArgs = new SqlRowsCopiedEventArgs(rowsCopied);
- try {
- _insideRowsCopiedEvent = true;
- this.OnRowsCopied(eventArgs);
- }
- finally {
- _insideRowsCopiedEvent = false;
- internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock);
- }
- return eventArgs.Abort;
- }
- // Reads a cell and then writes it.
- // Read may block at this moment since there is no getValueAsync or DownStream async at this moment.
- // When _isAsyncBulkCopy == true: Write will return Task (when async method runs asynchronously) or Null (when async call actually ran synchronously) for performance.
- // When _isAsyncBulkCopy == false: Writes are purely [....]. This method reutrn null at the end.
- //
- private Task ReadWriteColumnValueAsync(int col) {
- bool isSqlType;
- bool isDataFeed;
- bool isNull;
- Object value = GetValueFromSourceRow(col, out isSqlType, out isDataFeed, out isNull); //this will return Task/null in future: as rTask
- _SqlMetaData metadata = _sortedColumnMappings[col]._metadata;
- if (!isDataFeed) {
- value = ConvertValue(value, metadata, isNull, ref isSqlType, out isDataFeed);
- }
- //write part
- Task writeTask = null;
- if (metadata.type != SqlDbType.Variant) {
- //this is the most common path
- writeTask = _parser.WriteBulkCopyValue(value, metadata, _stateObj, isSqlType, isDataFeed, isNull); //returns Task/Null
- }
- else {
- SqlBuffer.StorageType variantInternalType = SqlBuffer.StorageType.Empty;
- if ((_SqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) {
- variantInternalType = _SqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal);
- }
- if (variantInternalType == SqlBuffer.StorageType.DateTime2) {
- _parser.WriteSqlVariantDateTime2(((DateTime)value), _stateObj);
- }
- else if (variantInternalType == SqlBuffer.StorageType.Date) {
- _parser.WriteSqlVariantDate(((DateTime)value), _stateObj);
- }
- else {
- writeTask = _parser.WriteSqlVariantDataRowValue(value, _stateObj); //returns Task/Null
- }
- }
- return writeTask;
- }
- private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask) {
- SqlConnection connection = _connection;
- if (connection == null) {
- // No connection
- throw ADP.ClosedConnectionError();
- }
- connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.BulkCopyTag);
- }
- // Runs a loop to copy all columns of a single row.
- // maintains a state by remembering #columns copied so far (int col)
- // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
- //
- private Task CopyColumnsAsync(int col, TaskCompletionSource<object> source = null) {
- Task resultTask = null, task = null;
- int i;
- try {
- for (i = col; i < _sortedColumnMappings.Count; i++) {
- task = ReadWriteColumnValueAsync(i); //First reads and then writes one cell value. Task 'task' is completed when reading task and writing task both are complete.
- if (task != null) break; //task != null means we have a pending read/write Task.
- }
- if (task != null) {
- if (source == null) {
- source = new TaskCompletionSource<object>();
- resultTask = source.Task;
- }
- CopyColumnsAsyncSetupContinuation(source, task, i);
- return resultTask; //associated task will be completed when all colums (i.e. the entire row) is written
- }
- if (source != null) {
- source.SetResult(null);
- }
- }
- catch(Exception ex) {
- if (source != null) {
- source.TrySetException(ex);
- }
- else {
- throw;
- }
- }
- return resultTask;
- }
- // This is in its own method to avoid always allocating the lambda in CopyColumnsAsync
- private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> source, Task task, int i) {
- AsyncHelper.ContinueTask(task, source, () => {
- if (i + 1 < _sortedColumnMappings.Count) {
- CopyColumnsAsync(i + 1, source); //continue from the next column
- }
- else {
- source.SetResult(null);
- }
- },
- _connection.GetOpenTdsConnection());
- }
- // The notification logic.
- //
- private void CheckAndRaiseNotification() {
- bool abortOperation = false; //returns if the operation needs to be aborted.
- Exception exception = null;
- _rowsCopied++;
- // Fire event logic
- if (_notifyAfter > 0) { // no action if no value specified
- // (0=no notification)
- if (_rowsUntilNotification > 0) { // > 0?
- if (--_rowsUntilNotification == 0) { // decrement counter
- // Fire event during operation. This is the users chance to abort the operation
- try {
- // it's also the user's chance to cause an exception ...
- _stateObj.BcpLock = true;
- abortOperation = FireRowsCopiedEvent(_rowsCopied);
- Bid.Trace("<sc.SqlBulkCopy.WriteToServerInternal|INFO> \n");
- // just in case some pathological person closes the target connection ...
- if (ConnectionState.Open != _connection.State) {
- exception = ADP.OpenConnectionRequired("CheckAndRaiseNotification", _connection.State);
- }
- }
- catch (Exception e) {
- //
- if (!ADP.IsCatchableExceptionType(e)) {
- exception = e;
- }
- else {
- exception = OperationAbortedException.Aborted(e);
- }
- }
- finally {
- _stateObj.BcpLock = false;
- }
- if (!abortOperation) {
- _rowsUntilNotification = _notifyAfter;
- }
- }
- }
- }
- if (!abortOperation && _rowsUntilNotification > _notifyAfter) { // if the specified counter decreased we update
- _rowsUntilNotification = _notifyAfter; // decreased we update otherwise not
- }
- if (exception == null && abortOperation) {
- exception = OperationAbortedException.Aborted(null);
- }
- if (_connection.State != ConnectionState.Open) {
- throw ADP.OpenConnectionRequired(SQL.WriteToServer, _connection.State);
- }
- if (exception != null) {
- _parser._asyncWrite = false;
- Task writeTask = _parser.WriteBulkCopyDone(_stateObj); //We should complete the current batch upto this row.
- Debug.Assert(writeTask == null, "Task should not pend while doing sync bulk copy");
- RunParser();
- AbortTransaction();
- throw exception; //this will be caught and put inside the Task's exception.
- }
- }
- // Checks for cancellation. If cancel requested, cancels the task and returns the cancelled task
- Task CheckForCancellation(CancellationToken cts, TaskCompletionSource<object> tcs) {
- if (cts.IsCancellationRequested) {
- if (tcs == null) {
- tcs = new TaskCompletionSource<object>();
- }
- tcs.SetCanceled();
- return tcs.Task;
- }
- else {
- return null;
- }
- }
- private TaskCompletionSource<object> ContinueTaskPend(Task task, TaskCompletionSource<object> source, Func<TaskCompletionSource<object>> action) {
- if (task == null) {
- return action();
- }
- else {
- Debug.Assert(source != null, "source should already be initialized if task is not null");
- AsyncHelper.ContinueTask(task, source, () => {
- TaskCompletionSource<object> newSource = action();
- Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists");
- });
- }
- return null;
- }
- // Copies all the rows in a batch
- // maintains state machine with state variable: rowSoFar
- // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
- //
- private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, TaskCompletionSource<object> source = null) {
- Task resultTask = null;
- Task task = null;
- int i;
- try {
- //totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false).
- for (i = rowsSoFar; (totalRows <= 0 || i < totalRows) && _hasMoreRowToCopy == true; i++) {
- if (_isAsyncBulkCopy == true) {
- resultTask = CheckForCancellation(cts, source);
- if (resultTask != null) {
- return resultTask; // task got cancelled!
- }
- }
- _stateObj.WriteByte(TdsEnums.SQLROW);
-
- task = CopyColumnsAsync(0); //copy 1 row
-
- if (task == null) { //tsk is done.
- CheckAndRaiseNotification(); //check notification logic after copying the row
- //now we will read the next row.
- Task readTask = ReadFromRowSourceAsync(cts); // read the next row. Caution: more is only valid if the task returns null. Otherwise, we wait for Task.Result
- if (readTask != null) {
- if (source == null) {
- source = new TaskCompletionSource<object>();
- }
- resultTask = source.Task;
- AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
- return resultTask; //associated task will be completed when all rows are copied to server/exception/cancelled.
- }
- }
- else { //tsk != null, we add continuation for it.
- source = source ?? new TaskCompletionSource<object>();
- resultTask = source.Task;
- AsyncHelper.ContinueTask(task, source, onSuccess: () => {
- CheckAndRaiseNotification(); //check for notification now as the current row copy is done at this moment.
- Task readTask = ReadFromRowSourceAsync(cts);
- if (readTask == null) {
- CopyRowsAsync(i + 1, totalRows, cts, source);
- }
- else {
- AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
- }
- }, connectionToDoom: _connection.GetOpenTdsConnection());
- return resultTask;
- }
- }
- if (source != null) {
- source.TrySetResult(null); // this is set only on the last call of async copy. But may not be set if everything runs synchronously.
- }
- }
- catch (Exception ex) {
- if (source != null) {
- source.TrySetException(ex);
- }
- else {
- throw;
- }
- }
- return resultTask;
- }
- // Copies all the batches in a loop. One iteration for one batch.
- // state variable is essentially not needed. (however, _hasMoreRowToCopy might be thought as a state variable)
- // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
- //
- private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source = null) {
- Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
- try {
- while (_hasMoreRowToCopy) {
- //pre->before every batch: Transaction, BulkCmd and metadata are done.
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- if (IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) { //internal trasaction is started prior to each batch if the Option is set.
- internalConnection.ThreadHasParserLockForClose = true; // In case of error, tell the connection we already have the parser lock
- try {
- _internalTransaction = _connection.BeginTransaction();
- }
- finally {
- internalConnection.ThreadHasParserLockForClose = false;
- }
- }
- Task commandTask = SubmitUpdateBulkCommand(updateBulkCommandText);
- if (commandTask == null) {
- Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
- if (continuedTask != null) {
- // Continuation will take care of re-calling CopyBatchesAsync
- return continuedTask;
- }
- }
- else {
- Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
- if (source == null) {
- source = new TaskCompletionSource<object>();
- }
- AsyncHelper.ContinueTask(commandTask, source, () => {
- Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
- if (continuedTask == null) {
- // Continuation finished [....], recall into CopyBatchesAsync to continue
- CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
- }
- }, _connection.GetOpenTdsConnection());
- return source.Task;
- }
- }
- }
- catch (Exception ex) {
- if (source != null) {
- source.TrySetException(ex);
- return source.Task;
- }
- else {
- throw;
- }
- }
- // If we are here, then we finished everything
- if (source != null) {
- source.SetResult(null);
- return source.Task;
- }
- else {
- return null;
- }
- }
- // Writes the MetaData and a single batch
- // If this returns true, then the caller is responsible for starting the next stage
- private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
- Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
- try {
- WriteMetaData(internalResults);
- Task task = CopyRowsAsync(0, _savedBatchSize, cts); //this is copying 1 batch of rows and setting _hasMoreRowToCopy = true/false.
- //post->after every batch
- if (task != null) {
- Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
- if (source == null) { //first time only
- source = new TaskCompletionSource<object>();
- }
- AsyncHelper.ContinueTask(task, source, () => {
- Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
- if (continuedTask == null) {
- // Continuation finished [....], recall into CopyBatchesAsync to continue
- CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
- }
- }, _connection.GetOpenTdsConnection(), _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), () => CopyBatchesAsyncContinuedOnError(cleanupParser: true));
- return source.Task;
- }
- else {
- return CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
- }
- }
- catch (Exception ex) {
- if (source != null) {
- source.TrySetException(ex);
- return source.Task;
- }
- else {
- throw;
- }
- }
- }
- // Takes care of finishing a single batch (write done, run parser, commit transaction)
- // If this returns true, then the caller is responsible for starting the next stage
- private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
- Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
- try {
- Task writeTask = _parser.WriteBulkCopyDone(_stateObj);
- if (writeTask == null) {
- RunParser();
- CommitTransaction();
- return null;
- }
- else {
- Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
- if (source == null) {
- source = new TaskCompletionSource<object>();
- }
- AsyncHelper.ContinueTask(writeTask, source, () => {
- try {
- RunParser();
- CommitTransaction();
- }
- catch (Exception) {
- CopyBatchesAsyncContinuedOnError(cleanupParser: false);
- throw;
- }
- // Always call back into CopyBatchesAsync
- CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
- }, connectionToDoom: _connection.GetOpenTdsConnection(), onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false));
- return source.Task;
- }
- }
- catch (Exception ex) {
- if (source != null) {
- source.TrySetException(ex);
- return source.Task;
- }
- else {
- throw;
- }
- }
- }
-
- // Takes care of cleaning up the parser, stateObj and transaction when CopyBatchesAsync fails
- private void CopyBatchesAsyncContinuedOnError(bool cleanupParser) {
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- RuntimeHelpers.PrepareConstrainedRegions();
- try {
- #if DEBUG
- TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
- RuntimeHelpers.PrepareConstrainedRegions();
- try {
- tdsReliabilitySection.Start();
- #endif //DEBUG
- if ((cleanupParser) && (_parser != null) && (_stateObj != null)) {
- _parser._asyncWrite = false;
- Task task = _parser.WriteBulkCopyDone(_stateObj);
- Debug.Assert(task == null, "Write should not pend when error occurs");
- RunParser();
- }
- if (_stateObj != null) {
- CleanUpStateObjectOnError();
- }
- #if DEBUG
- }
- finally {
- tdsReliabilitySection.Stop();
- }
- #endif //DEBUG
- }
- catch (OutOfMemoryException) {
- internalConnection.DoomThisConnection();
- throw;
- }
- catch (StackOverflowException) {
- internalConnection.DoomThisConnection();
- throw;
- }
- catch (ThreadAbortException) {
- internalConnection.DoomThisConnection();
- throw;
- }
- AbortTransaction();
- }
- //Cleans the stateobj. Used in a number of places, specially in exceptions
- //
- private void CleanUpStateObjectOnError() {
- if (_stateObj != null) {
- _parser.Connection.ThreadHasParserLockForClose = true;
- try {
- _stateObj.ResetBuffer();
- _stateObj._outputPacketNumber = 1;
- //If _parser is closed, sending attention will raise debug assertion, so we avoid it but not calling CancelRequest;
- if (_parser.State == TdsParserState.OpenNotLoggedIn || _parser.State == TdsParserState.OpenLoggedIn) {
- _stateObj.CancelRequest();
- }
- _stateObj._internalTimeout = false;
- _stateObj.CloseSession();
- _stateObj._bulkCopyOpperationInProgress = false;
- _stateObj._bulkCopyWriteTimeout = false;
- _stateObj = null;
- }
- finally {
- _parser.Connection.ThreadHasParserLockForClose = false;
- }
- }
- }
- // The continuation part of WriteToServerInternalRest. Executes when the initial query task is completed. (see, WriteToServerInternalRest).
- // It carries on the source which is passed from the WriteToServerInternalRest and performs SetResult when the entire copy is done.
- // The carried on source may be null in case of [....] copy. So no need to SetResult at that time.
- // It launches the copy operation.
- //
- private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet internalResults, CancellationToken cts, TaskCompletionSource<object> source) {
- Task task = null;
- string updateBulkCommandText = null;
- try {
- updateBulkCommandText = AnalyzeTargetAndCreateUpdateBulkCommand(internalResults);
- if (_sortedColumnMappings.Count != 0) {
- _stateObj.SniContext = SniContext.Snix_SendRows;
- _savedBatchSize = _batchSize; // for safety. If someone changes the batchsize during copy we still be using _savedBatchSize
- _rowsUntilNotification = _notifyAfter;
- _rowsCopied = 0;
- _currentRowMetadata = new SourceColumnMetadata[_sortedColumnMappings.Count];
- for (int i = 0; i < _currentRowMetadata.Length; i++) {
- _currentRowMetadata[i] = GetColumnMetadata(i);
- }
- task = CopyBatchesAsync(internalResults, updateBulkCommandText, cts); //launch the BulkCopy
- }
- if (task != null) {
- if (source == null) {
- source = new TaskCompletionSource<object>();
- }
- AsyncHelper.ContinueTask(task, source, () => {
- //Bulk copy task is completed at this moment.
- //Todo: The cases may be combined for code reuse.
- if (task.IsCanceled) {
- _localColumnMappings = null;
- try {
- CleanUpStateObjectOnError();
- }
- finally {
- source.SetCanceled();
- }
- }
- else if (task.Exception != null) {
- source.SetException(task.Exception.InnerException);
- }
- else {
- _localColumnMappings = null;
- try {
- CleanUpStateObjectOnError();
- }
- finally {
- if (source != null) {
- if (cts.IsCancellationRequested) { //We may get cancellation req even after the entire copy.
- source.SetCanceled();
- }
- else {
- source.SetResult(null);
- }
- }
- }
- }
- }, _connection.GetOpenTdsConnection());
- return;
- }
- else {
- _localColumnMappings = null;
- try {
- CleanUpStateObjectOnError();
- } catch (Exception cleanupEx) {
- Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
- }
-
- if(source != null) {
- source.SetResult(null);
- }
- }
- }
- catch(Exception ex){
- _localColumnMappings = null;
- try {
- CleanUpStateObjectOnError();
- } catch (Exception cleanupEx) {
- Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
- }
- if (source != null) {
- source.TrySetException(ex);
- }
- else {
- throw;
- }
- }
- }
- // Rest of the WriteToServerInternalAsync method.
- // It carries on the source from its caller WriteToServerInternal.
- // source is null in case of [....] bcp. But valid in case of Async bcp.
- // It calls the WriteToServerInternalRestContinuedAsync as a continuation of the initial query task.
- //
- private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletionSource<object> source) {
- Debug.Assert(_hasMoreRowToCopy, "first time it is true, otherwise this method would not have been called.");
- _hasMoreRowToCopy = true;
- Task<BulkCopySimpleResultSet> internalResultsTask = null;
- BulkCopySimpleResultSet internalResults = new BulkCopySimpleResultSet();
- SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
- try {
- _parser = _connection.Parser;
- _parser._asyncWrite = _isAsyncBulkCopy; //very important!
- Task reconnectTask;
- try {
- reconnectTask = _connection.ValidateAndReconnect(
- () => {
- if (_parserLock != null) {
- _parserLock.Release();
- _parserLock = null;
- }
- }, BulkCopyTimeout);
- }
- catch (SqlException ex) {
- throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
- }
- if (reconnectTask != null) {
- if (_isAsyncBulkCopy) {
- CancellationTokenRegistration regReconnectCancel = new CancellationTokenRegistration();
- TaskCompletionSource<object> cancellableReconnectTS = new TaskCompletionSource<object>();
- if (cts.CanBeCanceled) {
- regReconnectCancel = cts.Register(() => cancellableReconnectTS.TrySetCanceled());
- }
- AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); });
- // no need to cancel timer since SqlBulkCopy creates specific task source for reconnection
- AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout,
- ()=>{return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout());}, CancellationToken.None);
- AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source,
- () => {
- regReconnectCancel.Dispose();
- if (_parserLock != null) {
- _parserLock.Release();
- _parserLock = null;
- }
- _parserLock = _connection.GetOpenTdsConnection()._parserLock;
- _parserLock.Wait(canReleaseFromAnyThread: true);
- WriteToServerInternalRestAsync(cts, source);
- },
- connectionToAbort: _connection,
- onFailure: (e) => { regReconnectCancel.Dispose(); },
- onCancellation: () => { regReconnectCancel.Dispose(); },
- exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex));
- return;
- }
- else {
- try {
- AsyncHelper.WaitForCompletion(reconnectTask, this.BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); } );
- }
- catch (SqlException ex) {
- throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex); // preserve behavior (throw InvalidOperationException on failure to connect)
- }
- _parserLock = _connection.GetOpenTdsConnection()._parserLock;
- _parserLock.Wait(canReleaseFromAnyThread: false);
- WriteToServerInternalRestAsync(cts, source);
- return;
- }
- }
- if (_isAsyncBulkCopy) {
- _connection.AddWeakReference(this, SqlReferenceCollection.BulkCopyTag);
- }
- internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we already have the parser lock
- try {
- _stateObj = _parser.GetSession(this);
- _stateObj._bulkCopyOpperationInProgress = true;
- _stateObj.StartSession(ObjectID);
- }
- finally {
- internalConnection.ThreadHasParserLockForClose = false;
- }
- try {
- internalResultsTask = CreateAndExecuteInitialQueryAsync(out internalResults); //Task/Null
- }
- catch (SqlException ex) {
- throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
- }
- if(internalResultsTask != null) {
- AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), _connection.GetOpenTdsConnection());
- }
- else {
- Debug.Assert(internalResults != null, "Executing initial query finished synchronously, but there were no results");
- WriteToServerInternalRestContinuedAsync(internalResults, cts, source); //internalResults is valid here.
- }
- }
- catch (Exception ex) {
- if (source != null) {
- source.TrySetException(ex);
- }
- else {
- throw;
- }
- }
- }
-
- // This returns Task for Async, Null for [....]
- //
- private Task WriteToServerInternalAsync(CancellationToken ctoken) {
- TaskCompletionSource<object> source = null;
- Task<object> resultTask = null;
-
- if (_isAsyncBulkCopy) {
- source = new TaskCompletionSource<object>(); //creating the completion source/Task that we pass to application
- resultTask = source.Task;
- RegisterForConnectionCloseNotification(ref resultTask);
- }
- if (_destinationTableName == null) {
- if(source != null) {
- source.SetException(SQL.BulkLoadMissingDestinationTable()); //no table to copy
- }
- else {
- throw SQL.BulkLoadMissingDestinationTable();
- }
- return resultTask;
- }
- try {
- Task readTask = ReadFromRowSourceAsync(ctoken); // readTask == reading task. This is the first read call. "more" is valid only if readTask == null;
- if (readTask == null) { //synchronously finished reading.
- if (!_hasMoreRowToCopy) { //no rows in the source to copy!
- if(source != null) {
- source.SetResult(null);
- }
- return resultTask;
- }
- else { //true, we have more rows.
- WriteToServerInternalRestAsync(ctoken, source); //rest of the method, passing the same completion and returning the incomplete task (ret).
- return resultTask;
- }
- }
- else {
- Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode");
- AsyncHelper.ContinueTask(readTask, source, () => {
- if (!_hasMoreRowToCopy) {
- source.SetResult(null); //no rows to copy!
- }
- else {
- WriteToServerInternalRestAsync(ctoken, source); //passing the same completion which will be completed by the Callee.
- }
- }, _connection.GetOpenTdsConnection());
- return resultTask;
- }
- }
- catch(Exception ex) {
- if (source != null) {
- source.TrySetException(ex);
- }
- else {
- throw;
- }
- }
- return resultTask;
- }
- }//End of SqlBulkCopy Class
- }//End of namespace
|