SessionConnectionReader.cs 63 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411141214131414141514161417141814191420142114221423142414251426142714281429143014311432143314341435143614371438143914401441144214431444144514461447144814491450145114521453145414551456145714581459146014611462146314641465146614671468146914701471147214731474147514761477147814791480148114821483148414851486148714881489149014911492149314941495149614971498149915001501150215031504150515061507150815091510151115121513151415151516151715181519152015211522152315241525152615271528152915301531153215331534153515361537153815391540154115421543154415451546154715481549155015511552155315541555155615571558155915601561156215631564156515661567156815691570157115721573157415751576157715781579158015811582158315841585158615871588158915901591159215931594159515961597159815991600160116021603160416051606160716081609161016111612161316141615161616171618161916201621162216231624162516261627162816291630
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System;
  7. using System.Diagnostics;
  8. using System.Net;
  9. using System.Runtime;
  10. using System.Runtime.CompilerServices;
  11. using System.Security.Authentication.ExtendedProtection;
  12. using System.ServiceModel;
  13. using System.ServiceModel.Activation;
  14. using System.ServiceModel.Description;
  15. using System.ServiceModel.Diagnostics;
  16. using System.ServiceModel.Dispatcher;
  17. using System.ServiceModel.Security;
  18. using System.Threading;
  19. using System.Xml;
  20. using System.ServiceModel.Diagnostics.Application;
  21. delegate void ServerSessionPreambleCallback(ServerSessionPreambleConnectionReader serverSessionPreambleReader);
  22. delegate void ServerSessionPreambleDemuxCallback(ServerSessionPreambleConnectionReader serverSessionPreambleReader, ConnectionDemuxer connectionDemuxer);
  23. interface ISessionPreambleHandler
  24. {
  25. void HandleServerSessionPreamble(ServerSessionPreambleConnectionReader serverSessionPreambleReader,
  26. ConnectionDemuxer connectionDemuxer);
  27. }
  28. // reads everything we need in order to match a channel (i.e. up to the via)
  29. class ServerSessionPreambleConnectionReader : InitialServerConnectionReader
  30. {
  31. ServerSessionDecoder decoder;
  32. byte[] connectionBuffer;
  33. int offset;
  34. int size;
  35. TransportSettingsCallback transportSettingsCallback;
  36. ServerSessionPreambleCallback callback;
  37. static WaitCallback readCallback;
  38. IConnectionOrientedTransportFactorySettings settings;
  39. Uri via;
  40. Action<Uri> viaDelegate;
  41. TimeoutHelper receiveTimeoutHelper;
  42. IConnection rawConnection;
  43. static AsyncCallback onValidate;
  44. public ServerSessionPreambleConnectionReader(IConnection connection, Action connectionDequeuedCallback,
  45. long streamPosition, int offset, int size, TransportSettingsCallback transportSettingsCallback,
  46. ConnectionClosedCallback closedCallback, ServerSessionPreambleCallback callback)
  47. : base(connection, closedCallback)
  48. {
  49. this.rawConnection = connection;
  50. this.decoder = new ServerSessionDecoder(streamPosition, MaxViaSize, MaxContentTypeSize);
  51. this.offset = offset;
  52. this.size = size;
  53. this.transportSettingsCallback = transportSettingsCallback;
  54. this.callback = callback;
  55. this.ConnectionDequeuedCallback = connectionDequeuedCallback;
  56. }
  57. public int BufferOffset
  58. {
  59. get { return offset; }
  60. }
  61. public int BufferSize
  62. {
  63. get { return size; }
  64. }
  65. public ServerSessionDecoder Decoder
  66. {
  67. get { return decoder; }
  68. }
  69. public IConnection RawConnection
  70. {
  71. get { return rawConnection; }
  72. }
  73. public Uri Via
  74. {
  75. get { return this.via; }
  76. }
  77. TimeSpan GetRemainingTimeout()
  78. {
  79. return this.receiveTimeoutHelper.RemainingTime();
  80. }
  81. static void ReadCallback(object state)
  82. {
  83. ServerSessionPreambleConnectionReader reader = (ServerSessionPreambleConnectionReader)state;
  84. bool success = false;
  85. try
  86. {
  87. reader.GetReadResult();
  88. reader.ContinueReading();
  89. success = true;
  90. }
  91. catch (CommunicationException exception)
  92. {
  93. DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
  94. }
  95. catch (TimeoutException exception)
  96. {
  97. if (TD.ReceiveTimeoutIsEnabled())
  98. {
  99. TD.ReceiveTimeout(exception.Message);
  100. }
  101. DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
  102. }
  103. catch (Exception e)
  104. {
  105. if (Fx.IsFatal(e))
  106. {
  107. throw;
  108. }
  109. if (!ExceptionHandler.HandleTransportExceptionHelper(e))
  110. {
  111. throw;
  112. }
  113. // containment -- all errors abort the reader, no additional containment action needed
  114. }
  115. finally
  116. {
  117. if (!success)
  118. {
  119. reader.Abort();
  120. }
  121. }
  122. }
  123. void GetReadResult()
  124. {
  125. offset = 0;
  126. size = Connection.EndRead();
  127. if (size == 0)
  128. {
  129. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
  130. }
  131. }
  132. void ContinueReading()
  133. {
  134. bool success = false;
  135. try
  136. {
  137. for (;;)
  138. {
  139. if (size == 0)
  140. {
  141. if (readCallback == null)
  142. {
  143. readCallback = new WaitCallback(ReadCallback);
  144. }
  145. if (Connection.BeginRead(0, connectionBuffer.Length, GetRemainingTimeout(), readCallback, this)
  146. == AsyncCompletionResult.Queued)
  147. {
  148. break;
  149. }
  150. GetReadResult();
  151. }
  152. int bytesDecoded = decoder.Decode(connectionBuffer, offset, size);
  153. if (bytesDecoded > 0)
  154. {
  155. offset += bytesDecoded;
  156. size -= bytesDecoded;
  157. }
  158. if (decoder.CurrentState == ServerSessionDecoder.State.PreUpgradeStart)
  159. {
  160. if (onValidate == null)
  161. {
  162. onValidate = Fx.ThunkCallback(new AsyncCallback(OnValidate));
  163. }
  164. this.via = decoder.Via;
  165. IAsyncResult result = this.Connection.BeginValidate(this.via, onValidate, this);
  166. if (result.CompletedSynchronously)
  167. {
  168. if (!VerifyValidationResult(result))
  169. {
  170. // This goes through the failure path (Abort) even though it doesn't throw.
  171. return;
  172. }
  173. }
  174. break; //exit loop, set success=true;
  175. }
  176. }
  177. success = true;
  178. }
  179. catch (CommunicationException exception)
  180. {
  181. DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
  182. }
  183. catch (TimeoutException exception)
  184. {
  185. if (TD.ReceiveTimeoutIsEnabled())
  186. {
  187. TD.ReceiveTimeout(exception.Message);
  188. }
  189. DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
  190. }
  191. catch (Exception e)
  192. {
  193. if (Fx.IsFatal(e))
  194. {
  195. throw;
  196. }
  197. if (!ExceptionHandler.HandleTransportExceptionHelper(e))
  198. {
  199. throw;
  200. }
  201. // containment -- all exceptions abort the reader, no additional containment action necessary
  202. }
  203. finally
  204. {
  205. if (!success)
  206. {
  207. Abort();
  208. }
  209. }
  210. }
  211. //returns true if validation was successful
  212. bool VerifyValidationResult(IAsyncResult result)
  213. {
  214. return this.Connection.EndValidate(result) && this.ContinuePostValidationProcessing();
  215. }
  216. static void OnValidate(IAsyncResult result)
  217. {
  218. bool success = false;
  219. ServerSessionPreambleConnectionReader thisPtr = (ServerSessionPreambleConnectionReader)result.AsyncState;
  220. try
  221. {
  222. if (!result.CompletedSynchronously)
  223. {
  224. if (!thisPtr.VerifyValidationResult(result))
  225. {
  226. // This goes through the failure path (Abort) even though it doesn't throw.
  227. return;
  228. }
  229. }
  230. success = true;
  231. }
  232. catch (CommunicationException exception)
  233. {
  234. DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
  235. }
  236. catch (TimeoutException exception)
  237. {
  238. if (TD.ReceiveTimeoutIsEnabled())
  239. {
  240. TD.ReceiveTimeout(exception.Message);
  241. }
  242. DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
  243. }
  244. catch (Exception e)
  245. {
  246. if (Fx.IsFatal(e))
  247. {
  248. throw;
  249. }
  250. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  251. }
  252. finally
  253. {
  254. if (!success)
  255. {
  256. thisPtr.Abort();
  257. }
  258. }
  259. }
  260. //returns false if the connection should be aborted
  261. bool ContinuePostValidationProcessing()
  262. {
  263. if (viaDelegate != null)
  264. {
  265. try
  266. {
  267. viaDelegate(via);
  268. }
  269. catch (ServiceActivationException e)
  270. {
  271. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  272. // return fault and close connection
  273. SendFault(FramingEncodingString.ServiceActivationFailedFault);
  274. return true;
  275. }
  276. }
  277. this.settings = transportSettingsCallback(via);
  278. if (settings == null)
  279. {
  280. EndpointNotFoundException e = new EndpointNotFoundException(SR.GetString(SR.EndpointNotFound, decoder.Via));
  281. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  282. SendFault(FramingEncodingString.EndpointNotFoundFault);
  283. return false;
  284. }
  285. // we have enough information to hand off to a channel. Our job is done
  286. callback(this);
  287. return true;
  288. }
  289. public void SendFault(string faultString)
  290. {
  291. InitialServerConnectionReader.SendFault(
  292. Connection, faultString, this.connectionBuffer, GetRemainingTimeout(),
  293. TransportDefaults.MaxDrainSize);
  294. base.Close(GetRemainingTimeout());
  295. }
  296. public void StartReading(Action<Uri> viaDelegate, TimeSpan receiveTimeout)
  297. {
  298. this.viaDelegate = viaDelegate;
  299. this.receiveTimeoutHelper = new TimeoutHelper(receiveTimeout);
  300. this.connectionBuffer = Connection.AsyncReadBuffer;
  301. ContinueReading();
  302. }
  303. public IDuplexSessionChannel CreateDuplexSessionChannel(ConnectionOrientedTransportChannelListener channelListener, EndpointAddress localAddress, bool exposeConnectionProperty, ConnectionDemuxer connectionDemuxer)
  304. {
  305. return new ServerFramingDuplexSessionChannel(channelListener, this, localAddress, exposeConnectionProperty, connectionDemuxer);
  306. }
  307. class ServerFramingDuplexSessionChannel : FramingDuplexSessionChannel
  308. {
  309. ConnectionOrientedTransportChannelListener channelListener;
  310. ConnectionDemuxer connectionDemuxer;
  311. ServerSessionConnectionReader sessionReader;
  312. ServerSessionDecoder decoder;
  313. IConnection rawConnection;
  314. byte[] connectionBuffer;
  315. int offset;
  316. int size;
  317. StreamUpgradeAcceptor upgradeAcceptor;
  318. IStreamUpgradeChannelBindingProvider channelBindingProvider;
  319. public ServerFramingDuplexSessionChannel(ConnectionOrientedTransportChannelListener channelListener, ServerSessionPreambleConnectionReader preambleReader,
  320. EndpointAddress localAddress, bool exposeConnectionProperty, ConnectionDemuxer connectionDemuxer)
  321. : base(channelListener, localAddress, preambleReader.Via, exposeConnectionProperty)
  322. {
  323. this.channelListener = channelListener;
  324. this.connectionDemuxer = connectionDemuxer;
  325. this.Connection = preambleReader.Connection;
  326. this.decoder = preambleReader.Decoder;
  327. this.connectionBuffer = preambleReader.connectionBuffer;
  328. this.offset = preambleReader.BufferOffset;
  329. this.size = preambleReader.BufferSize;
  330. this.rawConnection = preambleReader.RawConnection;
  331. StreamUpgradeProvider upgrade = channelListener.Upgrade;
  332. if (upgrade != null)
  333. {
  334. this.channelBindingProvider = upgrade.GetProperty<IStreamUpgradeChannelBindingProvider>();
  335. this.upgradeAcceptor = upgrade.CreateUpgradeAcceptor();
  336. }
  337. }
  338. protected override void ReturnConnectionIfNecessary(bool abort, TimeSpan timeout)
  339. {
  340. IConnection localConnection = null;
  341. if (this.sessionReader != null)
  342. {
  343. lock (ThisLock)
  344. {
  345. localConnection = this.sessionReader.GetRawConnection();
  346. }
  347. }
  348. if (localConnection != null)
  349. {
  350. if (abort)
  351. {
  352. localConnection.Abort();
  353. }
  354. else
  355. {
  356. this.connectionDemuxer.ReuseConnection(localConnection, timeout);
  357. }
  358. this.connectionDemuxer = null;
  359. }
  360. }
  361. public override T GetProperty<T>()
  362. {
  363. if (typeof(T) == typeof(IChannelBindingProvider))
  364. {
  365. return (T)(object)this.channelBindingProvider;
  366. }
  367. return base.GetProperty<T>();
  368. }
  369. protected override void PrepareMessage(Message message)
  370. {
  371. channelListener.RaiseMessageReceived();
  372. base.PrepareMessage(message);
  373. }
  374. // perform security handshake and ACK connection
  375. protected override void OnOpen(TimeSpan timeout)
  376. {
  377. bool success = false;
  378. try
  379. {
  380. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  381. // first validate our content type
  382. ValidateContentType(ref timeoutHelper);
  383. // next read any potential upgrades and finish consuming the preamble
  384. for (;;)
  385. {
  386. if (size == 0)
  387. {
  388. offset = 0;
  389. size = Connection.Read(connectionBuffer, 0, connectionBuffer.Length, timeoutHelper.RemainingTime());
  390. if (size == 0)
  391. {
  392. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
  393. }
  394. }
  395. for (;;)
  396. {
  397. DecodeBytes();
  398. switch (decoder.CurrentState)
  399. {
  400. case ServerSessionDecoder.State.UpgradeRequest:
  401. ProcessUpgradeRequest(ref timeoutHelper);
  402. // accept upgrade
  403. Connection.Write(ServerSessionEncoder.UpgradeResponseBytes, 0, ServerSessionEncoder.UpgradeResponseBytes.Length, true, timeoutHelper.RemainingTime());
  404. IConnection connectionToUpgrade = this.Connection;
  405. if (this.size > 0)
  406. {
  407. connectionToUpgrade = new PreReadConnection(connectionToUpgrade, this.connectionBuffer, this.offset, this.size);
  408. }
  409. try
  410. {
  411. this.Connection = InitialServerConnectionReader.UpgradeConnection(connectionToUpgrade, upgradeAcceptor, this);
  412. if (this.channelBindingProvider != null && this.channelBindingProvider.IsChannelBindingSupportEnabled)
  413. {
  414. this.SetChannelBinding(this.channelBindingProvider.GetChannelBinding(this.upgradeAcceptor, ChannelBindingKind.Endpoint));
  415. }
  416. this.connectionBuffer = Connection.AsyncReadBuffer;
  417. }
  418. #pragma warning suppress 56500
  419. catch (Exception exception)
  420. {
  421. if (Fx.IsFatal(exception))
  422. throw;
  423. // Audit Authentication Failure
  424. WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
  425. throw;
  426. }
  427. break;
  428. case ServerSessionDecoder.State.Start:
  429. SetupSecurityIfNecessary();
  430. // we've finished the preamble. Ack and return.
  431. Connection.Write(ServerSessionEncoder.AckResponseBytes, 0,
  432. ServerSessionEncoder.AckResponseBytes.Length, true, timeoutHelper.RemainingTime());
  433. SetupSessionReader();
  434. success = true;
  435. return;
  436. }
  437. if (size == 0)
  438. break;
  439. }
  440. }
  441. }
  442. finally
  443. {
  444. if (!success)
  445. {
  446. Connection.Abort();
  447. }
  448. }
  449. }
  450. void AcceptUpgradedConnection(IConnection upgradedConnection)
  451. {
  452. this.Connection = upgradedConnection;
  453. if (this.channelBindingProvider != null && this.channelBindingProvider.IsChannelBindingSupportEnabled)
  454. {
  455. this.SetChannelBinding(this.channelBindingProvider.GetChannelBinding(this.upgradeAcceptor, ChannelBindingKind.Endpoint));
  456. }
  457. this.connectionBuffer = Connection.AsyncReadBuffer;
  458. }
  459. void ValidateContentType(ref TimeoutHelper timeoutHelper)
  460. {
  461. this.MessageEncoder = channelListener.MessageEncoderFactory.CreateSessionEncoder();
  462. if (!this.MessageEncoder.IsContentTypeSupported(decoder.ContentType))
  463. {
  464. SendFault(FramingEncodingString.ContentTypeInvalidFault, ref timeoutHelper);
  465. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(
  466. SR.ContentTypeMismatch, decoder.ContentType, this.MessageEncoder.ContentType)));
  467. }
  468. ICompressedMessageEncoder compressedMessageEncoder = this.MessageEncoder as ICompressedMessageEncoder;
  469. if (compressedMessageEncoder != null && compressedMessageEncoder.CompressionEnabled)
  470. {
  471. compressedMessageEncoder.SetSessionContentType(this.decoder.ContentType);
  472. }
  473. }
  474. void DecodeBytes()
  475. {
  476. int bytesDecoded = decoder.Decode(connectionBuffer, offset, size);
  477. if (bytesDecoded > 0)
  478. {
  479. offset += bytesDecoded;
  480. size -= bytesDecoded;
  481. }
  482. }
  483. void ProcessUpgradeRequest(ref TimeoutHelper timeoutHelper)
  484. {
  485. if (this.upgradeAcceptor == null)
  486. {
  487. SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
  488. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  489. new ProtocolException(SR.GetString(SR.UpgradeRequestToNonupgradableService, decoder.Upgrade)));
  490. }
  491. if (!this.upgradeAcceptor.CanUpgrade(decoder.Upgrade))
  492. {
  493. SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
  494. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  495. new ProtocolException(SR.GetString(SR.UpgradeProtocolNotSupported, decoder.Upgrade)));
  496. }
  497. }
  498. void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
  499. {
  500. InitialServerConnectionReader.SendFault(Connection, faultString,
  501. connectionBuffer, timeoutHelper.RemainingTime(), TransportDefaults.MaxDrainSize);
  502. }
  503. void SetupSecurityIfNecessary()
  504. {
  505. StreamSecurityUpgradeAcceptor securityUpgradeAcceptor = this.upgradeAcceptor as StreamSecurityUpgradeAcceptor;
  506. if (securityUpgradeAcceptor != null)
  507. {
  508. this.RemoteSecurity = securityUpgradeAcceptor.GetRemoteSecurity();
  509. if (this.RemoteSecurity == null)
  510. {
  511. Exception securityFailedException = new ProtocolException(
  512. SR.GetString(SR.RemoteSecurityNotNegotiatedOnStreamUpgrade, this.Via));
  513. WriteAuditFailure(securityUpgradeAcceptor, securityFailedException);
  514. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(securityFailedException);
  515. }
  516. else
  517. {
  518. // Audit Authentication Success
  519. WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Success, null);
  520. }
  521. }
  522. }
  523. void SetupSessionReader()
  524. {
  525. this.sessionReader = new ServerSessionConnectionReader(this);
  526. base.SetMessageSource(this.sessionReader);
  527. }
  528. protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  529. {
  530. return new OpenAsyncResult(this, timeout, callback, state);
  531. }
  532. protected override void OnEndOpen(IAsyncResult result)
  533. {
  534. OpenAsyncResult.End(result);
  535. }
  536. #region Transport Security Auditing
  537. void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, Exception exception)
  538. {
  539. try
  540. {
  541. WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Failure, exception);
  542. }
  543. #pragma warning suppress 56500 // covered by FxCop
  544. catch (Exception auditException)
  545. {
  546. if (Fx.IsFatal(auditException))
  547. {
  548. throw;
  549. }
  550. DiagnosticUtility.TraceHandledException(auditException, TraceEventType.Error);
  551. }
  552. }
  553. void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, AuditLevel auditLevel, Exception exception)
  554. {
  555. if ((this.channelListener.AuditBehavior.MessageAuthenticationAuditLevel & auditLevel) != auditLevel)
  556. {
  557. return;
  558. }
  559. if (securityUpgradeAcceptor == null)
  560. {
  561. return;
  562. }
  563. String primaryIdentity = String.Empty;
  564. SecurityMessageProperty clientSecurity = securityUpgradeAcceptor.GetRemoteSecurity();
  565. if (clientSecurity != null)
  566. {
  567. primaryIdentity = GetIdentityNameFromContext(clientSecurity);
  568. }
  569. ServiceSecurityAuditBehavior auditBehavior = this.channelListener.AuditBehavior;
  570. if (auditLevel == AuditLevel.Success)
  571. {
  572. SecurityAuditHelper.WriteTransportAuthenticationSuccessEvent(auditBehavior.AuditLogLocation,
  573. auditBehavior.SuppressAuditFailure, null, this.LocalVia, primaryIdentity);
  574. }
  575. else
  576. {
  577. SecurityAuditHelper.WriteTransportAuthenticationFailureEvent(auditBehavior.AuditLogLocation,
  578. auditBehavior.SuppressAuditFailure, null, this.LocalVia, primaryIdentity, exception);
  579. }
  580. }
  581. [MethodImpl(MethodImplOptions.NoInlining)]
  582. static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity)
  583. {
  584. return SecurityUtils.GetIdentityNamesFromContext(
  585. clientSecurity.ServiceSecurityContext.AuthorizationContext);
  586. }
  587. #endregion
  588. class OpenAsyncResult : AsyncResult
  589. {
  590. ServerFramingDuplexSessionChannel channel;
  591. TimeoutHelper timeoutHelper;
  592. static WaitCallback readCallback;
  593. static WaitCallback onWriteAckResponse;
  594. static WaitCallback onWriteUpgradeResponse;
  595. static AsyncCallback onUpgradeConnection;
  596. public OpenAsyncResult(ServerFramingDuplexSessionChannel channel, TimeSpan timeout,
  597. AsyncCallback callback, object state)
  598. : base(callback, state)
  599. {
  600. this.channel = channel;
  601. this.timeoutHelper = new TimeoutHelper(timeout);
  602. bool completeSelf = false;
  603. bool success = false;
  604. try
  605. {
  606. channel.ValidateContentType(ref this.timeoutHelper);
  607. completeSelf = ContinueReading();
  608. success = true;
  609. }
  610. finally
  611. {
  612. if (!success)
  613. {
  614. CleanupOnError();
  615. }
  616. }
  617. if (completeSelf)
  618. {
  619. base.Complete(true);
  620. }
  621. }
  622. public static void End(IAsyncResult result)
  623. {
  624. AsyncResult.End<OpenAsyncResult>(result);
  625. }
  626. void CleanupOnError()
  627. {
  628. this.channel.Connection.Abort();
  629. }
  630. bool ContinueReading()
  631. {
  632. for (;;)
  633. {
  634. if (channel.size == 0)
  635. {
  636. if (readCallback == null)
  637. {
  638. readCallback = new WaitCallback(ReadCallback);
  639. }
  640. if (channel.Connection.BeginRead(0, channel.connectionBuffer.Length, timeoutHelper.RemainingTime(),
  641. readCallback, this) == AsyncCompletionResult.Queued)
  642. {
  643. return false;
  644. }
  645. GetReadResult();
  646. }
  647. for (;;)
  648. {
  649. channel.DecodeBytes();
  650. switch (channel.decoder.CurrentState)
  651. {
  652. case ServerSessionDecoder.State.UpgradeRequest:
  653. channel.ProcessUpgradeRequest(ref this.timeoutHelper);
  654. // accept upgrade
  655. if (onWriteUpgradeResponse == null)
  656. {
  657. onWriteUpgradeResponse = Fx.ThunkCallback(new WaitCallback(OnWriteUpgradeResponse));
  658. }
  659. AsyncCompletionResult writeResult = channel.Connection.BeginWrite(
  660. ServerSessionEncoder.UpgradeResponseBytes, 0, ServerSessionEncoder.UpgradeResponseBytes.Length,
  661. true, timeoutHelper.RemainingTime(), onWriteUpgradeResponse, this);
  662. if (writeResult == AsyncCompletionResult.Queued)
  663. {
  664. return false;
  665. }
  666. if (!HandleWriteUpgradeResponseComplete())
  667. {
  668. return false;
  669. }
  670. break;
  671. case ServerSessionDecoder.State.Start:
  672. channel.SetupSecurityIfNecessary();
  673. // we've finished the preamble. Ack and return.
  674. if (onWriteAckResponse == null)
  675. {
  676. onWriteAckResponse = Fx.ThunkCallback(new WaitCallback(OnWriteAckResponse));
  677. }
  678. AsyncCompletionResult writeAckResult =
  679. channel.Connection.BeginWrite(ServerSessionEncoder.AckResponseBytes, 0,
  680. ServerSessionEncoder.AckResponseBytes.Length, true, timeoutHelper.RemainingTime(),
  681. onWriteAckResponse, this);
  682. if (writeAckResult == AsyncCompletionResult.Queued)
  683. {
  684. return false;
  685. }
  686. return HandleWriteAckComplete();
  687. }
  688. if (channel.size == 0)
  689. break;
  690. }
  691. }
  692. }
  693. void GetReadResult()
  694. {
  695. channel.offset = 0;
  696. channel.size = channel.Connection.EndRead();
  697. if (channel.size == 0)
  698. {
  699. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(channel.decoder.CreatePrematureEOFException());
  700. }
  701. }
  702. bool HandleWriteUpgradeResponseComplete()
  703. {
  704. channel.Connection.EndWrite();
  705. IConnection connectionToUpgrade = channel.Connection;
  706. if (channel.size > 0)
  707. {
  708. connectionToUpgrade = new PreReadConnection(connectionToUpgrade, channel.connectionBuffer, channel.offset, channel.size);
  709. }
  710. if (onUpgradeConnection == null)
  711. {
  712. onUpgradeConnection = Fx.ThunkCallback(new AsyncCallback(OnUpgradeConnection));
  713. }
  714. try
  715. {
  716. IAsyncResult upgradeConnectionResult = InitialServerConnectionReader.BeginUpgradeConnection(
  717. connectionToUpgrade, channel.upgradeAcceptor, channel, onUpgradeConnection, this);
  718. if (!upgradeConnectionResult.CompletedSynchronously)
  719. {
  720. return false;
  721. }
  722. return HandleUpgradeConnectionComplete(upgradeConnectionResult);
  723. }
  724. #pragma warning suppress 56500
  725. catch (Exception exception)
  726. {
  727. if (Fx.IsFatal(exception))
  728. {
  729. throw;
  730. }
  731. // Audit Authentication Failure
  732. this.channel.WriteAuditFailure(channel.upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
  733. throw;
  734. }
  735. }
  736. bool HandleUpgradeConnectionComplete(IAsyncResult result)
  737. {
  738. channel.AcceptUpgradedConnection(InitialServerConnectionReader.EndUpgradeConnection(result));
  739. return true;
  740. }
  741. bool HandleWriteAckComplete()
  742. {
  743. channel.Connection.EndWrite();
  744. channel.SetupSessionReader();
  745. return true;
  746. }
  747. static void ReadCallback(object state)
  748. {
  749. OpenAsyncResult thisPtr = (OpenAsyncResult)state;
  750. bool completeSelf = false;
  751. Exception completionException = null;
  752. try
  753. {
  754. thisPtr.GetReadResult();
  755. completeSelf = thisPtr.ContinueReading();
  756. }
  757. catch (Exception e)
  758. {
  759. if (Fx.IsFatal(e))
  760. {
  761. throw;
  762. }
  763. completeSelf = true;
  764. completionException = e;
  765. thisPtr.CleanupOnError();
  766. }
  767. if (completeSelf)
  768. {
  769. thisPtr.Complete(false, completionException);
  770. }
  771. }
  772. static void OnWriteUpgradeResponse(object asyncState)
  773. {
  774. OpenAsyncResult thisPtr = (OpenAsyncResult)asyncState;
  775. bool completeSelf = false;
  776. Exception completionException = null;
  777. try
  778. {
  779. completeSelf = thisPtr.HandleWriteUpgradeResponseComplete();
  780. if (completeSelf)
  781. {
  782. completeSelf = thisPtr.ContinueReading();
  783. }
  784. }
  785. catch (Exception e)
  786. {
  787. if (Fx.IsFatal(e))
  788. {
  789. throw;
  790. }
  791. completionException = e;
  792. completeSelf = true;
  793. thisPtr.CleanupOnError();
  794. // Audit Authentication Failure
  795. thisPtr.channel.WriteAuditFailure(thisPtr.channel.upgradeAcceptor as StreamSecurityUpgradeAcceptor, e);
  796. }
  797. if (completeSelf)
  798. {
  799. thisPtr.Complete(false, completionException);
  800. }
  801. }
  802. static void OnUpgradeConnection(IAsyncResult result)
  803. {
  804. if (result.CompletedSynchronously)
  805. {
  806. return;
  807. }
  808. OpenAsyncResult thisPtr = (OpenAsyncResult)result.AsyncState;
  809. bool completeSelf = false;
  810. Exception completionException = null;
  811. try
  812. {
  813. completeSelf = thisPtr.HandleUpgradeConnectionComplete(result);
  814. if (completeSelf)
  815. {
  816. completeSelf = thisPtr.ContinueReading();
  817. }
  818. }
  819. catch (Exception e)
  820. {
  821. if (Fx.IsFatal(e))
  822. {
  823. throw;
  824. }
  825. completionException = e;
  826. completeSelf = true;
  827. thisPtr.CleanupOnError();
  828. // Audit Authentication Failure
  829. thisPtr.channel.WriteAuditFailure(thisPtr.channel.upgradeAcceptor as StreamSecurityUpgradeAcceptor, e);
  830. }
  831. if (completeSelf)
  832. {
  833. thisPtr.Complete(false, completionException);
  834. }
  835. }
  836. static void OnWriteAckResponse(object asyncState)
  837. {
  838. OpenAsyncResult thisPtr = (OpenAsyncResult)asyncState;
  839. bool completeSelf = false;
  840. Exception completionException = null;
  841. try
  842. {
  843. completeSelf = thisPtr.HandleWriteAckComplete();
  844. }
  845. catch (Exception e)
  846. {
  847. if (Fx.IsFatal(e))
  848. {
  849. throw;
  850. }
  851. completionException = e;
  852. completeSelf = true;
  853. thisPtr.CleanupOnError();
  854. }
  855. if (completeSelf)
  856. {
  857. thisPtr.Complete(false, completionException);
  858. }
  859. }
  860. }
  861. class ServerSessionConnectionReader : SessionConnectionReader
  862. {
  863. ServerSessionDecoder decoder;
  864. int maxBufferSize;
  865. BufferManager bufferManager;
  866. MessageEncoder messageEncoder;
  867. string contentType;
  868. IConnection rawConnection;
  869. public ServerSessionConnectionReader(ServerFramingDuplexSessionChannel channel)
  870. : base(channel.Connection, channel.rawConnection, channel.offset, channel.size, channel.RemoteSecurity)
  871. {
  872. this.decoder = channel.decoder;
  873. this.contentType = this.decoder.ContentType;
  874. this.maxBufferSize = channel.channelListener.MaxBufferSize;
  875. this.bufferManager = channel.channelListener.BufferManager;
  876. this.messageEncoder = channel.MessageEncoder;
  877. this.rawConnection = channel.rawConnection;
  878. }
  879. protected override void EnsureDecoderAtEof()
  880. {
  881. if (!(decoder.CurrentState == ServerSessionDecoder.State.End || decoder.CurrentState == ServerSessionDecoder.State.EnvelopeEnd))
  882. {
  883. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
  884. }
  885. }
  886. protected override Message DecodeMessage(byte[] buffer, ref int offset, ref int size, ref bool isAtEof, TimeSpan timeout)
  887. {
  888. while (!isAtEof && size > 0)
  889. {
  890. int bytesRead = decoder.Decode(buffer, offset, size);
  891. if (bytesRead > 0)
  892. {
  893. if (EnvelopeBuffer != null)
  894. {
  895. if (!object.ReferenceEquals(buffer, EnvelopeBuffer))
  896. {
  897. System.Buffer.BlockCopy(buffer, offset, EnvelopeBuffer, EnvelopeOffset, bytesRead);
  898. }
  899. EnvelopeOffset += bytesRead;
  900. }
  901. offset += bytesRead;
  902. size -= bytesRead;
  903. }
  904. switch (decoder.CurrentState)
  905. {
  906. case ServerSessionDecoder.State.EnvelopeStart:
  907. int envelopeSize = decoder.EnvelopeSize;
  908. if (envelopeSize > maxBufferSize)
  909. {
  910. base.SendFault(FramingEncodingString.MaxMessageSizeExceededFault, timeout);
  911. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  912. MaxMessageSizeStream.CreateMaxReceivedMessageSizeExceededException(maxBufferSize));
  913. }
  914. EnvelopeBuffer = bufferManager.TakeBuffer(envelopeSize);
  915. EnvelopeOffset = 0;
  916. EnvelopeSize = envelopeSize;
  917. break;
  918. case ServerSessionDecoder.State.EnvelopeEnd:
  919. if (EnvelopeBuffer != null)
  920. {
  921. using (ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? ServiceModelActivity.CreateBoundedActivity(true) : null)
  922. {
  923. if (DiagnosticUtility.ShouldUseActivity)
  924. {
  925. ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage);
  926. }
  927. Message message = null;
  928. try
  929. {
  930. message = messageEncoder.ReadMessage(new ArraySegment<byte>(EnvelopeBuffer, 0, EnvelopeSize), bufferManager, this.contentType);
  931. }
  932. catch (XmlException xmlException)
  933. {
  934. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  935. new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException));
  936. }
  937. if (DiagnosticUtility.ShouldUseActivity)
  938. {
  939. TraceUtility.TransferFromTransport(message);
  940. }
  941. EnvelopeBuffer = null;
  942. return message;
  943. }
  944. }
  945. break;
  946. case ServerSessionDecoder.State.End:
  947. isAtEof = true;
  948. break;
  949. }
  950. }
  951. return null;
  952. }
  953. protected override void PrepareMessage(Message message)
  954. {
  955. base.PrepareMessage(message);
  956. IPEndPoint remoteEndPoint = this.rawConnection.RemoteIPEndPoint;
  957. // pipes will return null
  958. if (remoteEndPoint != null)
  959. {
  960. RemoteEndpointMessageProperty remoteEndpointProperty = new RemoteEndpointMessageProperty(remoteEndPoint);
  961. message.Properties.Add(RemoteEndpointMessageProperty.Name, remoteEndpointProperty);
  962. }
  963. }
  964. }
  965. }
  966. }
  967. abstract class SessionConnectionReader : IMessageSource
  968. {
  969. bool isAtEOF;
  970. bool usingAsyncReadBuffer;
  971. IConnection connection;
  972. byte[] buffer;
  973. int offset;
  974. int size;
  975. byte[] envelopeBuffer;
  976. int envelopeOffset;
  977. int envelopeSize;
  978. bool readIntoEnvelopeBuffer;
  979. WaitCallback onAsyncReadComplete;
  980. Message pendingMessage;
  981. Exception pendingException;
  982. WaitCallback pendingCallback;
  983. object pendingCallbackState;
  984. SecurityMessageProperty security;
  985. TimeoutHelper readTimeoutHelper;
  986. // Raw connection that we will revert to after end handshake
  987. IConnection rawConnection;
  988. protected SessionConnectionReader(IConnection connection, IConnection rawConnection,
  989. int offset, int size, SecurityMessageProperty security)
  990. {
  991. this.offset = offset;
  992. this.size = size;
  993. if (size > 0)
  994. {
  995. this.buffer = connection.AsyncReadBuffer;
  996. }
  997. this.connection = connection;
  998. this.rawConnection = rawConnection;
  999. this.onAsyncReadComplete = new WaitCallback(OnAsyncReadComplete);
  1000. this.security = security;
  1001. }
  1002. Message DecodeMessage(TimeSpan timeout)
  1003. {
  1004. if (DiagnosticUtility.ShouldUseActivity &&
  1005. ServiceModelActivity.Current != null &&
  1006. ServiceModelActivity.Current.ActivityType == ActivityType.ProcessAction)
  1007. {
  1008. ServiceModelActivity.Current.Resume();
  1009. }
  1010. if (!readIntoEnvelopeBuffer)
  1011. {
  1012. return DecodeMessage(buffer, ref offset, ref size, ref isAtEOF, timeout);
  1013. }
  1014. else
  1015. {
  1016. // decode from the envelope buffer
  1017. int dummyOffset = this.envelopeOffset;
  1018. return DecodeMessage(envelopeBuffer, ref dummyOffset, ref size, ref isAtEOF, timeout);
  1019. }
  1020. }
  1021. protected abstract Message DecodeMessage(byte[] buffer, ref int offset, ref int size, ref bool isAtEof, TimeSpan timeout);
  1022. protected byte[] EnvelopeBuffer
  1023. {
  1024. get { return envelopeBuffer; }
  1025. set { envelopeBuffer = value; }
  1026. }
  1027. protected int EnvelopeOffset
  1028. {
  1029. get { return envelopeOffset; }
  1030. set { envelopeOffset = value; }
  1031. }
  1032. protected int EnvelopeSize
  1033. {
  1034. get { return envelopeSize; }
  1035. set { envelopeSize = value; }
  1036. }
  1037. public IConnection GetRawConnection()
  1038. {
  1039. IConnection result = null;
  1040. if (this.rawConnection != null)
  1041. {
  1042. result = this.rawConnection;
  1043. this.rawConnection = null;
  1044. if (size > 0)
  1045. {
  1046. PreReadConnection preReadConnection = result as PreReadConnection;
  1047. if (preReadConnection != null) // make sure we don't keep wrapping
  1048. {
  1049. preReadConnection.AddPreReadData(this.buffer, this.offset, this.size);
  1050. }
  1051. else
  1052. {
  1053. result = new PreReadConnection(result, this.buffer, this.offset, this.size);
  1054. }
  1055. }
  1056. }
  1057. return result;
  1058. }
  1059. public AsyncReceiveResult BeginReceive(TimeSpan timeout, WaitCallback callback, object state)
  1060. {
  1061. if (pendingMessage != null || pendingException != null)
  1062. {
  1063. return AsyncReceiveResult.Completed;
  1064. }
  1065. this.readTimeoutHelper = new TimeoutHelper(timeout);
  1066. for (;;)
  1067. {
  1068. if (isAtEOF)
  1069. {
  1070. return AsyncReceiveResult.Completed;
  1071. }
  1072. if (size > 0)
  1073. {
  1074. pendingMessage = DecodeMessage(readTimeoutHelper.RemainingTime());
  1075. if (pendingMessage != null)
  1076. {
  1077. PrepareMessage(pendingMessage);
  1078. return AsyncReceiveResult.Completed;
  1079. }
  1080. else if (isAtEOF) // could have read the END record under DecodeMessage
  1081. {
  1082. return AsyncReceiveResult.Completed;
  1083. }
  1084. }
  1085. if (size != 0)
  1086. {
  1087. throw Fx.AssertAndThrow("BeginReceive: DecodeMessage() should consume the outstanding buffer or return a message.");
  1088. }
  1089. if (!usingAsyncReadBuffer)
  1090. {
  1091. buffer = connection.AsyncReadBuffer;
  1092. usingAsyncReadBuffer = true;
  1093. }
  1094. pendingCallback = callback;
  1095. pendingCallbackState = state;
  1096. bool throwing = true;
  1097. AsyncCompletionResult asyncReadResult;
  1098. try
  1099. {
  1100. asyncReadResult =
  1101. connection.BeginRead(0, buffer.Length, readTimeoutHelper.RemainingTime(), onAsyncReadComplete, null);
  1102. throwing = false;
  1103. }
  1104. finally
  1105. {
  1106. if (throwing)
  1107. {
  1108. pendingCallback = null;
  1109. pendingCallbackState = null;
  1110. }
  1111. }
  1112. if (asyncReadResult == AsyncCompletionResult.Queued)
  1113. {
  1114. return AsyncReceiveResult.Pending;
  1115. }
  1116. pendingCallback = null;
  1117. pendingCallbackState = null;
  1118. int bytesRead = connection.EndRead();
  1119. HandleReadComplete(bytesRead, false);
  1120. }
  1121. }
  1122. public Message Receive(TimeSpan timeout)
  1123. {
  1124. Message message = GetPendingMessage();
  1125. if (message != null)
  1126. {
  1127. return message;
  1128. }
  1129. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  1130. for (;;)
  1131. {
  1132. if (isAtEOF)
  1133. {
  1134. return null;
  1135. }
  1136. if (size > 0)
  1137. {
  1138. message = DecodeMessage(timeoutHelper.RemainingTime());
  1139. if (message != null)
  1140. {
  1141. PrepareMessage(message);
  1142. return message;
  1143. }
  1144. else if (isAtEOF) // could have read the END record under DecodeMessage
  1145. {
  1146. return null;
  1147. }
  1148. }
  1149. if (size != 0)
  1150. {
  1151. throw Fx.AssertAndThrow("Receive: DecodeMessage() should consume the outstanding buffer or return a message.");
  1152. }
  1153. if (buffer == null)
  1154. {
  1155. buffer = DiagnosticUtility.Utility.AllocateByteArray(connection.AsyncReadBufferSize);
  1156. }
  1157. int bytesRead;
  1158. if (EnvelopeBuffer != null &&
  1159. (EnvelopeSize - EnvelopeOffset) >= buffer.Length)
  1160. {
  1161. bytesRead = connection.Read(EnvelopeBuffer, EnvelopeOffset, buffer.Length, timeoutHelper.RemainingTime());
  1162. HandleReadComplete(bytesRead, true);
  1163. }
  1164. else
  1165. {
  1166. bytesRead = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime());
  1167. HandleReadComplete(bytesRead, false);
  1168. }
  1169. }
  1170. }
  1171. public Message EndReceive()
  1172. {
  1173. return GetPendingMessage();
  1174. }
  1175. Message GetPendingMessage()
  1176. {
  1177. if (pendingException != null)
  1178. {
  1179. Exception exception = pendingException;
  1180. pendingException = null;
  1181. throw TraceUtility.ThrowHelperError(exception, pendingMessage);
  1182. }
  1183. if (pendingMessage != null)
  1184. {
  1185. Message message = pendingMessage;
  1186. pendingMessage = null;
  1187. return message;
  1188. }
  1189. return null;
  1190. }
  1191. public AsyncReceiveResult BeginWaitForMessage(TimeSpan timeout, WaitCallback callback, object state)
  1192. {
  1193. try
  1194. {
  1195. return BeginReceive(timeout, callback, state);
  1196. }
  1197. catch (TimeoutException e)
  1198. {
  1199. pendingException = e;
  1200. return AsyncReceiveResult.Completed;
  1201. }
  1202. }
  1203. public bool EndWaitForMessage()
  1204. {
  1205. try
  1206. {
  1207. Message message = EndReceive();
  1208. this.pendingMessage = message;
  1209. return true;
  1210. }
  1211. catch (TimeoutException e)
  1212. {
  1213. if (TD.ReceiveTimeoutIsEnabled())
  1214. {
  1215. TD.ReceiveTimeout(e.Message);
  1216. }
  1217. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  1218. return false;
  1219. }
  1220. }
  1221. public bool WaitForMessage(TimeSpan timeout)
  1222. {
  1223. try
  1224. {
  1225. Message message = Receive(timeout);
  1226. this.pendingMessage = message;
  1227. return true;
  1228. }
  1229. catch (TimeoutException e)
  1230. {
  1231. if (TD.ReceiveTimeoutIsEnabled())
  1232. {
  1233. TD.ReceiveTimeout(e.Message);
  1234. }
  1235. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  1236. return false;
  1237. }
  1238. }
  1239. protected abstract void EnsureDecoderAtEof();
  1240. void HandleReadComplete(int bytesRead, bool readIntoEnvelopeBuffer)
  1241. {
  1242. this.readIntoEnvelopeBuffer = readIntoEnvelopeBuffer;
  1243. if (bytesRead == 0)
  1244. {
  1245. EnsureDecoderAtEof();
  1246. isAtEOF = true;
  1247. }
  1248. else
  1249. {
  1250. this.offset = 0;
  1251. this.size = bytesRead;
  1252. }
  1253. }
  1254. void OnAsyncReadComplete(object state)
  1255. {
  1256. try
  1257. {
  1258. for (;;)
  1259. {
  1260. int bytesRead = connection.EndRead();
  1261. HandleReadComplete(bytesRead, false);
  1262. if (isAtEOF)
  1263. {
  1264. break;
  1265. }
  1266. Message message = DecodeMessage(this.readTimeoutHelper.RemainingTime());
  1267. if (message != null)
  1268. {
  1269. PrepareMessage(message);
  1270. this.pendingMessage = message;
  1271. break;
  1272. }
  1273. else if (isAtEOF) // could have read the END record under DecodeMessage
  1274. {
  1275. break;
  1276. }
  1277. if (size != 0)
  1278. {
  1279. throw Fx.AssertAndThrow("OnAsyncReadComplete: DecodeMessage() should consume the outstanding buffer or return a message.");
  1280. }
  1281. if (connection.BeginRead(0, buffer.Length, this.readTimeoutHelper.RemainingTime(),
  1282. onAsyncReadComplete, null) == AsyncCompletionResult.Queued)
  1283. {
  1284. return;
  1285. }
  1286. }
  1287. }
  1288. #pragma warning suppress 56500 // [....], transferring exception to caller
  1289. catch (Exception e)
  1290. {
  1291. if (Fx.IsFatal(e))
  1292. {
  1293. throw;
  1294. }
  1295. pendingException = e;
  1296. }
  1297. WaitCallback callback = pendingCallback;
  1298. object callbackState = pendingCallbackState;
  1299. pendingCallback = null;
  1300. pendingCallbackState = null;
  1301. callback(callbackState);
  1302. }
  1303. protected virtual void PrepareMessage(Message message)
  1304. {
  1305. if (security != null)
  1306. {
  1307. message.Properties.Security = (SecurityMessageProperty)security.CreateCopy();
  1308. }
  1309. }
  1310. protected void SendFault(string faultString, TimeSpan timeout)
  1311. {
  1312. byte[] drainBuffer = new byte[128];
  1313. InitialServerConnectionReader.SendFault(
  1314. connection, faultString, drainBuffer, timeout,
  1315. TransportDefaults.MaxDrainSize);
  1316. }
  1317. }
  1318. class ClientDuplexConnectionReader : SessionConnectionReader
  1319. {
  1320. ClientDuplexDecoder decoder;
  1321. int maxBufferSize;
  1322. BufferManager bufferManager;
  1323. MessageEncoder messageEncoder;
  1324. ClientFramingDuplexSessionChannel channel;
  1325. public ClientDuplexConnectionReader(ClientFramingDuplexSessionChannel channel, IConnection connection, ClientDuplexDecoder decoder,
  1326. IConnectionOrientedTransportFactorySettings settings, MessageEncoder messageEncoder)
  1327. : base(connection, null, 0, 0, null)
  1328. {
  1329. this.decoder = decoder;
  1330. this.maxBufferSize = settings.MaxBufferSize;
  1331. this.bufferManager = settings.BufferManager;
  1332. this.messageEncoder = messageEncoder;
  1333. this.channel = channel;
  1334. }
  1335. protected override void EnsureDecoderAtEof()
  1336. {
  1337. if (!(decoder.CurrentState == ClientFramingDecoderState.End
  1338. || decoder.CurrentState == ClientFramingDecoderState.EnvelopeEnd
  1339. || decoder.CurrentState == ClientFramingDecoderState.ReadingUpgradeRecord
  1340. || decoder.CurrentState == ClientFramingDecoderState.UpgradeResponse))
  1341. {
  1342. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
  1343. }
  1344. }
  1345. static IDisposable CreateProcessActionActivity()
  1346. {
  1347. IDisposable retval = null;
  1348. if (DiagnosticUtility.ShouldUseActivity)
  1349. {
  1350. if ((ServiceModelActivity.Current != null) &&
  1351. (ServiceModelActivity.Current.ActivityType == ActivityType.ProcessAction))
  1352. {
  1353. // Do nothing-- we are already OK
  1354. }
  1355. else if ((ServiceModelActivity.Current != null) &&
  1356. (ServiceModelActivity.Current.PreviousActivity != null) &&
  1357. (ServiceModelActivity.Current.PreviousActivity.ActivityType == ActivityType.ProcessAction))
  1358. {
  1359. retval = ServiceModelActivity.BoundOperation(ServiceModelActivity.Current.PreviousActivity);
  1360. }
  1361. else
  1362. {
  1363. ServiceModelActivity activity = ServiceModelActivity.CreateBoundedActivity(true);
  1364. ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage);
  1365. retval = activity;
  1366. }
  1367. }
  1368. return retval;
  1369. }
  1370. protected override Message DecodeMessage(byte[] buffer, ref int offset, ref int size, ref bool isAtEOF, TimeSpan timeout)
  1371. {
  1372. while (size > 0)
  1373. {
  1374. int bytesRead = decoder.Decode(buffer, offset, size);
  1375. if (bytesRead > 0)
  1376. {
  1377. if (EnvelopeBuffer != null)
  1378. {
  1379. if (!object.ReferenceEquals(buffer, EnvelopeBuffer))
  1380. System.Buffer.BlockCopy(buffer, offset, EnvelopeBuffer, EnvelopeOffset, bytesRead);
  1381. EnvelopeOffset += bytesRead;
  1382. }
  1383. offset += bytesRead;
  1384. size -= bytesRead;
  1385. }
  1386. switch (decoder.CurrentState)
  1387. {
  1388. case ClientFramingDecoderState.Fault:
  1389. channel.Session.CloseOutputSession(channel.InternalCloseTimeout);
  1390. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(FaultStringDecoder.GetFaultException(decoder.Fault, channel.RemoteAddress.Uri.ToString(), messageEncoder.ContentType));
  1391. case ClientFramingDecoderState.End:
  1392. isAtEOF = true;
  1393. return null; // we're done
  1394. case ClientFramingDecoderState.EnvelopeStart:
  1395. int envelopeSize = decoder.EnvelopeSize;
  1396. if (envelopeSize > maxBufferSize)
  1397. {
  1398. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  1399. MaxMessageSizeStream.CreateMaxReceivedMessageSizeExceededException(maxBufferSize));
  1400. }
  1401. EnvelopeBuffer = bufferManager.TakeBuffer(envelopeSize);
  1402. EnvelopeOffset = 0;
  1403. EnvelopeSize = envelopeSize;
  1404. break;
  1405. case ClientFramingDecoderState.EnvelopeEnd:
  1406. if (EnvelopeBuffer != null)
  1407. {
  1408. Message message = null;
  1409. try
  1410. {
  1411. IDisposable activity = ClientDuplexConnectionReader.CreateProcessActionActivity();
  1412. using (activity)
  1413. {
  1414. message = messageEncoder.ReadMessage(new ArraySegment<byte>(EnvelopeBuffer, 0, EnvelopeSize), bufferManager);
  1415. if (DiagnosticUtility.ShouldUseActivity)
  1416. {
  1417. TraceUtility.TransferFromTransport(message);
  1418. }
  1419. }
  1420. }
  1421. catch (XmlException xmlException)
  1422. {
  1423. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  1424. new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException));
  1425. }
  1426. EnvelopeBuffer = null;
  1427. return message;
  1428. }
  1429. break;
  1430. }
  1431. }
  1432. return null;
  1433. }
  1434. }
  1435. }