SqlBulkCopy.cs 118 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013201420152016201720182019202020212022202320242025202620272028202920302031203220332034203520362037203820392040204120422043204420452046204720482049205020512052205320542055205620572058205920602061206220632064206520662067206820692070207120722073207420752076207720782079208020812082208320842085208620872088208920902091209220932094209520962097209820992100210121022103210421052106210721082109211021112112211321142115211621172118211921202121212221232124212521262127212821292130213121322133213421352136213721382139214021412142214321442145214621472148214921502151215221532154215521562157215821592160216121622163216421652166216721682169217021712172217321742175217621772178217921802181218221832184218521862187218821892190219121922193219421952196219721982199220022012202220322042205220622072208220922102211221222132214221522162217221822192220222122222223222422252226222722282229223022312232223322342235223622372238223922402241224222432244224522462247224822492250225122522253225422552256225722582259226022612262226322642265226622672268226922702271227222732274227522762277227822792280228122822283228422852286228722882289229022912292229322942295229622972298229923002301230223032304230523062307230823092310231123122313231423152316231723182319232023212322232323242325232623272328232923302331233223332334233523362337233823392340234123422343234423452346234723482349235023512352235323542355235623572358235923602361236223632364236523662367236823692370237123722373237423752376237723782379238023812382238323842385238623872388238923902391239223932394239523962397239823992400240124022403240424052406240724082409241024112412241324142415241624172418241924202421242224232424242524262427242824292430243124322433243424352436243724382439244024412442244324442445244624472448244924502451245224532454245524562457245824592460246124622463246424652466246724682469247024712472247324742475247624772478247924802481248224832484248524862487248824892490249124922493249424952496249724982499250025012502250325042505250625072508250925102511251225132514251525162517251825192520252125222523252425252526252725282529253025312532253325342535253625372538253925402541254225432544254525462547254825492550255125522553255425552556255725582559256025612562256325642565256625672568256925702571257225732574257525762577257825792580258125822583
  1. //------------------------------------------------------------------------------
  2. // <copyright file="SqlBulkCopy.cs" company="Microsoft">
  3. // Copyright (c) Microsoft Corporation. All rights reserved.
  4. // </copyright>
  5. // <owner current="true" primary="true">[....]</owner>
  6. // <owner current="true" primary="false">[....]</owner>
  7. //------------------------------------------------------------------------------
  8. // todo list:
  9. // * An ID column need to be ignored - even if there is an association
  10. // * Spec: ID columns will be ignored - even if there is an association
  11. // * Spec: How do we publish CommandTimeout on the bcpoperation?
  12. //
  13. namespace System.Data.SqlClient {
  14. using System;
  15. using System.Collections;
  16. using System.Collections.Generic;
  17. using System.ComponentModel;
  18. using System.Data;
  19. using System.Data.Common;
  20. using System.Data.Sql;
  21. using System.Data.SqlTypes;
  22. using System.Diagnostics;
  23. using System.Globalization;
  24. using System.Runtime.CompilerServices;
  25. using System.Runtime.ConstrainedExecution;
  26. using System.Text;
  27. using System.Threading;
  28. using System.Threading.Tasks;
  29. using System.Transactions;
  30. using System.Xml;
  31. using MSS = Microsoft.SqlServer.Server;
  32. // -------------------------------------------------------------------------------------------------
  33. // this internal class helps us to associate the metadata (from the target)
  34. // with columnordinals (from the source)
  35. //
  36. sealed internal class _ColumnMapping {
  37. internal int _sourceColumnOrdinal;
  38. internal _SqlMetaData _metadata;
  39. internal _ColumnMapping(int columnId, _SqlMetaData metadata) {
  40. _sourceColumnOrdinal = columnId;
  41. _metadata = metadata;
  42. }
  43. }
  44. sealed internal class Row {
  45. private object[] _dataFields;
  46. internal Row(int rowCount) {
  47. _dataFields = new object[rowCount];
  48. }
  49. internal object[] DataFields {
  50. get {
  51. return _dataFields;
  52. }
  53. }
  54. internal object this[int index] {
  55. get {
  56. return _dataFields[index];
  57. }
  58. }
  59. }
  60. // the controlling class for one result (metadata + rows)
  61. //
  62. sealed internal class Result {
  63. private _SqlMetaDataSet _metadata;
  64. private ArrayList _rowset;
  65. internal Result(_SqlMetaDataSet metadata) {
  66. this._metadata = metadata;
  67. this._rowset = new ArrayList();
  68. }
  69. internal int Count {
  70. get {
  71. return _rowset.Count;
  72. }
  73. }
  74. internal _SqlMetaDataSet MetaData {
  75. get {
  76. return _metadata;
  77. }
  78. }
  79. internal Row this[int index] {
  80. get {
  81. return (Row)_rowset[index];
  82. }
  83. }
  84. internal void AddRow(Row row) {
  85. _rowset.Add(row);
  86. }
  87. }
  88. // A wrapper object for metadata and rowsets returned by our initial queries
  89. //
  90. sealed internal class BulkCopySimpleResultSet {
  91. private ArrayList _results; // the list of results
  92. private Result resultSet; // the current result
  93. private int[] indexmap; // associates columnids with indexes in the rowarray
  94. // c-tor
  95. //
  96. internal BulkCopySimpleResultSet() {
  97. _results = new ArrayList();
  98. }
  99. // indexer
  100. //
  101. internal Result this[int idx] {
  102. get {
  103. return (Result)_results[idx];
  104. }
  105. }
  106. // callback function for the tdsparser
  107. // note that setting the metadata adds a resultset
  108. //
  109. internal void SetMetaData(_SqlMetaDataSet metadata) {
  110. resultSet = new Result(metadata);
  111. _results.Add(resultSet);
  112. indexmap = new int[resultSet.MetaData.Length];
  113. for(int i = 0; i < indexmap.Length; i++) {
  114. indexmap[i] = i;
  115. }
  116. }
  117. // callback function for the tdsparser
  118. // this will create an indexmap for the active resultset
  119. //
  120. internal int[] CreateIndexMap() {
  121. return indexmap;
  122. }
  123. // callback function for the tdsparser
  124. // this will return an array of rows to store the rowdata
  125. //
  126. internal object[] CreateRowBuffer() {
  127. Row row = new Row(resultSet.MetaData.Length);
  128. resultSet.AddRow(row);
  129. return row.DataFields;
  130. }
  131. }
  132. // -------------------------------------------------------------------------------------------------
  133. //
  134. //
  135. public sealed class SqlBulkCopy : IDisposable {
  136. private enum TableNameComponents {
  137. Server = 0,
  138. Catalog,
  139. Owner,
  140. TableName,
  141. }
  142. private enum ValueSourceType {
  143. Unspecified = 0,
  144. IDataReader,
  145. DataTable,
  146. RowArray
  147. }
  148. // Enum for specifying SqlDataReader.Get method used
  149. private enum ValueMethod : byte {
  150. GetValue,
  151. SqlTypeSqlDecimal,
  152. SqlTypeSqlDouble,
  153. SqlTypeSqlSingle,
  154. DataFeedStream,
  155. DataFeedText,
  156. DataFeedXml
  157. }
  158. // Used to hold column metadata for SqlDataReader case
  159. private struct SourceColumnMetadata {
  160. public SourceColumnMetadata(ValueMethod method, bool isSqlType, bool isDataFeed) {
  161. Method = method;
  162. IsSqlType = isSqlType;
  163. IsDataFeed = isDataFeed;
  164. }
  165. public readonly ValueMethod Method;
  166. public readonly bool IsSqlType;
  167. public readonly bool IsDataFeed;
  168. }
  169. // The initial query will return three tables.
  170. // Transaction count has only one value in one column and one row
  171. // MetaData has n columns but no rows
  172. // Collation has 4 columns and n rows
  173. private const int TranCountResultId = 0;
  174. private const int TranCountRowId = 0;
  175. private const int TranCountValueId = 0;
  176. private const int MetaDataResultId = 1;
  177. private const int CollationResultId = 2;
  178. private const int ColIdId = 0;
  179. private const int NameId = 1;
  180. private const int Tds_CollationId = 2;
  181. private const int CollationId = 3;
  182. private const int MAX_LENGTH = 0x7FFFFFFF;
  183. private const int DefaultCommandTimeout = 30;
  184. private bool _enableStreaming = false;
  185. private int _batchSize;
  186. private bool _ownConnection;
  187. private SqlBulkCopyOptions _copyOptions;
  188. private int _timeout = DefaultCommandTimeout;
  189. private string _destinationTableName;
  190. private int _rowsCopied;
  191. private int _notifyAfter;
  192. private int _rowsUntilNotification;
  193. private bool _insideRowsCopiedEvent;
  194. private object _rowSource;
  195. private SqlDataReader _SqlDataReaderRowSource;
  196. private bool _rowSourceIsSqlDataReaderSmi;
  197. private DbDataReader _DbDataReaderRowSource;
  198. private DataTable _dataTableSource;
  199. private SqlBulkCopyColumnMappingCollection _columnMappings;
  200. private SqlBulkCopyColumnMappingCollection _localColumnMappings;
  201. private SqlConnection _connection;
  202. private SqlTransaction _internalTransaction;
  203. private SqlTransaction _externalTransaction;
  204. private ValueSourceType _rowSourceType = ValueSourceType.Unspecified;
  205. private DataRow _currentRow;
  206. private int _currentRowLength;
  207. private DataRowState _rowStateToSkip;
  208. private IEnumerator _rowEnumerator;
  209. private TdsParser _parser;
  210. private TdsParserStateObject _stateObj;
  211. private List<_ColumnMapping> _sortedColumnMappings;
  212. private SqlRowsCopiedEventHandler _rowsCopiedEventHandler;
  213. private static int _objectTypeCount; // Bid counter
  214. internal readonly int _objectID = System.Threading.Interlocked.Increment(ref _objectTypeCount);
  215. //newly added member variables for Async modification, m = member variable to bcp
  216. private int _savedBatchSize = 0; //save the batchsize so that changes are not affected unexpectedly
  217. private bool _hasMoreRowToCopy = false;
  218. private bool _isAsyncBulkCopy = false;
  219. private bool _isBulkCopyingInProgress = false;
  220. private SqlInternalConnectionTds.SyncAsyncLock _parserLock = null;
  221. private SourceColumnMetadata[] _currentRowMetadata;
  222. // for debug purpose only.
  223. //
  224. #if DEBUG
  225. internal static bool _setAlwaysTaskOnWrite = false; //when set and in DEBUG mode, TdsParser::WriteBulkCopyValue will always return a task
  226. internal static bool SetAlwaysTaskOnWrite {
  227. set {
  228. _setAlwaysTaskOnWrite = value;
  229. }
  230. get{
  231. return _setAlwaysTaskOnWrite;
  232. }
  233. }
  234. #endif
  235. // ctor
  236. //
  237. public SqlBulkCopy(SqlConnection connection) {
  238. if(connection == null) {
  239. throw ADP.ArgumentNull("connection");
  240. }
  241. _connection = connection;
  242. _columnMappings = new SqlBulkCopyColumnMappingCollection();
  243. }
  244. public SqlBulkCopy(SqlConnection connection, SqlBulkCopyOptions copyOptions, SqlTransaction externalTransaction)
  245. : this (connection) {
  246. _copyOptions = copyOptions;
  247. if(externalTransaction != null && IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) {
  248. throw SQL.BulkLoadConflictingTransactionOption();
  249. }
  250. if(!IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) {
  251. _externalTransaction = externalTransaction;
  252. }
  253. }
  254. public SqlBulkCopy(string connectionString) : this (new SqlConnection(connectionString)) {
  255. if(connectionString == null) {
  256. throw ADP.ArgumentNull("connectionString");
  257. }
  258. _connection = new SqlConnection(connectionString);
  259. _columnMappings = new SqlBulkCopyColumnMappingCollection();
  260. _ownConnection = true;
  261. }
  262. public SqlBulkCopy(string connectionString, SqlBulkCopyOptions copyOptions)
  263. : this (connectionString) {
  264. _copyOptions = copyOptions;
  265. }
  266. public int BatchSize {
  267. get {
  268. return _batchSize;
  269. }
  270. set {
  271. if(value >= 0) {
  272. _batchSize = value;
  273. }
  274. else {
  275. throw ADP.ArgumentOutOfRange("BatchSize");
  276. }
  277. }
  278. }
  279. public int BulkCopyTimeout {
  280. get {
  281. return _timeout;
  282. }
  283. set {
  284. if(value < 0) {
  285. throw SQL.BulkLoadInvalidTimeout(value);
  286. }
  287. _timeout = value;
  288. }
  289. }
  290. public bool EnableStreaming {
  291. get {
  292. return _enableStreaming;
  293. }
  294. set {
  295. _enableStreaming = value;
  296. }
  297. }
  298. public SqlBulkCopyColumnMappingCollection ColumnMappings {
  299. get {
  300. return _columnMappings;
  301. }
  302. }
  303. public string DestinationTableName {
  304. get {
  305. return _destinationTableName;
  306. }
  307. set {
  308. if(value == null) {
  309. throw ADP.ArgumentNull("DestinationTableName");
  310. }
  311. else if(value.Length == 0) {
  312. throw ADP.ArgumentOutOfRange("DestinationTableName");
  313. }
  314. _destinationTableName = value;
  315. }
  316. }
  317. public int NotifyAfter {
  318. get {
  319. return _notifyAfter;
  320. }
  321. set {
  322. if(value >= 0) {
  323. _notifyAfter = value;
  324. }
  325. else {
  326. throw ADP.ArgumentOutOfRange("NotifyAfter");
  327. }
  328. }
  329. }
  330. internal int ObjectID {
  331. get {
  332. return _objectID;
  333. }
  334. }
  335. public event SqlRowsCopiedEventHandler SqlRowsCopied {
  336. add {
  337. _rowsCopiedEventHandler += value;
  338. }
  339. remove {
  340. _rowsCopiedEventHandler -= value;
  341. }
  342. }
  343. internal SqlStatistics Statistics {
  344. get {
  345. if(null != _connection) {
  346. if(_connection.StatisticsEnabled) {
  347. return _connection.Statistics;
  348. }
  349. }
  350. return null;
  351. }
  352. }
  353. //================================================================
  354. // IDisposable
  355. //================================================================
  356. void IDisposable.Dispose() {
  357. this.Dispose(true);
  358. GC.SuppressFinalize(this);
  359. }
  360. private bool IsCopyOption(SqlBulkCopyOptions copyOption) {
  361. return (_copyOptions & copyOption) == copyOption;
  362. }
  363. //Creates the initial query string, but does not execute it.
  364. //
  365. private string CreateInitialQuery() {
  366. string[] parts;
  367. try {
  368. parts = MultipartIdentifier.ParseMultipartIdentifier(this.DestinationTableName, "[\"", "]\"", Res.SQL_BulkCopyDestinationTableName, true);
  369. }
  370. catch (Exception e) {
  371. throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, e);
  372. }
  373. if (ADP.IsEmpty(parts[MultipartIdentifier.TableIndex])) {
  374. throw SQL.BulkLoadInvalidDestinationTable(this.DestinationTableName, null);
  375. }
  376. string TDSCommand;
  377. TDSCommand = "select @@trancount; SET FMTONLY ON select * from " + this.DestinationTableName + " SET FMTONLY OFF ";
  378. if (_connection.IsShiloh) {
  379. // If its a temp DB then try to connect
  380. string TableCollationsStoredProc;
  381. if (_connection.IsKatmaiOrNewer) {
  382. TableCollationsStoredProc = "sp_tablecollations_100";
  383. }
  384. else if (_connection.IsYukonOrNewer) {
  385. TableCollationsStoredProc = "sp_tablecollations_90";
  386. }
  387. else {
  388. TableCollationsStoredProc = "sp_tablecollations";
  389. }
  390. string TableName = parts[MultipartIdentifier.TableIndex];
  391. bool isTempTable = TableName.Length > 0 && '#' == TableName[0];
  392. if (!ADP.IsEmpty(TableName)) {
  393. // Escape table name to be put inside TSQL literal block (within N'').
  394. TableName = SqlServerEscapeHelper.EscapeStringAsLiteral(TableName);
  395. // VSDD 581951 - escape the table name
  396. TableName = SqlServerEscapeHelper.EscapeIdentifier(TableName);
  397. }
  398. string SchemaName = parts[MultipartIdentifier.SchemaIndex];
  399. if (!ADP.IsEmpty(SchemaName)) {
  400. // Escape schema name to be put inside TSQL literal block (within N'').
  401. SchemaName = SqlServerEscapeHelper.EscapeStringAsLiteral(SchemaName);
  402. // VSDD 581951 - escape the schema name
  403. SchemaName = SqlServerEscapeHelper.EscapeIdentifier(SchemaName);
  404. }
  405. string CatalogName = parts[MultipartIdentifier.CatalogIndex];
  406. if (isTempTable && ADP.IsEmpty(CatalogName)) {
  407. TDSCommand += String.Format((IFormatProvider)null, "exec tempdb..{0} N'{1}.{2}'",
  408. TableCollationsStoredProc,
  409. SchemaName,
  410. TableName
  411. );
  412. }
  413. else {
  414. // VSDD 581951 - escape the catalog name
  415. if (!ADP.IsEmpty(CatalogName)) {
  416. CatalogName = SqlServerEscapeHelper.EscapeIdentifier(CatalogName);
  417. }
  418. TDSCommand += String.Format((IFormatProvider)null, "exec {0}..{1} N'{2}.{3}'",
  419. CatalogName,
  420. TableCollationsStoredProc,
  421. SchemaName,
  422. TableName
  423. );
  424. }
  425. }
  426. return TDSCommand;
  427. }
  428. // Creates and then executes initial query to get information about the targettable
  429. // When __isAsyncBulkCopy == false (i.e. it is [....] copy): out result contains the resulset. Returns null.
  430. // When __isAsyncBulkCopy == true (i.e. it is Async copy): This still uses the _parser.Run method synchronously and return Task<BulkCopySimpleResultSet>.
  431. // We need to have a _parser.RunAsync to make it real async.
  432. private Task<BulkCopySimpleResultSet> CreateAndExecuteInitialQueryAsync(out BulkCopySimpleResultSet result) {
  433. string TDSCommand = CreateInitialQuery();
  434. Bid.Trace("<sc.SqlBulkCopy.CreateAndExecuteInitialQueryAsync|INFO> Initial Query: '%ls' \n", TDSCommand);
  435. Bid.CorrelationTrace("<sc.SqlBulkCopy.CreateAndExecuteInitialQueryAsync|Info|Correlation> ObjectID%d#, ActivityID %ls\n", ObjectID);
  436. Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true);
  437. if (executeTask == null) {
  438. result = new BulkCopySimpleResultSet();
  439. RunParser(result);
  440. return null;
  441. }
  442. else {
  443. Debug.Assert(_isAsyncBulkCopy, "Execution pended when not doing async bulk copy");
  444. result = null;
  445. return executeTask.ContinueWith<BulkCopySimpleResultSet>(t => {
  446. Debug.Assert(!t.IsCanceled, "Execution task was canceled");
  447. if (t.IsFaulted) {
  448. throw t.Exception.InnerException;
  449. }
  450. else {
  451. var internalResult = new BulkCopySimpleResultSet();
  452. RunParserReliably(internalResult);
  453. return internalResult;
  454. }
  455. }, TaskScheduler.Default);
  456. }
  457. }
  458. // Matches associated columns with metadata from initial query
  459. // builds and executes the update bulk command
  460. //
  461. private string AnalyzeTargetAndCreateUpdateBulkCommand(BulkCopySimpleResultSet internalResults) {
  462. StringBuilder updateBulkCommandText = new StringBuilder();
  463. if (_connection.IsShiloh && 0 == internalResults[CollationResultId].Count) {
  464. throw SQL.BulkLoadNoCollation();
  465. }
  466. Debug.Assert((internalResults != null), "Where are the results from the initial query?");
  467. updateBulkCommandText.AppendFormat("insert bulk {0} (", this.DestinationTableName);
  468. int nmatched = 0; // number of columns that match and are accepted
  469. int nrejected = 0; // number of columns that match but were rejected
  470. bool rejectColumn; // true if a column is rejected because of an excluded type
  471. bool isInTransaction;
  472. if(_parser.IsYukonOrNewer) {
  473. isInTransaction = _connection.HasLocalTransaction;
  474. }
  475. else {
  476. isInTransaction = (bool)(0 < (SqlInt32)(internalResults[TranCountResultId][TranCountRowId][TranCountValueId]));
  477. }
  478. // Throw if there is a transaction but no flag is set
  479. if(isInTransaction && null == _externalTransaction && null == _internalTransaction && (_connection.Parser != null && _connection.Parser.CurrentTransaction != null && _connection.Parser.CurrentTransaction.IsLocal)) {
  480. throw SQL.BulkLoadExistingTransaction();
  481. }
  482. // loop over the metadata for each column
  483. //
  484. _SqlMetaDataSet metaDataSet = internalResults[MetaDataResultId].MetaData;
  485. _sortedColumnMappings = new List<_ColumnMapping>(metaDataSet.Length);
  486. for(int i = 0; i < metaDataSet.Length; i++) {
  487. _SqlMetaData metadata = metaDataSet[i];
  488. rejectColumn = false;
  489. // Check for excluded types
  490. //
  491. if((metadata.type == SqlDbType.Timestamp)
  492. || ((metadata.isIdentity) && !IsCopyOption(SqlBulkCopyOptions.KeepIdentity))) {
  493. // remove metadata for excluded columns
  494. metaDataSet[i] = null;
  495. rejectColumn = true;
  496. // we still need to find a matching column association
  497. }
  498. // find out if this column is associated
  499. int assocId;
  500. for(assocId = 0; assocId < _localColumnMappings.Count; assocId++) {
  501. if((_localColumnMappings[assocId]._destinationColumnOrdinal == metadata.ordinal) ||
  502. (UnquotedName(_localColumnMappings[assocId]._destinationColumnName) == metadata.column)) {
  503. if(rejectColumn) {
  504. nrejected++; // count matched columns only
  505. break;
  506. }
  507. _sortedColumnMappings.Add(new _ColumnMapping(_localColumnMappings[assocId]._internalSourceColumnOrdinal, metadata));
  508. nmatched++;
  509. if(nmatched > 1) {
  510. updateBulkCommandText.Append(", "); // a leading comma for all but the first one
  511. }
  512. // some datatypes need special handling ...
  513. //
  514. if(metadata.type == SqlDbType.Variant) {
  515. AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "sql_variant");
  516. }
  517. else if(metadata.type == SqlDbType.Udt) {
  518. // UDTs are sent as varbinary
  519. AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, "varbinary");
  520. }
  521. else {
  522. AppendColumnNameAndTypeName(updateBulkCommandText, metadata.column, typeof(SqlDbType).GetEnumName(metadata.type));
  523. }
  524. switch(metadata.metaType.NullableType) {
  525. case TdsEnums.SQLNUMERICN:
  526. case TdsEnums.SQLDECIMALN:
  527. // decimal and numeric need to include precision and scale
  528. //
  529. updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0},{1})", metadata.precision, metadata.scale);
  530. break;
  531. case TdsEnums.SQLUDT: {
  532. if (metadata.IsLargeUdt) {
  533. updateBulkCommandText.Append("(max)");
  534. } else {
  535. int size = metadata.length;
  536. updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
  537. }
  538. break;
  539. }
  540. case TdsEnums.SQLTIME:
  541. case TdsEnums.SQLDATETIME2:
  542. case TdsEnums.SQLDATETIMEOFFSET:
  543. // date, dateime2, and datetimeoffset need to include scale
  544. //
  545. updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", metadata.scale);
  546. break;
  547. default: {
  548. // for non-long non-fixed types we need to add the Size
  549. //
  550. if(!metadata.metaType.IsFixed && !metadata.metaType.IsLong) {
  551. int size = metadata.length;
  552. switch(metadata.metaType.NullableType) {
  553. case TdsEnums.SQLNCHAR:
  554. case TdsEnums.SQLNVARCHAR:
  555. case TdsEnums.SQLNTEXT:
  556. size /= 2;
  557. break;
  558. default:
  559. break;
  560. }
  561. updateBulkCommandText.AppendFormat((IFormatProvider)null, "({0})", size);
  562. }
  563. else if(metadata.metaType.IsPlp && metadata.metaType.SqlDbType != SqlDbType.Xml) {
  564. // Partial length column prefix (max)
  565. updateBulkCommandText.Append("(max)");
  566. }
  567. break;
  568. }
  569. }
  570. if(_connection.IsShiloh) {
  571. // Shiloh or above!
  572. // get collation for column i
  573. Result rowset = internalResults[CollationResultId];
  574. object rowvalue = rowset[i][CollationId];
  575. if(rowvalue != null) {
  576. Debug.Assert(rowvalue is SqlString);
  577. SqlString collation_name = (SqlString)rowvalue;
  578. if(!collation_name.IsNull) {
  579. updateBulkCommandText.Append(" COLLATE " + collation_name.Value);
  580. // VSTFDEVDIV 461426: compare collations only if the collation value was set on the metadata
  581. if (null != _SqlDataReaderRowSource && metadata.collation != null) {
  582. // On SqlDataReader we can verify the sourcecolumn collation!
  583. int sourceColumnId = _localColumnMappings[assocId]._internalSourceColumnOrdinal;
  584. int destinationLcid = metadata.collation.LCID;
  585. int sourceLcid = _SqlDataReaderRowSource.GetLocaleId(sourceColumnId);
  586. if(sourceLcid != destinationLcid) {
  587. throw SQL.BulkLoadLcidMismatch(sourceLcid, _SqlDataReaderRowSource.GetName(sourceColumnId), destinationLcid, metadata.column);
  588. }
  589. }
  590. }
  591. }
  592. }
  593. break;
  594. } // end if found
  595. } // end of (inner) for loop
  596. if(assocId == _localColumnMappings.Count) {
  597. // remove metadata for unmatched columns
  598. metaDataSet[i] = null;
  599. }
  600. } // end of (outer) for loop
  601. // all columnmappings should have matched up
  602. if(nmatched + nrejected != _localColumnMappings.Count) {
  603. throw (SQL.BulkLoadNonMatchingColumnMapping());
  604. }
  605. updateBulkCommandText.Append(")");
  606. if((_copyOptions & (
  607. SqlBulkCopyOptions.KeepNulls
  608. | SqlBulkCopyOptions.TableLock
  609. | SqlBulkCopyOptions.CheckConstraints
  610. | SqlBulkCopyOptions.FireTriggers)) != SqlBulkCopyOptions.Default) {
  611. bool addSeparator = false; // insert a comma character if multiple options in list ...
  612. updateBulkCommandText.Append(" with (");
  613. if(IsCopyOption(SqlBulkCopyOptions.KeepNulls)) {
  614. updateBulkCommandText.Append("KEEP_NULLS");
  615. addSeparator = true;
  616. }
  617. if(IsCopyOption(SqlBulkCopyOptions.TableLock)) {
  618. updateBulkCommandText.Append((addSeparator ? ", " : "") + "TABLOCK");
  619. addSeparator = true;
  620. }
  621. if(IsCopyOption(SqlBulkCopyOptions.CheckConstraints)) {
  622. updateBulkCommandText.Append((addSeparator ? ", " : "") + "CHECK_CONSTRAINTS");
  623. addSeparator = true;
  624. }
  625. if(IsCopyOption(SqlBulkCopyOptions.FireTriggers)) {
  626. updateBulkCommandText.Append((addSeparator ? ", " : "") + "FIRE_TRIGGERS");
  627. addSeparator = true;
  628. }
  629. updateBulkCommandText.Append(")");
  630. }
  631. return (updateBulkCommandText.ToString());
  632. }
  633. // submitts the updatebulk command
  634. //
  635. private Task SubmitUpdateBulkCommand(string TDSCommand) {
  636. Bid.CorrelationTrace("<sc.SqlBulkCopy.SubmitUpdateBulkCommand|Info|Correlation> ObjectID%d#, ActivityID %ls\n", ObjectID);
  637. Task executeTask = _parser.TdsExecuteSQLBatch(TDSCommand, this.BulkCopyTimeout, null, _stateObj, sync: !_isAsyncBulkCopy, callerHasConnectionLock: true);
  638. if (executeTask == null) {
  639. RunParser();
  640. return null;
  641. }
  642. else {
  643. Debug.Assert(_isAsyncBulkCopy, "Execution pended when not doing async bulk copy");
  644. return executeTask.ContinueWith(t => {
  645. Debug.Assert(!t.IsCanceled, "Execution task was canceled");
  646. if (t.IsFaulted) {
  647. throw t.Exception.InnerException;
  648. }
  649. else {
  650. RunParserReliably();
  651. }
  652. }, TaskScheduler.Default);
  653. }
  654. }
  655. // Starts writing the Bulkcopy data stream
  656. //
  657. private void WriteMetaData(BulkCopySimpleResultSet internalResults) {
  658. _stateObj.SetTimeoutSeconds(this.BulkCopyTimeout);
  659. _SqlMetaDataSet metadataCollection = internalResults[MetaDataResultId].MetaData;
  660. _stateObj._outputMessageType = TdsEnums.MT_BULK;
  661. _parser.WriteBulkCopyMetaData(metadataCollection, _sortedColumnMappings.Count, _stateObj);
  662. }
  663. //================================================================
  664. // Close()
  665. //
  666. // Terminates the bulk copy operation.
  667. // Must be called at the end of the bulk copy session.
  668. //================================================================
  669. public void Close() {
  670. if(_insideRowsCopiedEvent) {
  671. throw SQL.InvalidOperationInsideEvent();
  672. }
  673. Dispose(true);
  674. GC.SuppressFinalize(this);
  675. }
  676. private void Dispose(bool disposing) {
  677. if(disposing) {
  678. // dispose dependend objects
  679. _columnMappings = null;
  680. _parser = null;
  681. try {
  682. // Just in case there is a lingering transaction (which there shouldn't be)
  683. try {
  684. Debug.Assert(_internalTransaction == null, "Internal transaction exists during dispose");
  685. if (null != _internalTransaction) {
  686. _internalTransaction.Rollback();
  687. _internalTransaction.Dispose();
  688. _internalTransaction = null;
  689. }
  690. }
  691. catch(Exception e) {
  692. //
  693. if(!ADP.IsCatchableExceptionType(e)) {
  694. throw;
  695. }
  696. ADP.TraceExceptionWithoutRethrow(e);
  697. }
  698. }
  699. finally {
  700. if(_connection != null) {
  701. if(_ownConnection) {
  702. _connection.Dispose();
  703. }
  704. _connection = null;
  705. }
  706. }
  707. }
  708. // free unmanaged objects
  709. }
  710. // unified method to read a value from the current row
  711. //
  712. private object GetValueFromSourceRow(int destRowIndex, out bool isSqlType, out bool isDataFeed, out bool isNull) {
  713. _SqlMetaData metadata = _sortedColumnMappings[destRowIndex]._metadata;
  714. int sourceOrdinal = _sortedColumnMappings[destRowIndex]._sourceColumnOrdinal;
  715. switch(_rowSourceType) {
  716. case ValueSourceType.IDataReader:
  717. // Handle data feeds (common for both DbDataReader and SqlDataReader)
  718. if (_currentRowMetadata[destRowIndex].IsDataFeed) {
  719. if (_DbDataReaderRowSource.IsDBNull(sourceOrdinal)) {
  720. isSqlType = false;
  721. isDataFeed = false;
  722. isNull = true;
  723. return DBNull.Value;
  724. }
  725. else {
  726. isSqlType = false;
  727. isDataFeed = true;
  728. isNull = false;
  729. switch (_currentRowMetadata[destRowIndex].Method) {
  730. case ValueMethod.DataFeedStream:
  731. return new StreamDataFeed(_DbDataReaderRowSource.GetStream(sourceOrdinal));
  732. case ValueMethod.DataFeedText:
  733. return new TextDataFeed(_DbDataReaderRowSource.GetTextReader(sourceOrdinal));
  734. case ValueMethod.DataFeedXml:
  735. // Only SqlDataReader supports an XmlReader
  736. // There is no GetXmlReader on DbDataReader, however if GetValue returns XmlReader we will read it as stream if it is assigned to XML field
  737. Debug.Assert(_SqlDataReaderRowSource != null, "Should not be reading row as an XmlReader if bulk copy source is not a SqlDataReader");
  738. return new XmlDataFeed(_SqlDataReaderRowSource.GetXmlReader(sourceOrdinal));
  739. default:
  740. Debug.Assert(false, string.Format("Current column is marked as being a DataFeed, but no DataFeed compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method));
  741. isDataFeed = false;
  742. object columnValue = _DbDataReaderRowSource.GetValue(sourceOrdinal);
  743. ADP.IsNullOrSqlType(columnValue, out isNull, out isSqlType);
  744. return columnValue;
  745. }
  746. }
  747. }
  748. // SqlDataReader-specific logic
  749. else if (null != _SqlDataReaderRowSource) {
  750. if (_currentRowMetadata[destRowIndex].IsSqlType) {
  751. INullable value;
  752. isSqlType = true;
  753. isDataFeed = false;
  754. switch (_currentRowMetadata[destRowIndex].Method) {
  755. case ValueMethod.SqlTypeSqlDecimal:
  756. value = _SqlDataReaderRowSource.GetSqlDecimal(sourceOrdinal);
  757. break;
  758. case ValueMethod.SqlTypeSqlDouble:
  759. value = new SqlDecimal(_SqlDataReaderRowSource.GetSqlDouble(sourceOrdinal).Value);
  760. break;
  761. case ValueMethod.SqlTypeSqlSingle:
  762. value = new SqlDecimal(_SqlDataReaderRowSource.GetSqlSingle(sourceOrdinal).Value);
  763. break;
  764. default:
  765. Debug.Assert(false, string.Format("Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method));
  766. value = (INullable)_SqlDataReaderRowSource.GetSqlValue(sourceOrdinal);
  767. break;
  768. }
  769. isNull = value.IsNull;
  770. return value;
  771. }
  772. else {
  773. isSqlType = false;
  774. isDataFeed = false;
  775. object value = _SqlDataReaderRowSource.GetValue(sourceOrdinal);
  776. isNull = ((value == null) || (value == DBNull.Value));
  777. if ((!isNull) && (metadata.type == SqlDbType.Udt)) {
  778. var columnAsINullable = value as INullable;
  779. isNull = (columnAsINullable != null) && columnAsINullable.IsNull;
  780. }
  781. #if DEBUG
  782. else if (!isNull) {
  783. Debug.Assert(!(value is INullable) || !((INullable)value).IsNull, "IsDBNull returned false, but GetValue returned a null INullable");
  784. }
  785. #endif
  786. return value;
  787. }
  788. }
  789. else {
  790. isDataFeed = false;
  791. IDataReader rowSourceAsIDataReader = (IDataReader)_rowSource;
  792. // Back-compat with 4.0 and 4.5 - only use IsDbNull when streaming is enabled and only for non-SqlDataReader
  793. if ((_enableStreaming) && (_SqlDataReaderRowSource == null) && (rowSourceAsIDataReader.IsDBNull(sourceOrdinal))) {
  794. isSqlType = false;
  795. isNull = true;
  796. return DBNull.Value;
  797. }
  798. else {
  799. object columnValue = rowSourceAsIDataReader.GetValue(sourceOrdinal);
  800. ADP.IsNullOrSqlType(columnValue, out isNull, out isSqlType);
  801. return columnValue;
  802. }
  803. }
  804. case ValueSourceType.DataTable:
  805. case ValueSourceType.RowArray: {
  806. Debug.Assert(_currentRow != null, "uninitialized _currentRow");
  807. Debug.Assert(sourceOrdinal < _currentRowLength, "inconsistency of length of rows from rowsource!");
  808. isDataFeed = false;
  809. object currentRowValue = _currentRow[sourceOrdinal];
  810. ADP.IsNullOrSqlType(currentRowValue, out isNull, out isSqlType);
  811. // If this row is not null, and there are special storage types for this row, then handle the special storage types
  812. if ((!isNull) && (_currentRowMetadata[destRowIndex].IsSqlType)) {
  813. switch (_currentRowMetadata[destRowIndex].Method) {
  814. case ValueMethod.SqlTypeSqlSingle: {
  815. if (isSqlType) {
  816. return new SqlDecimal(((SqlSingle)currentRowValue).Value);
  817. }
  818. else {
  819. float f = (float)currentRowValue;
  820. if (!float.IsNaN(f)) {
  821. isSqlType = true;
  822. return new SqlDecimal(f);
  823. }
  824. break;
  825. }
  826. }
  827. case ValueMethod.SqlTypeSqlDouble: {
  828. if (isSqlType) {
  829. return new SqlDecimal(((SqlDouble)currentRowValue).Value);
  830. }
  831. else {
  832. double d = (double)currentRowValue;
  833. if (!double.IsNaN(d)) {
  834. isSqlType = true;
  835. return new SqlDecimal(d);
  836. }
  837. break;
  838. }
  839. }
  840. case ValueMethod.SqlTypeSqlDecimal: {
  841. if (isSqlType) {
  842. return (SqlDecimal)currentRowValue;
  843. }
  844. else {
  845. isSqlType = true;
  846. return new SqlDecimal((Decimal)currentRowValue);
  847. }
  848. }
  849. default: {
  850. Debug.Assert(false, string.Format("Current column is marked as being a SqlType, but no SqlType compatible method was provided. Method: {0}", _currentRowMetadata[destRowIndex].Method));
  851. break;
  852. }
  853. }
  854. }
  855. // If we are here then either the value is null, there was no special storage type for this column or the special storage type wasn't handled (e.g. if the currentRowValue is NaN)
  856. return currentRowValue;
  857. }
  858. default: {
  859. Debug.Assert(false, "ValueSourcType unspecified");
  860. throw ADP.NotSupported();
  861. }
  862. }
  863. }
  864. // unified method to read a row from the current rowsource
  865. // When _isAsyncBulkCopy == true (i.e. async copy): returns Task<bool> when IDataReader is a DbDataReader, Null for others.
  866. // When _isAsyncBulkCopy == false (i.e. [....] copy): returns null. Uses ReadFromRowSource to get the boolean value.
  867. // "more" -- should be used by the caller only when the return value is null.
  868. private Task ReadFromRowSourceAsync(CancellationToken cts) {
  869. DbDataReader dbRowSource = _rowSource as DbDataReader;
  870. if (_isAsyncBulkCopy && _rowSourceType == ValueSourceType.IDataReader && (_rowSource as DbDataReader) != null) {
  871. //This will call ReadAsync for DbDataReader (for SqlDataReader it will be truely async read; for non-SqlDataReader it may block.)
  872. return ((DbDataReader)_rowSource).ReadAsync(cts).ContinueWith((t) => {
  873. if (t.Status == TaskStatus.RanToCompletion) {
  874. _hasMoreRowToCopy = t.Result;
  875. }
  876. return t;
  877. }, TaskScheduler.Default).Unwrap();
  878. }
  879. else { //this will call Read for DataRows, DataTable and IDataReader (this includes all IDataReader except DbDataReader)
  880. _hasMoreRowToCopy = false;
  881. try {
  882. _hasMoreRowToCopy = ReadFromRowSource(); //Synchronous calls for DataRows and DataTable won't block. For IDataReader, it may block.
  883. }
  884. catch(Exception ex) {
  885. if (_isAsyncBulkCopy) {
  886. TaskCompletionSource<bool> tcs = new TaskCompletionSource<bool>();
  887. tcs.SetException(ex);
  888. return tcs.Task;
  889. }
  890. else {
  891. throw;
  892. }
  893. }
  894. return null;
  895. }
  896. }
  897. private bool ReadFromRowSource() {
  898. switch(_rowSourceType) {
  899. case ValueSourceType.IDataReader:
  900. return ((IDataReader)_rowSource).Read();
  901. // treatment for RowArray case is same as for DataTable, prevent code duplicate
  902. case ValueSourceType.RowArray:
  903. case ValueSourceType.DataTable:
  904. Debug.Assert(_rowEnumerator != null, "uninitialized _rowEnumerator");
  905. Debug.Assert((_rowStateToSkip & DataRowState.Deleted) != 0, "Deleted is a permitted rowstate?");
  906. // repeat until we get a row that is not deleted or there are no more rows ...
  907. do {
  908. if(!_rowEnumerator.MoveNext()) {
  909. return false;
  910. }
  911. _currentRow = (DataRow)_rowEnumerator.Current;
  912. } while ((_currentRow.RowState & _rowStateToSkip) != 0); // repeat if there is an unexpected rowstate
  913. // SQLBUVSTS01:36286 - move this line out of loop because
  914. // ItemArray raises exception when used on deleted row
  915. _currentRowLength = _currentRow.ItemArray.Length;
  916. return true;
  917. default:
  918. Debug.Assert(false, "ValueSourcType unspecified");
  919. throw ADP.NotSupported();
  920. }
  921. }
  922. private SourceColumnMetadata GetColumnMetadata(int ordinal) {
  923. int sourceOrdinal = _sortedColumnMappings[ordinal]._sourceColumnOrdinal;
  924. _SqlMetaData metadata = _sortedColumnMappings[ordinal]._metadata;
  925. // Handle special Sql data types for SqlDataReader and DataTables
  926. ValueMethod method;
  927. bool isSqlType;
  928. bool isDataFeed;
  929. if (((_SqlDataReaderRowSource != null) || (_dataTableSource != null)) && ((metadata.metaType.NullableType == TdsEnums.SQLDECIMALN) || (metadata.metaType.NullableType == TdsEnums.SQLNUMERICN))) {
  930. isDataFeed = false;
  931. Type t;
  932. switch(_rowSourceType) {
  933. case ValueSourceType.IDataReader:
  934. t = _SqlDataReaderRowSource.GetFieldType(sourceOrdinal);
  935. break;
  936. case ValueSourceType.DataTable:
  937. case ValueSourceType.RowArray:
  938. t = _dataTableSource.Columns[sourceOrdinal].DataType;
  939. break;
  940. default:
  941. t = null;
  942. Debug.Assert(false, string.Format("Unknown value source: {0}", _rowSourceType));
  943. break;
  944. }
  945. if (typeof(SqlDecimal) == t || typeof(Decimal) == t) {
  946. isSqlType = true;
  947. method = ValueMethod.SqlTypeSqlDecimal; // Source Type Decimal
  948. }
  949. else if (typeof(SqlDouble) == t || typeof (double) == t) {
  950. isSqlType = true;
  951. method = ValueMethod.SqlTypeSqlDouble; // Source Type SqlDouble
  952. }
  953. else if (typeof(SqlSingle) == t || typeof (float) == t) {
  954. isSqlType = true;
  955. method = ValueMethod.SqlTypeSqlSingle; // Source Type SqlSingle
  956. }
  957. else {
  958. isSqlType = false;
  959. method = ValueMethod.GetValue;
  960. }
  961. }
  962. // Check for data streams
  963. else if ((_enableStreaming) && (metadata.length == MAX_LENGTH) && (!_rowSourceIsSqlDataReaderSmi)) {
  964. isSqlType = false;
  965. if (_SqlDataReaderRowSource != null) {
  966. // MetaData property is not set for SMI, but since streaming is disabled we do not need it
  967. MetaType mtSource = _SqlDataReaderRowSource.MetaData[sourceOrdinal].metaType;
  968. // There is no memory gain for non-sequential access for binary
  969. if ((metadata.type == SqlDbType.VarBinary) && (mtSource.IsBinType) && (mtSource.SqlDbType != SqlDbType.Timestamp) && _SqlDataReaderRowSource.IsCommandBehavior(CommandBehavior.SequentialAccess)) {
  970. isDataFeed = true;
  971. method = ValueMethod.DataFeedStream;
  972. }
  973. // For text and XML there is memory gain from streaming on destination side even if reader is non-sequential
  974. else if (((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) && (mtSource.IsCharType) && (mtSource.SqlDbType != SqlDbType.Xml)) {
  975. isDataFeed = true;
  976. method = ValueMethod.DataFeedText;
  977. }
  978. else if ((metadata.type == SqlDbType.Xml) && (mtSource.SqlDbType == SqlDbType.Xml)) {
  979. isDataFeed = true;
  980. method = ValueMethod.DataFeedXml;
  981. }
  982. else {
  983. isDataFeed = false;
  984. method = ValueMethod.GetValue;
  985. }
  986. }
  987. else if (_DbDataReaderRowSource != null) {
  988. if (metadata.type == SqlDbType.VarBinary) {
  989. isDataFeed = true;
  990. method = ValueMethod.DataFeedStream;
  991. }
  992. else if ((metadata.type == SqlDbType.VarChar) || (metadata.type == SqlDbType.NVarChar)) {
  993. isDataFeed = true;
  994. method = ValueMethod.DataFeedText;
  995. }
  996. else {
  997. isDataFeed = false;
  998. method = ValueMethod.GetValue;
  999. }
  1000. }
  1001. else {
  1002. isDataFeed = false;
  1003. method = ValueMethod.GetValue;
  1004. }
  1005. }
  1006. else {
  1007. isSqlType = false;
  1008. isDataFeed = false;
  1009. method = ValueMethod.GetValue;
  1010. }
  1011. return new SourceColumnMetadata(method, isSqlType, isDataFeed);
  1012. }
  1013. //
  1014. //
  1015. private void CreateOrValidateConnection(string method) {
  1016. if(null == _connection) {
  1017. throw ADP.ConnectionRequired(method);
  1018. }
  1019. if (_connection.IsContextConnection) {
  1020. throw SQL.NotAvailableOnContextConnection();
  1021. }
  1022. if(_ownConnection && _connection.State != ConnectionState.Open) {
  1023. _connection.Open();
  1024. }
  1025. // close any non MARS dead readers, if applicable, and then throw if still busy.
  1026. _connection.ValidateConnectionForExecute(method, null);
  1027. // if we have a transaction, check to ensure that the active
  1028. // connection property matches the connection associated with
  1029. // the transaction
  1030. if(null != _externalTransaction && _connection != _externalTransaction.Connection) {
  1031. throw ADP.TransactionConnectionMismatch();
  1032. }
  1033. }
  1034. // Runs the _parser until it is done and ensures that ThreadHasParserLockForClose is correctly set and unset
  1035. // Ensure that you only call this inside of a Reliabilty Section
  1036. private void RunParser(BulkCopySimpleResultSet bulkCopyHandler = null) {
  1037. // In case of error while reading, we should let the connection know that we already own the _parserLock
  1038. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  1039. internalConnection.ThreadHasParserLockForClose = true;
  1040. try {
  1041. _parser.Run(RunBehavior.UntilDone, null, null, bulkCopyHandler, _stateObj);
  1042. }
  1043. finally {
  1044. internalConnection.ThreadHasParserLockForClose = false;
  1045. }
  1046. }
  1047. // Runs the _parser until it is done and ensures that ThreadHasParserLockForClose is correctly set and unset
  1048. // This takes care of setting up the Reliability Section, and will doom the connect if there is a catastrophic (OOM, StackOverflow, ThreadAbort) error
  1049. private void RunParserReliably(BulkCopySimpleResultSet bulkCopyHandler = null) {
  1050. // In case of error while reading, we should let the connection know that we already own the _parserLock
  1051. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  1052. internalConnection.ThreadHasParserLockForClose = true;
  1053. try {
  1054. _parser.RunReliably(RunBehavior.UntilDone, null, null, bulkCopyHandler, _stateObj);
  1055. }
  1056. finally {
  1057. internalConnection.ThreadHasParserLockForClose = false;
  1058. }
  1059. }
  1060. private void CommitTransaction() {
  1061. if (null != _internalTransaction) {
  1062. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  1063. internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock
  1064. try {
  1065. _internalTransaction.Commit(); //commit.
  1066. _internalTransaction.Dispose();
  1067. _internalTransaction = null;
  1068. }
  1069. finally {
  1070. internalConnection.ThreadHasParserLockForClose = false;
  1071. }
  1072. }
  1073. }
  1074. private void AbortTransaction() {
  1075. if (_internalTransaction != null) {
  1076. if (!_internalTransaction.IsZombied) {
  1077. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  1078. internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we have the lock
  1079. try {
  1080. _internalTransaction.Rollback();
  1081. }
  1082. finally {
  1083. internalConnection.ThreadHasParserLockForClose = false;
  1084. }
  1085. }
  1086. _internalTransaction.Dispose();
  1087. _internalTransaction = null;
  1088. }
  1089. }
  1090. // Appends columnname in square brackets, a space and the typename to the query
  1091. // putting the name in quotes also requires doubling existing ']' so that they are not mistaken for
  1092. // the closing quote
  1093. // example: abc will become [abc] but abc[] will becom [abc[]]]
  1094. //
  1095. private void AppendColumnNameAndTypeName(StringBuilder query, string columnName, string typeName) {
  1096. SqlServerEscapeHelper.EscapeIdentifier(query, columnName);
  1097. query.Append(" ");
  1098. query.Append(typeName);
  1099. }
  1100. private string UnquotedName(string name) {
  1101. if(ADP.IsEmpty(name)) return null;
  1102. if(name[0] == '[') {
  1103. int l = name.Length;
  1104. Debug.Assert(name[l - 1] == ']', "Name starts with [ but doesn not end with ]");
  1105. name = name.Substring(1, l - 2);
  1106. }
  1107. return name;
  1108. }
  1109. private object ValidateBulkCopyVariant(object value) {
  1110. // from the spec:
  1111. // "The only acceptable types are ..."
  1112. // GUID, BIGVARBINARY, BIGBINARY, BIGVARCHAR, BIGCHAR, NVARCHAR, NCHAR, BIT, INT1, INT2, INT4, INT8,
  1113. // MONEY4, MONEY, DECIMALN, NUMERICN, FTL4, FLT8, DATETIME4 and DATETIME
  1114. //
  1115. MetaType metatype = MetaType.GetMetaTypeFromValue(value);
  1116. switch(metatype.TDSType) {
  1117. case TdsEnums.SQLFLT4:
  1118. case TdsEnums.SQLFLT8:
  1119. case TdsEnums.SQLINT8:
  1120. case TdsEnums.SQLINT4:
  1121. case TdsEnums.SQLINT2:
  1122. case TdsEnums.SQLINT1:
  1123. case TdsEnums.SQLBIT:
  1124. case TdsEnums.SQLBIGVARBINARY:
  1125. case TdsEnums.SQLBIGVARCHAR:
  1126. case TdsEnums.SQLUNIQUEID:
  1127. case TdsEnums.SQLNVARCHAR:
  1128. case TdsEnums.SQLDATETIME:
  1129. case TdsEnums.SQLMONEY:
  1130. case TdsEnums.SQLNUMERICN:
  1131. case TdsEnums.SQLDATE:
  1132. case TdsEnums.SQLTIME:
  1133. case TdsEnums.SQLDATETIME2:
  1134. case TdsEnums.SQLDATETIMEOFFSET:
  1135. if (value is INullable) { // Current limitation in the SqlBulkCopy Variant code limits BulkCopy to CLR/COM Types.
  1136. return MetaType.GetComValueFromSqlVariant (value);
  1137. } else {
  1138. return value;
  1139. }
  1140. default:
  1141. throw SQL.BulkLoadInvalidVariantValue();
  1142. }
  1143. }
  1144. private object ConvertValue(object value, _SqlMetaData metadata, bool isNull, ref bool isSqlType, out bool coercedToDataFeed) {
  1145. coercedToDataFeed = false;
  1146. if (isNull) {
  1147. if(!metadata.isNullable) {
  1148. throw SQL.BulkLoadBulkLoadNotAllowDBNull(metadata.column);
  1149. }
  1150. return value;
  1151. }
  1152. MetaType type = metadata.metaType;
  1153. bool typeChanged = false;
  1154. try {
  1155. MetaType mt;
  1156. switch(type.NullableType) {
  1157. case TdsEnums.SQLNUMERICN:
  1158. case TdsEnums.SQLDECIMALN:
  1159. mt = MetaType.GetMetaTypeFromSqlDbType(type.SqlDbType, false);
  1160. value = SqlParameter.CoerceValue(value, mt, out coercedToDataFeed, out typeChanged, false);
  1161. // Convert Source Decimal Percision and Scale to Destination Percision and Scale
  1162. // Fix Bug: 385971 sql decimal data could get corrupted on insert if the scale of
  1163. // the source and destination weren't the same. The BCP protocal, specifies the
  1164. // scale of the incoming data in the insert statement, we just tell the server we
  1165. // are inserting the same scale back. This then created a bug inside the BCP opperation
  1166. // if the scales didn't match. The fix is to do the same thing that SQL Paramater does,
  1167. // and adjust the scale before writing. In Orcas is scale adjustment should be removed from
  1168. // SqlParamater and SqlBulkCopy and Isoloated inside SqlParamater.CoerceValue, but becouse of
  1169. // where we are in the cycle, the changes must be kept at minimum, so I'm just bringing the
  1170. // code over to SqlBulkCopy.
  1171. SqlDecimal sqlValue;
  1172. if ((isSqlType) && (!typeChanged)) {
  1173. sqlValue = (SqlDecimal)value;
  1174. }
  1175. else {
  1176. sqlValue = new SqlDecimal((Decimal)value);
  1177. }
  1178. if (sqlValue.Scale != metadata.scale) {
  1179. sqlValue = TdsParser.AdjustSqlDecimalScale(sqlValue, metadata.scale);
  1180. }
  1181. // Perf: It is more effecient to write a SqlDecimal than a decimal since we need to break it into its 'bits' when writing
  1182. value = sqlValue;
  1183. isSqlType = true;
  1184. typeChanged = false; // Setting this to false as SqlParameter.CoerceValue will only set it to true when coverting to a CLR type
  1185. if (sqlValue.Precision > metadata.precision) {
  1186. throw SQL.BulkLoadCannotConvertValue(value.GetType(), mt, ADP.ParameterValueOutOfRange(sqlValue));
  1187. }
  1188. break;
  1189. case TdsEnums.SQLINTN:
  1190. case TdsEnums.SQLFLTN:
  1191. case TdsEnums.SQLFLT4:
  1192. case TdsEnums.SQLFLT8:
  1193. case TdsEnums.SQLMONEYN:
  1194. case TdsEnums.SQLDATETIM4:
  1195. case TdsEnums.SQLDATETIME:
  1196. case TdsEnums.SQLDATETIMN:
  1197. case TdsEnums.SQLBIT:
  1198. case TdsEnums.SQLBITN:
  1199. case TdsEnums.SQLUNIQUEID:
  1200. case TdsEnums.SQLBIGBINARY:
  1201. case TdsEnums.SQLBIGVARBINARY:
  1202. case TdsEnums.SQLIMAGE:
  1203. case TdsEnums.SQLBIGCHAR:
  1204. case TdsEnums.SQLBIGVARCHAR:
  1205. case TdsEnums.SQLTEXT:
  1206. case TdsEnums.SQLDATE:
  1207. case TdsEnums.SQLTIME:
  1208. case TdsEnums.SQLDATETIME2:
  1209. case TdsEnums.SQLDATETIMEOFFSET:
  1210. mt = MetaType.GetMetaTypeFromSqlDbType (type.SqlDbType, false);
  1211. value = SqlParameter.CoerceValue(value, mt, out coercedToDataFeed, out typeChanged, false);
  1212. break;
  1213. case TdsEnums.SQLNCHAR:
  1214. case TdsEnums.SQLNVARCHAR:
  1215. case TdsEnums.SQLNTEXT:
  1216. mt = MetaType.GetMetaTypeFromSqlDbType(type.SqlDbType, false);
  1217. value = SqlParameter.CoerceValue(value, mt, out coercedToDataFeed, out typeChanged, false);
  1218. if (!coercedToDataFeed) { // We do not need to test for TextDataFeed as it is only assigned to (N)VARCHAR(MAX)
  1219. int len = ((isSqlType) && (!typeChanged)) ? ((SqlString)value).Value.Length : ((string)value).Length;
  1220. if (len > metadata.length / 2) {
  1221. throw SQL.BulkLoadStringTooLong();
  1222. }
  1223. }
  1224. break;
  1225. case TdsEnums.SQLVARIANT:
  1226. value = ValidateBulkCopyVariant(value);
  1227. typeChanged = true;
  1228. break;
  1229. case TdsEnums.SQLUDT:
  1230. // UDTs are sent as varbinary so we need to get the raw bytes
  1231. // unlike other types the parser does not like SQLUDT in form of SqlType
  1232. // so we cast to a CLR type.
  1233. // Hack for type system version knob - only call GetBytes if the value is not already
  1234. // in byte[] form.
  1235. if (!(value is byte[])) {
  1236. value = _connection.GetBytes(value);
  1237. typeChanged = true;
  1238. }
  1239. break;
  1240. case TdsEnums.SQLXMLTYPE:
  1241. // Could be either string, SqlCachedBuffer, XmlReader or XmlDataFeed
  1242. Debug.Assert((value is XmlReader) || (value is SqlCachedBuffer) || (value is string) || (value is SqlString) || (value is XmlDataFeed), "Invalid value type of Xml datatype");
  1243. if(value is XmlReader) {
  1244. value = new XmlDataFeed((XmlReader)value);
  1245. typeChanged = true;
  1246. coercedToDataFeed = true;
  1247. }
  1248. break;
  1249. default:
  1250. Debug.Assert(false, "Unknown TdsType!" + type.NullableType.ToString("x2", (IFormatProvider)null));
  1251. throw SQL.BulkLoadCannotConvertValue(value.GetType(), metadata.metaType, null);
  1252. }
  1253. if (typeChanged) {
  1254. // All type changes change to CLR types
  1255. isSqlType = false;
  1256. }
  1257. return value;
  1258. }
  1259. catch(Exception e) {
  1260. if(!ADP.IsCatchableExceptionType(e)) {
  1261. throw;
  1262. }
  1263. throw SQL.BulkLoadCannotConvertValue(value.GetType(), metadata.metaType, e);
  1264. }
  1265. }
  1266. public void WriteToServer(IDataReader reader) {
  1267. SqlConnection.ExecutePermission.Demand();
  1268. if (reader == null) {
  1269. throw new ArgumentNullException("reader");
  1270. }
  1271. if (_isBulkCopyingInProgress) {
  1272. throw SQL.BulkLoadPendingOperation();
  1273. }
  1274. SqlStatistics statistics = Statistics;
  1275. try {
  1276. statistics = SqlStatistics.StartTimer(Statistics);
  1277. _rowSource = reader;
  1278. _SqlDataReaderRowSource = _rowSource as SqlDataReader;
  1279. if (_SqlDataReaderRowSource != null) {
  1280. _rowSourceIsSqlDataReaderSmi = _SqlDataReaderRowSource is SqlDataReaderSmi;
  1281. }
  1282. _DbDataReaderRowSource = _rowSource as DbDataReader;
  1283. _dataTableSource = null;
  1284. _rowSourceType = ValueSourceType.IDataReader;
  1285. _isAsyncBulkCopy = false;
  1286. WriteRowSourceToServerAsync(reader.FieldCount, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
  1287. }
  1288. finally {
  1289. SqlStatistics.StopTimer(statistics);
  1290. }
  1291. }
  1292. public void WriteToServer(DataTable table) {
  1293. WriteToServer(table, 0);
  1294. }
  1295. public void WriteToServer(DataTable table, DataRowState rowState) {
  1296. SqlConnection.ExecutePermission.Demand();
  1297. if (table == null) {
  1298. throw new ArgumentNullException("table");
  1299. }
  1300. if (_isBulkCopyingInProgress) {
  1301. throw SQL.BulkLoadPendingOperation();
  1302. }
  1303. SqlStatistics statistics = Statistics;
  1304. try {
  1305. statistics = SqlStatistics.StartTimer(Statistics);
  1306. _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
  1307. _rowSource = table;
  1308. _dataTableSource = table;
  1309. _SqlDataReaderRowSource = null;
  1310. _rowSourceType = ValueSourceType.DataTable;
  1311. _rowEnumerator = table.Rows.GetEnumerator();
  1312. _isAsyncBulkCopy = false;
  1313. WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
  1314. }
  1315. finally {
  1316. SqlStatistics.StopTimer(statistics);
  1317. }
  1318. }
  1319. public void WriteToServer(DataRow[] rows) {
  1320. SqlConnection.ExecutePermission.Demand();
  1321. SqlStatistics statistics = Statistics;
  1322. if (rows == null) {
  1323. throw new ArgumentNullException("rows");
  1324. }
  1325. if (_isBulkCopyingInProgress) {
  1326. throw SQL.BulkLoadPendingOperation();
  1327. }
  1328. if (rows.Length == 0) {
  1329. return; // nothing to do. user passed us an empty array
  1330. }
  1331. try {
  1332. statistics = SqlStatistics.StartTimer(Statistics);
  1333. DataTable table = rows[0].Table;
  1334. Debug.Assert(null != table, "How can we have rows without a table?");
  1335. _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
  1336. _rowSource = rows;
  1337. _dataTableSource = table;
  1338. _SqlDataReaderRowSource = null;
  1339. _rowSourceType = ValueSourceType.RowArray;
  1340. _rowEnumerator = rows.GetEnumerator();
  1341. _isAsyncBulkCopy = false;
  1342. WriteRowSourceToServerAsync(table.Columns.Count, CancellationToken.None); //It returns null since _isAsyncBulkCopy = false;
  1343. }
  1344. finally {
  1345. SqlStatistics.StopTimer(statistics);
  1346. }
  1347. }
  1348. /*Async overloads start here*/
  1349. public Task WriteToServerAsync(DataRow[] rows) {
  1350. return WriteToServerAsync(rows, CancellationToken.None);
  1351. }
  1352. public Task WriteToServerAsync(DataRow[] rows, CancellationToken cancellationToken) {
  1353. Task resultTask = null;
  1354. SqlConnection.ExecutePermission.Demand();
  1355. if (rows == null) {
  1356. throw new ArgumentNullException("rows");
  1357. }
  1358. if (_isBulkCopyingInProgress) {
  1359. throw SQL.BulkLoadPendingOperation();
  1360. }
  1361. SqlStatistics statistics = Statistics;
  1362. try {
  1363. statistics = SqlStatistics.StartTimer(Statistics);
  1364. if (rows.Length == 0) {
  1365. TaskCompletionSource<object> source = new TaskCompletionSource<object>();
  1366. if (cancellationToken.IsCancellationRequested) {
  1367. source.SetCanceled();
  1368. }
  1369. else{
  1370. source.SetResult(null);
  1371. }
  1372. resultTask = source.Task;
  1373. return resultTask; // nothing to do. user passed us an empty array. Return a completed Task.
  1374. }
  1375. DataTable table = rows[0].Table;
  1376. Debug.Assert(null != table, "How can we have rows without a table?");
  1377. _rowStateToSkip = DataRowState.Deleted; // Don't allow deleted rows
  1378. _rowSource = rows;
  1379. _dataTableSource = table;
  1380. _SqlDataReaderRowSource = null;
  1381. _rowSourceType = ValueSourceType.RowArray;
  1382. _rowEnumerator = rows.GetEnumerator();
  1383. _isAsyncBulkCopy = true;
  1384. resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
  1385. }
  1386. finally{
  1387. SqlStatistics.StopTimer(statistics);
  1388. }
  1389. return resultTask;
  1390. }
  1391. public Task WriteToServerAsync(IDataReader reader) {
  1392. return WriteToServerAsync(reader, CancellationToken.None);
  1393. }
  1394. public Task WriteToServerAsync(IDataReader reader, CancellationToken cancellationToken) {
  1395. Task resultTask = null;
  1396. SqlConnection.ExecutePermission.Demand();
  1397. if (reader == null) {
  1398. throw new ArgumentNullException("reader");
  1399. }
  1400. if (_isBulkCopyingInProgress) {
  1401. throw SQL.BulkLoadPendingOperation();
  1402. }
  1403. SqlStatistics statistics = Statistics;
  1404. try {
  1405. statistics = SqlStatistics.StartTimer(Statistics);
  1406. _rowSource = reader;
  1407. _SqlDataReaderRowSource = _rowSource as SqlDataReader;
  1408. _DbDataReaderRowSource = _rowSource as DbDataReader;
  1409. _dataTableSource = null;
  1410. _rowSourceType = ValueSourceType.IDataReader;
  1411. _isAsyncBulkCopy = true;
  1412. resultTask = WriteRowSourceToServerAsync(reader.FieldCount, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
  1413. }
  1414. finally {
  1415. SqlStatistics.StopTimer(statistics);
  1416. }
  1417. return resultTask;
  1418. }
  1419. public Task WriteToServerAsync(DataTable table) {
  1420. return WriteToServerAsync(table, 0, CancellationToken.None);
  1421. }
  1422. public Task WriteToServerAsync(DataTable table, CancellationToken cancellationToken) {
  1423. return WriteToServerAsync(table, 0, cancellationToken);
  1424. }
  1425. public Task WriteToServerAsync(DataTable table, DataRowState rowState) {
  1426. return WriteToServerAsync(table, rowState, CancellationToken.None);
  1427. }
  1428. public Task WriteToServerAsync(DataTable table, DataRowState rowState, CancellationToken cancellationToken) {
  1429. Task resultTask = null;
  1430. SqlConnection.ExecutePermission.Demand();
  1431. if (table == null) {
  1432. throw new ArgumentNullException("table");
  1433. }
  1434. if (_isBulkCopyingInProgress){
  1435. throw SQL.BulkLoadPendingOperation();
  1436. }
  1437. SqlStatistics statistics = Statistics;
  1438. try {
  1439. statistics = SqlStatistics.StartTimer(Statistics);
  1440. _rowStateToSkip = ((rowState == 0) || (rowState == DataRowState.Deleted)) ? DataRowState.Deleted : ~rowState | DataRowState.Deleted;
  1441. _rowSource = table;
  1442. _SqlDataReaderRowSource = null;
  1443. _dataTableSource = table;
  1444. _rowSourceType = ValueSourceType.DataTable;
  1445. _rowEnumerator = table.Rows.GetEnumerator();
  1446. _isAsyncBulkCopy = true;
  1447. resultTask = WriteRowSourceToServerAsync(table.Columns.Count, cancellationToken); //It returns Task since _isAsyncBulkCopy = true;
  1448. }
  1449. finally {
  1450. SqlStatistics.StopTimer(statistics);
  1451. }
  1452. return resultTask;
  1453. }
  1454. // Writes row source.
  1455. //
  1456. private Task WriteRowSourceToServerAsync(int columnCount, CancellationToken ctoken) {
  1457. Task reconnectTask = _connection._currentReconnectionTask;
  1458. if (reconnectTask != null && !reconnectTask.IsCompleted) {
  1459. if (this._isAsyncBulkCopy) {
  1460. TaskCompletionSource<object> tcs = new TaskCompletionSource<object>();
  1461. reconnectTask.ContinueWith((t) => {
  1462. Task writeTask = WriteRowSourceToServerAsync(columnCount, ctoken);
  1463. if (writeTask == null) {
  1464. tcs.SetResult(null);
  1465. }
  1466. else {
  1467. AsyncHelper.ContinueTask(writeTask, tcs, () => tcs.SetResult(null));
  1468. }
  1469. }, ctoken); // we do not need to propagate exception etc. from reconnect task, we just need to wait for it to finish
  1470. return tcs.Task;
  1471. }
  1472. else {
  1473. AsyncHelper.WaitForCompletion(reconnectTask, BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); }, rethrowExceptions: false);
  1474. }
  1475. }
  1476. bool finishedSynchronously = true;
  1477. _isBulkCopyingInProgress = true;
  1478. CreateOrValidateConnection(SQL.WriteToServer);
  1479. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  1480. Debug.Assert(_parserLock == null, "Previous parser lock not cleaned");
  1481. _parserLock = internalConnection._parserLock;
  1482. _parserLock.Wait(canReleaseFromAnyThread: _isAsyncBulkCopy);
  1483. TdsParser bestEffortCleanupTarget = null;
  1484. RuntimeHelpers.PrepareConstrainedRegions();
  1485. try {
  1486. #if DEBUG
  1487. TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
  1488. RuntimeHelpers.PrepareConstrainedRegions();
  1489. try {
  1490. tdsReliabilitySection.Start();
  1491. #else
  1492. {
  1493. #endif //DEBUG
  1494. bestEffortCleanupTarget = SqlInternalConnection.GetBestEffortCleanupTarget(_connection);
  1495. WriteRowSourceToServerCommon(columnCount); //this is common in both sync and async
  1496. Task resultTask = WriteToServerInternalAsync(ctoken); // resultTask is null for sync, but Task for async.
  1497. if (resultTask != null) {
  1498. finishedSynchronously = false;
  1499. return resultTask.ContinueWith((t) => {
  1500. AbortTransaction(); // if there is one, on success transactions will be commited
  1501. _isBulkCopyingInProgress = false;
  1502. if (_parser != null) {
  1503. _parser._asyncWrite = false;
  1504. }
  1505. if (_parserLock != null) {
  1506. _parserLock.Release();
  1507. _parserLock = null;
  1508. }
  1509. return t;
  1510. }, TaskScheduler.Default).Unwrap();
  1511. }
  1512. return null;
  1513. }
  1514. #if DEBUG
  1515. finally {
  1516. tdsReliabilitySection.Stop();
  1517. }
  1518. #endif //DEBUG
  1519. }
  1520. catch (System.OutOfMemoryException e) {
  1521. _connection.Abort(e);
  1522. throw;
  1523. }
  1524. catch (System.StackOverflowException e) {
  1525. _connection.Abort(e);
  1526. throw;
  1527. }
  1528. catch (System.Threading.ThreadAbortException e) {
  1529. _connection.Abort(e);
  1530. SqlInternalConnection.BestEffortCleanup(bestEffortCleanupTarget);
  1531. throw;
  1532. }
  1533. finally {
  1534. _columnMappings.ReadOnly = false;
  1535. if (finishedSynchronously) {
  1536. AbortTransaction(); // if there is one, on success transactions will be commited
  1537. _isBulkCopyingInProgress = false;
  1538. if (_parser != null) {
  1539. _parser._asyncWrite = false;
  1540. }
  1541. if (_parserLock != null) {
  1542. _parserLock.Release();
  1543. _parserLock = null;
  1544. }
  1545. }
  1546. }
  1547. }
  1548. // Handles the column mapping.
  1549. //
  1550. private void WriteRowSourceToServerCommon(int columnCount) {
  1551. bool unspecifiedColumnOrdinals = false;
  1552. _columnMappings.ReadOnly = true;
  1553. _localColumnMappings = _columnMappings;
  1554. if (_localColumnMappings.Count > 0) {
  1555. _localColumnMappings.ValidateCollection();
  1556. foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
  1557. if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
  1558. unspecifiedColumnOrdinals = true;
  1559. break;
  1560. }
  1561. }
  1562. }
  1563. else {
  1564. _localColumnMappings = new SqlBulkCopyColumnMappingCollection();
  1565. _localColumnMappings.CreateDefaultMapping(columnCount);
  1566. }
  1567. // perf: If the user specified all column ordinals we do not need to get a schematable
  1568. //
  1569. if (unspecifiedColumnOrdinals) {
  1570. int index = -1;
  1571. unspecifiedColumnOrdinals = false;
  1572. // Match up sourceColumn names with sourceColumn ordinals
  1573. //
  1574. if (_localColumnMappings.Count > 0) {
  1575. foreach (SqlBulkCopyColumnMapping bulkCopyColumn in _localColumnMappings) {
  1576. if (bulkCopyColumn._internalSourceColumnOrdinal == -1) {
  1577. string unquotedColumnName = UnquotedName(bulkCopyColumn.SourceColumn);
  1578. switch (this._rowSourceType) {
  1579. case ValueSourceType.DataTable:
  1580. index = ((DataTable)_rowSource).Columns.IndexOf(unquotedColumnName);
  1581. break;
  1582. case ValueSourceType.RowArray:
  1583. index = ((DataRow[])_rowSource)[0].Table.Columns.IndexOf(unquotedColumnName);
  1584. break;
  1585. case ValueSourceType.IDataReader:
  1586. try {
  1587. index = ((IDataRecord)this._rowSource).GetOrdinal(unquotedColumnName);
  1588. }
  1589. catch (IndexOutOfRangeException e) {
  1590. throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName, e));
  1591. }
  1592. break;
  1593. }
  1594. if (index == -1) {
  1595. throw (SQL.BulkLoadNonMatchingColumnName(unquotedColumnName));
  1596. }
  1597. bulkCopyColumn._internalSourceColumnOrdinal = index;
  1598. }
  1599. }
  1600. }
  1601. }
  1602. }
  1603. internal void OnConnectionClosed() {
  1604. TdsParserStateObject stateObj = _stateObj;
  1605. if (stateObj != null) {
  1606. stateObj.OnConnectionClosed();
  1607. }
  1608. }
  1609. private void OnRowsCopied(SqlRowsCopiedEventArgs value) {
  1610. SqlRowsCopiedEventHandler handler = _rowsCopiedEventHandler;
  1611. if(handler != null) {
  1612. handler(this, value);
  1613. }
  1614. }
  1615. // fxcop:
  1616. // Use the .Net Event System whenever appropriate.
  1617. private bool FireRowsCopiedEvent(long rowsCopied) {
  1618. // release lock to prevent possible deadlocks
  1619. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  1620. bool semaphoreLock = internalConnection._parserLock.CanBeReleasedFromAnyThread;
  1621. internalConnection._parserLock.Release();
  1622. SqlRowsCopiedEventArgs eventArgs = new SqlRowsCopiedEventArgs(rowsCopied);
  1623. try {
  1624. _insideRowsCopiedEvent = true;
  1625. this.OnRowsCopied(eventArgs);
  1626. }
  1627. finally {
  1628. _insideRowsCopiedEvent = false;
  1629. internalConnection._parserLock.Wait(canReleaseFromAnyThread: semaphoreLock);
  1630. }
  1631. return eventArgs.Abort;
  1632. }
  1633. // Reads a cell and then writes it.
  1634. // Read may block at this moment since there is no getValueAsync or DownStream async at this moment.
  1635. // When _isAsyncBulkCopy == true: Write will return Task (when async method runs asynchronously) or Null (when async call actually ran synchronously) for performance.
  1636. // When _isAsyncBulkCopy == false: Writes are purely [....]. This method reutrn null at the end.
  1637. //
  1638. private Task ReadWriteColumnValueAsync(int col) {
  1639. bool isSqlType;
  1640. bool isDataFeed;
  1641. bool isNull;
  1642. Object value = GetValueFromSourceRow(col, out isSqlType, out isDataFeed, out isNull); //this will return Task/null in future: as rTask
  1643. _SqlMetaData metadata = _sortedColumnMappings[col]._metadata;
  1644. if (!isDataFeed) {
  1645. value = ConvertValue(value, metadata, isNull, ref isSqlType, out isDataFeed);
  1646. }
  1647. //write part
  1648. Task writeTask = null;
  1649. if (metadata.type != SqlDbType.Variant) {
  1650. //this is the most common path
  1651. writeTask = _parser.WriteBulkCopyValue(value, metadata, _stateObj, isSqlType, isDataFeed, isNull); //returns Task/Null
  1652. }
  1653. else {
  1654. SqlBuffer.StorageType variantInternalType = SqlBuffer.StorageType.Empty;
  1655. if ((_SqlDataReaderRowSource != null) && (_connection.IsKatmaiOrNewer)) {
  1656. variantInternalType = _SqlDataReaderRowSource.GetVariantInternalStorageType(_sortedColumnMappings[col]._sourceColumnOrdinal);
  1657. }
  1658. if (variantInternalType == SqlBuffer.StorageType.DateTime2) {
  1659. _parser.WriteSqlVariantDateTime2(((DateTime)value), _stateObj);
  1660. }
  1661. else if (variantInternalType == SqlBuffer.StorageType.Date) {
  1662. _parser.WriteSqlVariantDate(((DateTime)value), _stateObj);
  1663. }
  1664. else {
  1665. writeTask = _parser.WriteSqlVariantDataRowValue(value, _stateObj); //returns Task/Null
  1666. }
  1667. }
  1668. return writeTask;
  1669. }
  1670. private void RegisterForConnectionCloseNotification<T>(ref Task<T> outterTask) {
  1671. SqlConnection connection = _connection;
  1672. if (connection == null) {
  1673. // No connection
  1674. throw ADP.ClosedConnectionError();
  1675. }
  1676. connection.RegisterForConnectionCloseNotification<T>(ref outterTask, this, SqlReferenceCollection.BulkCopyTag);
  1677. }
  1678. // Runs a loop to copy all columns of a single row.
  1679. // maintains a state by remembering #columns copied so far (int col)
  1680. // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
  1681. //
  1682. private Task CopyColumnsAsync(int col, TaskCompletionSource<object> source = null) {
  1683. Task resultTask = null, task = null;
  1684. int i;
  1685. try {
  1686. for (i = col; i < _sortedColumnMappings.Count; i++) {
  1687. task = ReadWriteColumnValueAsync(i); //First reads and then writes one cell value. Task 'task' is completed when reading task and writing task both are complete.
  1688. if (task != null) break; //task != null means we have a pending read/write Task.
  1689. }
  1690. if (task != null) {
  1691. if (source == null) {
  1692. source = new TaskCompletionSource<object>();
  1693. resultTask = source.Task;
  1694. }
  1695. CopyColumnsAsyncSetupContinuation(source, task, i);
  1696. return resultTask; //associated task will be completed when all colums (i.e. the entire row) is written
  1697. }
  1698. if (source != null) {
  1699. source.SetResult(null);
  1700. }
  1701. }
  1702. catch(Exception ex) {
  1703. if (source != null) {
  1704. source.TrySetException(ex);
  1705. }
  1706. else {
  1707. throw;
  1708. }
  1709. }
  1710. return resultTask;
  1711. }
  1712. // This is in its own method to avoid always allocating the lambda in CopyColumnsAsync
  1713. private void CopyColumnsAsyncSetupContinuation(TaskCompletionSource<object> source, Task task, int i) {
  1714. AsyncHelper.ContinueTask(task, source, () => {
  1715. if (i + 1 < _sortedColumnMappings.Count) {
  1716. CopyColumnsAsync(i + 1, source); //continue from the next column
  1717. }
  1718. else {
  1719. source.SetResult(null);
  1720. }
  1721. },
  1722. _connection.GetOpenTdsConnection());
  1723. }
  1724. // The notification logic.
  1725. //
  1726. private void CheckAndRaiseNotification() {
  1727. bool abortOperation = false; //returns if the operation needs to be aborted.
  1728. Exception exception = null;
  1729. _rowsCopied++;
  1730. // Fire event logic
  1731. if (_notifyAfter > 0) { // no action if no value specified
  1732. // (0=no notification)
  1733. if (_rowsUntilNotification > 0) { // > 0?
  1734. if (--_rowsUntilNotification == 0) { // decrement counter
  1735. // Fire event during operation. This is the users chance to abort the operation
  1736. try {
  1737. // it's also the user's chance to cause an exception ...
  1738. _stateObj.BcpLock = true;
  1739. abortOperation = FireRowsCopiedEvent(_rowsCopied);
  1740. Bid.Trace("<sc.SqlBulkCopy.WriteToServerInternal|INFO> \n");
  1741. // just in case some pathological person closes the target connection ...
  1742. if (ConnectionState.Open != _connection.State) {
  1743. exception = ADP.OpenConnectionRequired("CheckAndRaiseNotification", _connection.State);
  1744. }
  1745. }
  1746. catch (Exception e) {
  1747. //
  1748. if (!ADP.IsCatchableExceptionType(e)) {
  1749. exception = e;
  1750. }
  1751. else {
  1752. exception = OperationAbortedException.Aborted(e);
  1753. }
  1754. }
  1755. finally {
  1756. _stateObj.BcpLock = false;
  1757. }
  1758. if (!abortOperation) {
  1759. _rowsUntilNotification = _notifyAfter;
  1760. }
  1761. }
  1762. }
  1763. }
  1764. if (!abortOperation && _rowsUntilNotification > _notifyAfter) { // if the specified counter decreased we update
  1765. _rowsUntilNotification = _notifyAfter; // decreased we update otherwise not
  1766. }
  1767. if (exception == null && abortOperation) {
  1768. exception = OperationAbortedException.Aborted(null);
  1769. }
  1770. if (_connection.State != ConnectionState.Open) {
  1771. throw ADP.OpenConnectionRequired(SQL.WriteToServer, _connection.State);
  1772. }
  1773. if (exception != null) {
  1774. _parser._asyncWrite = false;
  1775. Task writeTask = _parser.WriteBulkCopyDone(_stateObj); //We should complete the current batch upto this row.
  1776. Debug.Assert(writeTask == null, "Task should not pend while doing sync bulk copy");
  1777. RunParser();
  1778. AbortTransaction();
  1779. throw exception; //this will be caught and put inside the Task's exception.
  1780. }
  1781. }
  1782. // Checks for cancellation. If cancel requested, cancels the task and returns the cancelled task
  1783. Task CheckForCancellation(CancellationToken cts, TaskCompletionSource<object> tcs) {
  1784. if (cts.IsCancellationRequested) {
  1785. if (tcs == null) {
  1786. tcs = new TaskCompletionSource<object>();
  1787. }
  1788. tcs.SetCanceled();
  1789. return tcs.Task;
  1790. }
  1791. else {
  1792. return null;
  1793. }
  1794. }
  1795. private TaskCompletionSource<object> ContinueTaskPend(Task task, TaskCompletionSource<object> source, Func<TaskCompletionSource<object>> action) {
  1796. if (task == null) {
  1797. return action();
  1798. }
  1799. else {
  1800. Debug.Assert(source != null, "source should already be initialized if task is not null");
  1801. AsyncHelper.ContinueTask(task, source, () => {
  1802. TaskCompletionSource<object> newSource = action();
  1803. Debug.Assert(newSource == null, "Shouldn't create a new source when one already exists");
  1804. });
  1805. }
  1806. return null;
  1807. }
  1808. // Copies all the rows in a batch
  1809. // maintains state machine with state variable: rowSoFar
  1810. // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
  1811. //
  1812. private Task CopyRowsAsync(int rowsSoFar, int totalRows, CancellationToken cts, TaskCompletionSource<object> source = null) {
  1813. Task resultTask = null;
  1814. Task task = null;
  1815. int i;
  1816. try {
  1817. //totalRows is batchsize which is 0 by default. In that case, we keep copying till the end (until _hasMoreRowToCopy == false).
  1818. for (i = rowsSoFar; (totalRows <= 0 || i < totalRows) && _hasMoreRowToCopy == true; i++) {
  1819. if (_isAsyncBulkCopy == true) {
  1820. resultTask = CheckForCancellation(cts, source);
  1821. if (resultTask != null) {
  1822. return resultTask; // task got cancelled!
  1823. }
  1824. }
  1825. _stateObj.WriteByte(TdsEnums.SQLROW);
  1826. task = CopyColumnsAsync(0); //copy 1 row
  1827. if (task == null) { //tsk is done.
  1828. CheckAndRaiseNotification(); //check notification logic after copying the row
  1829. //now we will read the next row.
  1830. Task readTask = ReadFromRowSourceAsync(cts); // read the next row. Caution: more is only valid if the task returns null. Otherwise, we wait for Task.Result
  1831. if (readTask != null) {
  1832. if (source == null) {
  1833. source = new TaskCompletionSource<object>();
  1834. }
  1835. resultTask = source.Task;
  1836. AsyncHelper.ContinueTask(readTask, source, () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
  1837. return resultTask; //associated task will be completed when all rows are copied to server/exception/cancelled.
  1838. }
  1839. }
  1840. else { //tsk != null, we add continuation for it.
  1841. source = source ?? new TaskCompletionSource<object>();
  1842. resultTask = source.Task;
  1843. AsyncHelper.ContinueTask(task, source, onSuccess: () => {
  1844. CheckAndRaiseNotification(); //check for notification now as the current row copy is done at this moment.
  1845. Task readTask = ReadFromRowSourceAsync(cts);
  1846. if (readTask == null) {
  1847. CopyRowsAsync(i + 1, totalRows, cts, source);
  1848. }
  1849. else {
  1850. AsyncHelper.ContinueTask(readTask, source, onSuccess: () => CopyRowsAsync(i + 1, totalRows, cts, source), connectionToDoom: _connection.GetOpenTdsConnection());
  1851. }
  1852. }, connectionToDoom: _connection.GetOpenTdsConnection());
  1853. return resultTask;
  1854. }
  1855. }
  1856. if (source != null) {
  1857. source.TrySetResult(null); // this is set only on the last call of async copy. But may not be set if everything runs synchronously.
  1858. }
  1859. }
  1860. catch (Exception ex) {
  1861. if (source != null) {
  1862. source.TrySetException(ex);
  1863. }
  1864. else {
  1865. throw;
  1866. }
  1867. }
  1868. return resultTask;
  1869. }
  1870. // Copies all the batches in a loop. One iteration for one batch.
  1871. // state variable is essentially not needed. (however, _hasMoreRowToCopy might be thought as a state variable)
  1872. // Returned Task could be null in two cases: (1) _isAsyncBulkCopy == false, or (2) _isAsyncBulkCopy == true but all async writes finished synchronously.
  1873. //
  1874. private Task CopyBatchesAsync(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source = null) {
  1875. Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
  1876. try {
  1877. while (_hasMoreRowToCopy) {
  1878. //pre->before every batch: Transaction, BulkCmd and metadata are done.
  1879. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  1880. if (IsCopyOption(SqlBulkCopyOptions.UseInternalTransaction)) { //internal trasaction is started prior to each batch if the Option is set.
  1881. internalConnection.ThreadHasParserLockForClose = true; // In case of error, tell the connection we already have the parser lock
  1882. try {
  1883. _internalTransaction = _connection.BeginTransaction();
  1884. }
  1885. finally {
  1886. internalConnection.ThreadHasParserLockForClose = false;
  1887. }
  1888. }
  1889. Task commandTask = SubmitUpdateBulkCommand(updateBulkCommandText);
  1890. if (commandTask == null) {
  1891. Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
  1892. if (continuedTask != null) {
  1893. // Continuation will take care of re-calling CopyBatchesAsync
  1894. return continuedTask;
  1895. }
  1896. }
  1897. else {
  1898. Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
  1899. if (source == null) {
  1900. source = new TaskCompletionSource<object>();
  1901. }
  1902. AsyncHelper.ContinueTask(commandTask, source, () => {
  1903. Task continuedTask = CopyBatchesAsyncContinued(internalResults, updateBulkCommandText, cts, source);
  1904. if (continuedTask == null) {
  1905. // Continuation finished [....], recall into CopyBatchesAsync to continue
  1906. CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
  1907. }
  1908. }, _connection.GetOpenTdsConnection());
  1909. return source.Task;
  1910. }
  1911. }
  1912. }
  1913. catch (Exception ex) {
  1914. if (source != null) {
  1915. source.TrySetException(ex);
  1916. return source.Task;
  1917. }
  1918. else {
  1919. throw;
  1920. }
  1921. }
  1922. // If we are here, then we finished everything
  1923. if (source != null) {
  1924. source.SetResult(null);
  1925. return source.Task;
  1926. }
  1927. else {
  1928. return null;
  1929. }
  1930. }
  1931. // Writes the MetaData and a single batch
  1932. // If this returns true, then the caller is responsible for starting the next stage
  1933. private Task CopyBatchesAsyncContinued(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
  1934. Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
  1935. try {
  1936. WriteMetaData(internalResults);
  1937. Task task = CopyRowsAsync(0, _savedBatchSize, cts); //this is copying 1 batch of rows and setting _hasMoreRowToCopy = true/false.
  1938. //post->after every batch
  1939. if (task != null) {
  1940. Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
  1941. if (source == null) { //first time only
  1942. source = new TaskCompletionSource<object>();
  1943. }
  1944. AsyncHelper.ContinueTask(task, source, () => {
  1945. Task continuedTask = CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
  1946. if (continuedTask == null) {
  1947. // Continuation finished [....], recall into CopyBatchesAsync to continue
  1948. CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
  1949. }
  1950. }, _connection.GetOpenTdsConnection(), _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false), () => CopyBatchesAsyncContinuedOnError(cleanupParser: true));
  1951. return source.Task;
  1952. }
  1953. else {
  1954. return CopyBatchesAsyncContinuedOnSuccess(internalResults, updateBulkCommandText, cts, source);
  1955. }
  1956. }
  1957. catch (Exception ex) {
  1958. if (source != null) {
  1959. source.TrySetException(ex);
  1960. return source.Task;
  1961. }
  1962. else {
  1963. throw;
  1964. }
  1965. }
  1966. }
  1967. // Takes care of finishing a single batch (write done, run parser, commit transaction)
  1968. // If this returns true, then the caller is responsible for starting the next stage
  1969. private Task CopyBatchesAsyncContinuedOnSuccess(BulkCopySimpleResultSet internalResults, string updateBulkCommandText, CancellationToken cts, TaskCompletionSource<object> source) {
  1970. Debug.Assert(source == null || !source.Task.IsCompleted, "Called into CopyBatchesAsync with a completed task!");
  1971. try {
  1972. Task writeTask = _parser.WriteBulkCopyDone(_stateObj);
  1973. if (writeTask == null) {
  1974. RunParser();
  1975. CommitTransaction();
  1976. return null;
  1977. }
  1978. else {
  1979. Debug.Assert(_isAsyncBulkCopy, "Task should not pend while doing sync bulk copy");
  1980. if (source == null) {
  1981. source = new TaskCompletionSource<object>();
  1982. }
  1983. AsyncHelper.ContinueTask(writeTask, source, () => {
  1984. try {
  1985. RunParser();
  1986. CommitTransaction();
  1987. }
  1988. catch (Exception) {
  1989. CopyBatchesAsyncContinuedOnError(cleanupParser: false);
  1990. throw;
  1991. }
  1992. // Always call back into CopyBatchesAsync
  1993. CopyBatchesAsync(internalResults, updateBulkCommandText, cts, source);
  1994. }, connectionToDoom: _connection.GetOpenTdsConnection(), onFailure: _ => CopyBatchesAsyncContinuedOnError(cleanupParser: false));
  1995. return source.Task;
  1996. }
  1997. }
  1998. catch (Exception ex) {
  1999. if (source != null) {
  2000. source.TrySetException(ex);
  2001. return source.Task;
  2002. }
  2003. else {
  2004. throw;
  2005. }
  2006. }
  2007. }
  2008. // Takes care of cleaning up the parser, stateObj and transaction when CopyBatchesAsync fails
  2009. private void CopyBatchesAsyncContinuedOnError(bool cleanupParser) {
  2010. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  2011. RuntimeHelpers.PrepareConstrainedRegions();
  2012. try {
  2013. #if DEBUG
  2014. TdsParser.ReliabilitySection tdsReliabilitySection = new TdsParser.ReliabilitySection();
  2015. RuntimeHelpers.PrepareConstrainedRegions();
  2016. try {
  2017. tdsReliabilitySection.Start();
  2018. #endif //DEBUG
  2019. if ((cleanupParser) && (_parser != null) && (_stateObj != null)) {
  2020. _parser._asyncWrite = false;
  2021. Task task = _parser.WriteBulkCopyDone(_stateObj);
  2022. Debug.Assert(task == null, "Write should not pend when error occurs");
  2023. RunParser();
  2024. }
  2025. if (_stateObj != null) {
  2026. CleanUpStateObjectOnError();
  2027. }
  2028. #if DEBUG
  2029. }
  2030. finally {
  2031. tdsReliabilitySection.Stop();
  2032. }
  2033. #endif //DEBUG
  2034. }
  2035. catch (OutOfMemoryException) {
  2036. internalConnection.DoomThisConnection();
  2037. throw;
  2038. }
  2039. catch (StackOverflowException) {
  2040. internalConnection.DoomThisConnection();
  2041. throw;
  2042. }
  2043. catch (ThreadAbortException) {
  2044. internalConnection.DoomThisConnection();
  2045. throw;
  2046. }
  2047. AbortTransaction();
  2048. }
  2049. //Cleans the stateobj. Used in a number of places, specially in exceptions
  2050. //
  2051. private void CleanUpStateObjectOnError() {
  2052. if (_stateObj != null) {
  2053. _parser.Connection.ThreadHasParserLockForClose = true;
  2054. try {
  2055. _stateObj.ResetBuffer();
  2056. _stateObj._outputPacketNumber = 1;
  2057. //If _parser is closed, sending attention will raise debug assertion, so we avoid it but not calling CancelRequest;
  2058. if (_parser.State == TdsParserState.OpenNotLoggedIn || _parser.State == TdsParserState.OpenLoggedIn) {
  2059. _stateObj.CancelRequest();
  2060. }
  2061. _stateObj._internalTimeout = false;
  2062. _stateObj.CloseSession();
  2063. _stateObj._bulkCopyOpperationInProgress = false;
  2064. _stateObj._bulkCopyWriteTimeout = false;
  2065. _stateObj = null;
  2066. }
  2067. finally {
  2068. _parser.Connection.ThreadHasParserLockForClose = false;
  2069. }
  2070. }
  2071. }
  2072. // The continuation part of WriteToServerInternalRest. Executes when the initial query task is completed. (see, WriteToServerInternalRest).
  2073. // It carries on the source which is passed from the WriteToServerInternalRest and performs SetResult when the entire copy is done.
  2074. // The carried on source may be null in case of [....] copy. So no need to SetResult at that time.
  2075. // It launches the copy operation.
  2076. //
  2077. private void WriteToServerInternalRestContinuedAsync(BulkCopySimpleResultSet internalResults, CancellationToken cts, TaskCompletionSource<object> source) {
  2078. Task task = null;
  2079. string updateBulkCommandText = null;
  2080. try {
  2081. updateBulkCommandText = AnalyzeTargetAndCreateUpdateBulkCommand(internalResults);
  2082. if (_sortedColumnMappings.Count != 0) {
  2083. _stateObj.SniContext = SniContext.Snix_SendRows;
  2084. _savedBatchSize = _batchSize; // for safety. If someone changes the batchsize during copy we still be using _savedBatchSize
  2085. _rowsUntilNotification = _notifyAfter;
  2086. _rowsCopied = 0;
  2087. _currentRowMetadata = new SourceColumnMetadata[_sortedColumnMappings.Count];
  2088. for (int i = 0; i < _currentRowMetadata.Length; i++) {
  2089. _currentRowMetadata[i] = GetColumnMetadata(i);
  2090. }
  2091. task = CopyBatchesAsync(internalResults, updateBulkCommandText, cts); //launch the BulkCopy
  2092. }
  2093. if (task != null) {
  2094. if (source == null) {
  2095. source = new TaskCompletionSource<object>();
  2096. }
  2097. AsyncHelper.ContinueTask(task, source, () => {
  2098. //Bulk copy task is completed at this moment.
  2099. //Todo: The cases may be combined for code reuse.
  2100. if (task.IsCanceled) {
  2101. _localColumnMappings = null;
  2102. try {
  2103. CleanUpStateObjectOnError();
  2104. }
  2105. finally {
  2106. source.SetCanceled();
  2107. }
  2108. }
  2109. else if (task.Exception != null) {
  2110. source.SetException(task.Exception.InnerException);
  2111. }
  2112. else {
  2113. _localColumnMappings = null;
  2114. try {
  2115. CleanUpStateObjectOnError();
  2116. }
  2117. finally {
  2118. if (source != null) {
  2119. if (cts.IsCancellationRequested) { //We may get cancellation req even after the entire copy.
  2120. source.SetCanceled();
  2121. }
  2122. else {
  2123. source.SetResult(null);
  2124. }
  2125. }
  2126. }
  2127. }
  2128. }, _connection.GetOpenTdsConnection());
  2129. return;
  2130. }
  2131. else {
  2132. _localColumnMappings = null;
  2133. try {
  2134. CleanUpStateObjectOnError();
  2135. } catch (Exception cleanupEx) {
  2136. Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
  2137. }
  2138. if(source != null) {
  2139. source.SetResult(null);
  2140. }
  2141. }
  2142. }
  2143. catch(Exception ex){
  2144. _localColumnMappings = null;
  2145. try {
  2146. CleanUpStateObjectOnError();
  2147. } catch (Exception cleanupEx) {
  2148. Debug.Fail("Unexpected exception during CleanUpstateObjectOnError (ignored)", cleanupEx.ToString());
  2149. }
  2150. if (source != null) {
  2151. source.TrySetException(ex);
  2152. }
  2153. else {
  2154. throw;
  2155. }
  2156. }
  2157. }
  2158. // Rest of the WriteToServerInternalAsync method.
  2159. // It carries on the source from its caller WriteToServerInternal.
  2160. // source is null in case of [....] bcp. But valid in case of Async bcp.
  2161. // It calls the WriteToServerInternalRestContinuedAsync as a continuation of the initial query task.
  2162. //
  2163. private void WriteToServerInternalRestAsync(CancellationToken cts, TaskCompletionSource<object> source) {
  2164. Debug.Assert(_hasMoreRowToCopy, "first time it is true, otherwise this method would not have been called.");
  2165. _hasMoreRowToCopy = true;
  2166. Task<BulkCopySimpleResultSet> internalResultsTask = null;
  2167. BulkCopySimpleResultSet internalResults = new BulkCopySimpleResultSet();
  2168. SqlInternalConnectionTds internalConnection = _connection.GetOpenTdsConnection();
  2169. try {
  2170. _parser = _connection.Parser;
  2171. _parser._asyncWrite = _isAsyncBulkCopy; //very important!
  2172. Task reconnectTask;
  2173. try {
  2174. reconnectTask = _connection.ValidateAndReconnect(
  2175. () => {
  2176. if (_parserLock != null) {
  2177. _parserLock.Release();
  2178. _parserLock = null;
  2179. }
  2180. }, BulkCopyTimeout);
  2181. }
  2182. catch (SqlException ex) {
  2183. throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
  2184. }
  2185. if (reconnectTask != null) {
  2186. if (_isAsyncBulkCopy) {
  2187. CancellationTokenRegistration regReconnectCancel = new CancellationTokenRegistration();
  2188. TaskCompletionSource<object> cancellableReconnectTS = new TaskCompletionSource<object>();
  2189. if (cts.CanBeCanceled) {
  2190. regReconnectCancel = cts.Register(() => cancellableReconnectTS.TrySetCanceled());
  2191. }
  2192. AsyncHelper.ContinueTask(reconnectTask, cancellableReconnectTS, () => { cancellableReconnectTS.SetResult(null); });
  2193. // no need to cancel timer since SqlBulkCopy creates specific task source for reconnection
  2194. AsyncHelper.SetTimeoutException(cancellableReconnectTS, BulkCopyTimeout,
  2195. ()=>{return SQL.BulkLoadInvalidDestinationTable(_destinationTableName, SQL.CR_ReconnectTimeout());}, CancellationToken.None);
  2196. AsyncHelper.ContinueTask(cancellableReconnectTS.Task, source,
  2197. () => {
  2198. regReconnectCancel.Dispose();
  2199. if (_parserLock != null) {
  2200. _parserLock.Release();
  2201. _parserLock = null;
  2202. }
  2203. _parserLock = _connection.GetOpenTdsConnection()._parserLock;
  2204. _parserLock.Wait(canReleaseFromAnyThread: true);
  2205. WriteToServerInternalRestAsync(cts, source);
  2206. },
  2207. connectionToAbort: _connection,
  2208. onFailure: (e) => { regReconnectCancel.Dispose(); },
  2209. onCancellation: () => { regReconnectCancel.Dispose(); },
  2210. exceptionConverter: (ex) => SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex));
  2211. return;
  2212. }
  2213. else {
  2214. try {
  2215. AsyncHelper.WaitForCompletion(reconnectTask, this.BulkCopyTimeout, () => { throw SQL.CR_ReconnectTimeout(); } );
  2216. }
  2217. catch (SqlException ex) {
  2218. throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex); // preserve behavior (throw InvalidOperationException on failure to connect)
  2219. }
  2220. _parserLock = _connection.GetOpenTdsConnection()._parserLock;
  2221. _parserLock.Wait(canReleaseFromAnyThread: false);
  2222. WriteToServerInternalRestAsync(cts, source);
  2223. return;
  2224. }
  2225. }
  2226. if (_isAsyncBulkCopy) {
  2227. _connection.AddWeakReference(this, SqlReferenceCollection.BulkCopyTag);
  2228. }
  2229. internalConnection.ThreadHasParserLockForClose = true; // In case of error, let the connection know that we already have the parser lock
  2230. try {
  2231. _stateObj = _parser.GetSession(this);
  2232. _stateObj._bulkCopyOpperationInProgress = true;
  2233. _stateObj.StartSession(ObjectID);
  2234. }
  2235. finally {
  2236. internalConnection.ThreadHasParserLockForClose = false;
  2237. }
  2238. try {
  2239. internalResultsTask = CreateAndExecuteInitialQueryAsync(out internalResults); //Task/Null
  2240. }
  2241. catch (SqlException ex) {
  2242. throw SQL.BulkLoadInvalidDestinationTable(_destinationTableName, ex);
  2243. }
  2244. if(internalResultsTask != null) {
  2245. AsyncHelper.ContinueTask(internalResultsTask, source, () => WriteToServerInternalRestContinuedAsync(internalResultsTask.Result, cts, source), _connection.GetOpenTdsConnection());
  2246. }
  2247. else {
  2248. Debug.Assert(internalResults != null, "Executing initial query finished synchronously, but there were no results");
  2249. WriteToServerInternalRestContinuedAsync(internalResults, cts, source); //internalResults is valid here.
  2250. }
  2251. }
  2252. catch (Exception ex) {
  2253. if (source != null) {
  2254. source.TrySetException(ex);
  2255. }
  2256. else {
  2257. throw;
  2258. }
  2259. }
  2260. }
  2261. // This returns Task for Async, Null for [....]
  2262. //
  2263. private Task WriteToServerInternalAsync(CancellationToken ctoken) {
  2264. TaskCompletionSource<object> source = null;
  2265. Task<object> resultTask = null;
  2266. if (_isAsyncBulkCopy) {
  2267. source = new TaskCompletionSource<object>(); //creating the completion source/Task that we pass to application
  2268. resultTask = source.Task;
  2269. RegisterForConnectionCloseNotification(ref resultTask);
  2270. }
  2271. if (_destinationTableName == null) {
  2272. if(source != null) {
  2273. source.SetException(SQL.BulkLoadMissingDestinationTable()); //no table to copy
  2274. }
  2275. else {
  2276. throw SQL.BulkLoadMissingDestinationTable();
  2277. }
  2278. return resultTask;
  2279. }
  2280. try {
  2281. Task readTask = ReadFromRowSourceAsync(ctoken); // readTask == reading task. This is the first read call. "more" is valid only if readTask == null;
  2282. if (readTask == null) { //synchronously finished reading.
  2283. if (!_hasMoreRowToCopy) { //no rows in the source to copy!
  2284. if(source != null) {
  2285. source.SetResult(null);
  2286. }
  2287. return resultTask;
  2288. }
  2289. else { //true, we have more rows.
  2290. WriteToServerInternalRestAsync(ctoken, source); //rest of the method, passing the same completion and returning the incomplete task (ret).
  2291. return resultTask;
  2292. }
  2293. }
  2294. else {
  2295. Debug.Assert(_isAsyncBulkCopy, "Read must not return a Task in the Sync mode");
  2296. AsyncHelper.ContinueTask(readTask, source, () => {
  2297. if (!_hasMoreRowToCopy) {
  2298. source.SetResult(null); //no rows to copy!
  2299. }
  2300. else {
  2301. WriteToServerInternalRestAsync(ctoken, source); //passing the same completion which will be completed by the Callee.
  2302. }
  2303. }, _connection.GetOpenTdsConnection());
  2304. return resultTask;
  2305. }
  2306. }
  2307. catch(Exception ex) {
  2308. if (source != null) {
  2309. source.TrySetException(ex);
  2310. }
  2311. else {
  2312. throw;
  2313. }
  2314. }
  2315. return resultTask;
  2316. }
  2317. }//End of SqlBulkCopy Class
  2318. }//End of namespace