ReliableOutputSessionChannel.cs 41 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123
  1. //----------------------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //----------------------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Runtime;
  7. using System.ServiceModel;
  8. using System.Threading;
  9. using System.ServiceModel.Diagnostics.Application;
  10. abstract class ReliableOutputSessionChannel : OutputChannel, IOutputSessionChannel
  11. {
  12. IClientReliableChannelBinder binder;
  13. ChannelParameterCollection channelParameters;
  14. ReliableRequestor closeRequestor;
  15. ReliableOutputConnection connection;
  16. Exception maxRetryCountException = null;
  17. ClientReliableSession session;
  18. IReliableFactorySettings settings;
  19. ReliableRequestor terminateRequestor;
  20. protected ReliableOutputSessionChannel(
  21. ChannelManagerBase factory,
  22. IReliableFactorySettings settings,
  23. IClientReliableChannelBinder binder,
  24. FaultHelper faultHelper,
  25. LateBoundChannelParameterCollection channelParameters)
  26. : base(factory)
  27. {
  28. this.settings = settings;
  29. this.binder = binder;
  30. this.session = new ClientReliableSession(this, settings, binder, faultHelper, null);
  31. this.session.PollingCallback = this.PollingCallback;
  32. this.session.UnblockChannelCloseCallback = this.UnblockClose;
  33. this.binder.Faulted += OnBinderFaulted;
  34. this.binder.OnException += OnBinderException;
  35. this.channelParameters = channelParameters;
  36. channelParameters.SetChannel(this);
  37. }
  38. protected IReliableChannelBinder Binder
  39. {
  40. get
  41. {
  42. return this.binder;
  43. }
  44. }
  45. protected ReliableOutputConnection Connection
  46. {
  47. get
  48. {
  49. return this.connection;
  50. }
  51. }
  52. protected Exception MaxRetryCountException
  53. {
  54. set
  55. {
  56. this.maxRetryCountException = value;
  57. }
  58. }
  59. protected ChannelReliableSession ReliableSession
  60. {
  61. get
  62. {
  63. return this.session;
  64. }
  65. }
  66. public override EndpointAddress RemoteAddress
  67. {
  68. get
  69. {
  70. return this.binder.RemoteAddress;
  71. }
  72. }
  73. protected abstract bool RequestAcks
  74. {
  75. get;
  76. }
  77. public IOutputSession Session
  78. {
  79. get
  80. {
  81. return this.session;
  82. }
  83. }
  84. public override Uri Via
  85. {
  86. get
  87. {
  88. return this.binder.Via;
  89. }
  90. }
  91. protected IReliableFactorySettings Settings
  92. {
  93. get
  94. {
  95. return this.settings;
  96. }
  97. }
  98. void CloseSequence(TimeSpan timeout)
  99. {
  100. this.CreateCloseRequestor();
  101. Message closeReply = this.closeRequestor.Request(timeout);
  102. this.ProcessCloseOrTerminateReply(true, closeReply);
  103. }
  104. IAsyncResult BeginCloseSequence(TimeSpan timeout, AsyncCallback callback, object state)
  105. {
  106. this.CreateCloseRequestor();
  107. return this.closeRequestor.BeginRequest(timeout, callback, state);
  108. }
  109. void EndCloseSequence(IAsyncResult result)
  110. {
  111. Message closeReply = this.closeRequestor.EndRequest(result);
  112. this.ProcessCloseOrTerminateReply(true, closeReply);
  113. }
  114. void ConfigureRequestor(ReliableRequestor requestor)
  115. {
  116. requestor.MessageVersion = this.settings.MessageVersion;
  117. requestor.Binder = this.binder;
  118. requestor.SetRequestResponsePattern();
  119. }
  120. void CreateCloseRequestor()
  121. {
  122. ReliableRequestor temp = this.CreateRequestor();
  123. this.ConfigureRequestor(temp);
  124. temp.TimeoutString1Index = SR.TimeoutOnClose;
  125. temp.MessageAction = WsrmIndex.GetCloseSequenceActionHeader(
  126. this.settings.MessageVersion.Addressing);
  127. temp.MessageBody = new CloseSequence(this.session.OutputID, this.connection.Last);
  128. lock (this.ThisLock)
  129. {
  130. this.ThrowIfClosed();
  131. this.closeRequestor = temp;
  132. }
  133. }
  134. protected abstract ReliableRequestor CreateRequestor();
  135. void CreateTerminateRequestor()
  136. {
  137. ReliableRequestor temp = this.CreateRequestor();
  138. this.ConfigureRequestor(temp);
  139. ReliableMessagingVersion reliableMessagingVersion = this.settings.ReliableMessagingVersion;
  140. temp.MessageAction = WsrmIndex.GetTerminateSequenceActionHeader(
  141. this.settings.MessageVersion.Addressing, reliableMessagingVersion);
  142. temp.MessageBody = new TerminateSequence(reliableMessagingVersion, this.session.OutputID,
  143. this.connection.Last);
  144. lock (this.ThisLock)
  145. {
  146. this.ThrowIfClosed();
  147. this.terminateRequestor = temp;
  148. this.session.CloseSession();
  149. }
  150. }
  151. public override T GetProperty<T>()
  152. {
  153. if (typeof(T) == typeof(IOutputSessionChannel))
  154. {
  155. return (T)(object)this;
  156. }
  157. if (typeof(T) == typeof(ChannelParameterCollection))
  158. {
  159. return (T)(object)this.channelParameters;
  160. }
  161. T baseProperty = base.GetProperty<T>();
  162. if (baseProperty != null)
  163. {
  164. return baseProperty;
  165. }
  166. T innerProperty = this.binder.Channel.GetProperty<T>();
  167. if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
  168. {
  169. return (T)(object)FaultConverter.GetDefaultFaultConverter(this.settings.MessageVersion);
  170. }
  171. else
  172. {
  173. return innerProperty;
  174. }
  175. }
  176. protected override void OnAbort()
  177. {
  178. if (this.connection != null)
  179. {
  180. this.connection.Abort(this);
  181. }
  182. ReliableRequestor tempRequestor = this.closeRequestor;
  183. if (tempRequestor != null)
  184. {
  185. tempRequestor.Abort(this);
  186. }
  187. tempRequestor = this.terminateRequestor;
  188. if (tempRequestor != null)
  189. {
  190. tempRequestor.Abort(this);
  191. }
  192. this.session.Abort();
  193. }
  194. protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  195. {
  196. bool wsrm11 = this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
  197. OperationWithTimeoutBeginCallback[] beginCallbacks = new OperationWithTimeoutBeginCallback[]
  198. {
  199. this.connection.BeginClose,
  200. wsrm11 ? this.BeginCloseSequence : default(OperationWithTimeoutBeginCallback),
  201. this.BeginTerminateSequence,
  202. this.session.BeginClose
  203. };
  204. OperationEndCallback[] endCallbacks = new OperationEndCallback[]
  205. {
  206. this.connection.EndClose,
  207. wsrm11 ? this.EndCloseSequence : default(OperationEndCallback),
  208. this.EndTerminateSequence,
  209. this.session.EndClose
  210. };
  211. return new ReliableChannelCloseAsyncResult(beginCallbacks, endCallbacks, this.binder,
  212. timeout, callback, state);
  213. }
  214. protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  215. {
  216. return new ReliableChannelOpenAsyncResult(this.binder, this.session, timeout,
  217. callback, state);
  218. }
  219. protected override IAsyncResult OnBeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  220. {
  221. return this.connection.BeginAddMessage(message, timeout, null, callback, state);
  222. }
  223. void OnBinderException(IReliableChannelBinder sender, Exception exception)
  224. {
  225. if (exception is QuotaExceededException)
  226. {
  227. if (this.State == CommunicationState.Opening ||
  228. this.State == CommunicationState.Opened ||
  229. this.State == CommunicationState.Closing)
  230. {
  231. this.session.OnLocalFault(exception, SequenceTerminatedFault.CreateQuotaExceededFault(this.session.OutputID), null);
  232. }
  233. }
  234. else
  235. {
  236. this.AddPendingException(exception);
  237. }
  238. }
  239. void OnBinderFaulted(IReliableChannelBinder sender, Exception exception)
  240. {
  241. this.binder.Abort();
  242. if (this.State == CommunicationState.Opening ||
  243. this.State == CommunicationState.Opened ||
  244. this.State == CommunicationState.Closing)
  245. {
  246. exception = new CommunicationException(SR.GetString(SR.EarlySecurityFaulted), exception);
  247. this.session.OnLocalFault(exception, (Message)null, null);
  248. }
  249. }
  250. protected override void OnClose(TimeSpan timeout)
  251. {
  252. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  253. this.connection.Close(timeoutHelper.RemainingTime());
  254. if (this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  255. {
  256. this.CloseSequence(timeoutHelper.RemainingTime());
  257. }
  258. this.TerminateSequence(timeoutHelper.RemainingTime());
  259. this.session.Close(timeoutHelper.RemainingTime());
  260. this.binder.Close(timeoutHelper.RemainingTime(), MaskingMode.Handled);
  261. }
  262. protected override void OnClosed()
  263. {
  264. base.OnClosed();
  265. this.binder.Faulted -= this.OnBinderFaulted;
  266. }
  267. protected abstract void OnConnectionSend(Message message, TimeSpan timeout, bool saveHandledException,
  268. bool maskUnhandledException);
  269. protected abstract IAsyncResult OnConnectionBeginSend(MessageAttemptInfo attemptInfo, TimeSpan timeout,
  270. bool maskUnhandledException, AsyncCallback callback, object state);
  271. protected abstract void OnConnectionEndSend(IAsyncResult result);
  272. void OnConnectionSendAckRequestedHandler(TimeSpan timeout)
  273. {
  274. this.session.OnLocalActivity();
  275. using (Message message = WsrmUtilities.CreateAckRequestedMessage(this.settings.MessageVersion,
  276. this.settings.ReliableMessagingVersion, this.ReliableSession.OutputID))
  277. {
  278. this.OnConnectionSend(message, timeout, false, true);
  279. }
  280. }
  281. IAsyncResult OnConnectionBeginSendAckRequestedHandler(TimeSpan timeout, AsyncCallback callback, object state)
  282. {
  283. this.session.OnLocalActivity();
  284. Message request = WsrmUtilities.CreateAckRequestedMessage(this.settings.MessageVersion,
  285. this.settings.ReliableMessagingVersion, this.ReliableSession.OutputID);
  286. return this.OnConnectionBeginSendMessage(request, timeout, callback, state);
  287. }
  288. void OnConnectionEndSendAckRequestedHandler(IAsyncResult result)
  289. {
  290. this.OnConnectionEndSendMessage(result);
  291. }
  292. void OnConnectionSendHandler(MessageAttemptInfo attemptInfo, TimeSpan timeout, bool maskUnhandledException)
  293. {
  294. using (attemptInfo.Message)
  295. {
  296. if (attemptInfo.RetryCount > this.settings.MaxRetryCount)
  297. {
  298. if (TD.MaxRetryCyclesExceededIsEnabled())
  299. {
  300. TD.MaxRetryCyclesExceeded(SR.GetString(SR.MaximumRetryCountExceeded));
  301. }
  302. this.session.OnLocalFault(new CommunicationException(SR.GetString(SR.MaximumRetryCountExceeded), this.maxRetryCountException),
  303. SequenceTerminatedFault.CreateMaxRetryCountExceededFault(this.session.OutputID), null);
  304. }
  305. else
  306. {
  307. this.session.OnLocalActivity();
  308. OnConnectionSend(attemptInfo.Message, timeout,
  309. (attemptInfo.RetryCount == this.settings.MaxRetryCount), maskUnhandledException);
  310. }
  311. }
  312. }
  313. IAsyncResult OnConnectionBeginSendHandler(MessageAttemptInfo attemptInfo, TimeSpan timeout, bool maskUnhandledException, AsyncCallback callback, object state)
  314. {
  315. if (attemptInfo.RetryCount > this.settings.MaxRetryCount)
  316. {
  317. if (TD.MaxRetryCyclesExceededIsEnabled())
  318. {
  319. TD.MaxRetryCyclesExceeded(SR.GetString(SR.MaximumRetryCountExceeded));
  320. }
  321. this.session.OnLocalFault(new CommunicationException(SR.GetString(SR.MaximumRetryCountExceeded), this.maxRetryCountException),
  322. SequenceTerminatedFault.CreateMaxRetryCountExceededFault(this.session.OutputID), null);
  323. return new CompletedAsyncResult(callback, state);
  324. }
  325. else
  326. {
  327. this.session.OnLocalActivity();
  328. return this.OnConnectionBeginSend(attemptInfo, timeout, maskUnhandledException, callback, state);
  329. }
  330. }
  331. void OnConnectionEndSendHandler(IAsyncResult result)
  332. {
  333. if (result is CompletedAsyncResult)
  334. CompletedAsyncResult.End(result);
  335. else
  336. OnConnectionEndSend(result);
  337. }
  338. protected abstract void OnConnectionSendMessage(Message message, TimeSpan timeout, MaskingMode maskingMode);
  339. protected abstract IAsyncResult OnConnectionBeginSendMessage(Message message, TimeSpan timeout,
  340. AsyncCallback callback, object state);
  341. protected abstract void OnConnectionEndSendMessage(IAsyncResult result);
  342. void OnComponentFaulted(Exception faultException, WsrmFault fault)
  343. {
  344. this.session.OnLocalFault(faultException, fault, null);
  345. }
  346. void OnComponentException(Exception exception)
  347. {
  348. this.ReliableSession.OnUnknownException(exception);
  349. }
  350. protected override void OnEndClose(IAsyncResult result)
  351. {
  352. ReliableChannelCloseAsyncResult.End(result);
  353. }
  354. protected override void OnEndOpen(IAsyncResult result)
  355. {
  356. ReliableChannelOpenAsyncResult.End(result);
  357. }
  358. protected override void OnEndSend(IAsyncResult result)
  359. {
  360. if (!this.connection.EndAddMessage(result))
  361. this.ThrowInvalidAddException();
  362. }
  363. protected override void OnFaulted()
  364. {
  365. this.session.OnFaulted();
  366. this.UnblockClose();
  367. base.OnFaulted();
  368. }
  369. protected override void OnOpen(TimeSpan timeout)
  370. {
  371. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  372. bool throwing = true;
  373. try
  374. {
  375. this.binder.Open(timeoutHelper.RemainingTime());
  376. this.session.Open(timeoutHelper.RemainingTime());
  377. throwing = false;
  378. }
  379. finally
  380. {
  381. if (throwing)
  382. {
  383. this.Binder.Close(timeoutHelper.RemainingTime());
  384. }
  385. }
  386. }
  387. protected override void OnSend(Message message, TimeSpan timeout)
  388. {
  389. if (!this.connection.AddMessage(message, timeout, null))
  390. this.ThrowInvalidAddException();
  391. }
  392. protected override void OnOpened()
  393. {
  394. base.OnOpened();
  395. this.connection = new ReliableOutputConnection(this.session.OutputID, this.Settings.MaxTransferWindowSize,
  396. this.Settings.MessageVersion, this.Settings.ReliableMessagingVersion, this.session.InitiationTime,
  397. this.RequestAcks, this.DefaultSendTimeout);
  398. this.connection.Faulted += OnComponentFaulted;
  399. this.connection.OnException += OnComponentException;
  400. this.connection.BeginSendHandler = OnConnectionBeginSendHandler;
  401. this.connection.EndSendHandler = OnConnectionEndSendHandler;
  402. this.connection.SendHandler = OnConnectionSendHandler;
  403. this.connection.BeginSendAckRequestedHandler = OnConnectionBeginSendAckRequestedHandler;
  404. this.connection.EndSendAckRequestedHandler = OnConnectionEndSendAckRequestedHandler;
  405. this.connection.SendAckRequestedHandler = OnConnectionSendAckRequestedHandler;
  406. }
  407. void PollingCallback()
  408. {
  409. using (Message request = WsrmUtilities.CreateAckRequestedMessage(this.Settings.MessageVersion,
  410. this.Settings.ReliableMessagingVersion, this.ReliableSession.OutputID))
  411. {
  412. this.OnConnectionSendMessage(request, this.DefaultSendTimeout, MaskingMode.All);
  413. }
  414. }
  415. void ProcessCloseOrTerminateReply(bool close, Message reply)
  416. {
  417. if (reply == null)
  418. {
  419. // In the close case, the requestor is configured to throw TimeoutException instead of returning null.
  420. // In the terminate case, this value can be null, but the caller should not call this method.
  421. throw Fx.AssertAndThrow("Argument reply cannot be null.");
  422. }
  423. ReliableRequestor requestor = close ? this.closeRequestor : this.terminateRequestor;
  424. WsrmMessageInfo info = requestor.GetInfo();
  425. // Some other thread has verified and cleaned up the reply, no more work to do.
  426. if (info != null)
  427. {
  428. return;
  429. }
  430. try
  431. {
  432. info = WsrmMessageInfo.Get(this.Settings.MessageVersion, this.Settings.ReliableMessagingVersion,
  433. this.binder.Channel, this.binder.GetInnerSession(), reply);
  434. this.ReliableSession.ProcessInfo(info, null, true);
  435. this.ReliableSession.VerifyDuplexProtocolElements(info, null, true);
  436. WsrmFault fault = close
  437. ? WsrmUtilities.ValidateCloseSequenceResponse(this.session, requestor.MessageId, info,
  438. this.connection.Last)
  439. : WsrmUtilities.ValidateTerminateSequenceResponse(this.session, requestor.MessageId, info,
  440. this.connection.Last);
  441. if (fault != null)
  442. {
  443. this.ReliableSession.OnLocalFault(null, fault, null);
  444. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
  445. }
  446. }
  447. finally
  448. {
  449. reply.Close();
  450. }
  451. }
  452. protected void ProcessMessage(Message message)
  453. {
  454. bool closeMessage = true;
  455. WsrmMessageInfo messageInfo = WsrmMessageInfo.Get(this.settings.MessageVersion,
  456. this.settings.ReliableMessagingVersion, this.binder.Channel, this.binder.GetInnerSession(), message);
  457. bool wsrm11 = this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
  458. try
  459. {
  460. if (!this.session.ProcessInfo(messageInfo, null))
  461. {
  462. closeMessage = false;
  463. return;
  464. }
  465. if (!this.ReliableSession.VerifySimplexProtocolElements(messageInfo, null))
  466. {
  467. closeMessage = false;
  468. return;
  469. }
  470. bool final = false;
  471. if (messageInfo.AcknowledgementInfo != null)
  472. {
  473. final = wsrm11 && messageInfo.AcknowledgementInfo.Final;
  474. int bufferRemaining = -1;
  475. if (this.settings.FlowControlEnabled)
  476. bufferRemaining = messageInfo.AcknowledgementInfo.BufferRemaining;
  477. this.connection.ProcessTransferred(messageInfo.AcknowledgementInfo.Ranges, bufferRemaining);
  478. }
  479. if (wsrm11)
  480. {
  481. WsrmFault fault = null;
  482. if (messageInfo.TerminateSequenceResponseInfo != null)
  483. {
  484. fault = WsrmUtilities.ValidateTerminateSequenceResponse(this.session,
  485. this.terminateRequestor.MessageId, messageInfo, this.connection.Last);
  486. if (fault == null)
  487. {
  488. fault = this.ProcessRequestorResponse(this.terminateRequestor, WsrmFeb2005Strings.TerminateSequence, messageInfo);
  489. }
  490. }
  491. else if (messageInfo.CloseSequenceResponseInfo != null)
  492. {
  493. fault = WsrmUtilities.ValidateCloseSequenceResponse(this.session,
  494. this.closeRequestor.MessageId, messageInfo, this.connection.Last);
  495. if (fault == null)
  496. {
  497. fault = this.ProcessRequestorResponse(this.closeRequestor, Wsrm11Strings.CloseSequence, messageInfo);
  498. }
  499. }
  500. else if (messageInfo.TerminateSequenceInfo != null)
  501. {
  502. if (!WsrmUtilities.ValidateWsrmRequest(this.session, messageInfo.TerminateSequenceInfo, this.binder, null))
  503. {
  504. return;
  505. }
  506. WsrmAcknowledgmentInfo ackInfo = messageInfo.AcknowledgementInfo;
  507. fault = WsrmUtilities.ValidateFinalAckExists(this.session, ackInfo);
  508. if ((fault == null) && !this.connection.IsFinalAckConsistent(ackInfo.Ranges))
  509. {
  510. fault = new InvalidAcknowledgementFault(this.session.OutputID, ackInfo.Ranges);
  511. }
  512. if (fault == null)
  513. {
  514. Message response = WsrmUtilities.CreateTerminateResponseMessage(
  515. this.settings.MessageVersion,
  516. messageInfo.TerminateSequenceInfo.MessageId,
  517. this.session.OutputID);
  518. try
  519. {
  520. this.OnConnectionSend(response, this.DefaultSendTimeout, false, true);
  521. }
  522. finally
  523. {
  524. response.Close();
  525. }
  526. this.session.OnRemoteFault(new ProtocolException(SR.GetString(SR.UnsupportedTerminateSequenceExceptionString)));
  527. return;
  528. }
  529. }
  530. else if (final)
  531. {
  532. if (this.closeRequestor == null)
  533. {
  534. string exceptionString = SR.GetString(SR.UnsupportedCloseExceptionString);
  535. string faultString = SR.GetString(SR.SequenceTerminatedUnsupportedClose);
  536. fault = SequenceTerminatedFault.CreateProtocolFault(this.session.OutputID, faultString,
  537. exceptionString);
  538. }
  539. else
  540. {
  541. fault = WsrmUtilities.ValidateFinalAck(this.session, messageInfo, this.connection.Last);
  542. if (fault == null)
  543. {
  544. this.closeRequestor.SetInfo(messageInfo);
  545. }
  546. }
  547. }
  548. else if (messageInfo.WsrmHeaderFault != null)
  549. {
  550. if (!(messageInfo.WsrmHeaderFault is UnknownSequenceFault))
  551. {
  552. throw Fx.AssertAndThrow("Fault must be UnknownSequence fault.");
  553. }
  554. if (this.terminateRequestor == null)
  555. {
  556. throw Fx.AssertAndThrow("In wsrm11, if we start getting UnknownSequence, terminateRequestor cannot be null.");
  557. }
  558. this.terminateRequestor.SetInfo(messageInfo);
  559. }
  560. if (fault != null)
  561. {
  562. this.session.OnLocalFault(fault.CreateException(), fault, null);
  563. return;
  564. }
  565. }
  566. this.session.OnRemoteActivity(this.connection.Strategy.QuotaRemaining == 0);
  567. }
  568. finally
  569. {
  570. if (closeMessage)
  571. messageInfo.Message.Close();
  572. }
  573. }
  574. protected abstract WsrmFault ProcessRequestorResponse(ReliableRequestor requestor, string requestName, WsrmMessageInfo info);
  575. void TerminateSequence(TimeSpan timeout)
  576. {
  577. ReliableMessagingVersion reliableMessagingVersion = this.settings.ReliableMessagingVersion;
  578. if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  579. {
  580. this.session.CloseSession();
  581. Message message = WsrmUtilities.CreateTerminateMessage(this.settings.MessageVersion,
  582. reliableMessagingVersion, this.session.OutputID);
  583. this.OnConnectionSendMessage(message, timeout, MaskingMode.Handled);
  584. }
  585. else if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  586. {
  587. this.CreateTerminateRequestor();
  588. Message terminateReply = this.terminateRequestor.Request(timeout);
  589. if (terminateReply != null)
  590. {
  591. this.ProcessCloseOrTerminateReply(false, terminateReply);
  592. }
  593. }
  594. else
  595. {
  596. throw Fx.AssertAndThrow("Reliable messaging version not supported.");
  597. }
  598. }
  599. IAsyncResult BeginTerminateSequence(TimeSpan timeout, AsyncCallback callback, object state)
  600. {
  601. ReliableMessagingVersion reliableMessagingVersion = this.settings.ReliableMessagingVersion;
  602. if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  603. {
  604. this.session.CloseSession();
  605. Message message = WsrmUtilities.CreateTerminateMessage(this.settings.MessageVersion,
  606. reliableMessagingVersion, this.session.OutputID);
  607. return this.OnConnectionBeginSendMessage(message, timeout, callback, state);
  608. }
  609. else if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  610. {
  611. this.CreateTerminateRequestor();
  612. return this.terminateRequestor.BeginRequest(timeout, callback, state);
  613. }
  614. else
  615. {
  616. throw Fx.AssertAndThrow("Reliable messaging version not supported.");
  617. }
  618. }
  619. void EndTerminateSequence(IAsyncResult result)
  620. {
  621. if (this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  622. {
  623. this.OnConnectionEndSendMessage(result);
  624. }
  625. else
  626. {
  627. Message terminateReply = this.terminateRequestor.EndRequest(result);
  628. if (terminateReply != null)
  629. {
  630. this.ProcessCloseOrTerminateReply(false, terminateReply);
  631. }
  632. }
  633. }
  634. void ThrowInvalidAddException()
  635. {
  636. if (this.State == CommunicationState.Faulted)
  637. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.GetTerminalException());
  638. else
  639. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.CreateClosedException());
  640. }
  641. void UnblockClose()
  642. {
  643. if (this.connection != null)
  644. {
  645. this.connection.Fault(this);
  646. }
  647. ReliableRequestor tempRequestor = this.closeRequestor;
  648. if (tempRequestor != null)
  649. {
  650. tempRequestor.Fault(this);
  651. }
  652. tempRequestor = this.terminateRequestor;
  653. if (tempRequestor != null)
  654. {
  655. tempRequestor.Fault(this);
  656. }
  657. }
  658. }
  659. sealed class ReliableOutputSessionChannelOverRequest : ReliableOutputSessionChannel
  660. {
  661. IClientReliableChannelBinder binder;
  662. public ReliableOutputSessionChannelOverRequest(ChannelManagerBase factory, IReliableFactorySettings settings,
  663. IClientReliableChannelBinder binder, FaultHelper faultHelper,
  664. LateBoundChannelParameterCollection channelParameters)
  665. : base(factory, settings, binder, faultHelper, channelParameters)
  666. {
  667. this.binder = binder;
  668. }
  669. protected override bool RequestAcks
  670. {
  671. get
  672. {
  673. return false;
  674. }
  675. }
  676. protected override ReliableRequestor CreateRequestor()
  677. {
  678. return new RequestReliableRequestor();
  679. }
  680. protected override void OnConnectionSend(Message message, TimeSpan timeout,
  681. bool saveHandledException, bool maskUnhandledException)
  682. {
  683. MaskingMode maskingMode = maskUnhandledException ? MaskingMode.Unhandled : MaskingMode.None;
  684. Message reply = null;
  685. if (saveHandledException)
  686. {
  687. try
  688. {
  689. reply = this.binder.Request(message, timeout, maskingMode);
  690. }
  691. catch (Exception e)
  692. {
  693. if (Fx.IsFatal(e))
  694. throw;
  695. if (this.Binder.IsHandleable(e))
  696. {
  697. this.MaxRetryCountException = e;
  698. }
  699. else
  700. {
  701. throw;
  702. }
  703. }
  704. }
  705. else
  706. {
  707. maskingMode |= MaskingMode.Handled;
  708. reply = this.binder.Request(message, timeout, maskingMode);
  709. if (reply != null)
  710. ProcessMessage(reply);
  711. }
  712. }
  713. protected override IAsyncResult OnConnectionBeginSend(MessageAttemptInfo attemptInfo,
  714. TimeSpan timeout, bool maskUnhandledException, AsyncCallback callback, object state)
  715. {
  716. ReliableBinderRequestAsyncResult result = new ReliableBinderRequestAsyncResult(callback, state);
  717. result.Binder = this.binder;
  718. result.MessageAttemptInfo = attemptInfo;
  719. result.MaskingMode = maskUnhandledException ? MaskingMode.Unhandled : MaskingMode.None;
  720. if (attemptInfo.RetryCount < this.Settings.MaxRetryCount)
  721. {
  722. result.MaskingMode |= MaskingMode.Handled;
  723. result.SaveHandledException = false;
  724. }
  725. else
  726. {
  727. result.SaveHandledException = true;
  728. }
  729. result.Begin(timeout);
  730. return result;
  731. }
  732. protected override void OnConnectionEndSend(IAsyncResult result)
  733. {
  734. Exception handledException;
  735. Message reply = ReliableBinderRequestAsyncResult.End(result, out handledException);
  736. ReliableBinderRequestAsyncResult requestResult = (ReliableBinderRequestAsyncResult)result;
  737. if (requestResult.MessageAttemptInfo.RetryCount == this.Settings.MaxRetryCount)
  738. {
  739. this.MaxRetryCountException = handledException;
  740. }
  741. if (reply != null)
  742. ProcessMessage(reply);
  743. }
  744. protected override void OnConnectionSendMessage(Message message, TimeSpan timeout, MaskingMode maskingMode)
  745. {
  746. Message reply = this.binder.Request(message, timeout, maskingMode);
  747. if (reply != null)
  748. {
  749. ProcessMessage(reply);
  750. }
  751. }
  752. protected override IAsyncResult OnConnectionBeginSendMessage(Message message, TimeSpan timeout,
  753. AsyncCallback callback, object state)
  754. {
  755. ReliableBinderRequestAsyncResult requestResult = new ReliableBinderRequestAsyncResult(callback, state);
  756. requestResult.Binder = this.binder;
  757. requestResult.MaskingMode = MaskingMode.Handled;
  758. requestResult.Message = message;
  759. requestResult.Begin(timeout);
  760. return requestResult;
  761. }
  762. protected override void OnConnectionEndSendMessage(IAsyncResult result)
  763. {
  764. Message reply = ReliableBinderRequestAsyncResult.End(result);
  765. if (reply != null)
  766. {
  767. this.ProcessMessage(reply);
  768. }
  769. }
  770. protected override WsrmFault ProcessRequestorResponse(ReliableRequestor requestor, string requestName, WsrmMessageInfo info)
  771. {
  772. string faultString = SR.GetString(SR.ReceivedResponseBeforeRequestFaultString, requestName);
  773. string exceptionString = SR.GetString(SR.ReceivedResponseBeforeRequestExceptionString, requestName);
  774. return SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.OutputID, faultString, exceptionString);
  775. }
  776. }
  777. sealed class ReliableOutputSessionChannelOverDuplex : ReliableOutputSessionChannel
  778. {
  779. static AsyncCallback onReceiveCompleted = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompletedStatic));
  780. public ReliableOutputSessionChannelOverDuplex(ChannelManagerBase factory, IReliableFactorySettings settings,
  781. IClientReliableChannelBinder binder, FaultHelper faultHelper,
  782. LateBoundChannelParameterCollection channelParameters)
  783. : base(factory, settings, binder, faultHelper, channelParameters)
  784. {
  785. }
  786. protected override bool RequestAcks
  787. {
  788. get
  789. {
  790. return true;
  791. }
  792. }
  793. protected override ReliableRequestor CreateRequestor()
  794. {
  795. return new SendWaitReliableRequestor();
  796. }
  797. protected override void OnConnectionSend(Message message, TimeSpan timeout, bool saveHandledException, bool maskUnhandledException)
  798. {
  799. MaskingMode maskingMode = maskUnhandledException ? MaskingMode.Unhandled : MaskingMode.None;
  800. if (saveHandledException)
  801. {
  802. try
  803. {
  804. this.Binder.Send(message, timeout, maskingMode);
  805. }
  806. catch (Exception e)
  807. {
  808. if (Fx.IsFatal(e))
  809. throw;
  810. if (this.Binder.IsHandleable(e))
  811. {
  812. this.MaxRetryCountException = e;
  813. }
  814. else
  815. {
  816. throw;
  817. }
  818. }
  819. }
  820. else
  821. {
  822. maskingMode |= MaskingMode.Handled;
  823. this.Binder.Send(message, timeout, maskingMode);
  824. }
  825. }
  826. protected override IAsyncResult OnConnectionBeginSend(MessageAttemptInfo attemptInfo,
  827. TimeSpan timeout, bool maskUnhandledException, AsyncCallback callback, object state)
  828. {
  829. ReliableBinderSendAsyncResult result = new ReliableBinderSendAsyncResult(callback, state);
  830. result.Binder = this.Binder;
  831. result.MessageAttemptInfo = attemptInfo;
  832. result.MaskingMode = maskUnhandledException ? MaskingMode.Unhandled : MaskingMode.None;
  833. if (attemptInfo.RetryCount < this.Settings.MaxRetryCount)
  834. {
  835. result.MaskingMode |= MaskingMode.Handled;
  836. result.SaveHandledException = false;
  837. }
  838. else
  839. {
  840. result.SaveHandledException = true;
  841. }
  842. result.Begin(timeout);
  843. return result;
  844. }
  845. protected override void OnConnectionEndSend(IAsyncResult result)
  846. {
  847. Exception handledException;
  848. ReliableBinderSendAsyncResult.End(result, out handledException);
  849. ReliableBinderSendAsyncResult sendResult = (ReliableBinderSendAsyncResult)result;
  850. if (sendResult.MessageAttemptInfo.RetryCount == this.Settings.MaxRetryCount)
  851. {
  852. this.MaxRetryCountException = handledException;
  853. }
  854. }
  855. protected override void OnConnectionSendMessage(Message message, TimeSpan timeout, MaskingMode maskingMode)
  856. {
  857. this.Binder.Send(message, timeout, maskingMode);
  858. }
  859. protected override IAsyncResult OnConnectionBeginSendMessage(Message message, TimeSpan timeout,
  860. AsyncCallback callback, object state)
  861. {
  862. ReliableBinderSendAsyncResult sendResult = new ReliableBinderSendAsyncResult(callback, state);
  863. sendResult.Binder = this.Binder;
  864. sendResult.MaskingMode = MaskingMode.Unhandled;
  865. sendResult.Message = message;
  866. sendResult.Begin(timeout);
  867. return sendResult;
  868. }
  869. protected override void OnConnectionEndSendMessage(IAsyncResult result)
  870. {
  871. ReliableBinderSendAsyncResult.End(result);
  872. }
  873. protected override void OnOpened()
  874. {
  875. base.OnOpened();
  876. if (Thread.CurrentThread.IsThreadPoolThread)
  877. {
  878. try
  879. {
  880. this.StartReceiving();
  881. }
  882. #pragma warning suppress 56500 // covered by FxCOP
  883. catch (Exception e)
  884. {
  885. if (Fx.IsFatal(e))
  886. throw;
  887. this.ReliableSession.OnUnknownException(e);
  888. }
  889. }
  890. else
  891. {
  892. ActionItem.Schedule(new Action<object>(StartReceiving), this);
  893. }
  894. }
  895. static void OnReceiveCompletedStatic(IAsyncResult result)
  896. {
  897. ReliableOutputSessionChannelOverDuplex channel = (ReliableOutputSessionChannelOverDuplex)result.AsyncState;
  898. try
  899. {
  900. channel.OnReceiveCompleted(result);
  901. }
  902. #pragma warning suppress 56500 // covered by FxCOP
  903. catch (Exception e)
  904. {
  905. if (Fx.IsFatal(e))
  906. throw;
  907. channel.ReliableSession.OnUnknownException(e);
  908. }
  909. }
  910. void OnReceiveCompleted(IAsyncResult result)
  911. {
  912. RequestContext context;
  913. if (this.Binder.EndTryReceive(result, out context))
  914. {
  915. if (context != null)
  916. {
  917. using (context)
  918. {
  919. Message requestMessage = context.RequestMessage;
  920. ProcessMessage(requestMessage);
  921. context.Close(this.DefaultCloseTimeout);
  922. }
  923. this.Binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this);
  924. }
  925. else
  926. {
  927. if (!this.Connection.Closed && (this.Binder.State == CommunicationState.Opened))
  928. {
  929. Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
  930. this.ReliableSession.OnLocalFault(e, (Message)null, null);
  931. }
  932. }
  933. }
  934. else
  935. {
  936. this.Binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this);
  937. }
  938. }
  939. protected override WsrmFault ProcessRequestorResponse(ReliableRequestor requestor, string requestName, WsrmMessageInfo info)
  940. {
  941. if (requestor != null)
  942. {
  943. requestor.SetInfo(info);
  944. return null;
  945. }
  946. else
  947. {
  948. string faultString = SR.GetString(SR.ReceivedResponseBeforeRequestFaultString, requestName);
  949. string exceptionString = SR.GetString(SR.ReceivedResponseBeforeRequestExceptionString, requestName);
  950. return SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.OutputID, faultString, exceptionString);
  951. }
  952. }
  953. void StartReceiving()
  954. {
  955. this.Binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this);
  956. }
  957. static void StartReceiving(object state)
  958. {
  959. ReliableOutputSessionChannelOverDuplex channel =
  960. (ReliableOutputSessionChannelOverDuplex)state;
  961. try
  962. {
  963. channel.StartReceiving();
  964. }
  965. #pragma warning suppress 56500 // covered by FxCOP
  966. catch (Exception e)
  967. {
  968. if (Fx.IsFatal(e))
  969. throw;
  970. channel.ReliableSession.OnUnknownException(e);
  971. }
  972. }
  973. }
  974. }