Connection.cs 31 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Diagnostics;
  7. using System.IO;
  8. using System.Net;
  9. using System.Runtime;
  10. using System.ServiceModel;
  11. using System.Threading;
  12. using System.ServiceModel.Diagnostics.Application;
  13. // Low level abstraction for a socket/pipe
  14. interface IConnection
  15. {
  16. byte[] AsyncReadBuffer { get; }
  17. int AsyncReadBufferSize { get; }
  18. TraceEventType ExceptionEventType { get; set; }
  19. IPEndPoint RemoteIPEndPoint { get; }
  20. void Abort();
  21. void Close(TimeSpan timeout, bool asyncAndLinger);
  22. void Shutdown(TimeSpan timeout);
  23. AsyncCompletionResult BeginWrite(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout,
  24. WaitCallback callback, object state);
  25. void EndWrite();
  26. void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout);
  27. void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout, BufferManager bufferManager);
  28. int Read(byte[] buffer, int offset, int size, TimeSpan timeout);
  29. AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout, WaitCallback callback, object state);
  30. int EndRead();
  31. // very ugly listener stuff
  32. object DuplicateAndClose(int targetProcessId);
  33. object GetCoreTransport();
  34. IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state);
  35. bool EndValidate(IAsyncResult result);
  36. }
  37. // Low level abstraction for connecting a socket/pipe
  38. interface IConnectionInitiator
  39. {
  40. IConnection Connect(Uri uri, TimeSpan timeout);
  41. IAsyncResult BeginConnect(Uri uri, TimeSpan timeout, AsyncCallback callback, object state);
  42. IConnection EndConnect(IAsyncResult result);
  43. }
  44. // Low level abstraction for listening for sockets/pipes
  45. interface IConnectionListener : IDisposable
  46. {
  47. void Listen();
  48. IAsyncResult BeginAccept(AsyncCallback callback, object state);
  49. IConnection EndAccept(IAsyncResult result);
  50. }
  51. abstract class DelegatingConnection : IConnection
  52. {
  53. IConnection connection;
  54. protected DelegatingConnection(IConnection connection)
  55. {
  56. this.connection = connection;
  57. }
  58. public virtual byte[] AsyncReadBuffer
  59. {
  60. get { return connection.AsyncReadBuffer; }
  61. }
  62. public virtual int AsyncReadBufferSize
  63. {
  64. get { return connection.AsyncReadBufferSize; }
  65. }
  66. public TraceEventType ExceptionEventType
  67. {
  68. get { return connection.ExceptionEventType; }
  69. set { connection.ExceptionEventType = value; }
  70. }
  71. protected IConnection Connection
  72. {
  73. get { return connection; }
  74. }
  75. public IPEndPoint RemoteIPEndPoint
  76. {
  77. get { return connection.RemoteIPEndPoint; }
  78. }
  79. public virtual void Abort()
  80. {
  81. connection.Abort();
  82. }
  83. public virtual void Close(TimeSpan timeout, bool asyncAndLinger)
  84. {
  85. connection.Close(timeout, asyncAndLinger);
  86. }
  87. public virtual void Shutdown(TimeSpan timeout)
  88. {
  89. connection.Shutdown(timeout);
  90. }
  91. public virtual object DuplicateAndClose(int targetProcessId)
  92. {
  93. return connection.DuplicateAndClose(targetProcessId);
  94. }
  95. public virtual object GetCoreTransport()
  96. {
  97. return connection.GetCoreTransport();
  98. }
  99. public virtual IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state)
  100. {
  101. return connection.BeginValidate(uri, callback, state);
  102. }
  103. public virtual bool EndValidate(IAsyncResult result)
  104. {
  105. return connection.EndValidate(result);
  106. }
  107. public virtual AsyncCompletionResult BeginWrite(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout,
  108. WaitCallback callback, object state)
  109. {
  110. return connection.BeginWrite(buffer, offset, size, immediate, timeout, callback, state);
  111. }
  112. public virtual void EndWrite()
  113. {
  114. connection.EndWrite();
  115. }
  116. public virtual void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout)
  117. {
  118. connection.Write(buffer, offset, size, immediate, timeout);
  119. }
  120. public virtual void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout, BufferManager bufferManager)
  121. {
  122. connection.Write(buffer, offset, size, immediate, timeout, bufferManager);
  123. }
  124. public virtual int Read(byte[] buffer, int offset, int size, TimeSpan timeout)
  125. {
  126. return connection.Read(buffer, offset, size, timeout);
  127. }
  128. public virtual AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout,
  129. WaitCallback callback, object state)
  130. {
  131. return connection.BeginRead(offset, size, timeout, callback, state);
  132. }
  133. public virtual int EndRead()
  134. {
  135. return connection.EndRead();
  136. }
  137. }
  138. class PreReadConnection : DelegatingConnection
  139. {
  140. int asyncBytesRead;
  141. byte[] preReadData;
  142. int preReadOffset;
  143. int preReadCount;
  144. public PreReadConnection(IConnection innerConnection, byte[] initialData)
  145. : this(innerConnection, initialData, 0, initialData.Length)
  146. {
  147. }
  148. public PreReadConnection(IConnection innerConnection, byte[] initialData, int initialOffset, int initialSize)
  149. : base(innerConnection)
  150. {
  151. this.preReadData = initialData;
  152. this.preReadOffset = initialOffset;
  153. this.preReadCount = initialSize;
  154. }
  155. public void AddPreReadData(byte[] initialData, int initialOffset, int initialSize)
  156. {
  157. if (this.preReadCount > 0)
  158. {
  159. byte[] tempBuffer = this.preReadData;
  160. this.preReadData = DiagnosticUtility.Utility.AllocateByteArray(initialSize + this.preReadCount);
  161. Buffer.BlockCopy(tempBuffer, this.preReadOffset, this.preReadData, 0, this.preReadCount);
  162. Buffer.BlockCopy(initialData, initialOffset, this.preReadData, this.preReadCount, initialSize);
  163. this.preReadOffset = 0;
  164. this.preReadCount += initialSize;
  165. }
  166. else
  167. {
  168. this.preReadData = initialData;
  169. this.preReadOffset = initialOffset;
  170. this.preReadCount = initialSize;
  171. }
  172. }
  173. public override int Read(byte[] buffer, int offset, int size, TimeSpan timeout)
  174. {
  175. ConnectionUtilities.ValidateBufferBounds(buffer, offset, size);
  176. if (this.preReadCount > 0)
  177. {
  178. int bytesToCopy = Math.Min(size, this.preReadCount);
  179. Buffer.BlockCopy(this.preReadData, this.preReadOffset, buffer, offset, bytesToCopy);
  180. this.preReadOffset += bytesToCopy;
  181. this.preReadCount -= bytesToCopy;
  182. return bytesToCopy;
  183. }
  184. return base.Read(buffer, offset, size, timeout);
  185. }
  186. public override AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout, WaitCallback callback, object state)
  187. {
  188. ConnectionUtilities.ValidateBufferBounds(AsyncReadBufferSize, offset, size);
  189. if (this.preReadCount > 0)
  190. {
  191. int bytesToCopy = Math.Min(size, this.preReadCount);
  192. Buffer.BlockCopy(this.preReadData, this.preReadOffset, AsyncReadBuffer, offset, bytesToCopy);
  193. this.preReadOffset += bytesToCopy;
  194. this.preReadCount -= bytesToCopy;
  195. this.asyncBytesRead = bytesToCopy;
  196. return AsyncCompletionResult.Completed;
  197. }
  198. return base.BeginRead(offset, size, timeout, callback, state);
  199. }
  200. public override int EndRead()
  201. {
  202. if (this.asyncBytesRead > 0)
  203. {
  204. int retValue = this.asyncBytesRead;
  205. this.asyncBytesRead = 0;
  206. return retValue;
  207. }
  208. return base.EndRead();
  209. }
  210. }
  211. class ConnectionStream : Stream
  212. {
  213. TimeSpan closeTimeout;
  214. int readTimeout;
  215. int writeTimeout;
  216. IConnection connection;
  217. bool immediate;
  218. public ConnectionStream(IConnection connection, IDefaultCommunicationTimeouts defaultTimeouts)
  219. {
  220. this.connection = connection;
  221. this.closeTimeout = defaultTimeouts.CloseTimeout;
  222. this.ReadTimeout = TimeoutHelper.ToMilliseconds(defaultTimeouts.ReceiveTimeout);
  223. this.WriteTimeout = TimeoutHelper.ToMilliseconds(defaultTimeouts.SendTimeout);
  224. immediate = true;
  225. }
  226. public IConnection Connection
  227. {
  228. get { return connection; }
  229. }
  230. public override bool CanRead
  231. {
  232. get { return true; }
  233. }
  234. public override bool CanSeek
  235. {
  236. get { return false; }
  237. }
  238. public override bool CanTimeout
  239. {
  240. get { return true; }
  241. }
  242. public override bool CanWrite
  243. {
  244. get { return true; }
  245. }
  246. public TimeSpan CloseTimeout
  247. {
  248. get { return closeTimeout; }
  249. set { this.closeTimeout = value; }
  250. }
  251. public override int ReadTimeout
  252. {
  253. get { return this.readTimeout; }
  254. set
  255. {
  256. if (value < -1)
  257. {
  258. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
  259. SR.GetString(SR.ValueMustBeInRange, -1, int.MaxValue)));
  260. }
  261. this.readTimeout = value;
  262. }
  263. }
  264. public override int WriteTimeout
  265. {
  266. get { return this.writeTimeout; }
  267. set
  268. {
  269. if (value < -1)
  270. {
  271. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
  272. SR.GetString(SR.ValueMustBeInRange, -1, int.MaxValue)));
  273. }
  274. this.writeTimeout = value;
  275. }
  276. }
  277. public bool Immediate
  278. {
  279. get { return immediate; }
  280. set { immediate = value; }
  281. }
  282. public override long Length
  283. {
  284. get
  285. {
  286. #pragma warning suppress 56503 // [....], required by the Stream.Length contract
  287. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  288. }
  289. }
  290. public override long Position
  291. {
  292. get
  293. {
  294. #pragma warning suppress 56503 // [....], required by the Stream.Position contract
  295. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  296. }
  297. set
  298. {
  299. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  300. }
  301. }
  302. public TraceEventType ExceptionEventType
  303. {
  304. get { return connection.ExceptionEventType; }
  305. set { connection.ExceptionEventType = value; }
  306. }
  307. public void Abort()
  308. {
  309. connection.Abort();
  310. }
  311. public override void Close()
  312. {
  313. connection.Close(this.CloseTimeout, false);
  314. }
  315. public override void Flush()
  316. {
  317. // NOP
  318. }
  319. public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  320. {
  321. return new WriteAsyncResult(this.connection, buffer, offset, count, this.Immediate, TimeoutHelper.FromMilliseconds(this.WriteTimeout), callback, state);
  322. }
  323. public override void EndWrite(IAsyncResult asyncResult)
  324. {
  325. WriteAsyncResult.End(asyncResult);
  326. }
  327. public override void Write(byte[] buffer, int offset, int count)
  328. {
  329. connection.Write(buffer, offset, count, this.Immediate, TimeoutHelper.FromMilliseconds(this.WriteTimeout));
  330. }
  331. public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
  332. {
  333. return new ReadAsyncResult(connection, buffer, offset, count, TimeoutHelper.FromMilliseconds(this.ReadTimeout), callback, state);
  334. }
  335. public override int EndRead(IAsyncResult asyncResult)
  336. {
  337. return ReadAsyncResult.End(asyncResult);
  338. }
  339. public override int Read(byte[] buffer, int offset, int count)
  340. {
  341. return this.Read(buffer, offset, count, TimeoutHelper.FromMilliseconds(this.ReadTimeout));
  342. }
  343. protected int Read(byte[] buffer, int offset, int count, TimeSpan timeout)
  344. {
  345. return connection.Read(buffer, offset, count, timeout);
  346. }
  347. public override long Seek(long offset, SeekOrigin origin)
  348. {
  349. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  350. }
  351. public override void SetLength(long value)
  352. {
  353. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotSupportedException(SR.GetString(SR.SeekNotSupported)));
  354. }
  355. public void Shutdown(TimeSpan timeout)
  356. {
  357. connection.Shutdown(timeout);
  358. }
  359. public IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state)
  360. {
  361. return this.connection.BeginValidate(uri, callback, state);
  362. }
  363. public bool EndValidate(IAsyncResult result)
  364. {
  365. return this.connection.EndValidate(result);
  366. }
  367. abstract class IOAsyncResult : AsyncResult
  368. {
  369. static WaitCallback onAsyncIOComplete;
  370. IConnection connection;
  371. protected IOAsyncResult(IConnection connection, AsyncCallback callback, object state)
  372. : base(callback, state)
  373. {
  374. this.connection = connection;
  375. }
  376. protected WaitCallback GetWaitCompletion()
  377. {
  378. if (onAsyncIOComplete == null)
  379. {
  380. onAsyncIOComplete = new WaitCallback(OnAsyncIOComplete);
  381. }
  382. return onAsyncIOComplete;
  383. }
  384. protected abstract void HandleIO(IConnection connection);
  385. static void OnAsyncIOComplete(object state)
  386. {
  387. IOAsyncResult thisPtr = (IOAsyncResult)state;
  388. Exception completionException = null;
  389. try
  390. {
  391. thisPtr.HandleIO(thisPtr.connection);
  392. }
  393. #pragma warning suppress 56500 // [....], transferring exception to another thread
  394. catch (Exception e)
  395. {
  396. if (Fx.IsFatal(e))
  397. {
  398. throw;
  399. }
  400. completionException = e;
  401. }
  402. thisPtr.Complete(false, completionException);
  403. }
  404. }
  405. sealed class ReadAsyncResult : IOAsyncResult
  406. {
  407. int bytesRead;
  408. byte[] buffer;
  409. int offset;
  410. public ReadAsyncResult(IConnection connection, byte[] buffer, int offset, int count, TimeSpan timeout,
  411. AsyncCallback callback, object state)
  412. : base(connection, callback, state)
  413. {
  414. this.buffer = buffer;
  415. this.offset = offset;
  416. AsyncCompletionResult readResult = connection.BeginRead(0, Math.Min(count, connection.AsyncReadBufferSize),
  417. timeout, GetWaitCompletion(), this);
  418. if (readResult == AsyncCompletionResult.Completed)
  419. {
  420. HandleIO(connection);
  421. base.Complete(true);
  422. }
  423. }
  424. protected override void HandleIO(IConnection connection)
  425. {
  426. bytesRead = connection.EndRead();
  427. Buffer.BlockCopy(connection.AsyncReadBuffer, 0, buffer, offset, bytesRead);
  428. }
  429. public static int End(IAsyncResult result)
  430. {
  431. ReadAsyncResult thisPtr = AsyncResult.End<ReadAsyncResult>(result);
  432. return thisPtr.bytesRead;
  433. }
  434. }
  435. sealed class WriteAsyncResult : IOAsyncResult
  436. {
  437. public WriteAsyncResult(IConnection connection, byte[] buffer, int offset, int count, bool immediate, TimeSpan timeout, AsyncCallback callback, object state)
  438. : base(connection, callback, state)
  439. {
  440. AsyncCompletionResult writeResult = connection.BeginWrite(buffer, offset, count, immediate, timeout, GetWaitCompletion(), this);
  441. if (writeResult == AsyncCompletionResult.Completed)
  442. {
  443. HandleIO(connection);
  444. base.Complete(true);
  445. }
  446. }
  447. protected override void HandleIO(IConnection connection)
  448. {
  449. connection.EndWrite();
  450. }
  451. public static void End(IAsyncResult result)
  452. {
  453. AsyncResult.End<WriteAsyncResult>(result);
  454. }
  455. }
  456. }
  457. class StreamConnection : IConnection
  458. {
  459. byte[] asyncReadBuffer;
  460. int bytesRead;
  461. ConnectionStream innerStream;
  462. AsyncCallback onRead;
  463. AsyncCallback onWrite;
  464. IAsyncResult readResult;
  465. IAsyncResult writeResult;
  466. WaitCallback readCallback;
  467. WaitCallback writeCallback;
  468. Stream stream;
  469. public StreamConnection(Stream stream, ConnectionStream innerStream)
  470. {
  471. Fx.Assert(stream != null, "StreamConnection: Stream cannot be null.");
  472. Fx.Assert(innerStream != null, "StreamConnection: Inner stream cannot be null.");
  473. this.stream = stream;
  474. this.innerStream = innerStream;
  475. onRead = Fx.ThunkCallback(new AsyncCallback(OnRead));
  476. onWrite = Fx.ThunkCallback(new AsyncCallback(OnWrite));
  477. }
  478. public byte[] AsyncReadBuffer
  479. {
  480. get
  481. {
  482. if (this.asyncReadBuffer == null)
  483. {
  484. lock (ThisLock)
  485. {
  486. if (this.asyncReadBuffer == null)
  487. {
  488. this.asyncReadBuffer = DiagnosticUtility.Utility.AllocateByteArray(innerStream.Connection.AsyncReadBufferSize);
  489. }
  490. }
  491. }
  492. return this.asyncReadBuffer;
  493. }
  494. }
  495. public int AsyncReadBufferSize
  496. {
  497. get { return innerStream.Connection.AsyncReadBufferSize; }
  498. }
  499. public Stream Stream
  500. {
  501. get { return this.stream; }
  502. }
  503. public object ThisLock
  504. {
  505. get { return this; }
  506. }
  507. public TraceEventType ExceptionEventType
  508. {
  509. get { return innerStream.ExceptionEventType; }
  510. set { innerStream.ExceptionEventType = value; }
  511. }
  512. public IPEndPoint RemoteIPEndPoint
  513. {
  514. get
  515. {
  516. #pragma warning suppress 56503 // Not publicly accessible and this should never be called.
  517. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotImplementedException());
  518. }
  519. }
  520. public void Abort()
  521. {
  522. innerStream.Abort();
  523. }
  524. Exception ConvertIOException(IOException ioException)
  525. {
  526. if (ioException.InnerException is TimeoutException)
  527. {
  528. return new TimeoutException(ioException.InnerException.Message, ioException);
  529. }
  530. else if (ioException.InnerException is CommunicationObjectAbortedException)
  531. {
  532. return new CommunicationObjectAbortedException(ioException.InnerException.Message, ioException);
  533. }
  534. else if (ioException.InnerException is CommunicationException)
  535. {
  536. return new CommunicationException(ioException.InnerException.Message, ioException);
  537. }
  538. else
  539. {
  540. return new CommunicationException(SR.GetString(SR.StreamError), ioException);
  541. }
  542. }
  543. public void Close(TimeSpan timeout, bool asyncAndLinger)
  544. {
  545. innerStream.CloseTimeout = timeout;
  546. try
  547. {
  548. stream.Close();
  549. }
  550. catch (IOException ioException)
  551. {
  552. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
  553. }
  554. }
  555. public void Shutdown(TimeSpan timeout)
  556. {
  557. innerStream.Shutdown(timeout);
  558. }
  559. public object DuplicateAndClose(int targetProcessId)
  560. {
  561. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotImplementedException());
  562. }
  563. public virtual object GetCoreTransport()
  564. {
  565. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new NotImplementedException());
  566. }
  567. public IAsyncResult BeginValidate(Uri uri, AsyncCallback callback, object state)
  568. {
  569. return this.innerStream.BeginValidate(uri, callback, state);
  570. }
  571. public bool EndValidate(IAsyncResult result)
  572. {
  573. return this.innerStream.EndValidate(result);
  574. }
  575. public AsyncCompletionResult BeginWrite(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout,
  576. WaitCallback callback, object state)
  577. {
  578. if (callback == null)
  579. {
  580. Fx.AssertAndThrow("Cannot call BeginWrite without a callback");
  581. }
  582. if (this.writeCallback != null)
  583. {
  584. Fx.AssertAndThrow("BeginWrite cannot be called twice");
  585. }
  586. this.writeCallback = callback;
  587. bool throwing = true;
  588. try
  589. {
  590. innerStream.Immediate = immediate;
  591. SetWriteTimeout(timeout);
  592. IAsyncResult localResult = stream.BeginWrite(buffer, offset, size, this.onWrite, state);
  593. if (!localResult.CompletedSynchronously)
  594. {
  595. throwing = false;
  596. return AsyncCompletionResult.Queued;
  597. }
  598. throwing = false;
  599. stream.EndWrite(localResult);
  600. }
  601. catch (IOException ioException)
  602. {
  603. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
  604. }
  605. finally
  606. {
  607. if (throwing)
  608. {
  609. this.writeCallback = null;
  610. }
  611. }
  612. return AsyncCompletionResult.Completed;
  613. }
  614. public void EndWrite()
  615. {
  616. IAsyncResult localResult = this.writeResult;
  617. this.writeResult = null;
  618. this.writeCallback = null;
  619. if (localResult != null)
  620. {
  621. try
  622. {
  623. stream.EndWrite(localResult);
  624. }
  625. catch (IOException ioException)
  626. {
  627. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
  628. }
  629. }
  630. }
  631. void OnWrite(IAsyncResult result)
  632. {
  633. if (result.CompletedSynchronously)
  634. {
  635. return;
  636. }
  637. if (this.writeResult != null)
  638. {
  639. throw Fx.AssertAndThrow("StreamConnection: OnWrite called twice.");
  640. }
  641. this.writeResult = result;
  642. this.writeCallback(result.AsyncState);
  643. }
  644. public void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout)
  645. {
  646. try
  647. {
  648. innerStream.Immediate = immediate;
  649. SetWriteTimeout(timeout);
  650. stream.Write(buffer, offset, size);
  651. }
  652. catch (IOException ioException)
  653. {
  654. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
  655. }
  656. }
  657. public void Write(byte[] buffer, int offset, int size, bool immediate, TimeSpan timeout, BufferManager bufferManager)
  658. {
  659. Write(buffer, offset, size, immediate, timeout);
  660. bufferManager.ReturnBuffer(buffer);
  661. }
  662. void SetReadTimeout(TimeSpan timeout)
  663. {
  664. int timeoutInMilliseconds = TimeoutHelper.ToMilliseconds(timeout);
  665. if (stream.CanTimeout)
  666. {
  667. stream.ReadTimeout = timeoutInMilliseconds;
  668. }
  669. innerStream.ReadTimeout = timeoutInMilliseconds;
  670. }
  671. void SetWriteTimeout(TimeSpan timeout)
  672. {
  673. int timeoutInMilliseconds = TimeoutHelper.ToMilliseconds(timeout);
  674. if (stream.CanTimeout)
  675. {
  676. stream.WriteTimeout = timeoutInMilliseconds;
  677. }
  678. innerStream.WriteTimeout = timeoutInMilliseconds;
  679. }
  680. public int Read(byte[] buffer, int offset, int size, TimeSpan timeout)
  681. {
  682. try
  683. {
  684. SetReadTimeout(timeout);
  685. return stream.Read(buffer, offset, size);
  686. }
  687. catch (IOException ioException)
  688. {
  689. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
  690. }
  691. }
  692. public AsyncCompletionResult BeginRead(int offset, int size, TimeSpan timeout, WaitCallback callback, object state)
  693. {
  694. ConnectionUtilities.ValidateBufferBounds(AsyncReadBufferSize, offset, size);
  695. readCallback = callback;
  696. try
  697. {
  698. SetReadTimeout(timeout);
  699. IAsyncResult localResult = stream.BeginRead(AsyncReadBuffer, offset, size, onRead, state);
  700. if (!localResult.CompletedSynchronously)
  701. {
  702. return AsyncCompletionResult.Queued;
  703. }
  704. bytesRead = stream.EndRead(localResult);
  705. }
  706. catch (IOException ioException)
  707. {
  708. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
  709. }
  710. return AsyncCompletionResult.Completed;
  711. }
  712. public int EndRead()
  713. {
  714. IAsyncResult localResult = this.readResult;
  715. this.readResult = null;
  716. if (localResult != null)
  717. {
  718. try
  719. {
  720. bytesRead = stream.EndRead(localResult);
  721. }
  722. catch (IOException ioException)
  723. {
  724. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(ConvertIOException(ioException));
  725. }
  726. }
  727. return bytesRead;
  728. }
  729. void OnRead(IAsyncResult result)
  730. {
  731. if (result.CompletedSynchronously)
  732. {
  733. return;
  734. }
  735. if (this.readResult != null)
  736. {
  737. throw Fx.AssertAndThrow("StreamConnection: OnRead called twice.");
  738. }
  739. this.readResult = result;
  740. readCallback(result.AsyncState);
  741. }
  742. }
  743. class ConnectionMessageProperty
  744. {
  745. IConnection connection;
  746. public ConnectionMessageProperty(IConnection connection)
  747. {
  748. this.connection = connection;
  749. }
  750. public static string Name
  751. {
  752. get { return "iconnection"; }
  753. }
  754. public IConnection Connection
  755. {
  756. get { return this.connection; }
  757. }
  758. }
  759. static class ConnectionUtilities
  760. {
  761. internal static void CloseNoThrow(IConnection connection, TimeSpan timeout)
  762. {
  763. bool success = false;
  764. try
  765. {
  766. connection.Close(timeout, false);
  767. success = true;
  768. }
  769. catch (TimeoutException e)
  770. {
  771. if (TD.CloseTimeoutIsEnabled())
  772. {
  773. TD.CloseTimeout(e.Message);
  774. }
  775. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  776. }
  777. catch (CommunicationException e)
  778. {
  779. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  780. }
  781. finally
  782. {
  783. if (!success)
  784. {
  785. connection.Abort();
  786. }
  787. }
  788. }
  789. internal static void ValidateBufferBounds(ArraySegment<byte> buffer)
  790. {
  791. ValidateBufferBounds(buffer.Array, buffer.Offset, buffer.Count);
  792. }
  793. internal static void ValidateBufferBounds(byte[] buffer, int offset, int size)
  794. {
  795. if (buffer == null)
  796. {
  797. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("buffer");
  798. }
  799. ValidateBufferBounds(buffer.Length, offset, size);
  800. }
  801. internal static void ValidateBufferBounds(int bufferSize, int offset, int size)
  802. {
  803. if (offset < 0)
  804. {
  805. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("offset", offset, SR.GetString(
  806. SR.ValueMustBeNonNegative)));
  807. }
  808. if (offset > bufferSize)
  809. {
  810. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("offset", offset, SR.GetString(
  811. SR.OffsetExceedsBufferSize, bufferSize)));
  812. }
  813. if (size <= 0)
  814. {
  815. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("size", size, SR.GetString(
  816. SR.ValueMustBePositive)));
  817. }
  818. int remainingBufferSpace = bufferSize - offset;
  819. if (size > remainingBufferSpace)
  820. {
  821. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("size", size, SR.GetString(
  822. SR.SizeExceedsRemainingBufferSpace, remainingBufferSpace)));
  823. }
  824. }
  825. }
  826. }