| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113111411151116111711181119112011211122112311241125112611271128112911301131113211331134113511361137113811391140114111421143114411451146114711481149115011511152115311541155115611571158115911601161116211631164116511661167116811691170117111721173117411751176117711781179118011811182118311841185118611871188118911901191119211931194119511961197119811991200120112021203120412051206120712081209121012111212121312141215121612171218121912201221122212231224122512261227122812291230123112321233123412351236123712381239124012411242124312441245124612471248124912501251125212531254125512561257125812591260126112621263126412651266126712681269127012711272127312741275127612771278127912801281128212831284128512861287128812891290129112921293129412951296129712981299130013011302130313041305130613071308130913101311131213131314131513161317131813191320132113221323132413251326132713281329133013311332133313341335133613371338133913401341134213431344134513461347134813491350135113521353135413551356135713581359136013611362136313641365136613671368136913701371137213731374137513761377137813791380138113821383138413851386138713881389139013911392139313941395139613971398139914001401140214031404140514061407140814091410141114121413141414151416141714181419142014211422142314241425142614271428142914301431143214331434143514361437143814391440144114421443144414451446144714481449145014511452145314541455145614571458145914601461146214631464146514661467146814691470147114721473147414751476147714781479148014811482148314841485148614871488148914901491149214931494149514961497149814991500150115021503150415051506150715081509151015111512151315141515151615171518151915201521152215231524152515261527152815291530153115321533153415351536153715381539154015411542154315441545154615471548154915501551155215531554155515561557155815591560156115621563156415651566156715681569157015711572157315741575157615771578157915801581158215831584158515861587158815891590159115921593159415951596159715981599160016011602160316041605160616071608160916101611161216131614161516161617161816191620162116221623162416251626162716281629163016311632163316341635163616371638163916401641164216431644164516461647164816491650165116521653165416551656165716581659166016611662166316641665166616671668166916701671167216731674167516761677167816791680168116821683168416851686168716881689169016911692169316941695169616971698169917001701170217031704170517061707170817091710171117121713171417151716171717181719172017211722172317241725172617271728172917301731173217331734173517361737173817391740174117421743174417451746174717481749175017511752175317541755175617571758175917601761176217631764176517661767176817691770177117721773177417751776177717781779178017811782178317841785178617871788178917901791179217931794179517961797179817991800180118021803180418051806180718081809181018111812181318141815181618171818181918201821182218231824182518261827182818291830183118321833183418351836183718381839184018411842184318441845184618471848184918501851185218531854185518561857185818591860186118621863186418651866186718681869187018711872187318741875187618771878187918801881188218831884188518861887188818891890189118921893189418951896189718981899190019011902190319041905190619071908190919101911191219131914191519161917191819191920192119221923192419251926192719281929193019311932193319341935193619371938193919401941194219431944194519461947194819491950195119521953195419551956195719581959196019611962196319641965196619671968196919701971197219731974197519761977197819791980198119821983198419851986198719881989199019911992199319941995199619971998199920002001200220032004200520062007200820092010201120122013 |
- //-----------------------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //-----------------------------------------------------------------------------
- namespace System.ServiceModel.Channels
- {
- using System.Collections.Generic;
- using System.Runtime;
- using System.ServiceModel;
- using System.ServiceModel.Diagnostics;
- using System.Threading;
- using System.Xml;
- // Note on locking:
- // The following rule must be followed in order to avoid deadlocks: ReliableRequestContext
- // locks MUST NOT be taken while under the ReliableReplySessionChannel lock.
- //
- // lock(context)-->lock(channel) ok.
- // lock(channel)-->lock(context) BAD!
- //
- sealed class ReliableReplySessionChannel : ReplyChannel, IReplySessionChannel
- {
- List<Int64> acked = new List<Int64>();
- static Action<object> asyncReceiveComplete = new Action<object>(AsyncReceiveCompleteStatic);
- IServerReliableChannelBinder binder;
- ReplyHelper closeSequenceReplyHelper;
- ReliableInputConnection connection;
- bool contextAborted;
- DeliveryStrategy<RequestContext> deliveryStrategy;
- ReliableRequestContext lastReply;
- bool lastReplyAcked;
- Int64 lastReplySequenceNumber = Int64.MinValue;
- ReliableChannelListenerBase<IReplySessionChannel> listener;
- InterruptibleWaitObject messagingCompleteWaitObject;
- Int64 nextReplySequenceNumber;
- static AsyncCallback onReceiveCompleted = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompletedStatic));
- string perfCounterId;
- Dictionary<Int64, ReliableRequestContext> requestsByRequestSequenceNumber = new Dictionary<Int64, ReliableRequestContext>();
- Dictionary<Int64, ReliableRequestContext> requestsByReplySequenceNumber = new Dictionary<Int64, ReliableRequestContext>();
- ServerReliableSession session;
- ReplyHelper terminateSequenceReplyHelper;
- public ReliableReplySessionChannel(
- ReliableChannelListenerBase<IReplySessionChannel> listener,
- IServerReliableChannelBinder binder,
- FaultHelper faultHelper,
- UniqueId inputID,
- UniqueId outputID)
- : base(listener, binder.LocalAddress)
- {
- this.listener = listener;
- this.connection = new ReliableInputConnection();
- this.connection.ReliableMessagingVersion = this.listener.ReliableMessagingVersion;
- this.binder = binder;
- this.session = new ServerReliableSession(this, listener, binder, faultHelper, inputID, outputID);
- this.session.UnblockChannelCloseCallback = this.UnblockClose;
- if (this.listener.Ordered)
- this.deliveryStrategy = new OrderedDeliveryStrategy<RequestContext>(this, this.listener.MaxTransferWindowSize, true);
- else
- this.deliveryStrategy = new UnorderedDeliveryStrategy<RequestContext>(this, this.listener.MaxTransferWindowSize);
- this.binder.Faulted += OnBinderFaulted;
- this.binder.OnException += OnBinderException;
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- this.messagingCompleteWaitObject = new InterruptibleWaitObject(false);
- }
- this.session.Open(TimeSpan.Zero);
- if (PerformanceCounters.PerformanceCountersEnabled)
- this.perfCounterId = this.listener.Uri.ToString().ToUpperInvariant();
- if (binder.HasSession)
- {
- try
- {
- this.StartReceiving(false);
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- this.session.OnUnknownException(e);
- }
- }
- }
- public IServerReliableChannelBinder Binder
- {
- get
- {
- return this.binder;
- }
- }
- bool IsMessagingCompleted
- {
- get
- {
- lock (this.ThisLock)
- {
- return this.connection.AllAdded && (this.requestsByRequestSequenceNumber.Count == 0) && this.lastReplyAcked;
- }
- }
- }
- MessageVersion MessageVersion
- {
- get
- {
- return this.listener.MessageVersion;
- }
- }
- int PendingRequestContexts
- {
- get
- {
- lock (this.ThisLock)
- {
- return (this.requestsByRequestSequenceNumber.Count - this.requestsByReplySequenceNumber.Count);
- }
- }
- }
- public IInputSession Session
- {
- get
- {
- return this.session;
- }
- }
- void AbortContexts()
- {
- lock (this.ThisLock)
- {
- if (this.contextAborted)
- return;
- this.contextAborted = true;
- }
- Dictionary<Int64, ReliableRequestContext>.ValueCollection contexts = this.requestsByRequestSequenceNumber.Values;
- foreach (ReliableRequestContext request in contexts)
- {
- request.Abort();
- }
- this.requestsByRequestSequenceNumber.Clear();
- this.requestsByReplySequenceNumber.Clear();
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- if (this.lastReply != null)
- this.lastReply.Abort();
- }
- }
- void AddAcknowledgementHeader(Message message)
- {
- WsrmUtilities.AddAcknowledgementHeader(
- this.listener.ReliableMessagingVersion,
- message,
- this.session.InputID,
- this.connection.Ranges,
- this.connection.IsLastKnown,
- this.listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount);
- }
- static void AsyncReceiveCompleteStatic(object state)
- {
- IAsyncResult result = (IAsyncResult)state;
- ReliableReplySessionChannel channel = (ReliableReplySessionChannel)(result.AsyncState);
- try
- {
- if (channel.HandleReceiveComplete(result))
- {
- channel.StartReceiving(true);
- }
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- channel.session.OnUnknownException(e);
- }
- }
- IAsyncResult BeginCloseBinder(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.binder.BeginClose(timeout, MaskingMode.Handled, callback, state);
- }
- IAsyncResult BeginCloseOutput(TimeSpan timeout, AsyncCallback callback, object state)
- {
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- ReliableRequestContext reply = this.lastReply;
- if (reply == null)
- return new CloseOutputCompletedAsyncResult(callback, state);
- else
- return reply.BeginReplyInternal(null, timeout, callback, state);
- }
- else
- {
- lock (this.ThisLock)
- {
- this.ThrowIfClosed();
- this.CreateCloseSequenceReplyHelper();
- }
- return this.closeSequenceReplyHelper.BeginWaitAndReply(timeout, callback, state);
- }
- }
- IAsyncResult BeginUnregisterChannel(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.listener.OnReliableChannelBeginClose(this.session.InputID,
- this.session.OutputID, timeout, callback, state);
- }
- Message CreateAcknowledgement(SequenceRangeCollection ranges)
- {
- Message message = WsrmUtilities.CreateAcknowledgmentMessage(
- this.MessageVersion,
- this.listener.ReliableMessagingVersion,
- this.session.InputID,
- ranges,
- this.connection.IsLastKnown,
- this.listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount);
- return message;
- }
- Message CreateSequenceClosedFault()
- {
- Message message = new SequenceClosedFault(this.session.InputID).CreateMessage(
- this.listener.MessageVersion, this.listener.ReliableMessagingVersion);
- this.AddAcknowledgementHeader(message);
- return message;
- }
- bool CreateCloseSequenceReplyHelper()
- {
- if (this.State == CommunicationState.Faulted || this.Aborted)
- {
- return false;
- }
- if (this.closeSequenceReplyHelper == null)
- {
- this.closeSequenceReplyHelper = new ReplyHelper(this, CloseSequenceReplyProvider.Instance,
- true);
- }
- return true;
- }
- bool CreateTerminateSequenceReplyHelper()
- {
- if (this.State == CommunicationState.Faulted || this.Aborted)
- {
- return false;
- }
- if (this.terminateSequenceReplyHelper == null)
- {
- this.terminateSequenceReplyHelper = new ReplyHelper(this,
- TerminateSequenceReplyProvider.Instance, false);
- }
- return true;
- }
- void CloseOutput(TimeSpan timeout)
- {
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- ReliableRequestContext reply = this.lastReply;
- if (reply != null)
- reply.ReplyInternal(null, timeout);
- }
- else
- {
- lock (this.ThisLock)
- {
- this.ThrowIfClosed();
- this.CreateCloseSequenceReplyHelper();
- }
- this.closeSequenceReplyHelper.WaitAndReply(timeout);
- }
- }
- bool ContainsRequest(Int64 requestSeqNum)
- {
- lock (this.ThisLock)
- {
- bool haveRequestInDictionary = this.requestsByRequestSequenceNumber.ContainsKey(requestSeqNum);
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- return (haveRequestInDictionary
- || ((this.lastReply != null) && (this.lastReply.RequestSequenceNumber == requestSeqNum) && (!this.lastReplyAcked)));
- }
- else
- {
- return haveRequestInDictionary;
- }
- }
- }
- void EndCloseBinder(IAsyncResult result)
- {
- this.binder.EndClose(result);
- }
- void EndCloseOutput(IAsyncResult result)
- {
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- if (result is CloseOutputCompletedAsyncResult)
- CloseOutputCompletedAsyncResult.End(result);
- else
- this.lastReply.EndReplyInternal(result);
- }
- else
- {
- this.closeSequenceReplyHelper.EndWaitAndReply(result);
- }
- }
- void EndUnregisterChannel(IAsyncResult result)
- {
- this.listener.OnReliableChannelEndClose(result);
- }
- public override T GetProperty<T>()
- {
- if (typeof(T) == typeof(IReplySessionChannel))
- {
- return (T)(object)this;
- }
- T baseProperty = base.GetProperty<T>();
- if (baseProperty != null)
- {
- return baseProperty;
- }
- T innerProperty = this.binder.Channel.GetProperty<T>();
- if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
- {
- return (T)(object)FaultConverter.GetDefaultFaultConverter(this.listener.MessageVersion);
- }
- else
- {
- return innerProperty;
- }
- }
- bool HandleReceiveComplete(IAsyncResult result)
- {
- RequestContext context;
- if (this.Binder.EndTryReceive(result, out context))
- {
- if (context == null)
- {
- bool terminated = false;
- lock (this.ThisLock)
- {
- terminated = this.connection.Terminate();
- }
- if (!terminated && (this.Binder.State == CommunicationState.Opened))
- {
- Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
- this.session.OnLocalFault(e, (Message)null, null);
- }
- return false;
- }
- WsrmMessageInfo info = WsrmMessageInfo.Get(this.listener.MessageVersion,
- this.listener.ReliableMessagingVersion, this.binder.Channel, this.binder.GetInnerSession(),
- context.RequestMessage);
- this.StartReceiving(false);
- this.ProcessRequest(context, info);
- return false;
- }
- return true;
- }
- protected override void OnAbort()
- {
- if (this.closeSequenceReplyHelper != null)
- {
- this.closeSequenceReplyHelper.Abort();
- }
- this.connection.Abort(this);
- if (this.terminateSequenceReplyHelper != null)
- {
- this.terminateSequenceReplyHelper.Abort();
- }
- this.session.Abort();
- this.AbortContexts();
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- this.messagingCompleteWaitObject.Abort(this);
- }
- this.listener.OnReliableChannelAbort(this.session.InputID, this.session.OutputID);
- base.OnAbort();
- }
- protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
- {
- this.ThrowIfCloseInvalid();
- bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
- OperationWithTimeoutBeginCallback[] beginOperations =
- new OperationWithTimeoutBeginCallback[] {
- new OperationWithTimeoutBeginCallback (this.BeginCloseOutput),
- wsrmFeb2005
- ? new OperationWithTimeoutBeginCallback(this.connection.BeginClose)
- : new OperationWithTimeoutBeginCallback(this.BeginTerminateSequence),
- wsrmFeb2005
- ? new OperationWithTimeoutBeginCallback(this.messagingCompleteWaitObject.BeginWait)
- : new OperationWithTimeoutBeginCallback(this.connection.BeginClose),
- new OperationWithTimeoutBeginCallback(this.session.BeginClose),
- new OperationWithTimeoutBeginCallback(this.BeginCloseBinder),
- new OperationWithTimeoutBeginCallback(this.BeginUnregisterChannel),
- new OperationWithTimeoutBeginCallback(base.OnBeginClose)
- };
- OperationEndCallback[] endOperations =
- new OperationEndCallback[] {
- new OperationEndCallback(this.EndCloseOutput),
- wsrmFeb2005
- ? new OperationEndCallback(this.connection.EndClose)
- : new OperationEndCallback(this.EndTerminateSequence),
- wsrmFeb2005
- ? new OperationEndCallback(this.messagingCompleteWaitObject.EndWait)
- : new OperationEndCallback(this.connection.EndClose),
- new OperationEndCallback(this.session.EndClose),
- new OperationEndCallback(this.EndCloseBinder),
- new OperationEndCallback(this.EndUnregisterChannel),
- new OperationEndCallback(base.OnEndClose)
- };
- return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout,
- beginOperations, endOperations, callback, state);
- }
- void OnBinderException(IReliableChannelBinder sender, Exception exception)
- {
- if (exception is QuotaExceededException)
- this.session.OnLocalFault(exception, (Message)null, null);
- else
- this.EnqueueAndDispatch(exception, null, false);
- }
- void OnBinderFaulted(IReliableChannelBinder sender, Exception exception)
- {
- this.binder.Abort();
- exception = new CommunicationException(SR.GetString(SR.EarlySecurityFaulted), exception);
- this.session.OnLocalFault(exception, (Message)null, null);
- }
- protected override void OnClose(TimeSpan timeout)
- {
- this.ThrowIfCloseInvalid();
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- this.CloseOutput(timeoutHelper.RemainingTime());
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- this.connection.Close(timeoutHelper.RemainingTime());
- this.messagingCompleteWaitObject.Wait(timeoutHelper.RemainingTime());
- }
- else
- {
- this.TerminateSequence(timeoutHelper.RemainingTime());
- this.connection.Close(timeoutHelper.RemainingTime());
- }
- this.session.Close(timeoutHelper.RemainingTime());
- this.binder.Close(timeoutHelper.RemainingTime(), MaskingMode.Handled);
- this.listener.OnReliableChannelClose(this.session.InputID, this.session.OutputID,
- timeoutHelper.RemainingTime());
- base.OnClose(timeoutHelper.RemainingTime());
- }
- protected override void OnClosed()
- {
- this.deliveryStrategy.Dispose();
- this.binder.Faulted -= this.OnBinderFaulted;
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- if (this.lastReply != null)
- {
- this.lastReply.Abort();
- }
- }
- base.OnClosed();
- }
- protected override void OnEndClose(IAsyncResult result)
- {
- OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
- }
- protected override void OnFaulted()
- {
- this.session.OnFaulted();
- this.UnblockClose();
- base.OnFaulted();
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.SessionFaulted(this.perfCounterId);
- }
- static void OnReceiveCompletedStatic(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- return;
- ReliableReplySessionChannel channel = (ReliableReplySessionChannel)(result.AsyncState);
- try
- {
- if (channel.HandleReceiveComplete(result))
- {
- channel.StartReceiving(true);
- }
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- channel.session.OnUnknownException(e);
- }
- }
- void OnTerminateSequenceCompleted()
- {
- if ((this.session.Settings.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
- && this.connection.IsSequenceClosed)
- {
- lock (this.ThisLock)
- {
- this.connection.Terminate();
- }
- }
- }
- bool PrepareReply(ReliableRequestContext context)
- {
- lock (this.ThisLock)
- {
- if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
- return false;
- long requestSequenceNumber = context.RequestSequenceNumber;
- bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
- if (wsrmFeb2005 && (this.connection.Last == requestSequenceNumber))
- {
- if (this.lastReply == null)
- this.lastReply = context;
- this.requestsByRequestSequenceNumber.Remove(requestSequenceNumber);
- bool canReply = this.connection.AllAdded && (this.State == CommunicationState.Closing);
- if (!canReply)
- return false;
- }
- else
- {
- if (this.State == CommunicationState.Closing)
- return false;
- if (!context.HasReply)
- {
- this.requestsByRequestSequenceNumber.Remove(requestSequenceNumber);
- return true;
- }
- }
- // won't throw if you do not need next sequence number
- if (this.nextReplySequenceNumber == Int64.MaxValue)
- {
- MessageNumberRolloverFault fault = new MessageNumberRolloverFault(this.session.OutputID);
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
- }
- context.SetReplySequenceNumber(++this.nextReplySequenceNumber);
- if (wsrmFeb2005 && (this.connection.Last == requestSequenceNumber))
- {
- if (!context.HasReply)
- this.lastReplyAcked = true; //If Last Reply has no user data, it does not need to be acked. Here we just set it as its ack received.
- this.lastReplySequenceNumber = this.nextReplySequenceNumber;
- context.SetLastReply(this.lastReplySequenceNumber);
- }
- else if (context.HasReply)
- {
- this.requestsByReplySequenceNumber.Add(this.nextReplySequenceNumber, context);
- }
- return true;
- }
- }
- Message PrepareReplyMessage(Int64 replySequenceNumber, bool isLast, SequenceRangeCollection ranges, Message reply)
- {
- this.AddAcknowledgementHeader(reply);
- WsrmUtilities.AddSequenceHeader(
- this.listener.ReliableMessagingVersion,
- reply,
- this.session.OutputID,
- replySequenceNumber,
- isLast);
- return reply;
- }
- void ProcessAcknowledgment(WsrmAcknowledgmentInfo info)
- {
- lock (this.ThisLock)
- {
- if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
- return;
- if (this.requestsByReplySequenceNumber.Count > 0)
- {
- Int64 reply;
- this.acked.Clear();
- foreach (KeyValuePair<Int64, ReliableRequestContext> pair in this.requestsByReplySequenceNumber)
- {
- reply = pair.Key;
- if (info.Ranges.Contains(reply))
- {
- this.acked.Add(reply);
- }
- }
- for (int i = 0; i < this.acked.Count; i++)
- {
- reply = this.acked[i];
- this.requestsByRequestSequenceNumber.Remove(
- this.requestsByReplySequenceNumber[reply].RequestSequenceNumber);
- this.requestsByReplySequenceNumber.Remove(reply);
- }
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- if (!this.lastReplyAcked && (this.lastReplySequenceNumber != Int64.MinValue))
- {
- this.lastReplyAcked = info.Ranges.Contains(this.lastReplySequenceNumber);
- }
- }
- }
- }
- }
- void ProcessAckRequested(RequestContext context)
- {
- try
- {
- using (Message reply = CreateAcknowledgement(this.connection.Ranges))
- {
- context.Reply(reply);
- }
- }
- finally
- {
- context.RequestMessage.Close();
- context.Close();
- }
- }
- void ProcessShutdown11(RequestContext context, WsrmMessageInfo info)
- {
- bool cleanup = true;
- try
- {
- bool isTerminate = (info.TerminateSequenceInfo != null);
- WsrmRequestInfo requestInfo = isTerminate
- ? (WsrmRequestInfo)info.TerminateSequenceInfo
- : (WsrmRequestInfo)info.CloseSequenceInfo;
- Int64 last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
- if (!WsrmUtilities.ValidateWsrmRequest(this.session, requestInfo, this.binder, context))
- {
- cleanup = false;
- return;
- }
- bool scheduleShutdown = false;
- Exception remoteFaultException = null;
- ReplyHelper closeHelper = null;
- bool haveAllReplyAcks = true;
- bool isLastLargeEnough = true;
- bool isLastConsistent = true;
- lock (this.ThisLock)
- {
- if (!this.connection.IsLastKnown)
- {
- // All requests and replies must be acknowledged.
- if (this.requestsByRequestSequenceNumber.Count == 0)
- {
- if (isTerminate)
- {
- if (this.connection.SetTerminateSequenceLast(last, out isLastLargeEnough))
- {
- scheduleShutdown = true;
- }
- else if (isLastLargeEnough)
- {
- remoteFaultException = new ProtocolException(SR.GetString(SR.EarlyTerminateSequence));
- }
- }
- else
- {
- scheduleShutdown = this.connection.SetCloseSequenceLast(last);
- isLastLargeEnough = scheduleShutdown;
- }
- if (scheduleShutdown)
- {
- // (1) !isTerminate && !IsLastKnown, CloseSequence received before TerminateSequence.
- // - Need to ensure helper to delay the reply until Close.
- // (2) isTerminate && !IsLastKnown, TerminateSequence received before CloseSequence.
- // - Close not required, ensure it is created so we can bypass it.
- if (!this.CreateCloseSequenceReplyHelper())
- {
- return;
- }
- // Capture the helper in order to unblock it.
- if (isTerminate)
- {
- closeHelper = this.closeSequenceReplyHelper;
- }
- this.session.SetFinalAck(this.connection.Ranges);
- this.deliveryStrategy.Dispose();
- }
- }
- else
- {
- haveAllReplyAcks = false;
- }
- }
- else
- {
- isLastConsistent = (last == this.connection.Last);
- }
- }
- WsrmFault fault = null;
- if (!isLastLargeEnough)
- {
- string faultString = SR.GetString(SR.SequenceTerminatedSmallLastMsgNumber);
- string exceptionString = SR.GetString(SR.SmallLastMsgNumberExceptionString);
- fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID, faultString,
- exceptionString);
- }
- else if (!haveAllReplyAcks)
- {
- string faultString = SR.GetString(SR.SequenceTerminatedNotAllRepliesAcknowledged);
- string exceptionString = SR.GetString(SR.NotAllRepliesAcknowledgedExceptionString);
- fault = SequenceTerminatedFault.CreateProtocolFault(this.session.OutputID, faultString,
- exceptionString);
- }
- else if (!isLastConsistent)
- {
- string faultString = SR.GetString(SR.SequenceTerminatedInconsistentLastMsgNumber);
- string exceptionString = SR.GetString(SR.InconsistentLastMsgNumberExceptionString);
- fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
- faultString, exceptionString);
- }
- else if (remoteFaultException != null)
- {
- Message message = WsrmUtilities.CreateTerminateMessage(this.MessageVersion,
- this.listener.ReliableMessagingVersion, this.session.OutputID);
- this.AddAcknowledgementHeader(message);
- using (message)
- {
- context.Reply(message);
- }
- this.session.OnRemoteFault(remoteFaultException);
- return;
- }
- if (fault != null)
- {
- this.session.OnLocalFault(fault.CreateException(), fault, context);
- cleanup = false;
- return;
- }
- if (isTerminate)
- {
- if (closeHelper != null)
- {
- closeHelper.UnblockWaiter();
- }
- lock (this.ThisLock)
- {
- if (!this.CreateTerminateSequenceReplyHelper())
- {
- return;
- }
- }
- }
- ReplyHelper replyHelper = isTerminate ? this.terminateSequenceReplyHelper : this.closeSequenceReplyHelper;
- if (!replyHelper.TransferRequestContext(context, info))
- {
- replyHelper.Reply(context, info, this.DefaultSendTimeout, MaskingMode.All);
- if (isTerminate)
- {
- this.OnTerminateSequenceCompleted();
- }
- }
- else
- {
- cleanup = false;
- }
- if (scheduleShutdown)
- {
- ActionItem.Schedule(this.ShutdownCallback, null);
- }
- }
- finally
- {
- if (cleanup)
- {
- context.RequestMessage.Close();
- context.Close();
- }
- }
- }
- public void ProcessDemuxedRequest(RequestContext context, WsrmMessageInfo info)
- {
- try
- {
- this.ProcessRequest(context, info);
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- throw;
- this.session.OnUnknownException(e);
- }
- }
- void ProcessRequest(RequestContext context, WsrmMessageInfo info)
- {
- bool closeMessage = true;
- bool closeContext = true;
- try
- {
- if (!this.session.ProcessInfo(info, context))
- {
- closeMessage = false;
- closeContext = false;
- return;
- }
- if (!this.session.VerifyDuplexProtocolElements(info, context))
- {
- closeMessage = false;
- closeContext = false;
- return;
- }
- this.session.OnRemoteActivity(false);
- if (info.CreateSequenceInfo != null)
- {
- EndpointAddress acksTo;
- if (WsrmUtilities.ValidateCreateSequence<IReplySessionChannel>(info, this.listener, this.binder.Channel, out acksTo))
- {
- Message response = WsrmUtilities.CreateCreateSequenceResponse(this.listener.MessageVersion,
- this.listener.ReliableMessagingVersion, true, info.CreateSequenceInfo,
- this.listener.Ordered, this.session.InputID, acksTo);
- using (context)
- {
- using (response)
- {
- if (this.Binder.AddressResponse(info.Message, response))
- context.Reply(response, this.DefaultSendTimeout);
- }
- }
- }
- else
- {
- this.session.OnLocalFault(info.FaultException, info.FaultReply, context);
- }
- closeContext = false;
- return;
- }
- closeContext = false;
- if (info.AcknowledgementInfo != null)
- {
- ProcessAcknowledgment(info.AcknowledgementInfo);
- closeContext = (info.Action == WsrmIndex.GetSequenceAcknowledgementActionString(this.listener.ReliableMessagingVersion));
- }
- if (!closeContext)
- {
- closeMessage = false;
- if (info.SequencedMessageInfo != null)
- {
- ProcessSequencedMessage(context, info.Action, info.SequencedMessageInfo);
- }
- else if (info.TerminateSequenceInfo != null)
- {
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- ProcessTerminateSequenceFeb2005(context, info);
- }
- else if (info.TerminateSequenceInfo.Identifier == this.session.InputID)
- {
- ProcessShutdown11(context, info);
- }
- else // Identifier == OutputID
- {
- WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
- SR.GetString(SR.SequenceTerminatedUnsupportedTerminateSequence),
- SR.GetString(SR.UnsupportedTerminateSequenceExceptionString));
- this.session.OnLocalFault(fault.CreateException(), fault, context);
- closeMessage = false;
- closeContext = false;
- return;
- }
- }
- else if (info.CloseSequenceInfo != null)
- {
- ProcessShutdown11(context, info);
- }
- else if (info.AckRequestedInfo != null)
- {
- ProcessAckRequested(context);
- }
- }
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- if (this.IsMessagingCompleted)
- {
- this.messagingCompleteWaitObject.Set();
- }
- }
- }
- finally
- {
- if (closeMessage)
- info.Message.Close();
- if (closeContext)
- context.Close();
- }
- }
- // A given reliable request can be in one of three states:
- // 1. Known and Processing: A ReliableRequestContext exists in requestTable but the outcome for
- // for the request is unknown. Any transport request referencing this reliable request
- // (by means of the sequence number) must be held until the outcome becomes known.
- // 2. Known and Processed: A ReliableRequestContext exists in the requestTable and the outcome for
- // for the request is known. The ReliableRequestContext holds that outcome. Any transport requests
- // referening this reliable request must send the response dictated by the outcome.
- // 3. Unknown: No ReliableRequestContext exists in the requestTable for the referenced reliable request.
- // In this case a new ReliableRequestContext is added to the requestTable to await some outcome.
- //
- // There are 4 possible outcomes for a reliable request:
- // a. It is captured and the user replies. Transport replies are then copies of the user's reply.
- // b. It is captured and the user closes the context. Transport replies are then acknowledgments
- // that include the sequence number of the reliable request.
- // c. It is captured and and the user aborts the context. Transport contexts are then aborted.
- // d. It is not captured. In this case an acknowledgment that includes all sequence numbers
- // previously captured is sent. Note two sub-cases here:
- // 1. It is not captured because it is dropped (e.g. it doesn't fit in the buffer). In this
- // case the reliable request's sequence number is not in the acknowledgment.
- // 2. It is not captured because it is a duplicate. In this case the reliable request's
- // sequence number is included in the acknowledgment.
- //
- // By following these rules it is possible to support one-way and two-operations without having
- // knowledge of them (the user drives using the request context we give them) and at the same time
- // it is possible to forget about past replies once acknowledgments for them are received.
- void ProcessSequencedMessage(RequestContext context, string action, WsrmSequencedMessageInfo info)
- {
- ReliableRequestContext reliableContext = null;
- WsrmFault fault = null;
- bool needDispatch = false;
- bool scheduleShutdown = false;
- bool wsrmFeb2005 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
- bool wsrm11 = this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
- Int64 requestSequenceNumber = info.SequenceNumber;
- bool isLast = wsrmFeb2005 && info.LastMessage;
- bool isLastOnly = wsrmFeb2005 && (action == WsrmFeb2005Strings.LastMessageAction);
- bool isDupe;
- Message message = null;
- lock (this.ThisLock)
- {
- if (this.Aborted || this.State == CommunicationState.Faulted || this.State == CommunicationState.Closed)
- {
- context.RequestMessage.Close();
- context.Abort();
- return;
- }
- isDupe = this.connection.Ranges.Contains(requestSequenceNumber);
- if (!this.connection.IsValid(requestSequenceNumber, isLast))
- {
- if (wsrmFeb2005)
- {
- fault = new LastMessageNumberExceededFault(this.session.InputID);
- }
- else
- {
- message = this.CreateSequenceClosedFault();
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- else if (isDupe)
- {
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- if (!this.requestsByRequestSequenceNumber.TryGetValue(info.SequenceNumber, out reliableContext))
- {
- if ((this.lastReply != null) && (this.lastReply.RequestSequenceNumber == info.SequenceNumber))
- reliableContext = this.lastReply;
- else
- reliableContext = new ReliableRequestContext(context, info.SequenceNumber, this, true);
- }
- reliableContext.SetAckRanges(this.connection.Ranges);
- }
- else if ((this.State == CommunicationState.Closing) && !isLastOnly)
- {
- if (wsrmFeb2005)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
- SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone),
- SR.GetString(SR.SessionClosedBeforeDone));
- }
- else
- {
- message = this.CreateSequenceClosedFault();
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
- // serialized ack size and the amount of memory taken by the ack ranges. In the
- // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
- // threat.
- else if (this.deliveryStrategy.CanEnqueue(requestSequenceNumber)
- && (this.requestsByReplySequenceNumber.Count < this.listener.MaxTransferWindowSize)
- && (this.listener.Ordered || this.connection.CanMerge(requestSequenceNumber)))
- {
- this.connection.Merge(requestSequenceNumber, isLast);
- reliableContext = new ReliableRequestContext(context, info.SequenceNumber, this, false);
- reliableContext.SetAckRanges(this.connection.Ranges);
- if (!isLastOnly)
- {
- needDispatch = this.deliveryStrategy.Enqueue(reliableContext, requestSequenceNumber);
- this.requestsByRequestSequenceNumber.Add(info.SequenceNumber, reliableContext);
- }
- else
- {
- this.lastReply = reliableContext;
- }
- scheduleShutdown = this.connection.AllAdded;
- }
- else
- {
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- if (fault != null)
- {
- this.session.OnLocalFault(fault.CreateException(), fault, context);
- return;
- }
- if (reliableContext == null)
- {
- if (message != null)
- {
- using (message)
- {
- context.Reply(message);
- }
- }
- context.RequestMessage.Close();
- context.Close();
- return;
- }
- if (isDupe && reliableContext.CheckForReplyOrAddInnerContext(context))
- {
- reliableContext.SendReply(context, MaskingMode.All);
- return;
- }
- if (!isDupe && isLastOnly)
- {
- reliableContext.Close();
- }
- if (needDispatch)
- {
- this.Dispatch();
- }
- if (scheduleShutdown)
- {
- ActionItem.Schedule(this.ShutdownCallback, null);
- }
- }
- void ProcessTerminateSequenceFeb2005(RequestContext context, WsrmMessageInfo info)
- {
- bool cleanup = true;
- try
- {
- Message message = null;
- bool isTerminateEarly;
- bool haveAllReplyAcks;
- lock (this.ThisLock)
- {
- isTerminateEarly = !this.connection.Terminate();
- haveAllReplyAcks = this.requestsByRequestSequenceNumber.Count == 0;
- }
- WsrmFault fault = null;
- if (isTerminateEarly)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
- SR.GetString(SR.SequenceTerminatedEarlyTerminateSequence),
- SR.GetString(SR.EarlyTerminateSequence));
- }
- else if (!haveAllReplyAcks)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
- SR.GetString(SR.SequenceTerminatedBeforeReplySequenceAcked),
- SR.GetString(SR.EarlyRequestTerminateSequence));
- }
- if (fault != null)
- {
- this.session.OnLocalFault(fault.CreateException(), fault, context);
- cleanup = false;
- return;
- }
- message = WsrmUtilities.CreateTerminateMessage(this.MessageVersion,
- this.listener.ReliableMessagingVersion, this.session.OutputID);
- this.AddAcknowledgementHeader(message);
- using (message)
- {
- context.Reply(message);
- }
- }
- finally
- {
- if (cleanup)
- {
- context.RequestMessage.Close();
- context.Close();
- }
- }
- }
- void StartReceiving(bool canBlock)
- {
- while (true)
- {
- IAsyncResult result = this.binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this);
- if (!result.CompletedSynchronously)
- {
- return;
- }
- if (!canBlock)
- {
- ActionItem.Schedule(asyncReceiveComplete, result);
- return;
- }
- if (!this.HandleReceiveComplete(result))
- break;
- }
- }
- void ShutdownCallback(object state)
- {
- this.Shutdown();
- }
- void TerminateSequence(TimeSpan timeout)
- {
- lock (this.ThisLock)
- {
- this.ThrowIfClosed();
- this.CreateTerminateSequenceReplyHelper();
- }
- this.terminateSequenceReplyHelper.WaitAndReply(timeout);
- this.OnTerminateSequenceCompleted();
- }
- IAsyncResult BeginTerminateSequence(TimeSpan timeout, AsyncCallback callback, object state)
- {
- lock (this.ThisLock)
- {
- this.ThrowIfClosed();
- this.CreateTerminateSequenceReplyHelper();
- }
- return this.terminateSequenceReplyHelper.BeginWaitAndReply(timeout, callback, state);
- }
- void EndTerminateSequence(IAsyncResult result)
- {
- this.terminateSequenceReplyHelper.EndWaitAndReply(result);
- this.OnTerminateSequenceCompleted();
- }
- void ThrowIfCloseInvalid()
- {
- bool shouldFault = false;
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- if (this.PendingRequestContexts != 0 || this.connection.Ranges.Count > 1)
- {
- shouldFault = true;
- }
- }
- else if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
- {
- if (this.PendingRequestContexts != 0)
- {
- shouldFault = true;
- }
- }
- if (shouldFault)
- {
- WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
- SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone), SR.GetString(SR.SessionClosedBeforeDone));
- this.session.OnLocalFault(null, fault, null);
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
- }
- }
- void UnblockClose()
- {
- this.AbortContexts();
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- this.messagingCompleteWaitObject.Fault(this);
- }
- else
- {
- if (this.closeSequenceReplyHelper != null)
- {
- this.closeSequenceReplyHelper.Fault();
- }
- if (this.terminateSequenceReplyHelper != null)
- {
- this.terminateSequenceReplyHelper.Fault();
- }
- }
- this.connection.Fault(this);
- }
- class CloseOutputCompletedAsyncResult : CompletedAsyncResult
- {
- public CloseOutputCompletedAsyncResult(AsyncCallback callback, object state)
- : base(callback, state)
- {
- }
- }
- class ReliableRequestContext : RequestContextBase
- {
- MessageBuffer bufferedReply;
- ReliableReplySessionChannel channel;
- List<RequestContext> innerContexts = new List<RequestContext>();
- bool isLastReply;
- bool outcomeKnown;
- SequenceRangeCollection ranges;
- Int64 requestSequenceNumber;
- Int64 replySequenceNumber;
- public ReliableRequestContext(RequestContext context, Int64 requestSequenceNumber, ReliableReplySessionChannel channel, bool outcome)
- : base(context.RequestMessage, channel.DefaultCloseTimeout, channel.DefaultSendTimeout)
- {
- this.channel = channel;
- this.requestSequenceNumber = requestSequenceNumber;
- this.outcomeKnown = outcome;
- if (!outcome)
- this.innerContexts.Add(context);
- }
- public bool CheckForReplyOrAddInnerContext(RequestContext innerContext)
- {
- lock (this.ThisLock)
- {
- if (this.outcomeKnown)
- return true;
- this.innerContexts.Add(innerContext);
- return false;
- }
- }
- public bool HasReply
- {
- get
- {
- return (this.bufferedReply != null);
- }
- }
- public Int64 RequestSequenceNumber
- {
- get
- {
- return this.requestSequenceNumber;
- }
- }
- void AbortInnerContexts()
- {
- for (int i = 0; i < this.innerContexts.Count; i++)
- {
- this.innerContexts[i].Abort();
- this.innerContexts[i].RequestMessage.Close();
- }
- this.innerContexts.Clear();
- }
- internal IAsyncResult BeginReplyInternal(Message reply, TimeSpan timeout, AsyncCallback callback, object state)
- {
- bool needAbort = true;
- bool needReply = true;
- try
- {
- lock (this.ThisLock)
- {
- if (this.ranges == null)
- {
- throw Fx.AssertAndThrow("this.ranges != null");
- }
- if (this.Aborted)
- {
- needAbort = false;
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.RequestContextAborted)));
- }
- if (this.outcomeKnown)
- {
- needAbort = false;
- needReply = false;
- }
- else
- {
- if ((reply != null) && (this.bufferedReply == null))
- this.bufferedReply = reply.CreateBufferedCopy(int.MaxValue);
- if (!this.channel.PrepareReply(this))
- {
- needAbort = false;
- needReply = false;
- }
- else
- {
- this.outcomeKnown = true;
- }
- }
- }
- if (!needReply)
- return new ReplyCompletedAsyncResult(callback, state);
- IAsyncResult result = new ReplyAsyncResult(this, timeout, callback, state);
- needAbort = false;
- return result;
- }
- finally
- {
- if (needAbort)
- {
- this.AbortInnerContexts();
- this.Abort();
- }
- }
- }
- internal void EndReplyInternal(IAsyncResult result)
- {
- if (result is ReplyCompletedAsyncResult)
- {
- ReplyCompletedAsyncResult.End(result);
- return;
- }
- bool throwing = true;
- try
- {
- ReplyAsyncResult.End(result);
- this.innerContexts.Clear();
- throwing = false;
- }
- finally
- {
- if (throwing)
- {
- this.AbortInnerContexts();
- this.Abort();
- }
- }
- }
- protected override void OnAbort()
- {
- bool outcome;
- lock (this.ThisLock)
- {
- outcome = this.outcomeKnown;
- this.outcomeKnown = true;
- }
- if (!outcome)
- {
- this.AbortInnerContexts();
- }
- if (this.channel.ContainsRequest(this.requestSequenceNumber))
- {
- Exception e = new ProtocolException(SR.GetString(SR.ReliableRequestContextAborted));
- this.channel.session.OnLocalFault(e, (Message)null, null);
- }
- }
- protected override IAsyncResult OnBeginReply(Message reply, TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.BeginReplyInternal(reply, timeout, callback, state);
- }
- protected override void OnClose(TimeSpan timeout)
- {
- // ReliableRequestContext.Close() relies on base.Close() to call reply if reply is not initiated.
- if (!this.ReplyInitiated)
- this.OnReply(null, timeout);
- }
- protected override void OnEndReply(IAsyncResult result)
- {
- this.EndReplyInternal(result);
- }
- protected override void OnReply(Message reply, TimeSpan timeout)
- {
- this.ReplyInternal(reply, timeout);
- }
- internal void ReplyInternal(Message reply, TimeSpan timeout)
- {
- bool needAbort = true;
- try
- {
- lock (this.ThisLock)
- {
- if (this.ranges == null)
- {
- throw Fx.AssertAndThrow("this.ranges != null");
- }
- if (this.Aborted)
- {
- needAbort = false;
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.RequestContextAborted)));
- }
- if (this.outcomeKnown)
- {
- needAbort = false;
- return;
- }
- if ((reply != null) && (this.bufferedReply == null))
- this.bufferedReply = reply.CreateBufferedCopy(int.MaxValue);
- if (!this.channel.PrepareReply(this))
- {
- needAbort = false;
- return;
- }
- this.outcomeKnown = true;
- }
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- for (int i = 0; i < this.innerContexts.Count; i++)
- SendReply(this.innerContexts[i], MaskingMode.Handled, ref timeoutHelper);
- this.innerContexts.Clear();
- needAbort = false;
- }
- finally
- {
- if (needAbort)
- {
- this.AbortInnerContexts();
- this.Abort();
- }
- }
- }
- public void SetAckRanges(SequenceRangeCollection ranges)
- {
- if (this.ranges == null)
- this.ranges = ranges;
- }
- public void SetLastReply(Int64 sequenceNumber)
- {
- this.replySequenceNumber = sequenceNumber;
- this.isLastReply = true;
- if (this.bufferedReply == null)
- this.bufferedReply = Message.CreateMessage(this.channel.MessageVersion, WsrmFeb2005Strings.LastMessageAction).CreateBufferedCopy(int.MaxValue);
- }
- public void SendReply(RequestContext context, MaskingMode maskingMode)
- {
- TimeoutHelper timeoutHelper = new TimeoutHelper(this.DefaultSendTimeout);
- SendReply(context, maskingMode, ref timeoutHelper);
- }
- void SendReply(RequestContext context, MaskingMode maskingMode, ref TimeoutHelper timeoutHelper)
- {
- Message reply;
- if (!this.outcomeKnown)
- {
- throw Fx.AssertAndThrow("this.outcomeKnown");
- }
- if (this.bufferedReply != null)
- {
- reply = this.bufferedReply.CreateMessage();
- this.channel.PrepareReplyMessage(this.replySequenceNumber, this.isLastReply, this.ranges, reply);
- }
- else
- {
- reply = this.channel.CreateAcknowledgement(this.ranges);
- }
- this.channel.binder.SetMaskingMode(context, maskingMode);
- using (reply)
- {
- context.Reply(reply, timeoutHelper.RemainingTime());
- }
- context.Close(timeoutHelper.RemainingTime());
- }
- public void SetReplySequenceNumber(Int64 sequenceNumber)
- {
- this.replySequenceNumber = sequenceNumber;
- }
- class ReplyCompletedAsyncResult : CompletedAsyncResult
- {
- public ReplyCompletedAsyncResult(AsyncCallback callback, object state)
- : base(callback, state)
- {
- }
- }
- class ReplyAsyncResult : AsyncResult
- {
- ReliableRequestContext context;
- int currentContext;
- Message reply;
- TimeoutHelper timeoutHelper;
- static AsyncCallback replyCompleteStatic = Fx.ThunkCallback(new AsyncCallback(ReplyCompleteStatic));
- public ReplyAsyncResult(ReliableRequestContext thisContext, TimeSpan timeout, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.timeoutHelper = new TimeoutHelper(timeout);
- this.context = thisContext;
- if (this.SendReplies())
- {
- this.Complete(true);
- }
- }
- public static void End(IAsyncResult result)
- {
- AsyncResult.End<ReplyAsyncResult>(result);
- }
- void HandleReplyComplete(IAsyncResult result)
- {
- RequestContext thisInnerContext = this.context.innerContexts[this.currentContext];
- try
- {
- thisInnerContext.EndReply(result);
- thisInnerContext.Close(this.timeoutHelper.RemainingTime());
- this.currentContext++;
- }
- finally
- {
- this.reply.Close();
- this.reply = null;
- }
- }
- static void ReplyCompleteStatic(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- return;
- Exception ex = null;
- ReplyAsyncResult thisPtr = null;
- bool complete = false;
- try
- {
- thisPtr = (ReplyAsyncResult)result.AsyncState;
- thisPtr.HandleReplyComplete(result);
- complete = thisPtr.SendReplies();
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- throw;
- ex = e;
- complete = true;
- }
- if (complete)
- thisPtr.Complete(false, ex);
- }
- bool SendReplies()
- {
- while (this.currentContext < this.context.innerContexts.Count)
- {
- if (this.context.bufferedReply != null)
- {
- this.reply = this.context.bufferedReply.CreateMessage();
- this.context.channel.PrepareReplyMessage(
- this.context.replySequenceNumber, this.context.isLastReply,
- this.context.ranges, this.reply);
- }
- else
- {
- this.reply = this.context.channel.CreateAcknowledgement(this.context.ranges);
- }
- RequestContext thisInnerContext = this.context.innerContexts[this.currentContext];
- this.context.channel.binder.SetMaskingMode(thisInnerContext, MaskingMode.Handled);
- IAsyncResult result = thisInnerContext.BeginReply(this.reply, this.timeoutHelper.RemainingTime(), replyCompleteStatic, this);
- if (!result.CompletedSynchronously)
- return false;
- this.HandleReplyComplete(result);
- }
- return true;
- }
- }
- }
- class ReplyHelper
- {
- Message asyncMessage;
- bool canTransfer = true;
- ReliableReplySessionChannel channel;
- WsrmMessageInfo info;
- ReplyProvider replyProvider;
- RequestContext requestContext;
- bool throwTimeoutOnWait;
- InterruptibleWaitObject waitHandle;
- internal ReplyHelper(ReliableReplySessionChannel channel, ReplyProvider replyProvider,
- bool throwTimeoutOnWait)
- {
- this.channel = channel;
- this.replyProvider = replyProvider;
- this.throwTimeoutOnWait = throwTimeoutOnWait;
- this.waitHandle = new InterruptibleWaitObject(false, this.throwTimeoutOnWait);
- }
- object ThisLock
- {
- get { return this.channel.ThisLock; }
- }
- internal void Abort()
- {
- this.Cleanup(true);
- }
- void Cleanup(bool abort)
- {
- lock (this.ThisLock)
- {
- this.canTransfer = false;
- }
- if (abort)
- {
- this.waitHandle.Abort(this.channel);
- }
- else
- {
- this.waitHandle.Fault(this.channel);
- }
- }
- internal void Fault()
- {
- this.Cleanup(false);
- }
- internal void Reply(RequestContext context, WsrmMessageInfo info, TimeSpan timeout, MaskingMode maskingMode)
- {
- using (Message message = this.replyProvider.Provide(this.channel, info))
- {
- this.channel.binder.SetMaskingMode(context, maskingMode);
- context.Reply(message, timeout);
- }
- }
- IAsyncResult BeginReply(TimeSpan timeout, AsyncCallback callback, object state)
- {
- lock (this.ThisLock)
- {
- this.canTransfer = false;
- }
- if (this.requestContext == null)
- {
- return new ReplyCompletedAsyncResult(callback, state);
- }
- this.asyncMessage = this.replyProvider.Provide(this.channel, info);
- bool throwing = true;
- try
- {
- this.channel.binder.SetMaskingMode(this.requestContext, MaskingMode.Handled);
- IAsyncResult result = this.requestContext.BeginReply(this.asyncMessage, timeout,
- callback, state);
- throwing = false;
- return result;
- }
- finally
- {
- if (throwing)
- {
- this.asyncMessage.Close();
- this.asyncMessage = null;
- }
- }
- }
- void EndReply(IAsyncResult result)
- {
- ReplyCompletedAsyncResult completedResult = result as ReplyCompletedAsyncResult;
- if (completedResult != null)
- {
- completedResult.End();
- return;
- }
- try
- {
- this.requestContext.EndReply(result);
- }
- finally
- {
- if (this.asyncMessage != null)
- {
- this.asyncMessage.Close();
- }
- }
- }
- internal bool TransferRequestContext(RequestContext requestContext, WsrmMessageInfo info)
- {
- RequestContext oldContext = null;
- WsrmMessageInfo oldInfo = null;
- lock (this.ThisLock)
- {
- if (!this.canTransfer)
- {
- return false;
- }
- else
- {
- oldContext = this.requestContext;
- oldInfo = this.info;
- this.requestContext = requestContext;
- this.info = info;
- }
- }
- this.waitHandle.Set();
- if (oldContext != null)
- {
- oldInfo.Message.Close();
- oldContext.Close();
- }
- return true;
- }
- internal void UnblockWaiter()
- {
- this.TransferRequestContext(null, null);
- }
- internal void WaitAndReply(TimeSpan timeout)
- {
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- this.waitHandle.Wait(timeoutHelper.RemainingTime());
- lock (this.ThisLock)
- {
- this.canTransfer = false;
- if (this.requestContext == null)
- {
- return;
- }
- }
- this.Reply(this.requestContext, this.info, timeoutHelper.RemainingTime(),
- MaskingMode.Handled);
- }
- internal IAsyncResult BeginWaitAndReply(TimeSpan timeout, AsyncCallback callback, object state)
- {
- OperationWithTimeoutBeginCallback[] beginOperations = new OperationWithTimeoutBeginCallback[] {
- new OperationWithTimeoutBeginCallback (this.waitHandle.BeginWait),
- new OperationWithTimeoutBeginCallback (this.BeginReply),
- };
- OperationEndCallback[] endOperations = new OperationEndCallback[] {
- new OperationEndCallback (this.waitHandle.EndWait),
- new OperationEndCallback(this.EndReply),
- };
- return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout, beginOperations,
- endOperations, callback, state);
- }
- internal void EndWaitAndReply(IAsyncResult result)
- {
- OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
- }
- class ReplyCompletedAsyncResult : CompletedAsyncResult
- {
- internal ReplyCompletedAsyncResult(AsyncCallback callback, object state)
- : base(callback, state)
- {
- }
- public void End()
- {
- AsyncResult.End<ReplyCompletedAsyncResult>(this);
- }
- }
- }
- abstract class ReplyProvider
- {
- internal abstract Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo info);
- }
- class CloseSequenceReplyProvider : ReplyProvider
- {
- static CloseSequenceReplyProvider instance = new CloseSequenceReplyProvider();
- CloseSequenceReplyProvider()
- {
- }
- static internal ReplyProvider Instance
- {
- get
- {
- if (instance == null)
- {
- instance = new CloseSequenceReplyProvider();
- }
- return instance;
- }
- }
- internal override Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo requestInfo)
- {
- Message message = WsrmUtilities.CreateCloseSequenceResponse(channel.MessageVersion,
- requestInfo.CloseSequenceInfo.MessageId, channel.session.InputID);
- channel.AddAcknowledgementHeader(message);
- return message;
- }
- }
- class TerminateSequenceReplyProvider : ReplyProvider
- {
- static TerminateSequenceReplyProvider instance = new TerminateSequenceReplyProvider();
- TerminateSequenceReplyProvider()
- {
- }
- static internal ReplyProvider Instance
- {
- get
- {
- if (instance == null)
- {
- instance = new TerminateSequenceReplyProvider();
- }
- return instance;
- }
- }
- internal override Message Provide(ReliableReplySessionChannel channel, WsrmMessageInfo requestInfo)
- {
- Message message = WsrmUtilities.CreateTerminateResponseMessage(channel.MessageVersion,
- requestInfo.TerminateSequenceInfo.MessageId, channel.session.InputID);
- channel.AddAcknowledgementHeader(message);
- return message;
- }
- }
- }
- }
|