ReliableRequestSessionChannel.cs 42 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //-----------------------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Runtime;
  7. using System.Threading;
  8. using System.Xml;
  9. using System.ServiceModel.Diagnostics.Application;
  10. sealed class ReliableRequestSessionChannel : RequestChannel, IRequestSessionChannel
  11. {
  12. IClientReliableChannelBinder binder;
  13. ChannelParameterCollection channelParameters;
  14. ReliableRequestor closeRequestor;
  15. ReliableOutputConnection connection;
  16. bool isLastKnown = false;
  17. Exception maxRetryCountException = null;
  18. static AsyncCallback onPollingComplete = Fx.ThunkCallback(new AsyncCallback(OnPollingComplete));
  19. SequenceRangeCollection ranges = SequenceRangeCollection.Empty;
  20. Guard replyAckConsistencyGuard;
  21. ClientReliableSession session;
  22. IReliableFactorySettings settings;
  23. InterruptibleWaitObject shutdownHandle;
  24. ReliableRequestor terminateRequestor;
  25. public ReliableRequestSessionChannel(
  26. ChannelManagerBase factory,
  27. IReliableFactorySettings settings,
  28. IClientReliableChannelBinder binder,
  29. FaultHelper faultHelper,
  30. LateBoundChannelParameterCollection channelParameters,
  31. UniqueId inputID)
  32. : base(factory, binder.RemoteAddress, binder.Via, true)
  33. {
  34. this.settings = settings;
  35. this.binder = binder;
  36. this.session = new ClientReliableSession(this, settings, binder, faultHelper, inputID);
  37. this.session.PollingCallback = this.PollingCallback;
  38. this.session.UnblockChannelCloseCallback = this.UnblockClose;
  39. if (this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  40. {
  41. this.shutdownHandle = new InterruptibleWaitObject(false);
  42. }
  43. else
  44. {
  45. this.replyAckConsistencyGuard = new Guard(Int32.MaxValue);
  46. }
  47. this.binder.Faulted += OnBinderFaulted;
  48. this.binder.OnException += OnBinderException;
  49. this.channelParameters = channelParameters;
  50. channelParameters.SetChannel(this);
  51. }
  52. public IOutputSession Session
  53. {
  54. get
  55. {
  56. return this.session;
  57. }
  58. }
  59. void AddAcknowledgementHeader(Message message, bool force)
  60. {
  61. if (this.ranges.Count == 0)
  62. {
  63. return;
  64. }
  65. WsrmUtilities.AddAcknowledgementHeader(this.settings.ReliableMessagingVersion, message,
  66. this.session.InputID, this.ranges, this.isLastKnown);
  67. }
  68. IAsyncResult BeginCloseBinder(TimeSpan timeout, AsyncCallback callback, object state)
  69. {
  70. return this.binder.BeginClose(timeout, MaskingMode.Handled, callback, state);
  71. }
  72. IAsyncResult BeginTerminateSequence(TimeSpan timeout, AsyncCallback callback, object state)
  73. {
  74. this.CreateTerminateRequestor();
  75. return this.terminateRequestor.BeginRequest(timeout, callback, state);
  76. }
  77. void CloseSequence(TimeSpan timeout)
  78. {
  79. this.CreateCloseRequestor();
  80. Message closeReply = this.closeRequestor.Request(timeout);
  81. this.ProcessCloseOrTerminateReply(true, closeReply);
  82. }
  83. IAsyncResult BeginCloseSequence(TimeSpan timeout, AsyncCallback callback, object state)
  84. {
  85. this.CreateCloseRequestor();
  86. return this.closeRequestor.BeginRequest(timeout, callback, state);
  87. }
  88. void EndCloseSequence(IAsyncResult result)
  89. {
  90. Message closeReply = this.closeRequestor.EndRequest(result);
  91. this.ProcessCloseOrTerminateReply(true, closeReply);
  92. }
  93. void ConfigureRequestor(ReliableRequestor requestor)
  94. {
  95. ReliableMessagingVersion reliableMessagingVersion = this.settings.ReliableMessagingVersion;
  96. requestor.MessageVersion = this.settings.MessageVersion;
  97. requestor.Binder = this.binder;
  98. requestor.SetRequestResponsePattern();
  99. requestor.MessageHeader = new WsrmAcknowledgmentHeader(reliableMessagingVersion, this.session.InputID,
  100. this.ranges, true, -1);
  101. }
  102. Message CreateAckRequestedMessage()
  103. {
  104. Message request = WsrmUtilities.CreateAckRequestedMessage(this.settings.MessageVersion,
  105. this.settings.ReliableMessagingVersion, this.session.OutputID);
  106. this.AddAcknowledgementHeader(request, true);
  107. return request;
  108. }
  109. protected override IAsyncRequest CreateAsyncRequest(Message message, AsyncCallback callback, object state)
  110. {
  111. return new AsyncRequest(this, callback, state);
  112. }
  113. void CreateCloseRequestor()
  114. {
  115. RequestReliableRequestor temp = new RequestReliableRequestor();
  116. this.ConfigureRequestor(temp);
  117. temp.TimeoutString1Index = SR.TimeoutOnClose;
  118. temp.MessageAction = WsrmIndex.GetCloseSequenceActionHeader(
  119. this.settings.MessageVersion.Addressing);
  120. temp.MessageBody = new CloseSequence(this.session.OutputID, this.connection.Last);
  121. lock (this.ThisLock)
  122. {
  123. this.ThrowIfClosed();
  124. this.closeRequestor = temp;
  125. }
  126. }
  127. protected override IRequest CreateRequest(Message message)
  128. {
  129. return new SyncRequest(this);
  130. }
  131. void CreateTerminateRequestor()
  132. {
  133. RequestReliableRequestor temp = new RequestReliableRequestor();
  134. this.ConfigureRequestor(temp);
  135. temp.MessageAction = WsrmIndex.GetTerminateSequenceActionHeader(
  136. this.settings.MessageVersion.Addressing, this.settings.ReliableMessagingVersion);
  137. temp.MessageBody = new TerminateSequence(this.settings.ReliableMessagingVersion,
  138. this.session.OutputID, this.connection.Last);
  139. lock (this.ThisLock)
  140. {
  141. this.ThrowIfClosed();
  142. this.terminateRequestor = temp;
  143. this.session.CloseSession();
  144. }
  145. }
  146. void EndCloseBinder(IAsyncResult result)
  147. {
  148. this.binder.EndClose(result);
  149. }
  150. void EndTerminateSequence(IAsyncResult result)
  151. {
  152. Message terminateReply = this.terminateRequestor.EndRequest(result);
  153. if (terminateReply != null)
  154. {
  155. this.ProcessCloseOrTerminateReply(false, terminateReply);
  156. }
  157. }
  158. Exception GetInvalidAddException()
  159. {
  160. if (this.State == CommunicationState.Faulted)
  161. return this.GetTerminalException();
  162. else
  163. return this.CreateClosedException();
  164. }
  165. public override T GetProperty<T>()
  166. {
  167. if (typeof(T) == typeof(IRequestSessionChannel))
  168. {
  169. return (T)(object)this;
  170. }
  171. if (typeof(T) == typeof(ChannelParameterCollection))
  172. {
  173. return (T)(object)this.channelParameters;
  174. }
  175. T baseProperty = base.GetProperty<T>();
  176. if (baseProperty != null)
  177. {
  178. return baseProperty;
  179. }
  180. T innerProperty = this.binder.Channel.GetProperty<T>();
  181. if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
  182. {
  183. return (T)(object)FaultConverter.GetDefaultFaultConverter(this.settings.MessageVersion);
  184. }
  185. else
  186. {
  187. return innerProperty;
  188. }
  189. }
  190. protected override void OnAbort()
  191. {
  192. if (this.connection != null)
  193. {
  194. this.connection.Abort(this);
  195. }
  196. if (this.shutdownHandle != null)
  197. {
  198. this.shutdownHandle.Abort(this);
  199. }
  200. ReliableRequestor tempRequestor = this.closeRequestor;
  201. if (tempRequestor != null)
  202. {
  203. tempRequestor.Abort(this);
  204. }
  205. tempRequestor = this.terminateRequestor;
  206. if (tempRequestor != null)
  207. {
  208. tempRequestor.Abort(this);
  209. }
  210. this.session.Abort();
  211. base.OnAbort();
  212. }
  213. protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  214. {
  215. bool wsrm11 = this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
  216. OperationWithTimeoutBeginCallback[] beginCallbacks = new OperationWithTimeoutBeginCallback[] {
  217. this.connection.BeginClose,
  218. this.BeginWaitForShutdown,
  219. wsrm11 ? this.BeginCloseSequence : default(OperationWithTimeoutBeginCallback),
  220. this.BeginTerminateSequence,
  221. this.session.BeginClose,
  222. this.BeginCloseBinder
  223. };
  224. OperationEndCallback[] endCallbacks = new OperationEndCallback[] {
  225. this.connection.EndClose,
  226. this.EndWaitForShutdown,
  227. wsrm11 ? this.EndCloseSequence : default(OperationEndCallback),
  228. this.EndTerminateSequence,
  229. this.session.EndClose,
  230. this.EndCloseBinder
  231. };
  232. return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, beginCallbacks, endCallbacks, callback, state);
  233. }
  234. protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  235. {
  236. return new ReliableChannelOpenAsyncResult(this.binder, this.session, timeout,
  237. callback, state);
  238. }
  239. void OnBinderException(IReliableChannelBinder sender, Exception exception)
  240. {
  241. if (exception is QuotaExceededException)
  242. {
  243. if (this.State == CommunicationState.Opening ||
  244. this.State == CommunicationState.Opened ||
  245. this.State == CommunicationState.Closing)
  246. {
  247. this.session.OnLocalFault(exception, SequenceTerminatedFault.CreateQuotaExceededFault(this.session.OutputID), null);
  248. }
  249. }
  250. else
  251. {
  252. this.AddPendingException(exception);
  253. }
  254. }
  255. void OnBinderFaulted(IReliableChannelBinder sender, Exception exception)
  256. {
  257. this.binder.Abort();
  258. if (this.State == CommunicationState.Opening ||
  259. this.State == CommunicationState.Opened ||
  260. this.State == CommunicationState.Closing)
  261. {
  262. exception = new CommunicationException(SR.GetString(SR.EarlySecurityFaulted), exception);
  263. this.session.OnLocalFault(exception, (Message)null, null);
  264. }
  265. }
  266. protected override void OnClose(TimeSpan timeout)
  267. {
  268. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  269. this.connection.Close(timeoutHelper.RemainingTime());
  270. this.WaitForShutdown(timeoutHelper.RemainingTime());
  271. if (this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  272. {
  273. this.CloseSequence(timeoutHelper.RemainingTime());
  274. }
  275. this.TerminateSequence(timeoutHelper.RemainingTime());
  276. this.session.Close(timeoutHelper.RemainingTime());
  277. this.binder.Close(timeoutHelper.RemainingTime(), MaskingMode.Handled);
  278. }
  279. protected override void OnClosed()
  280. {
  281. base.OnClosed();
  282. this.binder.Faulted -= this.OnBinderFaulted;
  283. }
  284. IAsyncResult OnConnectionBeginSend(MessageAttemptInfo attemptInfo, TimeSpan timeout,
  285. bool maskUnhandledException, AsyncCallback callback, object state)
  286. {
  287. if (attemptInfo.RetryCount > this.settings.MaxRetryCount)
  288. {
  289. if (TD.MaxRetryCyclesExceededIsEnabled())
  290. {
  291. TD.MaxRetryCyclesExceeded(SR.GetString(SR.MaximumRetryCountExceeded));
  292. }
  293. this.session.OnLocalFault(new CommunicationException(SR.GetString(SR.MaximumRetryCountExceeded), this.maxRetryCountException),
  294. SequenceTerminatedFault.CreateMaxRetryCountExceededFault(this.session.OutputID), null);
  295. return new CompletedAsyncResult(callback, state);
  296. }
  297. else
  298. {
  299. this.session.OnLocalActivity();
  300. this.AddAcknowledgementHeader(attemptInfo.Message, false);
  301. ReliableBinderRequestAsyncResult result = new ReliableBinderRequestAsyncResult(callback, state);
  302. result.Binder = this.binder;
  303. result.MessageAttemptInfo = attemptInfo;
  304. result.MaskingMode = maskUnhandledException ? MaskingMode.Unhandled : MaskingMode.None;
  305. if (attemptInfo.RetryCount < this.settings.MaxRetryCount)
  306. {
  307. result.MaskingMode |= MaskingMode.Handled;
  308. result.SaveHandledException = false;
  309. }
  310. else
  311. {
  312. result.SaveHandledException = true;
  313. }
  314. result.Begin(timeout);
  315. return result;
  316. }
  317. }
  318. void OnConnectionEndSend(IAsyncResult result)
  319. {
  320. if (result is CompletedAsyncResult)
  321. {
  322. CompletedAsyncResult.End(result);
  323. }
  324. else
  325. {
  326. Exception handledException;
  327. Message reply = ReliableBinderRequestAsyncResult.End(result, out handledException);
  328. ReliableBinderRequestAsyncResult requestResult = (ReliableBinderRequestAsyncResult)result;
  329. if (requestResult.MessageAttemptInfo.RetryCount == this.settings.MaxRetryCount)
  330. {
  331. this.maxRetryCountException = handledException;
  332. }
  333. if (reply != null)
  334. {
  335. this.ProcessReply(reply, (IReliableRequest)requestResult.MessageAttemptInfo.State,
  336. requestResult.MessageAttemptInfo.GetSequenceNumber());
  337. }
  338. }
  339. }
  340. void OnConnectionSend(MessageAttemptInfo attemptInfo, TimeSpan timeout, bool maskUnhandledException)
  341. {
  342. using (attemptInfo.Message)
  343. {
  344. if (attemptInfo.RetryCount > this.settings.MaxRetryCount)
  345. {
  346. if (TD.MaxRetryCyclesExceededIsEnabled())
  347. {
  348. TD.MaxRetryCyclesExceeded(SR.GetString(SR.MaximumRetryCountExceeded));
  349. }
  350. this.session.OnLocalFault(new CommunicationException(SR.GetString(SR.MaximumRetryCountExceeded), this.maxRetryCountException),
  351. SequenceTerminatedFault.CreateMaxRetryCountExceededFault(this.session.OutputID), null);
  352. return;
  353. }
  354. this.AddAcknowledgementHeader(attemptInfo.Message, false);
  355. this.session.OnLocalActivity();
  356. Message reply = null;
  357. MaskingMode maskingMode = maskUnhandledException ? MaskingMode.Unhandled : MaskingMode.None;
  358. if (attemptInfo.RetryCount < this.settings.MaxRetryCount)
  359. {
  360. maskingMode |= MaskingMode.Handled;
  361. reply = this.binder.Request(attemptInfo.Message, timeout, maskingMode);
  362. }
  363. else
  364. {
  365. try
  366. {
  367. reply = this.binder.Request(attemptInfo.Message, timeout, maskingMode);
  368. }
  369. catch (Exception e)
  370. {
  371. if (Fx.IsFatal(e))
  372. throw;
  373. if (this.binder.IsHandleable(e))
  374. {
  375. this.maxRetryCountException = e;
  376. }
  377. else
  378. {
  379. throw;
  380. }
  381. }
  382. }
  383. if (reply != null)
  384. ProcessReply(reply, (IReliableRequest)attemptInfo.State, attemptInfo.GetSequenceNumber());
  385. }
  386. }
  387. void OnConnectionSendAckRequested(TimeSpan timeout)
  388. {
  389. // do nothing, only replies to sequence messages alter the state of the reliable output connection
  390. }
  391. IAsyncResult OnConnectionBeginSendAckRequested(TimeSpan timeout, AsyncCallback callback, object state)
  392. {
  393. // do nothing, only replies to sequence messages alter the state of the reliable output connection
  394. return new CompletedAsyncResult(callback, state);
  395. }
  396. void OnConnectionEndSendAckRequested(IAsyncResult result)
  397. {
  398. CompletedAsyncResult.End(result);
  399. }
  400. void OnComponentFaulted(Exception faultException, WsrmFault fault)
  401. {
  402. this.session.OnLocalFault(faultException, fault, null);
  403. }
  404. void OnComponentException(Exception exception)
  405. {
  406. this.session.OnUnknownException(exception);
  407. }
  408. protected override void OnEndClose(IAsyncResult result)
  409. {
  410. OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
  411. }
  412. protected override void OnEndOpen(IAsyncResult result)
  413. {
  414. ReliableChannelOpenAsyncResult.End(result);
  415. }
  416. protected override void OnFaulted()
  417. {
  418. this.session.OnFaulted();
  419. this.UnblockClose();
  420. base.OnFaulted();
  421. }
  422. protected override void OnOpen(TimeSpan timeout)
  423. {
  424. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  425. bool throwing = true;
  426. try
  427. {
  428. this.binder.Open(timeoutHelper.RemainingTime());
  429. this.session.Open(timeoutHelper.RemainingTime());
  430. throwing = false;
  431. }
  432. finally
  433. {
  434. if (throwing)
  435. {
  436. this.binder.Close(timeoutHelper.RemainingTime());
  437. }
  438. }
  439. }
  440. protected override void OnOpened()
  441. {
  442. base.OnOpened();
  443. this.connection = new ReliableOutputConnection(this.session.OutputID, this.settings.MaxTransferWindowSize,
  444. this.settings.MessageVersion, this.settings.ReliableMessagingVersion, this.session.InitiationTime,
  445. false, this.DefaultSendTimeout);
  446. this.connection.Faulted += OnComponentFaulted;
  447. this.connection.OnException += OnComponentException;
  448. this.connection.BeginSendHandler = OnConnectionBeginSend;
  449. this.connection.EndSendHandler = OnConnectionEndSend;
  450. this.connection.SendHandler = OnConnectionSend;
  451. this.connection.BeginSendAckRequestedHandler = OnConnectionBeginSendAckRequested;
  452. this.connection.EndSendAckRequestedHandler = OnConnectionEndSendAckRequested;
  453. this.connection.SendAckRequestedHandler = OnConnectionSendAckRequested;
  454. }
  455. static void OnPollingComplete(IAsyncResult result)
  456. {
  457. if (!result.CompletedSynchronously)
  458. {
  459. ReliableRequestSessionChannel channel = (ReliableRequestSessionChannel)result.AsyncState;
  460. channel.EndSendAckRequestedMessage(result);
  461. }
  462. }
  463. void PollingCallback()
  464. {
  465. IAsyncResult result = this.BeginSendAckRequestedMessage(this.DefaultSendTimeout, MaskingMode.All,
  466. onPollingComplete, this);
  467. if (result.CompletedSynchronously)
  468. {
  469. this.EndSendAckRequestedMessage(result);
  470. }
  471. }
  472. void ProcessCloseOrTerminateReply(bool close, Message reply)
  473. {
  474. if (reply == null)
  475. {
  476. // In the close case, the requestor is configured to throw TimeoutException instead of returning null.
  477. // In the terminate case, this value can be null, but the caller should not call this method.
  478. throw Fx.AssertAndThrow("Argument reply cannot be null.");
  479. }
  480. ReliableMessagingVersion reliableMessagingVersion = this.settings.ReliableMessagingVersion;
  481. if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  482. {
  483. if (close)
  484. {
  485. throw Fx.AssertAndThrow("Close does not exist in Feb2005.");
  486. }
  487. reply.Close();
  488. }
  489. else if (reliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  490. {
  491. WsrmMessageInfo info = this.closeRequestor.GetInfo();
  492. // Close - Final ack made it.
  493. // Terminate - UnknownSequence.
  494. // Either way, message has been verified and does not belong to this thread.
  495. if (info != null)
  496. {
  497. return;
  498. }
  499. try
  500. {
  501. info = WsrmMessageInfo.Get(this.settings.MessageVersion, reliableMessagingVersion,
  502. this.binder.Channel, this.binder.GetInnerSession(), reply);
  503. this.session.ProcessInfo(info, null, true);
  504. this.session.VerifyDuplexProtocolElements(info, null, true);
  505. WsrmFault fault = close
  506. ? WsrmUtilities.ValidateCloseSequenceResponse(this.session, this.closeRequestor.MessageId, info,
  507. this.connection.Last)
  508. : WsrmUtilities.ValidateTerminateSequenceResponse(this.session, this.terminateRequestor.MessageId,
  509. info, this.connection.Last);
  510. if (fault != null)
  511. {
  512. this.session.OnLocalFault(null, fault, null);
  513. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
  514. }
  515. }
  516. finally
  517. {
  518. reply.Close();
  519. }
  520. }
  521. else
  522. {
  523. throw Fx.AssertAndThrow("Reliable messaging version not supported.");
  524. }
  525. }
  526. void ProcessReply(Message reply, IReliableRequest request, Int64 requestSequenceNumber)
  527. {
  528. WsrmMessageInfo messageInfo = WsrmMessageInfo.Get(this.settings.MessageVersion,
  529. this.settings.ReliableMessagingVersion, this.binder.Channel, this.binder.GetInnerSession(), reply);
  530. if (!this.session.ProcessInfo(messageInfo, null))
  531. return;
  532. if (!this.session.VerifyDuplexProtocolElements(messageInfo, null))
  533. return;
  534. bool wsrm11 = this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
  535. if (messageInfo.WsrmHeaderFault != null)
  536. {
  537. // Close message now, going to stop processing in all cases.
  538. messageInfo.Message.Close();
  539. if (!(messageInfo.WsrmHeaderFault is UnknownSequenceFault))
  540. {
  541. throw Fx.AssertAndThrow("Fault must be UnknownSequence fault.");
  542. }
  543. if (this.terminateRequestor == null)
  544. {
  545. throw Fx.AssertAndThrow("If we start getting UnknownSequence, terminateRequestor cannot be null.");
  546. }
  547. this.terminateRequestor.SetInfo(messageInfo);
  548. return;
  549. }
  550. if (messageInfo.AcknowledgementInfo == null)
  551. {
  552. WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  553. SR.GetString(SR.SequenceTerminatedReplyMissingAcknowledgement),
  554. SR.GetString(SR.ReplyMissingAcknowledgement));
  555. messageInfo.Message.Close();
  556. this.session.OnLocalFault(fault.CreateException(), fault, null);
  557. return;
  558. }
  559. if (wsrm11 && (messageInfo.TerminateSequenceInfo != null))
  560. {
  561. UniqueId faultId = (messageInfo.TerminateSequenceInfo.Identifier == this.session.OutputID)
  562. ? this.session.InputID
  563. : this.session.OutputID;
  564. WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(faultId,
  565. SR.GetString(SR.SequenceTerminatedUnsupportedTerminateSequence),
  566. SR.GetString(SR.UnsupportedTerminateSequenceExceptionString));
  567. messageInfo.Message.Close();
  568. this.session.OnLocalFault(fault.CreateException(), fault, null);
  569. return;
  570. }
  571. else if (wsrm11 && messageInfo.AcknowledgementInfo.Final)
  572. {
  573. // Close message now, going to stop processing in all cases.
  574. messageInfo.Message.Close();
  575. if (this.closeRequestor == null)
  576. {
  577. // Remote endpoint signaled Close, this is not allowed so we fault.
  578. string exceptionString = SR.GetString(SR.UnsupportedCloseExceptionString);
  579. string faultString = SR.GetString(SR.SequenceTerminatedUnsupportedClose);
  580. WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.OutputID, faultString,
  581. exceptionString);
  582. this.session.OnLocalFault(fault.CreateException(), fault, null);
  583. }
  584. else
  585. {
  586. WsrmFault fault = WsrmUtilities.ValidateFinalAck(this.session, messageInfo, this.connection.Last);
  587. if (fault == null)
  588. {
  589. // Received valid final ack after sending Close, inform the close thread.
  590. this.closeRequestor.SetInfo(messageInfo);
  591. }
  592. else
  593. {
  594. // Received invalid final ack after sending Close, fault.
  595. this.session.OnLocalFault(fault.CreateException(), fault, null);
  596. }
  597. }
  598. return;
  599. }
  600. int bufferRemaining = -1;
  601. if (this.settings.FlowControlEnabled)
  602. bufferRemaining = messageInfo.AcknowledgementInfo.BufferRemaining;
  603. // We accept no more than MaxSequenceRanges ranges to limit the serialized ack size and
  604. // the amount of memory taken up by the ack ranges. Since request reply uses the presence of
  605. // a reply as an acknowledgement we cannot call ProcessTransferred (which stops retrying the
  606. // request) if we intend to drop the message. This means the limit is not strict since we do
  607. // not check for the limit and merge the ranges atomically. The limit + the number of
  608. // concurrent threads is a sufficient mitigation.
  609. if ((messageInfo.SequencedMessageInfo != null) &&
  610. !ReliableInputConnection.CanMerge(messageInfo.SequencedMessageInfo.SequenceNumber, this.ranges))
  611. {
  612. messageInfo.Message.Close();
  613. return;
  614. }
  615. bool exitGuard = this.replyAckConsistencyGuard != null ? this.replyAckConsistencyGuard.Enter() : false;
  616. try
  617. {
  618. this.connection.ProcessTransferred(requestSequenceNumber,
  619. messageInfo.AcknowledgementInfo.Ranges, bufferRemaining);
  620. this.session.OnRemoteActivity(this.connection.Strategy.QuotaRemaining == 0);
  621. if (messageInfo.SequencedMessageInfo != null)
  622. {
  623. lock (this.ThisLock)
  624. {
  625. this.ranges = this.ranges.MergeWith(messageInfo.SequencedMessageInfo.SequenceNumber);
  626. }
  627. }
  628. }
  629. finally
  630. {
  631. if (exitGuard)
  632. {
  633. this.replyAckConsistencyGuard.Exit();
  634. }
  635. }
  636. if (request != null)
  637. {
  638. if (WsrmUtilities.IsWsrmAction(this.settings.ReliableMessagingVersion, messageInfo.Action))
  639. {
  640. messageInfo.Message.Close();
  641. request.Set(null);
  642. }
  643. else
  644. {
  645. request.Set(messageInfo.Message);
  646. }
  647. }
  648. // The termination mechanism in the TerminateSequence fails with RequestReply.
  649. // Since the ack ranges are updated after ProcessTransferred is called and
  650. // ProcessTransferred potentially signals the Termination process, this channel
  651. // winds up sending a message with the ack for last message missing.
  652. // Thus we send the termination after we update the ranges.
  653. if ((this.shutdownHandle != null) && this.connection.CheckForTermination())
  654. {
  655. this.shutdownHandle.Set();
  656. }
  657. if (request != null)
  658. request.Complete();
  659. }
  660. IAsyncResult BeginSendAckRequestedMessage(TimeSpan timeout, MaskingMode maskingMode, AsyncCallback callback,
  661. object state)
  662. {
  663. this.session.OnLocalActivity();
  664. ReliableBinderRequestAsyncResult requestResult = new ReliableBinderRequestAsyncResult(callback, state);
  665. requestResult.Binder = this.binder;
  666. requestResult.MaskingMode = maskingMode;
  667. requestResult.Message = this.CreateAckRequestedMessage();
  668. requestResult.Begin(timeout);
  669. return requestResult;
  670. }
  671. void EndSendAckRequestedMessage(IAsyncResult result)
  672. {
  673. Message reply = ReliableBinderRequestAsyncResult.End(result);
  674. if (reply != null)
  675. {
  676. this.ProcessReply(reply, null, 0);
  677. }
  678. }
  679. void TerminateSequence(TimeSpan timeout)
  680. {
  681. this.CreateTerminateRequestor();
  682. Message terminateReply = this.terminateRequestor.Request(timeout);
  683. if (terminateReply != null)
  684. {
  685. this.ProcessCloseOrTerminateReply(false, terminateReply);
  686. }
  687. }
  688. void UnblockClose()
  689. {
  690. FaultPendingRequests();
  691. if (this.connection != null)
  692. {
  693. this.connection.Fault(this);
  694. }
  695. if (this.shutdownHandle != null)
  696. {
  697. this.shutdownHandle.Fault(this);
  698. }
  699. ReliableRequestor tempRequestor = this.closeRequestor;
  700. if (tempRequestor != null)
  701. {
  702. tempRequestor.Fault(this);
  703. }
  704. tempRequestor = this.terminateRequestor;
  705. if (tempRequestor != null)
  706. {
  707. tempRequestor.Fault(this);
  708. }
  709. }
  710. void WaitForShutdown(TimeSpan timeout)
  711. {
  712. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  713. if (this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  714. {
  715. this.shutdownHandle.Wait(timeoutHelper.RemainingTime());
  716. }
  717. else
  718. {
  719. this.isLastKnown = true;
  720. // We already closed the connection so we know everything was acknowledged.
  721. // Make sure the reply acknowledgement ranges are current.
  722. this.replyAckConsistencyGuard.Close(timeoutHelper.RemainingTime());
  723. }
  724. }
  725. IAsyncResult BeginWaitForShutdown(TimeSpan timeout, AsyncCallback callback, object state)
  726. {
  727. if (this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  728. {
  729. return this.shutdownHandle.BeginWait(timeout, callback, state);
  730. }
  731. else
  732. {
  733. this.isLastKnown = true;
  734. // We already closed the connection so we know everything was acknowledged.
  735. // Make sure the reply acknowledgement ranges are current.
  736. return this.replyAckConsistencyGuard.BeginClose(timeout, callback, state);
  737. }
  738. }
  739. void EndWaitForShutdown(IAsyncResult result)
  740. {
  741. if (this.settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  742. {
  743. this.shutdownHandle.EndWait(result);
  744. }
  745. else
  746. {
  747. this.replyAckConsistencyGuard.EndClose(result);
  748. }
  749. }
  750. interface IReliableRequest : IRequestBase
  751. {
  752. void Set(Message reply);
  753. void Complete();
  754. }
  755. class SyncRequest : IReliableRequest, IRequest
  756. {
  757. bool aborted = false;
  758. bool completed = false;
  759. ManualResetEvent completedHandle;
  760. bool faulted = false;
  761. TimeSpan originalTimeout;
  762. Message reply;
  763. ReliableRequestSessionChannel parent;
  764. object thisLock = new object();
  765. public SyncRequest(ReliableRequestSessionChannel parent)
  766. {
  767. this.parent = parent;
  768. }
  769. object ThisLock
  770. {
  771. get
  772. {
  773. return this.thisLock;
  774. }
  775. }
  776. public void Abort(RequestChannel channel)
  777. {
  778. lock (this.ThisLock)
  779. {
  780. if (!this.completed)
  781. {
  782. this.aborted = true;
  783. this.completed = true;
  784. if (this.completedHandle != null)
  785. this.completedHandle.Set();
  786. }
  787. }
  788. }
  789. public void Fault(RequestChannel channel)
  790. {
  791. lock (this.ThisLock)
  792. {
  793. if (!this.completed)
  794. {
  795. this.faulted = true;
  796. this.completed = true;
  797. if (this.completedHandle != null)
  798. this.completedHandle.Set();
  799. }
  800. }
  801. }
  802. public void Complete()
  803. {
  804. }
  805. public void SendRequest(Message message, TimeSpan timeout)
  806. {
  807. this.originalTimeout = timeout;
  808. if (!parent.connection.AddMessage(message, timeout, this))
  809. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.parent.GetInvalidAddException());
  810. }
  811. public void Set(Message reply)
  812. {
  813. lock (this.ThisLock)
  814. {
  815. if (!this.completed)
  816. {
  817. this.reply = reply;
  818. this.completed = true;
  819. if (this.completedHandle != null)
  820. this.completedHandle.Set();
  821. return;
  822. }
  823. }
  824. if (reply != null)
  825. {
  826. reply.Close();
  827. }
  828. }
  829. public Message WaitForReply(TimeSpan timeout)
  830. {
  831. bool throwing = true;
  832. try
  833. {
  834. bool expired = false;
  835. if (!this.completed)
  836. {
  837. bool wait = false;
  838. lock (this.ThisLock)
  839. {
  840. if (!this.completed)
  841. {
  842. wait = true;
  843. this.completedHandle = new ManualResetEvent(false);
  844. }
  845. }
  846. if (wait)
  847. {
  848. expired = !TimeoutHelper.WaitOne(this.completedHandle, timeout);
  849. lock (this.ThisLock)
  850. {
  851. if (!this.completed)
  852. {
  853. this.completed = true;
  854. }
  855. else
  856. {
  857. expired = false;
  858. }
  859. }
  860. this.completedHandle.Close();
  861. }
  862. }
  863. if (this.aborted)
  864. {
  865. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.parent.CreateClosedException());
  866. }
  867. else if (this.faulted)
  868. {
  869. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.parent.GetTerminalException());
  870. }
  871. else if (expired)
  872. {
  873. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException(SR.GetString(SR.TimeoutOnRequest, this.originalTimeout)));
  874. }
  875. else
  876. {
  877. throwing = false;
  878. return this.reply;
  879. }
  880. }
  881. finally
  882. {
  883. if (throwing)
  884. {
  885. WsrmFault fault = SequenceTerminatedFault.CreateCommunicationFault(this.parent.session.InputID,
  886. SR.GetString(SR.SequenceTerminatedReliableRequestThrew), null);
  887. this.parent.session.OnLocalFault(null, fault, null);
  888. if (this.completedHandle != null)
  889. this.completedHandle.Close();
  890. }
  891. }
  892. }
  893. public void OnReleaseRequest()
  894. {
  895. }
  896. }
  897. class AsyncRequest : AsyncResult, IReliableRequest, IAsyncRequest
  898. {
  899. bool completed = false;
  900. ReliableRequestSessionChannel parent;
  901. Message reply;
  902. bool set = false;
  903. object thisLock = new object();
  904. public AsyncRequest(ReliableRequestSessionChannel parent, AsyncCallback callback, object state)
  905. : base(callback, state)
  906. {
  907. this.parent = parent;
  908. }
  909. object ThisLock
  910. {
  911. get
  912. {
  913. return this.thisLock;
  914. }
  915. }
  916. public void Abort(RequestChannel channel)
  917. {
  918. if (this.ShouldComplete())
  919. {
  920. this.Complete(false, parent.CreateClosedException());
  921. }
  922. }
  923. public void Fault(RequestChannel channel)
  924. {
  925. if (this.ShouldComplete())
  926. {
  927. this.Complete(false, parent.GetTerminalException());
  928. }
  929. }
  930. void AddCompleted(IAsyncResult result)
  931. {
  932. Exception completeException = null;
  933. try
  934. {
  935. if (!parent.connection.EndAddMessage(result))
  936. completeException = this.parent.GetInvalidAddException();
  937. }
  938. #pragma warning suppress 56500 // covered by FxCOP
  939. catch (Exception e)
  940. {
  941. if (Fx.IsFatal(e))
  942. throw;
  943. completeException = e;
  944. }
  945. if (completeException != null && this.ShouldComplete())
  946. this.Complete(result.CompletedSynchronously, completeException);
  947. }
  948. public void BeginSendRequest(Message message, TimeSpan timeout)
  949. {
  950. parent.connection.BeginAddMessage(message, timeout, this, Fx.ThunkCallback(new AsyncCallback(AddCompleted)), null);
  951. }
  952. public void Complete()
  953. {
  954. if (this.ShouldComplete())
  955. {
  956. this.Complete(false, null);
  957. }
  958. }
  959. public Message End()
  960. {
  961. AsyncResult.End<AsyncRequest>(this);
  962. return this.reply;
  963. }
  964. public void Set(Message reply)
  965. {
  966. lock (this.ThisLock)
  967. {
  968. if (!this.set)
  969. {
  970. this.reply = reply;
  971. this.set = true;
  972. return;
  973. }
  974. }
  975. if (reply != null)
  976. {
  977. reply.Close();
  978. }
  979. }
  980. bool ShouldComplete()
  981. {
  982. lock (this.ThisLock)
  983. {
  984. if (this.completed)
  985. {
  986. return false;
  987. }
  988. this.completed = true;
  989. }
  990. return true;
  991. }
  992. public void OnReleaseRequest()
  993. {
  994. }
  995. }
  996. }
  997. }