SqlStream.cs 23 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621
  1. //------------------------------------------------------------------------------
  2. // <copyright file="SqlStream.cs" company="Microsoft">
  3. // Copyright (c) Microsoft Corporation. All rights reserved.
  4. // </copyright>
  5. // <owner current="true" primary="true">[....]</owner>
  6. // <owner current="true" primary="false">[....]</owner>
  7. //------------------------------------------------------------------------------
  8. namespace System.Data.SqlClient {
  9. using System;
  10. using System.Collections;
  11. using System.Collections.Generic;
  12. using System.ComponentModel;
  13. using System.Data;
  14. using System.Data.Common;
  15. using System.Diagnostics;
  16. using System.Globalization;
  17. using System.IO;
  18. using System.Runtime.InteropServices;
  19. using System.Text;
  20. using System.Xml;
  21. using System.Reflection;
  22. using System.Runtime.CompilerServices;
  23. sealed internal class SqlStream : Stream {
  24. private SqlDataReader _reader; // reader we will stream off
  25. private int _columnOrdinal;
  26. private long _bytesCol;
  27. int _bom;
  28. private byte[] _bufferedData;
  29. private bool _processAllRows;
  30. private bool _advanceReader;
  31. private bool _readFirstRow = false;
  32. private bool _endOfColumn = false;
  33. internal SqlStream(SqlDataReader reader, bool addByteOrderMark, bool processAllRows) :
  34. this(0, reader, addByteOrderMark, processAllRows, true) {
  35. }
  36. internal SqlStream(int columnOrdinal, SqlDataReader reader, bool addByteOrderMark , bool processAllRows, bool advanceReader) {
  37. _columnOrdinal = columnOrdinal;
  38. _reader = reader;
  39. _bom = addByteOrderMark ? 0xfeff : 0;
  40. _processAllRows = processAllRows;
  41. _advanceReader = advanceReader;
  42. }
  43. override public bool CanRead {
  44. get {
  45. return true;
  46. }
  47. }
  48. override public bool CanSeek {
  49. get {
  50. return false;
  51. }
  52. }
  53. override public bool CanWrite {
  54. get {
  55. return false;
  56. }
  57. }
  58. override public long Length {
  59. get {
  60. throw ADP.NotSupported();
  61. }
  62. }
  63. override public long Position {
  64. get {
  65. throw ADP.NotSupported();
  66. }
  67. set {
  68. throw ADP.NotSupported();
  69. }
  70. }
  71. override protected void Dispose(bool disposing) {
  72. try {
  73. if (disposing && _advanceReader && _reader != null && !_reader.IsClosed) {
  74. _reader.Close();
  75. }
  76. _reader = null;
  77. }
  78. finally {
  79. base.Dispose(disposing);
  80. }
  81. }
  82. override public void Flush() {
  83. throw ADP.NotSupported();
  84. }
  85. override public int Read(byte[] buffer, int offset, int count) {
  86. int intCount = 0;
  87. int cBufferedData = 0;
  88. if ((null == _reader)) {
  89. throw ADP.StreamClosed(ADP.Read);
  90. }
  91. if (null == buffer) {
  92. throw ADP.ArgumentNull(ADP.ParameterBuffer);
  93. }
  94. if ((offset < 0) || (count < 0)) {
  95. throw ADP.ArgumentOutOfRange(String.Empty, (offset < 0 ? ADP.ParameterOffset : ADP.ParameterCount));
  96. }
  97. if (buffer.Length - offset < count) {
  98. throw ADP.ArgumentOutOfRange(ADP.ParameterCount);
  99. }
  100. // Need to find out if we should add byte order mark or not.
  101. // We need to add this if we are getting ntext xml, not if we are getting binary xml
  102. // Binary Xml always begins with the bytes 0xDF and 0xFF
  103. // If we aren't getting these, then we are getting unicode xml
  104. if (_bom > 0 ) {
  105. // Read and buffer the first two bytes
  106. _bufferedData = new byte[2];
  107. cBufferedData = ReadBytes(_bufferedData, 0, 2);
  108. // Check to se if we should add the byte order mark
  109. if ((cBufferedData < 2) || ((_bufferedData[0] == 0xDF) && (_bufferedData[1] == 0xFF))){
  110. _bom = 0;
  111. }
  112. while (count > 0) {
  113. if (_bom > 0) {
  114. buffer[offset] = (byte)_bom;
  115. _bom >>= 8;
  116. offset++;
  117. count--;
  118. intCount++;
  119. }
  120. else {
  121. break;
  122. }
  123. }
  124. }
  125. if (cBufferedData > 0) {
  126. while (count > 0) {
  127. buffer[offset++] = _bufferedData[0];
  128. intCount++;
  129. count--;
  130. if ((cBufferedData > 1) && (count > 0)) {
  131. buffer[offset++] = _bufferedData[1];
  132. intCount++;
  133. count--;
  134. break;
  135. }
  136. }
  137. _bufferedData = null;
  138. }
  139. intCount += ReadBytes(buffer, offset, count);
  140. return intCount;
  141. }
  142. private static bool AdvanceToNextRow(SqlDataReader reader) {
  143. Debug.Assert(reader != null && !reader.IsClosed);
  144. // this method skips empty result sets
  145. do {
  146. if (reader.Read()) {
  147. return true;
  148. }
  149. } while (reader.NextResult());
  150. // no more rows
  151. return false;
  152. }
  153. private int ReadBytes(byte[] buffer, int offset, int count) {
  154. bool gotData = true;
  155. int intCount = 0;
  156. int cb = 0;
  157. if (_reader.IsClosed || _endOfColumn) {
  158. return 0;
  159. }
  160. try {
  161. while (count > 0) {
  162. // if I haven't read any bytes, get the next row
  163. if (_advanceReader && (0 == _bytesCol)) {
  164. gotData = false;
  165. if (_readFirstRow && !_processAllRows) {
  166. // for XML column, stop processing after the first row
  167. // no op here - reader is closed after the end of this loop
  168. }
  169. else if (AdvanceToNextRow(_reader)) {
  170. _readFirstRow = true;
  171. if (_reader.IsDBNull(_columnOrdinal)) {
  172. // VSTFDEVDIV 479659: handle row with DBNULL as empty data
  173. // for XML column, processing is stopped on the next loop since _readFirstRow is true
  174. continue;
  175. }
  176. // the value is not null, read it
  177. gotData = true;
  178. }
  179. // else AdvanceToNextRow has returned false - no more rows or result sets remained, stop processing
  180. }
  181. if (gotData) {
  182. cb = (int) _reader.GetBytesInternal(_columnOrdinal, _bytesCol, buffer, offset, count);
  183. if (cb < count) {
  184. _bytesCol = 0;
  185. gotData = false;
  186. if (!_advanceReader) {
  187. _endOfColumn = true;
  188. }
  189. }
  190. else {
  191. Debug.Assert(cb == count);
  192. _bytesCol += cb;
  193. }
  194. // we are guaranteed that cb is < Int32.Max since we always pass in count which is of type Int32 to
  195. // our getbytes interface
  196. count -= (int)cb;
  197. offset += (int)cb;
  198. intCount += (int)cb;
  199. }
  200. else {
  201. break; // no more data available, we are done
  202. }
  203. }
  204. if (!gotData && _advanceReader) {
  205. _reader.Close(); // Need to close the reader if we are done reading
  206. }
  207. }
  208. catch (Exception e) {
  209. if (_advanceReader && ADP.IsCatchableExceptionType(e)) {
  210. _reader.Close();
  211. }
  212. throw;
  213. }
  214. return intCount;
  215. }
  216. internal XmlReader ToXmlReader() {
  217. // Dev11 Bug #315513: Exception type breaking change from 4.0 RTM when calling GetChars on null xml
  218. // We need to wrap all exceptions inside a TargetInvocationException to simulate calling CreateSqlReader via MethodInfo.Invoke
  219. return SqlTypes.SqlXml.CreateSqlXmlReader(this, closeInput: true, throwTargetInvocationExceptions: true);
  220. }
  221. override public long Seek(long offset, SeekOrigin origin) {
  222. throw ADP.NotSupported();
  223. }
  224. override public void SetLength(long value) {
  225. throw ADP.NotSupported();
  226. }
  227. override public void Write(byte[] buffer, int offset, int count) {
  228. throw ADP.NotSupported();
  229. }
  230. }
  231. // XmlTextReader does not read all the bytes off the network buffers, so we have to cache it here in the random access
  232. // case. This causes double buffering and is a perf hit, but this is not the high perf way for accessing this type of data.
  233. // In the case of sequential access, we do not have to do any buffering since the XmlTextReader we return can become
  234. // invalid as soon as we move off the current column.
  235. sealed internal class SqlCachedStream : Stream {
  236. int _currentPosition; // Position within the current array byte
  237. int _currentArrayIndex; // Index into the _cachedBytes ArrayList
  238. List<byte[]> _cachedBytes;
  239. long _totalLength;
  240. // Reads off from the network buffer and caches bytes. Only reads one column value in the current row.
  241. internal SqlCachedStream(SqlCachedBuffer sqlBuf ) {
  242. _cachedBytes = sqlBuf.CachedBytes;
  243. }
  244. override public bool CanRead {
  245. get {
  246. return true;
  247. }
  248. }
  249. override public bool CanSeek {
  250. get {
  251. return true;
  252. }
  253. }
  254. override public bool CanWrite {
  255. get {
  256. return false;
  257. }
  258. }
  259. override public long Length {
  260. get {
  261. return TotalLength;
  262. }
  263. }
  264. override public long Position {
  265. get {
  266. long pos = 0;
  267. if (_currentArrayIndex > 0) {
  268. for (int ii = 0 ; ii < _currentArrayIndex ; ii++) {
  269. pos += _cachedBytes[ii].Length;
  270. }
  271. }
  272. pos += _currentPosition;
  273. return pos;
  274. }
  275. set {
  276. if (null == _cachedBytes) {
  277. throw ADP.StreamClosed(ADP.ParameterSetPosition);
  278. }
  279. SetInternalPosition(value, ADP.ParameterSetPosition);
  280. }
  281. }
  282. override protected void Dispose(bool disposing) {
  283. try {
  284. if (disposing && _cachedBytes != null)
  285. _cachedBytes.Clear();
  286. _cachedBytes = null;
  287. _currentPosition = 0;
  288. _currentArrayIndex = 0;
  289. _totalLength = 0;
  290. }
  291. finally {
  292. base.Dispose(disposing);
  293. }
  294. }
  295. override public void Flush() {
  296. throw ADP.NotSupported();
  297. }
  298. override public int Read(byte[] buffer, int offset, int count) {
  299. int cb;
  300. int intCount = 0;
  301. if (null == _cachedBytes) {
  302. throw ADP.StreamClosed(ADP.Read);
  303. }
  304. if (null == buffer) {
  305. throw ADP.ArgumentNull(ADP.ParameterBuffer);
  306. }
  307. if ((offset < 0) || (count < 0)) {
  308. throw ADP.ArgumentOutOfRange(String.Empty, (offset < 0 ? ADP.ParameterOffset : ADP.ParameterCount));
  309. }
  310. if (buffer.Length - offset < count) {
  311. throw ADP.ArgumentOutOfRange(ADP.ParameterCount);
  312. }
  313. if (_cachedBytes.Count <= _currentArrayIndex) {
  314. return 0; // Everything is read!
  315. }
  316. while (count > 0) {
  317. if (_cachedBytes[_currentArrayIndex].Length <= _currentPosition) {
  318. _currentArrayIndex++; // We are done reading this chunk, go to next
  319. if (_cachedBytes.Count > _currentArrayIndex) {
  320. _currentPosition = 0;
  321. }
  322. else {
  323. break;
  324. }
  325. }
  326. cb = _cachedBytes[_currentArrayIndex].Length - _currentPosition;
  327. if (cb > count)
  328. cb = count;
  329. Array.Copy(_cachedBytes[_currentArrayIndex], _currentPosition, buffer, offset, cb);
  330. _currentPosition += cb;
  331. count -= (int)cb;
  332. offset += (int)cb;
  333. intCount += (int)cb;
  334. }
  335. return intCount;
  336. }
  337. override public long Seek(long offset, SeekOrigin origin) {
  338. long pos = 0;
  339. if (null == _cachedBytes) {
  340. throw ADP.StreamClosed(ADP.Read);
  341. }
  342. switch(origin) {
  343. case SeekOrigin.Begin:
  344. SetInternalPosition(offset, ADP.ParameterOffset);
  345. break;
  346. case SeekOrigin.Current:
  347. pos = offset + Position;
  348. SetInternalPosition(pos, ADP.ParameterOffset);
  349. break;
  350. case SeekOrigin.End:
  351. pos = TotalLength + offset;
  352. SetInternalPosition(pos, ADP.ParameterOffset);
  353. break;
  354. default:
  355. throw ADP.InvalidSeekOrigin(ADP.ParameterOffset);
  356. }
  357. return pos;
  358. }
  359. override public void SetLength(long value) {
  360. throw ADP.NotSupported();
  361. }
  362. override public void Write(byte[] buffer, int offset, int count) {
  363. throw ADP.NotSupported();
  364. }
  365. private void SetInternalPosition(long lPos, string argumentName) {
  366. long pos = lPos;
  367. if (pos < 0) {
  368. throw new ArgumentOutOfRangeException(argumentName);
  369. }
  370. for (int ii = 0 ; ii < _cachedBytes.Count ; ii++) {
  371. if (pos > _cachedBytes[ii].Length) {
  372. pos -= _cachedBytes[ii].Length;
  373. }
  374. else {
  375. _currentArrayIndex = ii;
  376. _currentPosition = (int)pos;
  377. return;
  378. }
  379. }
  380. if (pos > 0)
  381. throw new ArgumentOutOfRangeException(argumentName);
  382. }
  383. private long TotalLength {
  384. get {
  385. if ((_totalLength == 0) && (_cachedBytes != null)) {
  386. long pos = 0;
  387. for (int ii = 0 ; ii < _cachedBytes.Count ; ii++) {
  388. pos += _cachedBytes[ii].Length;
  389. }
  390. _totalLength = pos;
  391. }
  392. return _totalLength;
  393. }
  394. }
  395. }
  396. sealed internal class SqlStreamingXml {
  397. int _columnOrdinal;
  398. SqlDataReader _reader;
  399. XmlReader _xmlReader;
  400. XmlWriter _xmlWriter;
  401. StringWriter _strWriter;
  402. long _charsRemoved;
  403. public SqlStreamingXml(int i, SqlDataReader reader) {
  404. _columnOrdinal = i;
  405. _reader = reader;
  406. }
  407. public void Close() {
  408. ((IDisposable)_xmlWriter).Dispose();
  409. ((IDisposable)_xmlReader).Dispose();
  410. _reader = null;
  411. _xmlReader = null;
  412. _xmlWriter = null;
  413. _strWriter = null;
  414. }
  415. public int ColumnOrdinal {
  416. get {
  417. return _columnOrdinal;
  418. }
  419. }
  420. public long GetChars(long dataIndex, char[] buffer, int bufferIndex, int length) {
  421. if (_xmlReader == null) {
  422. SqlStream sqlStream = new SqlStream( _columnOrdinal, _reader, true /* addByteOrderMark */, false /* processAllRows*/, false /*advanceReader*/);
  423. _xmlReader = sqlStream.ToXmlReader();
  424. _strWriter = new StringWriter((System.IFormatProvider)null);
  425. XmlWriterSettings writerSettings = new XmlWriterSettings();
  426. writerSettings.CloseOutput = true; // close the memory stream when done
  427. writerSettings.ConformanceLevel = ConformanceLevel.Fragment;
  428. _xmlWriter = XmlWriter.Create(_strWriter, writerSettings);
  429. }
  430. int charsToSkip = 0;
  431. int cnt = 0;
  432. if (dataIndex < _charsRemoved) {
  433. throw ADP.NonSeqByteAccess(dataIndex, _charsRemoved, ADP.GetChars);
  434. }
  435. else if (dataIndex > _charsRemoved) {
  436. charsToSkip = (int)(dataIndex - _charsRemoved);
  437. }
  438. // If buffer parameter is null, we have to return -1 since there is no way for us to know the
  439. // total size up front without reading and converting the XML.
  440. if (buffer == null) {
  441. return (long)(-1);
  442. }
  443. StringBuilder strBldr = _strWriter.GetStringBuilder();
  444. while (!_xmlReader.EOF) {
  445. if (strBldr.Length >= (length+ charsToSkip)) {
  446. break;
  447. }
  448. // Can't call _xmlWriter.WriteNode here, since it reads all of the data in before returning the first char.
  449. // Do own implementation of WriteNode instead that reads just enough data to return the required number of chars
  450. //_xmlWriter.WriteNode(_xmlReader, true);
  451. // _xmlWriter.Flush();
  452. WriteXmlElement();
  453. if (charsToSkip > 0) {
  454. // Aggressively remove the characters we want to skip to avoid growing StringBuilder size too much
  455. cnt = strBldr.Length < charsToSkip ? strBldr.Length : charsToSkip;
  456. strBldr.Remove(0, cnt);
  457. charsToSkip -= cnt;
  458. _charsRemoved +=(long)cnt;
  459. }
  460. }
  461. if (charsToSkip > 0) {
  462. cnt = strBldr.Length < charsToSkip ? strBldr.Length : charsToSkip;
  463. strBldr.Remove(0, cnt);
  464. charsToSkip -= cnt;
  465. _charsRemoved +=(long)cnt;
  466. }
  467. if (strBldr.Length == 0) {
  468. return 0;
  469. }
  470. // At this point charsToSkip must be 0
  471. Debug.Assert(charsToSkip == 0);
  472. cnt = strBldr.Length < length ? strBldr.Length : length;
  473. for (int i = 0 ; i < cnt ; i++) {
  474. buffer[bufferIndex + i] = strBldr[i];
  475. }
  476. // Remove the characters we have already returned
  477. strBldr.Remove(0, cnt);
  478. _charsRemoved += (long)cnt;
  479. return (long)cnt;
  480. }
  481. // This method duplicates the work of XmlWriter.WriteNode except that it reads one element at a time
  482. // instead of reading the entire node like XmlWriter.
  483. private void WriteXmlElement() {
  484. if (_xmlReader.EOF)
  485. return;
  486. bool canReadChunk = _xmlReader.CanReadValueChunk;
  487. char[] writeNodeBuffer = null;
  488. // Constants
  489. const int WriteNodeBufferSize = 1024;
  490. _xmlReader.Read();
  491. switch (_xmlReader.NodeType) {
  492. case XmlNodeType.Element:
  493. _xmlWriter.WriteStartElement(_xmlReader.Prefix, _xmlReader.LocalName, _xmlReader.NamespaceURI);
  494. _xmlWriter.WriteAttributes(_xmlReader, true);
  495. if (_xmlReader.IsEmptyElement) {
  496. _xmlWriter.WriteEndElement();
  497. break;
  498. }
  499. break;
  500. case XmlNodeType.Text:
  501. if (canReadChunk) {
  502. if (writeNodeBuffer == null) {
  503. writeNodeBuffer = new char[WriteNodeBufferSize];
  504. }
  505. int read;
  506. while ((read = _xmlReader.ReadValueChunk(writeNodeBuffer, 0, WriteNodeBufferSize)) > 0) {
  507. _xmlWriter.WriteChars(writeNodeBuffer, 0, read);
  508. }
  509. }
  510. else {
  511. _xmlWriter.WriteString(_xmlReader.Value);
  512. }
  513. break;
  514. case XmlNodeType.Whitespace:
  515. case XmlNodeType.SignificantWhitespace:
  516. _xmlWriter.WriteWhitespace(_xmlReader.Value);
  517. break;
  518. case XmlNodeType.CDATA:
  519. _xmlWriter.WriteCData(_xmlReader.Value);
  520. break;
  521. case XmlNodeType.EntityReference:
  522. _xmlWriter.WriteEntityRef(_xmlReader.Name);
  523. break;
  524. case XmlNodeType.XmlDeclaration:
  525. case XmlNodeType.ProcessingInstruction:
  526. _xmlWriter.WriteProcessingInstruction(_xmlReader.Name, _xmlReader.Value);
  527. break;
  528. case XmlNodeType.DocumentType:
  529. _xmlWriter.WriteDocType(_xmlReader.Name, _xmlReader.GetAttribute("PUBLIC"), _xmlReader.GetAttribute("SYSTEM"), _xmlReader.Value);
  530. break;
  531. case XmlNodeType.Comment:
  532. _xmlWriter.WriteComment(_xmlReader.Value);
  533. break;
  534. case XmlNodeType.EndElement:
  535. _xmlWriter.WriteFullEndElement();
  536. break;
  537. }
  538. _xmlWriter.Flush();
  539. }
  540. }
  541. }