ReliableInputSessionChannel.cs 48 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Collections.Generic;
  7. using System.Runtime;
  8. using System.ServiceModel;
  9. using System.ServiceModel.Diagnostics;
  10. using System.Threading;
  11. using System.Xml;
  12. abstract class ReliableInputSessionChannel : InputChannel, IInputSessionChannel
  13. {
  14. bool advertisedZero = false;
  15. IServerReliableChannelBinder binder;
  16. ReliableInputConnection connection;
  17. DeliveryStrategy<Message> deliveryStrategy;
  18. ReliableChannelListenerBase<IInputSessionChannel> listener;
  19. ServerReliableSession session;
  20. protected string perfCounterId;
  21. static Action<object> asyncReceiveComplete = new Action<object>(AsyncReceiveCompleteStatic);
  22. static AsyncCallback onReceiveCompleted = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompletedStatic));
  23. protected ReliableInputSessionChannel(
  24. ReliableChannelListenerBase<IInputSessionChannel> listener,
  25. IServerReliableChannelBinder binder,
  26. FaultHelper faultHelper,
  27. UniqueId inputID)
  28. : base(listener, binder.LocalAddress)
  29. {
  30. this.binder = binder;
  31. this.listener = listener;
  32. this.connection = new ReliableInputConnection();
  33. this.connection.ReliableMessagingVersion = listener.ReliableMessagingVersion;
  34. this.session = new ServerReliableSession(this, listener, binder, faultHelper, inputID, null);
  35. this.session.UnblockChannelCloseCallback = this.UnblockClose;
  36. if (listener.Ordered)
  37. this.deliveryStrategy = new OrderedDeliveryStrategy<Message>(this, listener.MaxTransferWindowSize, false);
  38. else
  39. this.deliveryStrategy = new UnorderedDeliveryStrategy<Message>(this, listener.MaxTransferWindowSize);
  40. this.binder.Faulted += OnBinderFaulted;
  41. this.binder.OnException += OnBinderException;
  42. this.session.Open(TimeSpan.Zero);
  43. if (PerformanceCounters.PerformanceCountersEnabled)
  44. this.perfCounterId = this.listener.Uri.ToString().ToUpperInvariant();
  45. }
  46. protected bool AdvertisedZero
  47. {
  48. get
  49. {
  50. return this.advertisedZero;
  51. }
  52. set
  53. {
  54. this.advertisedZero = value;
  55. }
  56. }
  57. public IServerReliableChannelBinder Binder
  58. {
  59. get
  60. {
  61. return this.binder;
  62. }
  63. }
  64. protected ReliableInputConnection Connection
  65. {
  66. get
  67. {
  68. return this.connection;
  69. }
  70. }
  71. protected DeliveryStrategy<Message> DeliveryStrategy
  72. {
  73. get
  74. {
  75. return this.deliveryStrategy;
  76. }
  77. }
  78. protected ReliableChannelListenerBase<IInputSessionChannel> Listener
  79. {
  80. get
  81. {
  82. return this.listener;
  83. }
  84. }
  85. protected ChannelReliableSession ReliableSession
  86. {
  87. get
  88. {
  89. return this.session;
  90. }
  91. }
  92. public IInputSession Session
  93. {
  94. get
  95. {
  96. return this.session;
  97. }
  98. }
  99. protected virtual void AggregateAsyncCloseOperations(List<OperationWithTimeoutBeginCallback> beginOperations, List<OperationEndCallback> endOperations)
  100. {
  101. beginOperations.Add(new OperationWithTimeoutBeginCallback(this.session.BeginClose));
  102. endOperations.Add(new OperationEndCallback(this.session.EndClose));
  103. }
  104. static void AsyncReceiveCompleteStatic(object state)
  105. {
  106. IAsyncResult result = (IAsyncResult)state;
  107. ReliableInputSessionChannel channel = (ReliableInputSessionChannel)(result.AsyncState);
  108. try
  109. {
  110. if (channel.HandleReceiveComplete(result))
  111. {
  112. channel.StartReceiving(true);
  113. }
  114. }
  115. #pragma warning suppress 56500 // covered by FxCOP
  116. catch (Exception e)
  117. {
  118. if (Fx.IsFatal(e))
  119. throw;
  120. channel.ReliableSession.OnUnknownException(e);
  121. }
  122. }
  123. static void OnReceiveCompletedStatic(IAsyncResult result)
  124. {
  125. if (result.CompletedSynchronously)
  126. return;
  127. ReliableInputSessionChannel channel = (ReliableInputSessionChannel)(result.AsyncState);
  128. try
  129. {
  130. if (channel.HandleReceiveComplete(result))
  131. {
  132. channel.StartReceiving(true);
  133. }
  134. }
  135. #pragma warning suppress 56500 // covered by FxCOP
  136. catch (Exception e)
  137. {
  138. if (Fx.IsFatal(e))
  139. throw;
  140. channel.ReliableSession.OnUnknownException(e);
  141. }
  142. }
  143. protected abstract bool HandleReceiveComplete(IAsyncResult result);
  144. protected virtual void AbortGuards()
  145. {
  146. }
  147. protected void AddAcknowledgementHeader(Message message)
  148. {
  149. int bufferRemaining = -1;
  150. if (this.Listener.FlowControlEnabled)
  151. {
  152. bufferRemaining = this.Listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount;
  153. this.AdvertisedZero = (bufferRemaining == 0);
  154. }
  155. WsrmUtilities.AddAcknowledgementHeader(this.listener.ReliableMessagingVersion, message,
  156. this.session.InputID, this.connection.Ranges, this.connection.IsLastKnown, bufferRemaining);
  157. }
  158. IAsyncResult BeginCloseBinder(TimeSpan timeout, AsyncCallback callback, object state)
  159. {
  160. return this.binder.BeginClose(timeout, MaskingMode.Handled, callback, state);
  161. }
  162. protected virtual IAsyncResult BeginCloseGuards(TimeSpan timeout, AsyncCallback callback, object state)
  163. {
  164. return new CompletedAsyncResult(callback, state);
  165. }
  166. IAsyncResult BeginUnregisterChannel(TimeSpan timeout, AsyncCallback callback, object state)
  167. {
  168. return this.listener.OnReliableChannelBeginClose(this.ReliableSession.InputID, null,
  169. timeout, callback, state);
  170. }
  171. protected override void OnClosed()
  172. {
  173. base.OnClosed();
  174. this.binder.Faulted -= this.OnBinderFaulted;
  175. this.deliveryStrategy.Dispose();
  176. }
  177. protected virtual void CloseGuards(TimeSpan timeout)
  178. {
  179. }
  180. protected Message CreateAcknowledgmentMessage()
  181. {
  182. int bufferRemaining = -1;
  183. if (this.Listener.FlowControlEnabled)
  184. {
  185. bufferRemaining = this.Listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount;
  186. this.AdvertisedZero = (bufferRemaining == 0);
  187. }
  188. Message message = WsrmUtilities.CreateAcknowledgmentMessage(
  189. this.listener.MessageVersion,
  190. this.listener.ReliableMessagingVersion,
  191. this.session.InputID,
  192. this.connection.Ranges,
  193. this.connection.IsLastKnown,
  194. bufferRemaining);
  195. return message;
  196. }
  197. void EndCloseBinder(IAsyncResult result)
  198. {
  199. this.binder.EndClose(result);
  200. }
  201. protected virtual void EndCloseGuards(IAsyncResult result)
  202. {
  203. CompletedAsyncResult.End(result);
  204. }
  205. void EndUnregisterChannel(IAsyncResult result)
  206. {
  207. this.listener.OnReliableChannelEndClose(result);
  208. }
  209. public override T GetProperty<T>()
  210. {
  211. if (typeof(T) == typeof(IInputSessionChannel))
  212. {
  213. return (T)(object)this;
  214. }
  215. T baseProperty = base.GetProperty<T>();
  216. if (baseProperty != null)
  217. {
  218. return baseProperty;
  219. }
  220. T innerProperty = this.binder.Channel.GetProperty<T>();
  221. if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
  222. {
  223. return (T)(object)FaultConverter.GetDefaultFaultConverter(this.listener.MessageVersion);
  224. }
  225. else
  226. {
  227. return innerProperty;
  228. }
  229. }
  230. protected override void OnAbort()
  231. {
  232. this.connection.Abort(this);
  233. this.AbortGuards();
  234. this.session.Abort();
  235. this.listener.OnReliableChannelAbort(this.ReliableSession.InputID, null);
  236. base.OnAbort();
  237. }
  238. protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  239. {
  240. this.ThrowIfCloseInvalid();
  241. OperationWithTimeoutBeginCallback[] beginOperations = new OperationWithTimeoutBeginCallback[]
  242. {
  243. this.connection.BeginClose,
  244. this.session.BeginClose,
  245. this.BeginCloseGuards,
  246. this.BeginCloseBinder,
  247. this.BeginUnregisterChannel,
  248. base.OnBeginClose
  249. };
  250. OperationEndCallback[] endOperations = new OperationEndCallback[]
  251. {
  252. this.connection.EndClose,
  253. this.session.EndClose,
  254. this.EndCloseGuards,
  255. this.EndCloseBinder,
  256. this.EndUnregisterChannel,
  257. base.OnEndClose
  258. };
  259. return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout,
  260. beginOperations, endOperations, callback, state);
  261. }
  262. void OnBinderException(IReliableChannelBinder sender, Exception exception)
  263. {
  264. if (exception is QuotaExceededException)
  265. this.session.OnLocalFault(exception, SequenceTerminatedFault.CreateQuotaExceededFault(this.session.OutputID), null);
  266. else
  267. this.EnqueueAndDispatch(exception, null, false);
  268. }
  269. void OnBinderFaulted(IReliableChannelBinder sender, Exception exception)
  270. {
  271. this.binder.Abort();
  272. exception = new CommunicationException(SR.GetString(SR.EarlySecurityFaulted), exception);
  273. this.session.OnLocalFault(exception, (Message)null, null);
  274. }
  275. protected override void OnClose(TimeSpan timeout)
  276. {
  277. this.ThrowIfCloseInvalid();
  278. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  279. this.connection.Close(timeoutHelper.RemainingTime());
  280. this.session.Close(timeoutHelper.RemainingTime());
  281. this.CloseGuards(timeoutHelper.RemainingTime());
  282. this.binder.Close(timeoutHelper.RemainingTime(), MaskingMode.Handled);
  283. this.listener.OnReliableChannelClose(this.ReliableSession.InputID, null,
  284. timeoutHelper.RemainingTime());
  285. base.OnClose(timeoutHelper.RemainingTime());
  286. }
  287. protected override void OnEndClose(IAsyncResult result)
  288. {
  289. OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
  290. }
  291. protected override void OnFaulted()
  292. {
  293. this.session.OnFaulted();
  294. this.UnblockClose();
  295. base.OnFaulted();
  296. if (PerformanceCounters.PerformanceCountersEnabled)
  297. PerformanceCounters.SessionFaulted(this.perfCounterId);
  298. }
  299. protected virtual void OnQuotaAvailable()
  300. {
  301. }
  302. protected void ShutdownCallback(object state)
  303. {
  304. this.Shutdown();
  305. }
  306. protected void StartReceiving(bool canBlock)
  307. {
  308. while (true)
  309. {
  310. IAsyncResult result = this.Binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this);
  311. if (!result.CompletedSynchronously)
  312. return;
  313. if (!canBlock)
  314. {
  315. ActionItem.Schedule(asyncReceiveComplete, result);
  316. return;
  317. }
  318. if (!this.HandleReceiveComplete(result))
  319. break;
  320. }
  321. }
  322. void ThrowIfCloseInvalid()
  323. {
  324. bool shouldFault = false;
  325. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  326. {
  327. if (this.DeliveryStrategy.EnqueuedCount > 0 || this.Connection.Ranges.Count > 1)
  328. {
  329. shouldFault = true;
  330. }
  331. }
  332. else if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  333. {
  334. if (this.DeliveryStrategy.EnqueuedCount > 0)
  335. {
  336. shouldFault = true;
  337. }
  338. }
  339. if (shouldFault)
  340. {
  341. WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  342. SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone), SR.GetString(SR.SessionClosedBeforeDone));
  343. this.session.OnLocalFault(null, fault, null);
  344. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
  345. }
  346. }
  347. void UnblockClose()
  348. {
  349. this.connection.Fault(this);
  350. }
  351. }
  352. sealed class ReliableInputSessionChannelOverDuplex : ReliableInputSessionChannel
  353. {
  354. TimeSpan acknowledgementInterval;
  355. bool acknowledgementScheduled = false;
  356. IOThreadTimer acknowledgementTimer;
  357. Guard guard = new Guard(Int32.MaxValue);
  358. int pendingAcknowledgements = 0;
  359. public ReliableInputSessionChannelOverDuplex(
  360. ReliableChannelListenerBase<IInputSessionChannel> listener,
  361. IServerReliableChannelBinder binder, FaultHelper faultHelper,
  362. UniqueId inputID)
  363. : base(listener, binder, faultHelper, inputID)
  364. {
  365. this.acknowledgementInterval = listener.AcknowledgementInterval;
  366. this.acknowledgementTimer = new IOThreadTimer(new Action<object>(this.OnAcknowledgementTimeoutElapsed), null, true);
  367. this.DeliveryStrategy.DequeueCallback = this.OnDeliveryStrategyItemDequeued;
  368. if (binder.HasSession)
  369. {
  370. try
  371. {
  372. this.StartReceiving(false);
  373. }
  374. #pragma warning suppress 56500 // covered by FxCOP
  375. catch (Exception e)
  376. {
  377. if (Fx.IsFatal(e))
  378. {
  379. throw;
  380. }
  381. this.ReliableSession.OnUnknownException(e);
  382. }
  383. }
  384. }
  385. protected override void AbortGuards()
  386. {
  387. this.guard.Abort();
  388. }
  389. protected override IAsyncResult BeginCloseGuards(TimeSpan timeout, AsyncCallback callback, object state)
  390. {
  391. return this.guard.BeginClose(timeout, callback, state);
  392. }
  393. protected override void CloseGuards(TimeSpan timeout)
  394. {
  395. this.guard.Close(timeout);
  396. }
  397. protected override void EndCloseGuards(IAsyncResult result)
  398. {
  399. this.guard.EndClose(result);
  400. }
  401. protected override bool HandleReceiveComplete(IAsyncResult result)
  402. {
  403. RequestContext context;
  404. if (this.Binder.EndTryReceive(result, out context))
  405. {
  406. if (context == null)
  407. {
  408. bool terminated = false;
  409. lock (this.ThisLock)
  410. {
  411. terminated = this.Connection.Terminate();
  412. }
  413. if (!terminated && (this.Binder.State == CommunicationState.Opened))
  414. {
  415. Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
  416. this.ReliableSession.OnLocalFault(e, (Message)null, null);
  417. }
  418. return false;
  419. }
  420. Message message = context.RequestMessage;
  421. context.Close();
  422. WsrmMessageInfo info = WsrmMessageInfo.Get(this.Listener.MessageVersion,
  423. this.Listener.ReliableMessagingVersion, this.Binder.Channel, this.Binder.GetInnerSession(),
  424. message);
  425. this.StartReceiving(false);
  426. this.ProcessMessage(info);
  427. return false;
  428. }
  429. return true;
  430. }
  431. void OnAcknowledgementTimeoutElapsed(object state)
  432. {
  433. lock (this.ThisLock)
  434. {
  435. this.acknowledgementScheduled = false;
  436. this.pendingAcknowledgements = 0;
  437. if (this.State == CommunicationState.Closing
  438. || this.State == CommunicationState.Closed
  439. || this.State == CommunicationState.Faulted)
  440. return;
  441. }
  442. if (this.guard.Enter())
  443. {
  444. try
  445. {
  446. using (Message message = CreateAcknowledgmentMessage())
  447. {
  448. this.Binder.Send(message, this.DefaultSendTimeout);
  449. }
  450. }
  451. finally
  452. {
  453. this.guard.Exit();
  454. }
  455. }
  456. }
  457. void OnDeliveryStrategyItemDequeued()
  458. {
  459. if (this.AdvertisedZero)
  460. this.OnAcknowledgementTimeoutElapsed(null);
  461. }
  462. protected override void OnClosing()
  463. {
  464. base.OnClosing();
  465. this.acknowledgementTimer.Cancel();
  466. }
  467. protected override void OnQuotaAvailable()
  468. {
  469. this.OnAcknowledgementTimeoutElapsed(null);
  470. }
  471. public void ProcessDemuxedMessage(WsrmMessageInfo info)
  472. {
  473. try
  474. {
  475. this.ProcessMessage(info);
  476. }
  477. #pragma warning suppress 56500 // covered by FxCOP
  478. catch (Exception e)
  479. {
  480. if (Fx.IsFatal(e))
  481. throw;
  482. this.ReliableSession.OnUnknownException(e);
  483. }
  484. }
  485. void ProcessMessage(WsrmMessageInfo info)
  486. {
  487. bool closeMessage = true;
  488. try
  489. {
  490. if (!this.ReliableSession.ProcessInfo(info, null))
  491. {
  492. closeMessage = false;
  493. return;
  494. }
  495. if (!this.ReliableSession.VerifySimplexProtocolElements(info, null))
  496. {
  497. closeMessage = false;
  498. return;
  499. }
  500. this.ReliableSession.OnRemoteActivity(false);
  501. if (info.CreateSequenceInfo != null)
  502. {
  503. EndpointAddress acksTo;
  504. if (WsrmUtilities.ValidateCreateSequence<IInputSessionChannel>(info, this.Listener, this.Binder.Channel, out acksTo))
  505. {
  506. Message response = WsrmUtilities.CreateCreateSequenceResponse(this.Listener.MessageVersion,
  507. this.Listener.ReliableMessagingVersion, false, info.CreateSequenceInfo,
  508. this.Listener.Ordered, this.ReliableSession.InputID, acksTo);
  509. using (response)
  510. {
  511. if (this.Binder.AddressResponse(info.Message, response))
  512. this.Binder.Send(response, this.DefaultSendTimeout);
  513. }
  514. }
  515. else
  516. {
  517. this.ReliableSession.OnLocalFault(info.FaultException, info.FaultReply, null);
  518. }
  519. return;
  520. }
  521. bool needDispatch = false;
  522. bool scheduleShutdown = false;
  523. bool tryAckNow = (info.AckRequestedInfo != null);
  524. bool terminate = false;
  525. Message message = null;
  526. WsrmFault fault = null;
  527. Exception remoteFaultException = null;
  528. bool wsrmFeb2005 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
  529. bool wsrm11 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
  530. if (info.SequencedMessageInfo != null)
  531. {
  532. lock (this.ThisLock)
  533. {
  534. if (this.Aborted || this.State == CommunicationState.Faulted)
  535. {
  536. return;
  537. }
  538. Int64 sequenceNumber = info.SequencedMessageInfo.SequenceNumber;
  539. bool isLast = wsrmFeb2005 && info.SequencedMessageInfo.LastMessage;
  540. if (!this.Connection.IsValid(sequenceNumber, isLast))
  541. {
  542. if (wsrmFeb2005)
  543. {
  544. fault = new LastMessageNumberExceededFault(this.ReliableSession.InputID);
  545. }
  546. else
  547. {
  548. message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
  549. this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
  550. tryAckNow = true;
  551. if (PerformanceCounters.PerformanceCountersEnabled)
  552. PerformanceCounters.MessageDropped(this.perfCounterId);
  553. }
  554. }
  555. else if (this.Connection.Ranges.Contains(sequenceNumber))
  556. {
  557. if (PerformanceCounters.PerformanceCountersEnabled)
  558. PerformanceCounters.MessageDropped(this.perfCounterId);
  559. tryAckNow = true;
  560. }
  561. else if (wsrmFeb2005 && info.Action == WsrmFeb2005Strings.LastMessageAction)
  562. {
  563. this.Connection.Merge(sequenceNumber, isLast);
  564. if (this.Connection.AllAdded)
  565. {
  566. scheduleShutdown = true;
  567. this.ReliableSession.CloseSession();
  568. }
  569. }
  570. else if (this.State == CommunicationState.Closing)
  571. {
  572. if (wsrmFeb2005)
  573. {
  574. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  575. SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone),
  576. SR.GetString(SR.SessionClosedBeforeDone));
  577. }
  578. else
  579. {
  580. message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
  581. this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
  582. tryAckNow = true;
  583. if (PerformanceCounters.PerformanceCountersEnabled)
  584. PerformanceCounters.MessageDropped(this.perfCounterId);
  585. }
  586. }
  587. // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
  588. // serialized ack size and the amount of memory taken by the ack ranges. In the
  589. // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
  590. // threat.
  591. else if (this.DeliveryStrategy.CanEnqueue(sequenceNumber)
  592. && (this.Listener.Ordered || this.Connection.CanMerge(sequenceNumber)))
  593. {
  594. this.Connection.Merge(sequenceNumber, isLast);
  595. needDispatch = this.DeliveryStrategy.Enqueue(info.Message, sequenceNumber);
  596. closeMessage = false;
  597. this.pendingAcknowledgements++;
  598. if (this.pendingAcknowledgements == this.Listener.MaxTransferWindowSize)
  599. tryAckNow = true;
  600. if (this.Connection.AllAdded)
  601. {
  602. scheduleShutdown = true;
  603. this.ReliableSession.CloseSession();
  604. }
  605. }
  606. else
  607. {
  608. if (PerformanceCounters.PerformanceCountersEnabled)
  609. PerformanceCounters.MessageDropped(this.perfCounterId);
  610. }
  611. if (this.Connection.IsLastKnown)
  612. tryAckNow = true;
  613. if (!tryAckNow && this.pendingAcknowledgements > 0 && !this.acknowledgementScheduled && fault == null)
  614. {
  615. this.acknowledgementScheduled = true;
  616. this.acknowledgementTimer.Set(this.acknowledgementInterval);
  617. }
  618. }
  619. }
  620. else if (wsrmFeb2005 && info.TerminateSequenceInfo != null)
  621. {
  622. bool isTerminateEarly;
  623. lock (this.ThisLock)
  624. {
  625. isTerminateEarly = !this.Connection.Terminate();
  626. }
  627. if (isTerminateEarly)
  628. {
  629. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  630. SR.GetString(SR.SequenceTerminatedEarlyTerminateSequence),
  631. SR.GetString(SR.EarlyTerminateSequence));
  632. }
  633. }
  634. else if (wsrm11 && ((info.TerminateSequenceInfo != null) || info.CloseSequenceInfo != null))
  635. {
  636. bool isTerminate = info.TerminateSequenceInfo != null;
  637. WsrmRequestInfo requestInfo = isTerminate
  638. ? (WsrmRequestInfo)info.TerminateSequenceInfo
  639. : (WsrmRequestInfo)info.CloseSequenceInfo;
  640. Int64 last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
  641. if (!WsrmUtilities.ValidateWsrmRequest(this.ReliableSession, requestInfo, this.Binder, null))
  642. {
  643. return;
  644. }
  645. bool isLastLargeEnough = true;
  646. bool isLastConsistent = true;
  647. lock (this.ThisLock)
  648. {
  649. if (!this.Connection.IsLastKnown)
  650. {
  651. if (isTerminate)
  652. {
  653. if (this.Connection.SetTerminateSequenceLast(last, out isLastLargeEnough))
  654. {
  655. scheduleShutdown = true;
  656. }
  657. else if (isLastLargeEnough)
  658. {
  659. remoteFaultException = new ProtocolException(SR.GetString(SR.EarlyTerminateSequence));
  660. }
  661. }
  662. else
  663. {
  664. scheduleShutdown = this.Connection.SetCloseSequenceLast(last);
  665. isLastLargeEnough = scheduleShutdown;
  666. }
  667. if (scheduleShutdown)
  668. {
  669. this.ReliableSession.SetFinalAck(this.Connection.Ranges);
  670. this.DeliveryStrategy.Dispose();
  671. }
  672. }
  673. else
  674. {
  675. isLastConsistent = (last == this.Connection.Last);
  676. // Have seen CloseSequence already, TerminateSequence means cleanup.
  677. if (isTerminate && isLastConsistent && this.Connection.IsSequenceClosed)
  678. {
  679. terminate = true;
  680. }
  681. }
  682. }
  683. if (!isLastLargeEnough)
  684. {
  685. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  686. SR.GetString(SR.SequenceTerminatedSmallLastMsgNumber),
  687. SR.GetString(SR.SmallLastMsgNumberExceptionString));
  688. }
  689. else if (!isLastConsistent)
  690. {
  691. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  692. SR.GetString(SR.SequenceTerminatedInconsistentLastMsgNumber),
  693. SR.GetString(SR.InconsistentLastMsgNumberExceptionString));
  694. }
  695. else
  696. {
  697. message = isTerminate
  698. ? WsrmUtilities.CreateTerminateResponseMessage(this.Listener.MessageVersion,
  699. requestInfo.MessageId, this.ReliableSession.InputID)
  700. : WsrmUtilities.CreateCloseSequenceResponse(this.Listener.MessageVersion,
  701. requestInfo.MessageId, this.ReliableSession.InputID);
  702. tryAckNow = true;
  703. }
  704. }
  705. if (fault != null)
  706. {
  707. this.ReliableSession.OnLocalFault(fault.CreateException(), fault, null);
  708. }
  709. else
  710. {
  711. if (tryAckNow)
  712. {
  713. lock (this.ThisLock)
  714. {
  715. if (this.acknowledgementScheduled)
  716. {
  717. this.acknowledgementTimer.Cancel();
  718. this.acknowledgementScheduled = false;
  719. }
  720. this.pendingAcknowledgements = 0;
  721. }
  722. if (message != null)
  723. {
  724. this.AddAcknowledgementHeader(message);
  725. }
  726. else
  727. {
  728. message = this.CreateAcknowledgmentMessage();
  729. }
  730. }
  731. if (message != null)
  732. {
  733. using (message)
  734. {
  735. if (this.guard.Enter())
  736. {
  737. try
  738. {
  739. this.Binder.Send(message, this.DefaultSendTimeout);
  740. }
  741. finally
  742. {
  743. this.guard.Exit();
  744. }
  745. }
  746. }
  747. }
  748. if (terminate)
  749. {
  750. lock (this.ThisLock)
  751. {
  752. this.Connection.Terminate();
  753. }
  754. }
  755. if (remoteFaultException != null)
  756. {
  757. this.ReliableSession.OnRemoteFault(remoteFaultException);
  758. return;
  759. }
  760. if (needDispatch)
  761. {
  762. this.Dispatch();
  763. }
  764. if (scheduleShutdown)
  765. {
  766. ActionItem.Schedule(this.ShutdownCallback, null);
  767. }
  768. }
  769. }
  770. finally
  771. {
  772. if (closeMessage)
  773. {
  774. info.Message.Close();
  775. }
  776. }
  777. }
  778. }
  779. sealed class ReliableInputSessionChannelOverReply : ReliableInputSessionChannel
  780. {
  781. public ReliableInputSessionChannelOverReply(
  782. ReliableChannelListenerBase<IInputSessionChannel> listener,
  783. IServerReliableChannelBinder binder, FaultHelper faultHelper,
  784. UniqueId inputID)
  785. : base(listener, binder, faultHelper, inputID)
  786. {
  787. if (binder.HasSession)
  788. {
  789. try
  790. {
  791. this.StartReceiving(false);
  792. }
  793. #pragma warning suppress 56500 // covered by FxCOP
  794. catch (Exception e)
  795. {
  796. if (Fx.IsFatal(e))
  797. {
  798. throw;
  799. }
  800. this.ReliableSession.OnUnknownException(e);
  801. }
  802. }
  803. }
  804. protected override bool HandleReceiveComplete(IAsyncResult result)
  805. {
  806. RequestContext context;
  807. bool timeoutOkay = this.Binder.EndTryReceive(result, out context);
  808. if (timeoutOkay)
  809. {
  810. if (context == null)
  811. {
  812. bool terminated = false;
  813. lock (this.ThisLock)
  814. {
  815. terminated = this.Connection.Terminate();
  816. }
  817. if (!terminated && (this.Binder.State == CommunicationState.Opened))
  818. {
  819. Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
  820. this.ReliableSession.OnLocalFault(e, (Message)null, null);
  821. }
  822. return false;
  823. }
  824. WsrmMessageInfo info = WsrmMessageInfo.Get(this.Listener.MessageVersion,
  825. this.Listener.ReliableMessagingVersion, this.Binder.Channel, this.Binder.GetInnerSession(),
  826. context.RequestMessage);
  827. this.StartReceiving(false);
  828. this.ProcessRequest(context, info);
  829. return false;
  830. }
  831. return true;
  832. }
  833. public void ProcessDemuxedRequest(RequestContext context, WsrmMessageInfo info)
  834. {
  835. try
  836. {
  837. this.ProcessRequest(context, info);
  838. }
  839. #pragma warning suppress 56500 // covered by FxCOP
  840. catch (Exception e)
  841. {
  842. if (Fx.IsFatal(e))
  843. throw;
  844. this.ReliableSession.OnUnknownException(e);
  845. }
  846. }
  847. void ProcessRequest(RequestContext context, WsrmMessageInfo info)
  848. {
  849. bool closeContext = true;
  850. bool closeMessage = true;
  851. try
  852. {
  853. if (!this.ReliableSession.ProcessInfo(info, context))
  854. {
  855. closeContext = false;
  856. closeMessage = false;
  857. return;
  858. }
  859. if (!this.ReliableSession.VerifySimplexProtocolElements(info, context))
  860. {
  861. closeContext = false;
  862. closeMessage = false;
  863. return;
  864. }
  865. this.ReliableSession.OnRemoteActivity(false);
  866. if (info.CreateSequenceInfo != null)
  867. {
  868. EndpointAddress acksTo;
  869. if (WsrmUtilities.ValidateCreateSequence<IInputSessionChannel>(info, this.Listener, this.Binder.Channel, out acksTo))
  870. {
  871. Message response = WsrmUtilities.CreateCreateSequenceResponse(this.Listener.MessageVersion,
  872. this.Listener.ReliableMessagingVersion, false, info.CreateSequenceInfo,
  873. this.Listener.Ordered, this.ReliableSession.InputID, acksTo);
  874. using (context)
  875. {
  876. using (response)
  877. {
  878. if (this.Binder.AddressResponse(info.Message, response))
  879. context.Reply(response, this.DefaultSendTimeout);
  880. }
  881. }
  882. }
  883. else
  884. {
  885. this.ReliableSession.OnLocalFault(info.FaultException, info.FaultReply, context);
  886. }
  887. closeContext = false;
  888. closeMessage = false;
  889. return;
  890. }
  891. bool needDispatch = false;
  892. bool scheduleShutdown = false;
  893. bool terminate = false;
  894. WsrmFault fault = null;
  895. Message message = null;
  896. Exception remoteFaultException = null;
  897. bool wsrmFeb2005 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
  898. bool wsrm11 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
  899. bool addAck = info.AckRequestedInfo != null;
  900. if (info.SequencedMessageInfo != null)
  901. {
  902. lock (this.ThisLock)
  903. {
  904. if (this.Aborted || (this.State == CommunicationState.Faulted))
  905. {
  906. return;
  907. }
  908. Int64 sequenceNumber = info.SequencedMessageInfo.SequenceNumber;
  909. bool isLast = wsrmFeb2005 && info.SequencedMessageInfo.LastMessage;
  910. if (!this.Connection.IsValid(sequenceNumber, isLast))
  911. {
  912. if (wsrmFeb2005)
  913. {
  914. fault = new LastMessageNumberExceededFault(this.ReliableSession.InputID);
  915. }
  916. else
  917. {
  918. message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
  919. this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
  920. if (PerformanceCounters.PerformanceCountersEnabled)
  921. PerformanceCounters.MessageDropped(this.perfCounterId);
  922. }
  923. }
  924. else if (this.Connection.Ranges.Contains(sequenceNumber))
  925. {
  926. if (PerformanceCounters.PerformanceCountersEnabled)
  927. PerformanceCounters.MessageDropped(this.perfCounterId);
  928. }
  929. else if (wsrmFeb2005 && info.Action == WsrmFeb2005Strings.LastMessageAction)
  930. {
  931. this.Connection.Merge(sequenceNumber, isLast);
  932. scheduleShutdown = this.Connection.AllAdded;
  933. }
  934. else if (this.State == CommunicationState.Closing)
  935. {
  936. if (wsrmFeb2005)
  937. {
  938. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  939. SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone),
  940. SR.GetString(SR.SessionClosedBeforeDone));
  941. }
  942. else
  943. {
  944. message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
  945. this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
  946. if (PerformanceCounters.PerformanceCountersEnabled)
  947. PerformanceCounters.MessageDropped(this.perfCounterId);
  948. }
  949. }
  950. // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
  951. // serialized ack size and the amount of memory taken by the ack ranges. In the
  952. // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
  953. // threat.
  954. else if (this.DeliveryStrategy.CanEnqueue(sequenceNumber)
  955. && (this.Listener.Ordered || this.Connection.CanMerge(sequenceNumber)))
  956. {
  957. this.Connection.Merge(sequenceNumber, isLast);
  958. needDispatch = this.DeliveryStrategy.Enqueue(info.Message, sequenceNumber);
  959. scheduleShutdown = this.Connection.AllAdded;
  960. closeMessage = false;
  961. }
  962. else
  963. {
  964. if (PerformanceCounters.PerformanceCountersEnabled)
  965. PerformanceCounters.MessageDropped(this.perfCounterId);
  966. }
  967. }
  968. }
  969. else if (wsrmFeb2005 && info.TerminateSequenceInfo != null)
  970. {
  971. bool isTerminateEarly;
  972. lock (this.ThisLock)
  973. {
  974. isTerminateEarly = !this.Connection.Terminate();
  975. }
  976. if (isTerminateEarly)
  977. {
  978. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  979. SR.GetString(SR.SequenceTerminatedEarlyTerminateSequence),
  980. SR.GetString(SR.EarlyTerminateSequence));
  981. }
  982. else
  983. {
  984. // In the normal case, TerminateSequence is a one-way operation, returning (the finally
  985. // block will close the context).
  986. return;
  987. }
  988. }
  989. else if (wsrm11 && ((info.TerminateSequenceInfo != null) || (info.CloseSequenceInfo != null)))
  990. {
  991. bool isTerminate = (info.TerminateSequenceInfo != null);
  992. WsrmRequestInfo requestInfo = isTerminate
  993. ? (WsrmRequestInfo)info.TerminateSequenceInfo
  994. : (WsrmRequestInfo)info.CloseSequenceInfo;
  995. Int64 last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
  996. if (!WsrmUtilities.ValidateWsrmRequest(this.ReliableSession, requestInfo, this.Binder, context))
  997. {
  998. closeMessage = false;
  999. closeContext = false;
  1000. return;
  1001. }
  1002. bool isLastLargeEnough = true;
  1003. bool isLastConsistent = true;
  1004. lock (this.ThisLock)
  1005. {
  1006. if (!this.Connection.IsLastKnown)
  1007. {
  1008. if (isTerminate)
  1009. {
  1010. if (this.Connection.SetTerminateSequenceLast(last, out isLastLargeEnough))
  1011. {
  1012. scheduleShutdown = true;
  1013. }
  1014. else if (isLastLargeEnough)
  1015. {
  1016. remoteFaultException = new ProtocolException(SR.GetString(SR.EarlyTerminateSequence));
  1017. }
  1018. }
  1019. else
  1020. {
  1021. scheduleShutdown = this.Connection.SetCloseSequenceLast(last);
  1022. isLastLargeEnough = scheduleShutdown;
  1023. }
  1024. if (scheduleShutdown)
  1025. {
  1026. this.ReliableSession.SetFinalAck(this.Connection.Ranges);
  1027. this.DeliveryStrategy.Dispose();
  1028. }
  1029. }
  1030. else
  1031. {
  1032. isLastConsistent = (last == this.Connection.Last);
  1033. // Have seen CloseSequence already, TerminateSequence means cleanup.
  1034. if (isTerminate && isLastConsistent && this.Connection.IsSequenceClosed)
  1035. {
  1036. terminate = true;
  1037. }
  1038. }
  1039. }
  1040. if (!isLastLargeEnough)
  1041. {
  1042. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  1043. SR.GetString(SR.SequenceTerminatedSmallLastMsgNumber),
  1044. SR.GetString(SR.SmallLastMsgNumberExceptionString));
  1045. }
  1046. else if (!isLastConsistent)
  1047. {
  1048. fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
  1049. SR.GetString(SR.SequenceTerminatedInconsistentLastMsgNumber),
  1050. SR.GetString(SR.InconsistentLastMsgNumberExceptionString));
  1051. }
  1052. else
  1053. {
  1054. message = isTerminate
  1055. ? WsrmUtilities.CreateTerminateResponseMessage(this.Listener.MessageVersion,
  1056. requestInfo.MessageId, this.ReliableSession.InputID)
  1057. : WsrmUtilities.CreateCloseSequenceResponse(this.Listener.MessageVersion,
  1058. requestInfo.MessageId, this.ReliableSession.InputID);
  1059. addAck = true;
  1060. }
  1061. }
  1062. if (fault != null)
  1063. {
  1064. this.ReliableSession.OnLocalFault(fault.CreateException(), fault, context);
  1065. closeMessage = false;
  1066. closeContext = false;
  1067. return;
  1068. }
  1069. if (message != null && addAck)
  1070. {
  1071. this.AddAcknowledgementHeader(message);
  1072. }
  1073. else if (message == null)
  1074. {
  1075. message = this.CreateAcknowledgmentMessage();
  1076. }
  1077. using (message)
  1078. {
  1079. context.Reply(message);
  1080. }
  1081. if (terminate)
  1082. {
  1083. lock (this.ThisLock)
  1084. {
  1085. this.Connection.Terminate();
  1086. }
  1087. }
  1088. if (remoteFaultException != null)
  1089. {
  1090. this.ReliableSession.OnRemoteFault(remoteFaultException);
  1091. return;
  1092. }
  1093. if (needDispatch)
  1094. {
  1095. this.Dispatch();
  1096. }
  1097. if (scheduleShutdown)
  1098. {
  1099. ActionItem.Schedule(this.ShutdownCallback, null);
  1100. }
  1101. }
  1102. finally
  1103. {
  1104. if (closeMessage)
  1105. info.Message.Close();
  1106. if (closeContext)
  1107. context.Close();
  1108. }
  1109. }
  1110. }
  1111. }