ReliableReplySessionChannel.cs 74 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //-----------------------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Collections.Generic;
  7. using System.Runtime;
  8. using System.ServiceModel;
  9. using System.ServiceModel.Diagnostics;
  10. using System.Threading;
  11. using System.Xml;
  12. // Note on locking:
  13. // The following rule must be followed in order to avoid deadlocks: ReliableRequestContext
  14. // locks MUST NOT be taken while under the ReliableReplySessionChannel lock.
  15. //
  16. // lock(context)-->lock(channel) ok.
  17. // lock(channel)-->lock(context) BAD!
  18. //
  19. sealed class ReliableReplySessionChannel : ReplyChannel, IReplySessionChannel
  20. {
  21. List<Int64> acked = new List<Int64>();
  22. static Action<object> asyncReceiveComplete = new Action<object>(AsyncReceiveCompleteStatic);
  23. IServerReliableChannelBinder binder;
  24. ReplyHelper closeSequenceReplyHelper;
  25. ReliableInputConnection connection;
  26. bool contextAborted;
  27. DeliveryStrategy<RequestContext> deliveryStrategy;
  28. ReliableRequestContext lastReply;
  29. bool lastReplyAcked;
  30. Int64 lastReplySequenceNumber = Int64.MinValue;
  31. ReliableChannelListenerBase<IReplySessionChannel> listener;
  32. InterruptibleWaitObject messagingCompleteWaitObject;
  33. Int64 nextReplySequenceNumber;
  34. static AsyncCallback onReceiveCompleted = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompletedStatic));
  35. string perfCounterId;
  36. Dictionary<Int64, ReliableRequestContext> requestsByRequestSequenceNumber = new Dictionary<Int64, ReliableRequestContext>();
  37. Dictionary<Int64, ReliableRequestContext> requestsByReplySequenceNumber = new Dictionary<Int64, ReliableRequestContext>();
  38. ServerReliableSession session;
  39. ReplyHelper terminateSequenceReplyHelper;
  40. public ReliableReplySessionChannel(
  41. ReliableChannelListenerBase<IReplySessionChannel> listener,
  42. IServerReliableChannelBinder binder,
  43. FaultHelper faultHelper,
  44. UniqueId inputID,
  45. UniqueId outputID)
  46. : base(listener, binder.LocalAddress)
  47. {
  48. this.listener = listener;
  49. this.connection = new ReliableInputConnection();
  50. this.connection.ReliableMessagingVersion = this.listener.ReliableMessagingVersion;
  51. this.binder = binder;
  52. this.session = new ServerReliableSession(this, listener, binder, faultHelper, inputID, outputID);
  53. this.session.UnblockChannelCloseCallback = this.UnblockClose;
  54. if (this.listener.Ordered)
  55. this.deliveryStrategy = new OrderedDeliveryStrategy<RequestContext>(this, this.listener.MaxTransferWindowSize, true);
  56. else
  57. this.deliveryStrategy = new UnorderedDeliveryStrategy<RequestContext>(this, this.listener.MaxTransferWindowSize);
  58. this.binder.Faulted += OnBinderFaulted;
  59. this.binder.OnException += OnBinderException;
  60. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  61. {
  62. this.messagingCompleteWaitObject = new InterruptibleWaitObject(false);
  63. }
  64. this.session.Open(TimeSpan.Zero);
  65. if (PerformanceCounters.PerformanceCountersEnabled)
  66. this.perfCounterId = this.listener.Uri.ToString().ToUpperInvariant();
  67. if (binder.HasSession)
  68. {
  69. try
  70. {
  71. this.StartReceiving(false);
  72. }
  73. #pragma warning suppress 56500 // covered by FxCOP
  74. catch (Exception e)
  75. {
  76. if (Fx.IsFatal(e))
  77. {
  78. throw;
  79. }
  80. this.session.OnUnknownException(e);
  81. }
  82. }
  83. }
  84. public IServerReliableChannelBinder Binder
  85. {
  86. get
  87. {
  88. return this.binder;
  89. }
  90. }
  91. bool IsMessagingCompleted
  92. {
  93. get
  94. {
  95. lock (this.ThisLock)
  96. {
  97. return this.connection.AllAdded && (this.requestsByRequestSequenceNumber.Count == 0) && this.lastReplyAcked;
  98. }
  99. }
  100. }
  101. MessageVersion MessageVersion
  102. {
  103. get
  104. {
  105. return this.listener.MessageVersion;
  106. }
  107. }
  108. int PendingRequestContexts
  109. {
  110. get
  111. {
  112. lock (this.ThisLock)
  113. {
  114. return (this.requestsByRequestSequenceNumber.Count - this.requestsByReplySequenceNumber.Count);
  115. }
  116. }
  117. }
  118. public IInputSession Session
  119. {
  120. get
  121. {
  122. return this.session;
  123. }
  124. }
  125. void AbortContexts()
  126. {
  127. lock (this.ThisLock)
  128. {
  129. if (this.contextAborted)
  130. return;
  131. this.contextAborted = true;
  132. }
  133. Dictionary<Int64, ReliableRequestContext>.ValueCollection contexts = this.requestsByRequestSequenceNumber.Values;
  134. foreach (ReliableRequestContext request in contexts)
  135. {
  136. request.Abort();
  137. }
  138. this.requestsByRequestSequenceNumber.Clear();
  139. this.requestsByReplySequenceNumber.Clear();
  140. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  141. {
  142. if (this.lastReply != null)
  143. this.lastReply.Abort();
  144. }
  145. }
  146. void AddAcknowledgementHeader(Message message)
  147. {
  148. WsrmUtilities.AddAcknowledgementHeader(
  149. this.listener.ReliableMessagingVersion,
  150. message,
  151. this.session.InputID,
  152. this.connection.Ranges,
  153. this.connection.IsLastKnown,
  154. this.listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount);
  155. }
  156. static void AsyncReceiveCompleteStatic(object state)
  157. {
  158. IAsyncResult result = (IAsyncResult)state;
  159. ReliableReplySessionChannel channel = (ReliableReplySessionChannel)(result.AsyncState);
  160. try
  161. {
  162. if (channel.HandleReceiveComplete(result))
  163. {
  164. channel.StartReceiving(true);
  165. }
  166. }
  167. #pragma warning suppress 56500 // covered by FxCOP
  168. catch (Exception e)
  169. {
  170. if (Fx.IsFatal(e))
  171. {
  172. throw;
  173. }
  174. channel.session.OnUnknownException(e);
  175. }
  176. }
  177. IAsyncResult BeginCloseBinder(TimeSpan timeout, AsyncCallback callback, object state)
  178. {
  179. return this.binder.BeginClose(timeout, MaskingMode.Handled, callback, state);
  180. }
  181. IAsyncResult BeginCloseOutput(TimeSpan timeout, AsyncCallback callback, object state)
  182. {
  183. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  184. {
  185. ReliableRequestContext reply = this.lastReply;
  186. if (reply == null)
  187. return new CloseOutputCompletedAsyncResult(callback, state);
  188. else
  189. return reply.BeginReplyInternal(null, timeout, callback, state);
  190. }
  191. else
  192. {
  193. lock (this.ThisLock)
  194. {
  195. this.ThrowIfClosed();
  196. this.CreateCloseSequenceReplyHelper();
  197. }
  198. return this.closeSequenceReplyHelper.BeginWaitAndReply(timeout, callback, state);
  199. }
  200. }
  201. IAsyncResult BeginUnregisterChannel(TimeSpan timeout, AsyncCallback callback, object state)
  202. {
  203. return this.listener.OnReliableChannelBeginClose(this.session.InputID,
  204. this.session.OutputID, timeout, callback, state);
  205. }
  206. Message CreateAcknowledgement(SequenceRangeCollection ranges)
  207. {
  208. Message message = WsrmUtilities.CreateAcknowledgmentMessage(
  209. this.MessageVersion,
  210. this.listener.ReliableMessagingVersion,
  211. this.session.InputID,
  212. ranges,
  213. this.connection.IsLastKnown,
  214. this.listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount);
  215. return message;
  216. }
  217. Message CreateSequenceClosedFault()
  218. {
  219. Message message = new SequenceClosedFault(this.session.InputID).CreateMessage(
  220. this.listener.MessageVersion, this.listener.ReliableMessagingVersion);
  221. this.AddAcknowledgementHeader(message);
  222. return message;
  223. }
  224. bool CreateCloseSequenceReplyHelper()
  225. {
  226. if (this.State == CommunicationState.Faulted || this.Aborted)
  227. {
  228. return false;
  229. }
  230. if (this.closeSequenceReplyHelper == null)
  231. {
  232. this.closeSequenceReplyHelper = new ReplyHelper(this, CloseSequenceReplyProvider.Instance,
  233. true);
  234. }
  235. return true;
  236. }
  237. bool CreateTerminateSequenceReplyHelper()
  238. {
  239. if (this.State == CommunicationState.Faulted || this.Aborted)
  240. {
  241. return false;
  242. }
  243. if (this.terminateSequenceReplyHelper == null)
  244. {
  245. this.terminateSequenceReplyHelper = new ReplyHelper(this,
  246. TerminateSequenceReplyProvider.Instance, false);
  247. }
  248. return true;
  249. }
  250. void CloseOutput(TimeSpan timeout)
  251. {
  252. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  253. {
  254. ReliableRequestContext reply = this.lastReply;
  255. if (reply != null)
  256. reply.ReplyInternal(null, timeout);
  257. }
  258. else
  259. {
  260. lock (this.ThisLock)
  261. {
  262. this.ThrowIfClosed();
  263. this.CreateCloseSequenceReplyHelper();
  264. }
  265. this.closeSequenceReplyHelper.WaitAndReply(timeout);
  266. }
  267. }
  268. bool ContainsRequest(Int64 requestSeqNum)
  269. {
  270. lock (this.ThisLock)
  271. {
  272. bool haveRequestInDictionary = this.requestsByRequestSequenceNumber.ContainsKey(requestSeqNum);
  273. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  274. {
  275. return (haveRequestInDictionary
  276. || ((this.lastReply != null) && (this.lastReply.RequestSequenceNumber == requestSeqNum) && (!this.lastReplyAcked)));
  277. }
  278. else
  279. {
  280. return haveRequestInDictionary;
  281. }
  282. }
  283. }
  284. void EndCloseBinder(IAsyncResult result)
  285. {
  286. this.binder.EndClose(result);
  287. }
  288. void EndCloseOutput(IAsyncResult result)
  289. {
  290. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  291. {
  292. if (result is CloseOutputCompletedAsyncResult)
  293. CloseOutputCompletedAsyncResult.End(result);
  294. else
  295. this.lastReply.EndReplyInternal(result);
  296. }
  297. else
  298. {
  299. this.closeSequenceReplyHelper.EndWaitAndReply(result);
  300. }
  301. }
  302. void EndUnregisterChannel(IAsyncResult result)
  303. {
  304. this.listener.OnReliableChannelEndClose(result);
  305. }
  306. public override T GetProperty<T>()
  307. {
  308. if (typeof(T) == typeof(IReplySessionChannel))
  309. {
  310. return (T)(object)this;
  311. }
  312. T baseProperty = base.GetProperty<T>();
  313. if (baseProperty != null)
  314. {
  315. return baseProperty;
  316. }
  317. T innerProperty = this.binder.Channel.GetProperty<T>();
  318. if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
  319. {
  320. return (T)(object)FaultConverter.GetDefaultFaultConverter(this.listener.MessageVersion);
  321. }
  322. else
  323. {
  324. return innerProperty;
  325. }
  326. }
  327. bool HandleReceiveComplete(IAsyncResult result)
  328. {
  329. RequestContext context;
  330. if (this.Binder.EndTryReceive(result, out context))
  331. {
  332. if (context == null)
  333. {
  334. bool terminated = false;
  335. lock (this.ThisLock)
  336. {
  337. terminated = this.connection.Terminate();
  338. }
  339. if (!terminated && (this.Binder.State == CommunicationState.Opened))
  340. {
  341. Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
  342. this.session.OnLocalFault(e, (Message)null, null);
  343. }
  344. return false;
  345. }
  346. WsrmMessageInfo info = WsrmMessageInfo.Get(this.listener.MessageVersion,
  347. this.listener.ReliableMessagingVersion, this.binder.Channel, this.binder.GetInnerSession(),
  348. context.RequestMessage);
  349. this.StartReceiving(false);
  350. this.ProcessRequest(context, info);
  351. return false;
  352. }
  353. return true;
  354. }
  355. protected override void OnAbort()
  356. {
  357. if (this.closeSequenceReplyHelper != null)
  358. {
  359. this.closeSequenceReplyHelper.Abort();
  360. }
  361. this.connection.Abort(this);
  362. if (this.terminateSequenceReplyHelper != null)
  363. {
  364. this.terminateSequenceReplyHelper.Abort();
  365. }
  366. this.session.Abort();
  367. this.AbortContexts();
  368. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  369. {
  370. this.messagingCompleteWaitObject.Abort(this);
  371. }
  372. this.listener.OnReliableChannelAbort(this.session.InputID, this.session.OutputID);
  373. base.OnAbort();
  374. }
  375. protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  376. {
  377. this.ThrowIfCloseInvalid();
  378. bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
  379. OperationWithTimeoutBeginCallback[] beginOperations =
  380. new OperationWithTimeoutBeginCallback[] {
  381. new OperationWithTimeoutBeginCallback (this.BeginCloseOutput),
  382. wsrmFeb2005
  383. ? new OperationWithTimeoutBeginCallback(this.connection.BeginClose)
  384. : new OperationWithTimeoutBeginCallback(this.BeginTerminateSequence),
  385. wsrmFeb2005
  386. ? new OperationWithTimeoutBeginCallback(this.messagingCompleteWaitObject.BeginWait)
  387. : new OperationWithTimeoutBeginCallback(this.connection.BeginClose),
  388. new OperationWithTimeoutBeginCallback(this.session.BeginClose),
  389. new OperationWithTimeoutBeginCallback(this.BeginCloseBinder),
  390. new OperationWithTimeoutBeginCallback(this.BeginUnregisterChannel),
  391. new OperationWithTimeoutBeginCallback(base.OnBeginClose)
  392. };
  393. OperationEndCallback[] endOperations =
  394. new OperationEndCallback[] {
  395. new OperationEndCallback(this.EndCloseOutput),
  396. wsrmFeb2005
  397. ? new OperationEndCallback(this.connection.EndClose)
  398. : new OperationEndCallback(this.EndTerminateSequence),
  399. wsrmFeb2005
  400. ? new OperationEndCallback(this.messagingCompleteWaitObject.EndWait)
  401. : new OperationEndCallback(this.connection.EndClose),
  402. new OperationEndCallback(this.session.EndClose),
  403. new OperationEndCallback(this.EndCloseBinder),
  404. new OperationEndCallback(this.EndUnregisterChannel),
  405. new OperationEndCallback(base.OnEndClose)
  406. };
  407. return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout,
  408. beginOperations, endOperations, callback, state);
  409. }
  410. void OnBinderException(IReliableChannelBinder sender, Exception exception)
  411. {
  412. if (exception is QuotaExceededException)
  413. this.session.OnLocalFault(exception, (Message)null, null);
  414. else
  415. this.EnqueueAndDispatch(exception, null, false);
  416. }
  417. void OnBinderFaulted(IReliableChannelBinder sender, Exception exception)
  418. {
  419. this.binder.Abort();
  420. exception = new CommunicationException(SR.GetString(SR.EarlySecurityFaulted), exception);
  421. this.session.OnLocalFault(exception, (Message)null, null);
  422. }
  423. protected override void OnClose(TimeSpan timeout)
  424. {
  425. this.ThrowIfCloseInvalid();
  426. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  427. this.CloseOutput(timeoutHelper.RemainingTime());
  428. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  429. {
  430. this.connection.Close(timeoutHelper.RemainingTime());
  431. this.messagingCompleteWaitObject.Wait(timeoutHelper.RemainingTime());
  432. }
  433. else
  434. {
  435. this.TerminateSequence(timeoutHelper.RemainingTime());
  436. this.connection.Close(timeoutHelper.RemainingTime());
  437. }
  438. this.session.Close(timeoutHelper.RemainingTime());
  439. this.binder.Close(timeoutHelper.RemainingTime(), MaskingMode.Handled);
  440. this.listener.OnReliableChannelClose(this.session.InputID, this.session.OutputID,
  441. timeoutHelper.RemainingTime());
  442. base.OnClose(timeoutHelper.RemainingTime());
  443. }
  444. protected override void OnClosed()
  445. {
  446. this.deliveryStrategy.Dispose();
  447. this.binder.Faulted -= this.OnBinderFaulted;
  448. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  449. {
  450. if (this.lastReply != null)
  451. {
  452. this.lastReply.Abort();
  453. }
  454. }
  455. base.OnClosed();
  456. }
  457. protected override void OnEndClose(IAsyncResult result)
  458. {
  459. OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
  460. }
  461. protected override void OnFaulted()
  462. {
  463. this.session.OnFaulted();
  464. this.UnblockClose();
  465. base.OnFaulted();
  466. if (PerformanceCounters.PerformanceCountersEnabled)
  467. PerformanceCounters.SessionFaulted(this.perfCounterId);
  468. }
  469. static void OnReceiveCompletedStatic(IAsyncResult result)
  470. {
  471. if (result.CompletedSynchronously)
  472. return;
  473. ReliableReplySessionChannel channel = (ReliableReplySessionChannel)(result.AsyncState);
  474. try
  475. {
  476. if (channel.HandleReceiveComplete(result))
  477. {
  478. channel.StartReceiving(true);
  479. }
  480. }
  481. #pragma warning suppress 56500 // covered by FxCOP
  482. catch (Exception e)
  483. {
  484. if (Fx.IsFatal(e))
  485. {
  486. throw;
  487. }
  488. channel.session.OnUnknownException(e);
  489. }
  490. }
  491. void OnTerminateSequenceCompleted()
  492. {
  493. if ((this.session.Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  494. && this.connection.IsSequenceClosed)
  495. {
  496. lock (this.ThisLock)
  497. {
  498. this.connection.Terminate();
  499. }
  500. }
  501. }
  502. bool PrepareReply(ReliableRequestContext context)
  503. {
  504. lock (this.ThisLock)
  505. {
  506. if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
  507. return false;
  508. long requestSequenceNumber = context.RequestSequenceNumber;
  509. bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
  510. if (wsrmFeb2005 && (this.connection.Last == requestSequenceNumber))
  511. {
  512. if (this.lastReply == null)
  513. this.lastReply = context;
  514. this.requestsByRequestSequenceNumber.Remove(requestSequenceNumber);
  515. bool canReply = this.connection.AllAdded && (this.State == CommunicationState.Closing);
  516. if (!canReply)
  517. return false;
  518. }
  519. else
  520. {
  521. if (this.State == CommunicationState.Closing)
  522. return false;
  523. if (!context.HasReply)
  524. {
  525. this.requestsByRequestSequenceNumber.Remove(requestSequenceNumber);
  526. return true;
  527. }
  528. }
  529. // won't throw if you do not need next sequence number
  530. if (this.nextReplySequenceNumber == Int64.MaxValue)
  531. {
  532. MessageNumberRolloverFault fault = new MessageNumberRolloverFault(this.session.OutputID);
  533. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
  534. }
  535. context.SetReplySequenceNumber(++this.nextReplySequenceNumber);
  536. if (wsrmFeb2005 && (this.connection.Last == requestSequenceNumber))
  537. {
  538. if (!context.HasReply)
  539. this.lastReplyAcked = true; //If Last Reply has no user data, it does not need to be acked. Here we just set it as its ack received.
  540. this.lastReplySequenceNumber = this.nextReplySequenceNumber;
  541. context.SetLastReply(this.lastReplySequenceNumber);
  542. }
  543. else if (context.HasReply)
  544. {
  545. this.requestsByReplySequenceNumber.Add(this.nextReplySequenceNumber, context);
  546. }
  547. return true;
  548. }
  549. }
  550. Message PrepareReplyMessage(Int64 replySequenceNumber, bool isLast, SequenceRangeCollection ranges, Message reply)
  551. {
  552. this.AddAcknowledgementHeader(reply);
  553. WsrmUtilities.AddSequenceHeader(
  554. this.listener.ReliableMessagingVersion,
  555. reply,
  556. this.session.OutputID,
  557. replySequenceNumber,
  558. isLast);
  559. return reply;
  560. }
  561. void ProcessAcknowledgment(WsrmAcknowledgmentInfo info)
  562. {
  563. lock (this.ThisLock)
  564. {
  565. if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
  566. return;
  567. if (this.requestsByReplySequenceNumber.Count > 0)
  568. {
  569. Int64 reply;
  570. this.acked.Clear();
  571. foreach (KeyValuePair<Int64, ReliableRequestContext> pair in this.requestsByReplySequenceNumber)
  572. {
  573. reply = pair.Key;
  574. if (info.Ranges.Contains(reply))
  575. {
  576. this.acked.Add(reply);
  577. }
  578. }
  579. for (int i = 0; i < this.acked.Count; i++)
  580. {
  581. reply = this.acked[i];
  582. this.requestsByRequestSequenceNumber.Remove(
  583. this.requestsByReplySequenceNumber[reply].RequestSequenceNumber);
  584. this.requestsByReplySequenceNumber.Remove(reply);
  585. }
  586. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  587. {
  588. if (!this.lastReplyAcked && (this.lastReplySequenceNumber != Int64.MinValue))
  589. {
  590. this.lastReplyAcked = info.Ranges.Contains(this.lastReplySequenceNumber);
  591. }
  592. }
  593. }
  594. }
  595. }
  596. void ProcessAckRequested(RequestContext context)
  597. {
  598. try
  599. {
  600. using (Message reply = CreateAcknowledgement(this.connection.Ranges))
  601. {
  602. context.Reply(reply);
  603. }
  604. }
  605. finally
  606. {
  607. context.RequestMessage.Close();
  608. context.Close();
  609. }
  610. }
  611. void ProcessShutdown11(RequestContext context, WsrmMessageInfo info)
  612. {
  613. bool cleanup = true;
  614. try
  615. {
  616. bool isTerminate = (info.TerminateSequenceInfo != null);
  617. WsrmRequestInfo requestInfo = isTerminate
  618. ? (WsrmRequestInfo)info.TerminateSequenceInfo
  619. : (WsrmRequestInfo)info.CloseSequenceInfo;
  620. Int64 last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
  621. if (!WsrmUtilities.ValidateWsrmRequest(this.session, requestInfo, this.binder, context))
  622. {
  623. cleanup = false;
  624. return;
  625. }
  626. bool scheduleShutdown = false;
  627. Exception remoteFaultException = null;
  628. ReplyHelper closeHelper = null;
  629. bool haveAllReplyAcks = true;
  630. bool isLastLargeEnough = true;
  631. bool isLastConsistent = true;
  632. lock (this.ThisLock)
  633. {
  634. if (!this.connection.IsLastKnown)
  635. {
  636. // All requests and replies must be acknowledged.
  637. if (this.requestsByRequestSequenceNumber.Count == 0)
  638. {
  639. if (isTerminate)
  640. {
  641. if (this.connection.SetTerminateSequenceLast(last, out isLastLargeEnough))
  642. {
  643. scheduleShutdown = true;
  644. }
  645. else if (isLastLargeEnough)
  646. {
  647. remoteFaultException = new ProtocolException(SR.GetString(SR.EarlyTerminateSequence));
  648. }
  649. }
  650. else
  651. {
  652. scheduleShutdown = this.connection.SetCloseSequenceLast(last);
  653. isLastLargeEnough = scheduleShutdown;
  654. }
  655. if (scheduleShutdown)
  656. {
  657. // (1) !isTerminate && !IsLastKnown, CloseSequence received before TerminateSequence.
  658. // - Need to ensure helper to delay the reply until Close.
  659. // (2) isTerminate && !IsLastKnown, TerminateSequence received before CloseSequence.
  660. // - Close not required, ensure it is created so we can bypass it.
  661. if (!this.CreateCloseSequenceReplyHelper())
  662. {
  663. return;
  664. }
  665. // Capture the helper in order to unblock it.
  666. if (isTerminate)
  667. {
  668. closeHelper = this.closeSequenceReplyHelper;
  669. }
  670. this.session.SetFinalAck(this.connection.Ranges);
  671. this.deliveryStrategy.Dispose();
  672. }
  673. }
  674. else
  675. {
  676. haveAllReplyAcks = false;
  677. }
  678. }
  679. else
  680. {
  681. isLastConsistent = (last == this.connection.Last);
  682. }
  683. }
  684. WsrmFault fault = null;
  685. if (!isLastLargeEnough)
  686. {
  687. string faultString = SR.GetString(SR.SequenceTerminatedSmallLastMsgNumber);
  688. string exceptionString = SR.GetString(SR.SmallLastMsgNumberExceptionString);
  689. fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID, faultString,
  690. exceptionString);
  691. }
  692. else if (!haveAllReplyAcks)
  693. {
  694. string faultString = SR.GetString(SR.SequenceTerminatedNotAllRepliesAcknowledged);
  695. string exceptionString = SR.GetString(SR.NotAllRepliesAcknowledgedExceptionString);
  696. fault = SequenceTerminatedFault.CreateProtocolFault(this.session.OutputID, faultString,
  697. exceptionString);
  698. }
  699. else if (!isLastConsistent)
  700. {
  701. string faultString = SR.GetString(SR.SequenceTerminatedInconsistentLastMsgNumber);
  702. string exceptionString = SR.GetString(SR.InconsistentLastMsgNumberExceptionString);
  703. fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  704. faultString, exceptionString);
  705. }
  706. else if (remoteFaultException != null)
  707. {
  708. Message message = WsrmUtilities.CreateTerminateMessage(this.MessageVersion,
  709. this.listener.ReliableMessagingVersion, this.session.OutputID);
  710. this.AddAcknowledgementHeader(message);
  711. using (message)
  712. {
  713. context.Reply(message);
  714. }
  715. this.session.OnRemoteFault(remoteFaultException);
  716. return;
  717. }
  718. if (fault != null)
  719. {
  720. this.session.OnLocalFault(fault.CreateException(), fault, context);
  721. cleanup = false;
  722. return;
  723. }
  724. if (isTerminate)
  725. {
  726. if (closeHelper != null)
  727. {
  728. closeHelper.UnblockWaiter();
  729. }
  730. lock (this.ThisLock)
  731. {
  732. if (!this.CreateTerminateSequenceReplyHelper())
  733. {
  734. return;
  735. }
  736. }
  737. }
  738. ReplyHelper replyHelper = isTerminate ? this.terminateSequenceReplyHelper : this.closeSequenceReplyHelper;
  739. if (!replyHelper.TransferRequestContext(context, info))
  740. {
  741. replyHelper.Reply(context, info, this.DefaultSendTimeout, MaskingMode.All);
  742. if (isTerminate)
  743. {
  744. this.OnTerminateSequenceCompleted();
  745. }
  746. }
  747. else
  748. {
  749. cleanup = false;
  750. }
  751. if (scheduleShutdown)
  752. {
  753. ActionItem.Schedule(this.ShutdownCallback, null);
  754. }
  755. }
  756. finally
  757. {
  758. if (cleanup)
  759. {
  760. context.RequestMessage.Close();
  761. context.Close();
  762. }
  763. }
  764. }
  765. public void ProcessDemuxedRequest(RequestContext context, WsrmMessageInfo info)
  766. {
  767. try
  768. {
  769. this.ProcessRequest(context, info);
  770. }
  771. #pragma warning suppress 56500 // covered by FxCOP
  772. catch (Exception e)
  773. {
  774. if (Fx.IsFatal(e))
  775. throw;
  776. this.session.OnUnknownException(e);
  777. }
  778. }
  779. void ProcessRequest(RequestContext context, WsrmMessageInfo info)
  780. {
  781. bool closeMessage = true;
  782. bool closeContext = true;
  783. try
  784. {
  785. if (!this.session.ProcessInfo(info, context))
  786. {
  787. closeMessage = false;
  788. closeContext = false;
  789. return;
  790. }
  791. if (!this.session.VerifyDuplexProtocolElements(info, context))
  792. {
  793. closeMessage = false;
  794. closeContext = false;
  795. return;
  796. }
  797. this.session.OnRemoteActivity(false);
  798. if (info.CreateSequenceInfo != null)
  799. {
  800. EndpointAddress acksTo;
  801. if (WsrmUtilities.ValidateCreateSequence<IReplySessionChannel>(info, this.listener, this.binder.Channel, out acksTo))
  802. {
  803. Message response = WsrmUtilities.CreateCreateSequenceResponse(this.listener.MessageVersion,
  804. this.listener.ReliableMessagingVersion, true, info.CreateSequenceInfo,
  805. this.listener.Ordered, this.session.InputID, acksTo);
  806. using (context)
  807. {
  808. using (response)
  809. {
  810. if (this.Binder.AddressResponse(info.Message, response))
  811. context.Reply(response, this.DefaultSendTimeout);
  812. }
  813. }
  814. }
  815. else
  816. {
  817. this.session.OnLocalFault(info.FaultException, info.FaultReply, context);
  818. }
  819. closeContext = false;
  820. return;
  821. }
  822. closeContext = false;
  823. if (info.AcknowledgementInfo != null)
  824. {
  825. ProcessAcknowledgment(info.AcknowledgementInfo);
  826. closeContext = (info.Action == WsrmIndex.GetSequenceAcknowledgementActionString(this.listener.ReliableMessagingVersion));
  827. }
  828. if (!closeContext)
  829. {
  830. closeMessage = false;
  831. if (info.SequencedMessageInfo != null)
  832. {
  833. ProcessSequencedMessage(context, info.Action, info.SequencedMessageInfo);
  834. }
  835. else if (info.TerminateSequenceInfo != null)
  836. {
  837. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  838. {
  839. ProcessTerminateSequenceFeb2005(context, info);
  840. }
  841. else if (info.TerminateSequenceInfo.Identifier == this.session.InputID)
  842. {
  843. ProcessShutdown11(context, info);
  844. }
  845. else // Identifier == OutputID
  846. {
  847. WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  848. SR.GetString(SR.SequenceTerminatedUnsupportedTerminateSequence),
  849. SR.GetString(SR.UnsupportedTerminateSequenceExceptionString));
  850. this.session.OnLocalFault(fault.CreateException(), fault, context);
  851. closeMessage = false;
  852. closeContext = false;
  853. return;
  854. }
  855. }
  856. else if (info.CloseSequenceInfo != null)
  857. {
  858. ProcessShutdown11(context, info);
  859. }
  860. else if (info.AckRequestedInfo != null)
  861. {
  862. ProcessAckRequested(context);
  863. }
  864. }
  865. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  866. {
  867. if (this.IsMessagingCompleted)
  868. {
  869. this.messagingCompleteWaitObject.Set();
  870. }
  871. }
  872. }
  873. finally
  874. {
  875. if (closeMessage)
  876. info.Message.Close();
  877. if (closeContext)
  878. context.Close();
  879. }
  880. }
  881. // A given reliable request can be in one of three states:
  882. // 1. Known and Processing: A ReliableRequestContext exists in requestTable but the outcome for
  883. // for the request is unknown. Any transport request referencing this reliable request
  884. // (by means of the sequence number) must be held until the outcome becomes known.
  885. // 2. Known and Processed: A ReliableRequestContext exists in the requestTable and the outcome for
  886. // for the request is known. The ReliableRequestContext holds that outcome. Any transport requests
  887. // referening this reliable request must send the response dictated by the outcome.
  888. // 3. Unknown: No ReliableRequestContext exists in the requestTable for the referenced reliable request.
  889. // In this case a new ReliableRequestContext is added to the requestTable to await some outcome.
  890. //
  891. // There are 4 possible outcomes for a reliable request:
  892. // a. It is captured and the user replies. Transport replies are then copies of the user's reply.
  893. // b. It is captured and the user closes the context. Transport replies are then acknowledgments
  894. // that include the sequence number of the reliable request.
  895. // c. It is captured and and the user aborts the context. Transport contexts are then aborted.
  896. // d. It is not captured. In this case an acknowledgment that includes all sequence numbers
  897. // previously captured is sent. Note two sub-cases here:
  898. // 1. It is not captured because it is dropped (e.g. it doesn't fit in the buffer). In this
  899. // case the reliable request's sequence number is not in the acknowledgment.
  900. // 2. It is not captured because it is a duplicate. In this case the reliable request's
  901. // sequence number is included in the acknowledgment.
  902. //
  903. // By following these rules it is possible to support one-way and two-operations without having
  904. // knowledge of them (the user drives using the request context we give them) and at the same time
  905. // it is possible to forget about past replies once acknowledgments for them are received.
  906. void ProcessSequencedMessage(RequestContext context, string action, WsrmSequencedMessageInfo info)
  907. {
  908. ReliableRequestContext reliableContext = null;
  909. WsrmFault fault = null;
  910. bool needDispatch = false;
  911. bool scheduleShutdown = false;
  912. bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
  913. bool wsrm11 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
  914. Int64 requestSequenceNumber = info.SequenceNumber;
  915. bool isLast = wsrmFeb2005 && info.LastMessage;
  916. bool isLastOnly = wsrmFeb2005 && (action == WsrmFeb2005Strings.LastMessageAction);
  917. bool isDupe;
  918. Message message = null;
  919. lock (this.ThisLock)
  920. {
  921. if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
  922. {
  923. context.RequestMessage.Close();
  924. context.Abort();
  925. return;
  926. }
  927. isDupe = this.connection.Ranges.Contains(requestSequenceNumber);
  928. if (!this.connection.IsValid(requestSequenceNumber, isLast))
  929. {
  930. if (wsrmFeb2005)
  931. {
  932. fault = new LastMessageNumberExceededFault(this.session.InputID);
  933. }
  934. else
  935. {
  936. message = this.CreateSequenceClosedFault();
  937. if (PerformanceCounters.PerformanceCountersEnabled)
  938. PerformanceCounters.MessageDropped(this.perfCounterId);
  939. }
  940. }
  941. else if (isDupe)
  942. {
  943. if (PerformanceCounters.PerformanceCountersEnabled)
  944. PerformanceCounters.MessageDropped(this.perfCounterId);
  945. if (!this.requestsByRequestSequenceNumber.TryGetValue(info.SequenceNumber, out reliableContext))
  946. {
  947. if ((this.lastReply != null) && (this.lastReply.RequestSequenceNumber == info.SequenceNumber))
  948. reliableContext = this.lastReply;
  949. else
  950. reliableContext = new ReliableRequestContext(context, info.SequenceNumber, this, true);
  951. }
  952. reliableContext.SetAckRanges(this.connection.Ranges);
  953. }
  954. else if ((this.State == CommunicationState.Closing) && !isLastOnly)
  955. {
  956. if (wsrmFeb2005)
  957. {
  958. fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  959. SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone),
  960. SR.GetString(SR.SessionClosedBeforeDone));
  961. }
  962. else
  963. {
  964. message = this.CreateSequenceClosedFault();
  965. if (PerformanceCounters.PerformanceCountersEnabled)
  966. PerformanceCounters.MessageDropped(this.perfCounterId);
  967. }
  968. }
  969. // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
  970. // serialized ack size and the amount of memory taken by the ack ranges. In the
  971. // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
  972. // threat.
  973. else if (this.deliveryStrategy.CanEnqueue(requestSequenceNumber)
  974. && (this.requestsByReplySequenceNumber.Count < this.listener.MaxTransferWindowSize)
  975. && (this.listener.Ordered || this.connection.CanMerge(requestSequenceNumber)))
  976. {
  977. this.connection.Merge(requestSequenceNumber, isLast);
  978. reliableContext = new ReliableRequestContext(context, info.SequenceNumber, this, false);
  979. reliableContext.SetAckRanges(this.connection.Ranges);
  980. if (!isLastOnly)
  981. {
  982. needDispatch = this.deliveryStrategy.Enqueue(reliableContext, requestSequenceNumber);
  983. this.requestsByRequestSequenceNumber.Add(info.SequenceNumber, reliableContext);
  984. }
  985. else
  986. {
  987. this.lastReply = reliableContext;
  988. }
  989. scheduleShutdown = this.connection.AllAdded;
  990. }
  991. else
  992. {
  993. if (PerformanceCounters.PerformanceCountersEnabled)
  994. PerformanceCounters.MessageDropped(this.perfCounterId);
  995. }
  996. }
  997. if (fault != null)
  998. {
  999. this.session.OnLocalFault(fault.CreateException(), fault, context);
  1000. return;
  1001. }
  1002. if (reliableContext == null)
  1003. {
  1004. if (message != null)
  1005. {
  1006. using (message)
  1007. {
  1008. context.Reply(message);
  1009. }
  1010. }
  1011. context.RequestMessage.Close();
  1012. context.Close();
  1013. return;
  1014. }
  1015. if (isDupe && reliableContext.CheckForReplyOrAddInnerContext(context))
  1016. {
  1017. reliableContext.SendReply(context, MaskingMode.All);
  1018. return;
  1019. }
  1020. if (!isDupe && isLastOnly)
  1021. {
  1022. reliableContext.Close();
  1023. }
  1024. if (needDispatch)
  1025. {
  1026. this.Dispatch();
  1027. }
  1028. if (scheduleShutdown)
  1029. {
  1030. ActionItem.Schedule(this.ShutdownCallback, null);
  1031. }
  1032. }
  1033. void ProcessTerminateSequenceFeb2005(RequestContext context, WsrmMessageInfo info)
  1034. {
  1035. bool cleanup = true;
  1036. try
  1037. {
  1038. Message message = null;
  1039. bool isTerminateEarly;
  1040. bool haveAllReplyAcks;
  1041. lock (this.ThisLock)
  1042. {
  1043. isTerminateEarly = !this.connection.Terminate();
  1044. haveAllReplyAcks = this.requestsByRequestSequenceNumber.Count == 0;
  1045. }
  1046. WsrmFault fault = null;
  1047. if (isTerminateEarly)
  1048. {
  1049. fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  1050. SR.GetString(SR.SequenceTerminatedEarlyTerminateSequence),
  1051. SR.GetString(SR.EarlyTerminateSequence));
  1052. }
  1053. else if (!haveAllReplyAcks)
  1054. {
  1055. fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  1056. SR.GetString(SR.SequenceTerminatedBeforeReplySequenceAcked),
  1057. SR.GetString(SR.EarlyRequestTerminateSequence));
  1058. }
  1059. if (fault != null)
  1060. {
  1061. this.session.OnLocalFault(fault.CreateException(), fault, context);
  1062. cleanup = false;
  1063. return;
  1064. }
  1065. message = WsrmUtilities.CreateTerminateMessage(this.MessageVersion,
  1066. this.listener.ReliableMessagingVersion, this.session.OutputID);
  1067. this.AddAcknowledgementHeader(message);
  1068. using (message)
  1069. {
  1070. context.Reply(message);
  1071. }
  1072. }
  1073. finally
  1074. {
  1075. if (cleanup)
  1076. {
  1077. context.RequestMessage.Close();
  1078. context.Close();
  1079. }
  1080. }
  1081. }
  1082. void StartReceiving(bool canBlock)
  1083. {
  1084. while (true)
  1085. {
  1086. IAsyncResult result = this.binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this);
  1087. if (!result.CompletedSynchronously)
  1088. {
  1089. return;
  1090. }
  1091. if (!canBlock)
  1092. {
  1093. ActionItem.Schedule(asyncReceiveComplete, result);
  1094. return;
  1095. }
  1096. if (!this.HandleReceiveComplete(result))
  1097. break;
  1098. }
  1099. }
  1100. void ShutdownCallback(object state)
  1101. {
  1102. this.Shutdown();
  1103. }
  1104. void TerminateSequence(TimeSpan timeout)
  1105. {
  1106. lock (this.ThisLock)
  1107. {
  1108. this.ThrowIfClosed();
  1109. this.CreateTerminateSequenceReplyHelper();
  1110. }
  1111. this.terminateSequenceReplyHelper.WaitAndReply(timeout);
  1112. this.OnTerminateSequenceCompleted();
  1113. }
  1114. IAsyncResult BeginTerminateSequence(TimeSpan timeout, AsyncCallback callback, object state)
  1115. {
  1116. lock (this.ThisLock)
  1117. {
  1118. this.ThrowIfClosed();
  1119. this.CreateTerminateSequenceReplyHelper();
  1120. }
  1121. return this.terminateSequenceReplyHelper.BeginWaitAndReply(timeout, callback, state);
  1122. }
  1123. void EndTerminateSequence(IAsyncResult result)
  1124. {
  1125. this.terminateSequenceReplyHelper.EndWaitAndReply(result);
  1126. this.OnTerminateSequenceCompleted();
  1127. }
  1128. void ThrowIfCloseInvalid()
  1129. {
  1130. bool shouldFault = false;
  1131. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  1132. {
  1133. if (this.PendingRequestContexts != 0 || this.connection.Ranges.Count > 1)
  1134. {
  1135. shouldFault = true;
  1136. }
  1137. }
  1138. else if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
  1139. {
  1140. if (this.PendingRequestContexts != 0)
  1141. {
  1142. shouldFault = true;
  1143. }
  1144. }
  1145. if (shouldFault)
  1146. {
  1147. WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
  1148. SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone), SR.GetString(SR.SessionClosedBeforeDone));
  1149. this.session.OnLocalFault(null, fault, null);
  1150. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
  1151. }
  1152. }
  1153. void UnblockClose()
  1154. {
  1155. this.AbortContexts();
  1156. if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
  1157. {
  1158. this.messagingCompleteWaitObject.Fault(this);
  1159. }
  1160. else
  1161. {
  1162. if (this.closeSequenceReplyHelper != null)
  1163. {
  1164. this.closeSequenceReplyHelper.Fault();
  1165. }
  1166. if (this.terminateSequenceReplyHelper != null)
  1167. {
  1168. this.terminateSequenceReplyHelper.Fault();
  1169. }
  1170. }
  1171. this.connection.Fault(this);
  1172. }
  1173. class CloseOutputCompletedAsyncResult : CompletedAsyncResult
  1174. {
  1175. public CloseOutputCompletedAsyncResult(AsyncCallback callback, object state)
  1176. : base(callback, state)
  1177. {
  1178. }
  1179. }
  1180. class ReliableRequestContext : RequestContextBase
  1181. {
  1182. MessageBuffer bufferedReply;
  1183. ReliableReplySessionChannel channel;
  1184. List<RequestContext> innerContexts = new List<RequestContext>();
  1185. bool isLastReply;
  1186. bool outcomeKnown;
  1187. SequenceRangeCollection ranges;
  1188. Int64 requestSequenceNumber;
  1189. Int64 replySequenceNumber;
  1190. public ReliableRequestContext(RequestContext context, Int64 requestSequenceNumber, ReliableReplySessionChannel channel, bool outcome)
  1191. : base(context.RequestMessage, channel.DefaultCloseTimeout, channel.DefaultSendTimeout)
  1192. {
  1193. this.channel = channel;
  1194. this.requestSequenceNumber = requestSequenceNumber;
  1195. this.outcomeKnown = outcome;
  1196. if (!outcome)
  1197. this.innerContexts.Add(context);
  1198. }
  1199. public bool CheckForReplyOrAddInnerContext(RequestContext innerContext)
  1200. {
  1201. lock (this.ThisLock)
  1202. {
  1203. if (this.outcomeKnown)
  1204. return true;
  1205. this.innerContexts.Add(innerContext);
  1206. return false;
  1207. }
  1208. }
  1209. public bool HasReply
  1210. {
  1211. get
  1212. {
  1213. return (this.bufferedReply != null);
  1214. }
  1215. }
  1216. public Int64 RequestSequenceNumber
  1217. {
  1218. get
  1219. {
  1220. return this.requestSequenceNumber;
  1221. }
  1222. }
  1223. void AbortInnerContexts()
  1224. {
  1225. for (int i = 0; i < this.innerContexts.Count; i++)
  1226. {
  1227. this.innerContexts[i].Abort();
  1228. this.innerContexts[i].RequestMessage.Close();
  1229. }
  1230. this.innerContexts.Clear();
  1231. }
  1232. internal IAsyncResult BeginReplyInternal(Message reply, TimeSpan timeout, AsyncCallback callback, object state)
  1233. {
  1234. bool needAbort = true;
  1235. bool needReply = true;
  1236. try
  1237. {
  1238. lock (this.ThisLock)
  1239. {
  1240. if (this.ranges == null)
  1241. {
  1242. throw Fx.AssertAndThrow("this.ranges != null");
  1243. }
  1244. if (this.Aborted)
  1245. {
  1246. needAbort = false;
  1247. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.RequestContextAborted)));
  1248. }
  1249. if (this.outcomeKnown)
  1250. {
  1251. needAbort = false;
  1252. needReply = false;
  1253. }
  1254. else
  1255. {
  1256. if ((reply != null) && (this.bufferedReply == null))
  1257. this.bufferedReply = reply.CreateBufferedCopy(int.MaxValue);
  1258. if (!this.channel.PrepareReply(this))
  1259. {
  1260. needAbort = false;
  1261. needReply = false;
  1262. }
  1263. else
  1264. {
  1265. this.outcomeKnown = true;
  1266. }
  1267. }
  1268. }
  1269. if (!needReply)
  1270. return new ReplyCompletedAsyncResult(callback, state);
  1271. IAsyncResult result = new ReplyAsyncResult(this, timeout, callback, state);
  1272. needAbort = false;
  1273. return result;
  1274. }
  1275. finally
  1276. {
  1277. if (needAbort)
  1278. {
  1279. this.AbortInnerContexts();
  1280. this.Abort();
  1281. }
  1282. }
  1283. }
  1284. internal void EndReplyInternal(IAsyncResult result)
  1285. {
  1286. if (result is ReplyCompletedAsyncResult)
  1287. {
  1288. ReplyCompletedAsyncResult.End(result);
  1289. return;
  1290. }
  1291. bool throwing = true;
  1292. try
  1293. {
  1294. ReplyAsyncResult.End(result);
  1295. this.innerContexts.Clear();
  1296. throwing = false;
  1297. }
  1298. finally
  1299. {
  1300. if (throwing)
  1301. {
  1302. this.AbortInnerContexts();
  1303. this.Abort();
  1304. }
  1305. }
  1306. }
  1307. protected override void OnAbort()
  1308. {
  1309. bool outcome;
  1310. lock (this.ThisLock)
  1311. {
  1312. outcome = this.outcomeKnown;
  1313. this.outcomeKnown = true;
  1314. }
  1315. if (!outcome)
  1316. {
  1317. this.AbortInnerContexts();
  1318. }
  1319. if (this.channel.ContainsRequest(this.requestSequenceNumber))
  1320. {
  1321. Exception e = new ProtocolException(SR.GetString(SR.ReliableRequestContextAborted));
  1322. this.channel.session.OnLocalFault(e, (Message)null, null);
  1323. }
  1324. }
  1325. protected override IAsyncResult OnBeginReply(Message reply, TimeSpan timeout, AsyncCallback callback, object state)
  1326. {
  1327. return this.BeginReplyInternal(reply, timeout, callback, state);
  1328. }
  1329. protected override void OnClose(TimeSpan timeout)
  1330. {
  1331. // ReliableRequestContext.Close() relies on base.Close() to call reply if reply is not initiated.
  1332. if (!this.ReplyInitiated)
  1333. this.OnReply(null, timeout);
  1334. }
  1335. protected override void OnEndReply(IAsyncResult result)
  1336. {
  1337. this.EndReplyInternal(result);
  1338. }
  1339. protected override void OnReply(Message reply, TimeSpan timeout)
  1340. {
  1341. this.ReplyInternal(reply, timeout);
  1342. }
  1343. internal void ReplyInternal(Message reply, TimeSpan timeout)
  1344. {
  1345. bool needAbort = true;
  1346. try
  1347. {
  1348. lock (this.ThisLock)
  1349. {
  1350. if (this.ranges == null)
  1351. {
  1352. throw Fx.AssertAndThrow("this.ranges != null");
  1353. }
  1354. if (this.Aborted)
  1355. {
  1356. needAbort = false;
  1357. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.RequestContextAborted)));
  1358. }
  1359. if (this.outcomeKnown)
  1360. {
  1361. needAbort = false;
  1362. return;
  1363. }
  1364. if ((reply != null) && (this.bufferedReply == null))
  1365. this.bufferedReply = reply.CreateBufferedCopy(int.MaxValue);
  1366. if (!this.channel.PrepareReply(this))
  1367. {
  1368. needAbort = false;
  1369. return;
  1370. }
  1371. this.outcomeKnown = true;
  1372. }
  1373. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  1374. for (int i = 0; i < this.innerContexts.Count; i++)
  1375. SendReply(this.innerContexts[i], MaskingMode.Handled, ref timeoutHelper);
  1376. this.innerContexts.Clear();
  1377. needAbort = false;
  1378. }
  1379. finally
  1380. {
  1381. if (needAbort)
  1382. {
  1383. this.AbortInnerContexts();
  1384. this.Abort();
  1385. }
  1386. }
  1387. }
  1388. public void SetAckRanges(SequenceRangeCollection ranges)
  1389. {
  1390. if (this.ranges == null)
  1391. this.ranges = ranges;
  1392. }
  1393. public void SetLastReply(Int64 sequenceNumber)
  1394. {
  1395. this.replySequenceNumber = sequenceNumber;
  1396. this.isLastReply = true;
  1397. if (this.bufferedReply == null)
  1398. this.bufferedReply = Message.CreateMessage(this.channel.MessageVersion, WsrmFeb2005Strings.LastMessageAction).CreateBufferedCopy(int.MaxValue);
  1399. }
  1400. public void SendReply(RequestContext context, MaskingMode maskingMode)
  1401. {
  1402. TimeoutHelper timeoutHelper = new TimeoutHelper(this.DefaultSendTimeout);
  1403. SendReply(context, maskingMode, ref timeoutHelper);
  1404. }
  1405. void SendReply(RequestContext context, MaskingMode maskingMode, ref TimeoutHelper timeoutHelper)
  1406. {
  1407. Message reply;
  1408. if (!this.outcomeKnown)
  1409. {
  1410. throw Fx.AssertAndThrow("this.outcomeKnown");
  1411. }
  1412. if (this.bufferedReply != null)
  1413. {
  1414. reply = this.bufferedReply.CreateMessage();
  1415. this.channel.PrepareReplyMessage(this.replySequenceNumber, this.isLastReply, this.ranges, reply);
  1416. }
  1417. else
  1418. {
  1419. reply = this.channel.CreateAcknowledgement(this.ranges);
  1420. }
  1421. this.channel.binder.SetMaskingMode(context, maskingMode);
  1422. using (reply)
  1423. {
  1424. context.Reply(reply, timeoutHelper.RemainingTime());
  1425. }
  1426. context.Close(timeoutHelper.RemainingTime());
  1427. }
  1428. public void SetReplySequenceNumber(Int64 sequenceNumber)
  1429. {
  1430. this.replySequenceNumber = sequenceNumber;
  1431. }
  1432. class ReplyCompletedAsyncResult : CompletedAsyncResult
  1433. {
  1434. public ReplyCompletedAsyncResult(AsyncCallback callback, object state)
  1435. : base(callback, state)
  1436. {
  1437. }
  1438. }
  1439. class ReplyAsyncResult : AsyncResult
  1440. {
  1441. ReliableRequestContext context;
  1442. int currentContext;
  1443. Message reply;
  1444. TimeoutHelper timeoutHelper;
  1445. static AsyncCallback replyCompleteStatic = Fx.ThunkCallback(new AsyncCallback(ReplyCompleteStatic));
  1446. public ReplyAsyncResult(ReliableRequestContext thisContext, TimeSpan timeout, AsyncCallback callback, object state)
  1447. : base(callback, state)
  1448. {
  1449. this.timeoutHelper = new TimeoutHelper(timeout);
  1450. this.context = thisContext;
  1451. if (this.SendReplies())
  1452. {
  1453. this.Complete(true);
  1454. }
  1455. }
  1456. public static void End(IAsyncResult result)
  1457. {
  1458. AsyncResult.End<ReplyAsyncResult>(result);
  1459. }
  1460. void HandleReplyComplete(IAsyncResult result)
  1461. {
  1462. RequestContext thisInnerContext = this.context.innerContexts[this.currentContext];
  1463. try
  1464. {
  1465. thisInnerContext.EndReply(result);
  1466. thisInnerContext.Close(this.timeoutHelper.RemainingTime());
  1467. this.currentContext++;
  1468. }
  1469. finally
  1470. {
  1471. this.reply.Close();
  1472. this.reply = null;
  1473. }
  1474. }
  1475. static void ReplyCompleteStatic(IAsyncResult result)
  1476. {
  1477. if (result.CompletedSynchronously)
  1478. return;
  1479. Exception ex = null;
  1480. ReplyAsyncResult thisPtr = null;
  1481. bool complete = false;
  1482. try
  1483. {
  1484. thisPtr = (ReplyAsyncResult)result.AsyncState;
  1485. thisPtr.HandleReplyComplete(result);
  1486. complete = thisPtr.SendReplies();
  1487. }
  1488. catch (Exception e)
  1489. {
  1490. if (Fx.IsFatal(e))
  1491. throw;
  1492. ex = e;
  1493. complete = true;
  1494. }
  1495. if (complete)
  1496. thisPtr.Complete(false, ex);
  1497. }
  1498. bool SendReplies()
  1499. {
  1500. while (this.currentContext < this.context.innerContexts.Count)
  1501. {
  1502. if (this.context.bufferedReply != null)
  1503. {
  1504. this.reply = this.context.bufferedReply.CreateMessage();
  1505. this.context.channel.PrepareReplyMessage(
  1506. this.context.replySequenceNumber, this.context.isLastReply,
  1507. this.context.ranges, this.reply);
  1508. }
  1509. else
  1510. {
  1511. this.reply = this.context.channel.CreateAcknowledgement(this.context.ranges);
  1512. }
  1513. RequestContext thisInnerContext = this.context.innerContexts[this.currentContext];
  1514. this.context.channel.binder.SetMaskingMode(thisInnerContext, MaskingMode.Handled);
  1515. IAsyncResult result = thisInnerContext.BeginReply(this.reply, this.timeoutHelper.RemainingTime(), replyCompleteStatic, this);
  1516. if (!result.CompletedSynchronously)
  1517. return false;
  1518. this.HandleReplyComplete(result);
  1519. }
  1520. return true;
  1521. }
  1522. }
  1523. }
  1524. class ReplyHelper
  1525. {
  1526. Message asyncMessage;
  1527. bool canTransfer = true;
  1528. ReliableReplySessionChannel channel;
  1529. WsrmMessageInfo info;
  1530. ReplyProvider replyProvider;
  1531. RequestContext requestContext;
  1532. bool throwTimeoutOnWait;
  1533. InterruptibleWaitObject waitHandle;
  1534. internal ReplyHelper(ReliableReplySessionChannel channel, ReplyProvider replyProvider,
  1535. bool throwTimeoutOnWait)
  1536. {
  1537. this.channel = channel;
  1538. this.replyProvider = replyProvider;
  1539. this.throwTimeoutOnWait = throwTimeoutOnWait;
  1540. this.waitHandle = new InterruptibleWaitObject(false, this.throwTimeoutOnWait);
  1541. }
  1542. object ThisLock
  1543. {
  1544. get { return this.channel.ThisLock; }
  1545. }
  1546. internal void Abort()
  1547. {
  1548. this.Cleanup(true);
  1549. }
  1550. void Cleanup(bool abort)
  1551. {
  1552. lock (this.ThisLock)
  1553. {
  1554. this.canTransfer = false;
  1555. }
  1556. if (abort)
  1557. {
  1558. this.waitHandle.Abort(this.channel);
  1559. }
  1560. else
  1561. {
  1562. this.waitHandle.Fault(this.channel);
  1563. }
  1564. }
  1565. internal void Fault()
  1566. {
  1567. this.Cleanup(false);
  1568. }
  1569. internal void Reply(RequestContext context, WsrmMessageInfo info, TimeSpan timeout, MaskingMode maskingMode)
  1570. {
  1571. using (Message message = this.replyProvider.Provide(this.channel, info))
  1572. {
  1573. this.channel.binder.SetMaskingMode(context, maskingMode);
  1574. context.Reply(message, timeout);
  1575. }
  1576. }
  1577. IAsyncResult BeginReply(TimeSpan timeout, AsyncCallback callback, object state)
  1578. {
  1579. lock (this.ThisLock)
  1580. {
  1581. this.canTransfer = false;
  1582. }
  1583. if (this.requestContext == null)
  1584. {
  1585. return new ReplyCompletedAsyncResult(callback, state);
  1586. }
  1587. this.asyncMessage = this.replyProvider.Provide(this.channel, info);
  1588. bool throwing = true;
  1589. try
  1590. {
  1591. this.channel.binder.SetMaskingMode(this.requestContext, MaskingMode.Handled);
  1592. IAsyncResult result = this.requestContext.BeginReply(this.asyncMessage, timeout,
  1593. callback, state);
  1594. throwing = false;
  1595. return result;
  1596. }
  1597. finally
  1598. {
  1599. if (throwing)
  1600. {
  1601. this.asyncMessage.Close();
  1602. this.asyncMessage = null;
  1603. }
  1604. }
  1605. }
  1606. void EndReply(IAsyncResult result)
  1607. {
  1608. ReplyCompletedAsyncResult completedResult = result as ReplyCompletedAsyncResult;
  1609. if (completedResult != null)
  1610. {
  1611. completedResult.End();
  1612. return;
  1613. }
  1614. try
  1615. {
  1616. this.requestContext.EndReply(result);
  1617. }
  1618. finally
  1619. {
  1620. if (this.asyncMessage != null)
  1621. {
  1622. this.asyncMessage.Close();
  1623. }
  1624. }
  1625. }
  1626. internal bool TransferRequestContext(RequestContext requestContext, WsrmMessageInfo info)
  1627. {
  1628. RequestContext oldContext = null;
  1629. WsrmMessageInfo oldInfo = null;
  1630. lock (this.ThisLock)
  1631. {
  1632. if (!this.canTransfer)
  1633. {
  1634. return false;
  1635. }
  1636. else
  1637. {
  1638. oldContext = this.requestContext;
  1639. oldInfo = this.info;
  1640. this.requestContext = requestContext;
  1641. this.info = info;
  1642. }
  1643. }
  1644. this.waitHandle.Set();
  1645. if (oldContext != null)
  1646. {
  1647. oldInfo.Message.Close();
  1648. oldContext.Close();
  1649. }
  1650. return true;
  1651. }
  1652. internal void UnblockWaiter()
  1653. {
  1654. this.TransferRequestContext(null, null);
  1655. }
  1656. internal void WaitAndReply(TimeSpan timeout)
  1657. {
  1658. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  1659. this.waitHandle.Wait(timeoutHelper.RemainingTime());
  1660. lock (this.ThisLock)
  1661. {
  1662. this.canTransfer = false;
  1663. if (this.requestContext == null)
  1664. {
  1665. return;
  1666. }
  1667. }
  1668. this.Reply(this.requestContext, this.info, timeoutHelper.RemainingTime(),
  1669. MaskingMode.Handled);
  1670. }
  1671. internal IAsyncResult BeginWaitAndReply(TimeSpan timeout, AsyncCallback callback, object state)
  1672. {
  1673. OperationWithTimeoutBeginCallback[] beginOperations = new OperationWithTimeoutBeginCallback[] {
  1674. new OperationWithTimeoutBeginCallback (this.waitHandle.BeginWait),
  1675. new OperationWithTimeoutBeginCallback (this.BeginReply),
  1676. };
  1677. OperationEndCallback[] endOperations = new OperationEndCallback[] {
  1678. new OperationEndCallback (this.waitHandle.EndWait),
  1679. new OperationEndCallback(this.EndReply),
  1680. };
  1681. return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, beginOperations,
  1682. endOperations, callback, state);
  1683. }
  1684. internal void EndWaitAndReply(IAsyncResult result)
  1685. {
  1686. OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
  1687. }
  1688. class ReplyCompletedAsyncResult : CompletedAsyncResult
  1689. {
  1690. internal ReplyCompletedAsyncResult(AsyncCallback callback, object state)
  1691. : base(callback, state)
  1692. {
  1693. }
  1694. public void End()
  1695. {
  1696. AsyncResult.End<ReplyCompletedAsyncResult>(this);
  1697. }
  1698. }
  1699. }
  1700. abstract class ReplyProvider
  1701. {
  1702. internal abstract Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo info);
  1703. }
  1704. class CloseSequenceReplyProvider : ReplyProvider
  1705. {
  1706. static CloseSequenceReplyProvider instance = new CloseSequenceReplyProvider();
  1707. CloseSequenceReplyProvider()
  1708. {
  1709. }
  1710. static internal ReplyProvider Instance
  1711. {
  1712. get
  1713. {
  1714. if (instance == null)
  1715. {
  1716. instance = new CloseSequenceReplyProvider();
  1717. }
  1718. return instance;
  1719. }
  1720. }
  1721. internal override Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo requestInfo)
  1722. {
  1723. Message message = WsrmUtilities.CreateCloseSequenceResponse(channel.MessageVersion,
  1724. requestInfo.CloseSequenceInfo.MessageId, channel.session.InputID);
  1725. channel.AddAcknowledgementHeader(message);
  1726. return message;
  1727. }
  1728. }
  1729. class TerminateSequenceReplyProvider : ReplyProvider
  1730. {
  1731. static TerminateSequenceReplyProvider instance = new TerminateSequenceReplyProvider();
  1732. TerminateSequenceReplyProvider()
  1733. {
  1734. }
  1735. static internal ReplyProvider Instance
  1736. {
  1737. get
  1738. {
  1739. if (instance == null)
  1740. {
  1741. instance = new TerminateSequenceReplyProvider();
  1742. }
  1743. return instance;
  1744. }
  1745. }
  1746. internal override Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo requestInfo)
  1747. {
  1748. Message message = WsrmUtilities.CreateTerminateResponseMessage(channel.MessageVersion,
  1749. requestInfo.TerminateSequenceInfo.MessageId, channel.session.InputID);
  1750. channel.AddAcknowledgementHeader(message);
  1751. return message;
  1752. }
  1753. }
  1754. }
  1755. }