| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270 |
- //------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //------------------------------------------------------------
- namespace System.ServiceModel.Channels
- {
- using System.Collections.Generic;
- using System.Runtime;
- using System.ServiceModel;
- using System.ServiceModel.Diagnostics;
- using System.Threading;
- using System.Xml;
- abstract class ReliableInputSessionChannel : InputChannel, IInputSessionChannel
- {
- bool advertisedZero = false;
- IServerReliableChannelBinder binder;
- ReliableInputConnection connection;
- DeliveryStrategy<Message> deliveryStrategy;
- ReliableChannelListenerBase<IInputSessionChannel> listener;
- ServerReliableSession session;
- protected string perfCounterId;
- static Action<object> asyncReceiveComplete = new Action<object>(AsyncReceiveCompleteStatic);
- static AsyncCallback onReceiveCompleted = Fx.ThunkCallback(new AsyncCallback(OnReceiveCompletedStatic));
- protected ReliableInputSessionChannel(
- ReliableChannelListenerBase<IInputSessionChannel> listener,
- IServerReliableChannelBinder binder,
- FaultHelper faultHelper,
- UniqueId inputID)
- : base(listener, binder.LocalAddress)
- {
- this.binder = binder;
- this.listener = listener;
- this.connection = new ReliableInputConnection();
- this.connection.ReliableMessagingVersion = listener.ReliableMessagingVersion;
- this.session = new ServerReliableSession(this, listener, binder, faultHelper, inputID, null);
- this.session.UnblockChannelCloseCallback = this.UnblockClose;
- if (listener.Ordered)
- this.deliveryStrategy = new OrderedDeliveryStrategy<Message>(this, listener.MaxTransferWindowSize, false);
- else
- this.deliveryStrategy = new UnorderedDeliveryStrategy<Message>(this, listener.MaxTransferWindowSize);
- this.binder.Faulted += OnBinderFaulted;
- this.binder.OnException += OnBinderException;
- this.session.Open(TimeSpan.Zero);
- if (PerformanceCounters.PerformanceCountersEnabled)
- this.perfCounterId = this.listener.Uri.ToString().ToUpperInvariant();
- }
- protected bool AdvertisedZero
- {
- get
- {
- return this.advertisedZero;
- }
- set
- {
- this.advertisedZero = value;
- }
- }
- public IServerReliableChannelBinder Binder
- {
- get
- {
- return this.binder;
- }
- }
- protected ReliableInputConnection Connection
- {
- get
- {
- return this.connection;
- }
- }
- protected DeliveryStrategy<Message> DeliveryStrategy
- {
- get
- {
- return this.deliveryStrategy;
- }
- }
- protected ReliableChannelListenerBase<IInputSessionChannel> Listener
- {
- get
- {
- return this.listener;
- }
- }
- protected ChannelReliableSession ReliableSession
- {
- get
- {
- return this.session;
- }
- }
- public IInputSession Session
- {
- get
- {
- return this.session;
- }
- }
- protected virtual void AggregateAsyncCloseOperations(List<OperationWithTimeoutBeginCallback> beginOperations, List<OperationEndCallback> endOperations)
- {
- beginOperations.Add(new OperationWithTimeoutBeginCallback(this.session.BeginClose));
- endOperations.Add(new OperationEndCallback(this.session.EndClose));
- }
- static void AsyncReceiveCompleteStatic(object state)
- {
- IAsyncResult result = (IAsyncResult)state;
- ReliableInputSessionChannel channel = (ReliableInputSessionChannel)(result.AsyncState);
- try
- {
- if (channel.HandleReceiveComplete(result))
- {
- channel.StartReceiving(true);
- }
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- throw;
- channel.ReliableSession.OnUnknownException(e);
- }
- }
- static void OnReceiveCompletedStatic(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- return;
- ReliableInputSessionChannel channel = (ReliableInputSessionChannel)(result.AsyncState);
- try
- {
- if (channel.HandleReceiveComplete(result))
- {
- channel.StartReceiving(true);
- }
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- throw;
- channel.ReliableSession.OnUnknownException(e);
- }
- }
- protected abstract bool HandleReceiveComplete(IAsyncResult result);
- protected virtual void AbortGuards()
- {
- }
- protected void AddAcknowledgementHeader(Message message)
- {
- int bufferRemaining = -1;
- if (this.Listener.FlowControlEnabled)
- {
- bufferRemaining = this.Listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount;
- this.AdvertisedZero = (bufferRemaining == 0);
- }
- WsrmUtilities.AddAcknowledgementHeader(this.listener.ReliableMessagingVersion, message,
- this.session.InputID, this.connection.Ranges, this.connection.IsLastKnown, bufferRemaining);
- }
- IAsyncResult BeginCloseBinder(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.binder.BeginClose(timeout, MaskingMode.Handled, callback, state);
- }
- protected virtual IAsyncResult BeginCloseGuards(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new CompletedAsyncResult(callback, state);
- }
- IAsyncResult BeginUnregisterChannel(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.listener.OnReliableChannelBeginClose(this.ReliableSession.InputID, null,
- timeout, callback, state);
- }
- protected override void OnClosed()
- {
- base.OnClosed();
- this.binder.Faulted -= this.OnBinderFaulted;
- this.deliveryStrategy.Dispose();
- }
- protected virtual void CloseGuards(TimeSpan timeout)
- {
- }
- protected Message CreateAcknowledgmentMessage()
- {
- int bufferRemaining = -1;
- if (this.Listener.FlowControlEnabled)
- {
- bufferRemaining = this.Listener.MaxTransferWindowSize - this.deliveryStrategy.EnqueuedCount;
- this.AdvertisedZero = (bufferRemaining == 0);
- }
- Message message = WsrmUtilities.CreateAcknowledgmentMessage(
- this.listener.MessageVersion,
- this.listener.ReliableMessagingVersion,
- this.session.InputID,
- this.connection.Ranges,
- this.connection.IsLastKnown,
- bufferRemaining);
- return message;
- }
- void EndCloseBinder(IAsyncResult result)
- {
- this.binder.EndClose(result);
- }
- protected virtual void EndCloseGuards(IAsyncResult result)
- {
- CompletedAsyncResult.End(result);
- }
- void EndUnregisterChannel(IAsyncResult result)
- {
- this.listener.OnReliableChannelEndClose(result);
- }
- public override T GetProperty<T>()
- {
- if (typeof(T) == typeof(IInputSessionChannel))
- {
- return (T)(object)this;
- }
- T baseProperty = base.GetProperty<T>();
- if (baseProperty != null)
- {
- return baseProperty;
- }
- T innerProperty = this.binder.Channel.GetProperty<T>();
- if ((innerProperty == null) && (typeof(T) == typeof(FaultConverter)))
- {
- return (T)(object)FaultConverter.GetDefaultFaultConverter(this.listener.MessageVersion);
- }
- else
- {
- return innerProperty;
- }
- }
- protected override void OnAbort()
- {
- this.connection.Abort(this);
- this.AbortGuards();
- this.session.Abort();
- this.listener.OnReliableChannelAbort(this.ReliableSession.InputID, null);
- base.OnAbort();
- }
- protected override IAsyncResult OnBeginClose(TimeSpan timeout, AsyncCallback callback, object state)
- {
- this.ThrowIfCloseInvalid();
- OperationWithTimeoutBeginCallback[] beginOperations = new OperationWithTimeoutBeginCallback[]
- {
- this.connection.BeginClose,
- this.session.BeginClose,
- this.BeginCloseGuards,
- this.BeginCloseBinder,
- this.BeginUnregisterChannel,
- base.OnBeginClose
- };
- OperationEndCallback[] endOperations = new OperationEndCallback[]
- {
- this.connection.EndClose,
- this.session.EndClose,
- this.EndCloseGuards,
- this.EndCloseBinder,
- this.EndUnregisterChannel,
- base.OnEndClose
- };
- return OperationWithTimeoutComposer.BeginComposeAsyncOperations(timeout,
- beginOperations, endOperations, callback, state);
- }
- void OnBinderException(IReliableChannelBinder sender, Exception exception)
- {
- if (exception is QuotaExceededException)
- this.session.OnLocalFault(exception, SequenceTerminatedFault.CreateQuotaExceededFault(this.session.OutputID), null);
- else
- this.EnqueueAndDispatch(exception, null, false);
- }
- void OnBinderFaulted(IReliableChannelBinder sender, Exception exception)
- {
- this.binder.Abort();
- exception = new CommunicationException(SR.GetString(SR.EarlySecurityFaulted), exception);
- this.session.OnLocalFault(exception, (Message)null, null);
- }
- protected override void OnClose(TimeSpan timeout)
- {
- this.ThrowIfCloseInvalid();
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- this.connection.Close(timeoutHelper.RemainingTime());
- this.session.Close(timeoutHelper.RemainingTime());
- this.CloseGuards(timeoutHelper.RemainingTime());
- this.binder.Close(timeoutHelper.RemainingTime(), MaskingMode.Handled);
- this.listener.OnReliableChannelClose(this.ReliableSession.InputID, null,
- timeoutHelper.RemainingTime());
- base.OnClose(timeoutHelper.RemainingTime());
- }
- protected override void OnEndClose(IAsyncResult result)
- {
- OperationWithTimeoutComposer.EndComposeAsyncOperations(result);
- }
- protected override void OnFaulted()
- {
- this.session.OnFaulted();
- this.UnblockClose();
- base.OnFaulted();
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.SessionFaulted(this.perfCounterId);
- }
- protected virtual void OnQuotaAvailable()
- {
- }
- protected void ShutdownCallback(object state)
- {
- this.Shutdown();
- }
- protected void StartReceiving(bool canBlock)
- {
- while (true)
- {
- IAsyncResult result = this.Binder.BeginTryReceive(TimeSpan.MaxValue, onReceiveCompleted, this);
- if (!result.CompletedSynchronously)
- return;
- if (!canBlock)
- {
- ActionItem.Schedule(asyncReceiveComplete, result);
- return;
- }
- if (!this.HandleReceiveComplete(result))
- break;
- }
- }
- void ThrowIfCloseInvalid()
- {
- bool shouldFault = false;
- if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005)
- {
- if (this.DeliveryStrategy.EnqueuedCount > 0 || this.Connection.Ranges.Count > 1)
- {
- shouldFault = true;
- }
- }
- else if (this.listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11)
- {
- if (this.DeliveryStrategy.EnqueuedCount > 0)
- {
- shouldFault = true;
- }
- }
- if (shouldFault)
- {
- WsrmFault fault = SequenceTerminatedFault.CreateProtocolFault(this.session.InputID,
- SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone), SR.GetString(SR.SessionClosedBeforeDone));
- this.session.OnLocalFault(null, fault, null);
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(fault.CreateException());
- }
- }
- void UnblockClose()
- {
- this.connection.Fault(this);
- }
- }
- sealed class ReliableInputSessionChannelOverDuplex : ReliableInputSessionChannel
- {
- TimeSpan acknowledgementInterval;
- bool acknowledgementScheduled = false;
- IOThreadTimer acknowledgementTimer;
- Guard guard = new Guard(Int32.MaxValue);
- int pendingAcknowledgements = 0;
- public ReliableInputSessionChannelOverDuplex(
- ReliableChannelListenerBase<IInputSessionChannel> listener,
- IServerReliableChannelBinder binder, FaultHelper faultHelper,
- UniqueId inputID)
- : base(listener, binder, faultHelper, inputID)
- {
- this.acknowledgementInterval = listener.AcknowledgementInterval;
- this.acknowledgementTimer = new IOThreadTimer(new Action<object>(this.OnAcknowledgementTimeoutElapsed), null, true);
- this.DeliveryStrategy.DequeueCallback = this.OnDeliveryStrategyItemDequeued;
- if (binder.HasSession)
- {
- try
- {
- this.StartReceiving(false);
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- this.ReliableSession.OnUnknownException(e);
- }
- }
- }
- protected override void AbortGuards()
- {
- this.guard.Abort();
- }
- protected override IAsyncResult BeginCloseGuards(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.guard.BeginClose(timeout, callback, state);
- }
- protected override void CloseGuards(TimeSpan timeout)
- {
- this.guard.Close(timeout);
- }
- protected override void EndCloseGuards(IAsyncResult result)
- {
- this.guard.EndClose(result);
- }
- protected override bool HandleReceiveComplete(IAsyncResult result)
- {
- RequestContext context;
- if (this.Binder.EndTryReceive(result, out context))
- {
- if (context == null)
- {
- bool terminated = false;
- lock (this.ThisLock)
- {
- terminated = this.Connection.Terminate();
- }
- if (!terminated && (this.Binder.State == CommunicationState.Opened))
- {
- Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
- this.ReliableSession.OnLocalFault(e, (Message)null, null);
- }
- return false;
- }
- Message message = context.RequestMessage;
- context.Close();
- WsrmMessageInfo info = WsrmMessageInfo.Get(this.Listener.MessageVersion,
- this.Listener.ReliableMessagingVersion, this.Binder.Channel, this.Binder.GetInnerSession(),
- message);
- this.StartReceiving(false);
- this.ProcessMessage(info);
- return false;
- }
- return true;
- }
- void OnAcknowledgementTimeoutElapsed(object state)
- {
- lock (this.ThisLock)
- {
- this.acknowledgementScheduled = false;
- this.pendingAcknowledgements = 0;
- if (this.State == CommunicationState.Closing
- || this.State == CommunicationState.Closed
- || this.State == CommunicationState.Faulted)
- return;
- }
- if (this.guard.Enter())
- {
- try
- {
- using (Message message = CreateAcknowledgmentMessage())
- {
- this.Binder.Send(message, this.DefaultSendTimeout);
- }
- }
- finally
- {
- this.guard.Exit();
- }
- }
- }
- void OnDeliveryStrategyItemDequeued()
- {
- if (this.AdvertisedZero)
- this.OnAcknowledgementTimeoutElapsed(null);
- }
- protected override void OnClosing()
- {
- base.OnClosing();
- this.acknowledgementTimer.Cancel();
- }
- protected override void OnQuotaAvailable()
- {
- this.OnAcknowledgementTimeoutElapsed(null);
- }
- public void ProcessDemuxedMessage(WsrmMessageInfo info)
- {
- try
- {
- this.ProcessMessage(info);
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- throw;
- this.ReliableSession.OnUnknownException(e);
- }
- }
- void ProcessMessage(WsrmMessageInfo info)
- {
- bool closeMessage = true;
- try
- {
- if (!this.ReliableSession.ProcessInfo(info, null))
- {
- closeMessage = false;
- return;
- }
- if (!this.ReliableSession.VerifySimplexProtocolElements(info, null))
- {
- closeMessage = false;
- return;
- }
- this.ReliableSession.OnRemoteActivity(false);
- if (info.CreateSequenceInfo != null)
- {
- EndpointAddress acksTo;
- if (WsrmUtilities.ValidateCreateSequence<IInputSessionChannel>(info, this.Listener, this.Binder.Channel, out acksTo))
- {
- Message response = WsrmUtilities.CreateCreateSequenceResponse(this.Listener.MessageVersion,
- this.Listener.ReliableMessagingVersion, false, info.CreateSequenceInfo,
- this.Listener.Ordered, this.ReliableSession.InputID, acksTo);
- using (response)
- {
- if (this.Binder.AddressResponse(info.Message, response))
- this.Binder.Send(response, this.DefaultSendTimeout);
- }
- }
- else
- {
- this.ReliableSession.OnLocalFault(info.FaultException, info.FaultReply, null);
- }
- return;
- }
- bool needDispatch = false;
- bool scheduleShutdown = false;
- bool tryAckNow = (info.AckRequestedInfo != null);
- bool terminate = false;
- Message message = null;
- WsrmFault fault = null;
- Exception remoteFaultException = null;
- bool wsrmFeb2005 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
- bool wsrm11 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
- if (info.SequencedMessageInfo != null)
- {
- lock (this.ThisLock)
- {
- if (this.Aborted || this.State == CommunicationState.Faulted)
- {
- return;
- }
- Int64 sequenceNumber = info.SequencedMessageInfo.SequenceNumber;
- bool isLast = wsrmFeb2005 && info.SequencedMessageInfo.LastMessage;
- if (!this.Connection.IsValid(sequenceNumber, isLast))
- {
- if (wsrmFeb2005)
- {
- fault = new LastMessageNumberExceededFault(this.ReliableSession.InputID);
- }
- else
- {
- message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
- this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
- tryAckNow = true;
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- else if (this.Connection.Ranges.Contains(sequenceNumber))
- {
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- tryAckNow = true;
- }
- else if (wsrmFeb2005 && info.Action == WsrmFeb2005Strings.LastMessageAction)
- {
- this.Connection.Merge(sequenceNumber, isLast);
- if (this.Connection.AllAdded)
- {
- scheduleShutdown = true;
- this.ReliableSession.CloseSession();
- }
- }
- else if (this.State == CommunicationState.Closing)
- {
- if (wsrmFeb2005)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone),
- SR.GetString(SR.SessionClosedBeforeDone));
- }
- else
- {
- message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
- this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
- tryAckNow = true;
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
- // serialized ack size and the amount of memory taken by the ack ranges. In the
- // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
- // threat.
- else if (this.DeliveryStrategy.CanEnqueue(sequenceNumber)
- && (this.Listener.Ordered || this.Connection.CanMerge(sequenceNumber)))
- {
- this.Connection.Merge(sequenceNumber, isLast);
- needDispatch = this.DeliveryStrategy.Enqueue(info.Message, sequenceNumber);
- closeMessage = false;
- this.pendingAcknowledgements++;
- if (this.pendingAcknowledgements == this.Listener.MaxTransferWindowSize)
- tryAckNow = true;
- if (this.Connection.AllAdded)
- {
- scheduleShutdown = true;
- this.ReliableSession.CloseSession();
- }
- }
- else
- {
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- if (this.Connection.IsLastKnown)
- tryAckNow = true;
- if (!tryAckNow && this.pendingAcknowledgements > 0 && !this.acknowledgementScheduled && fault == null)
- {
- this.acknowledgementScheduled = true;
- this.acknowledgementTimer.Set(this.acknowledgementInterval);
- }
- }
- }
- else if (wsrmFeb2005 && info.TerminateSequenceInfo != null)
- {
- bool isTerminateEarly;
- lock (this.ThisLock)
- {
- isTerminateEarly = !this.Connection.Terminate();
- }
- if (isTerminateEarly)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedEarlyTerminateSequence),
- SR.GetString(SR.EarlyTerminateSequence));
- }
- }
- else if (wsrm11 && ((info.TerminateSequenceInfo != null) || info.CloseSequenceInfo != null))
- {
- bool isTerminate = info.TerminateSequenceInfo != null;
- WsrmRequestInfo requestInfo = isTerminate
- ? (WsrmRequestInfo)info.TerminateSequenceInfo
- : (WsrmRequestInfo)info.CloseSequenceInfo;
- Int64 last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
- if (!WsrmUtilities.ValidateWsrmRequest(this.ReliableSession, requestInfo, this.Binder, null))
- {
- return;
- }
- bool isLastLargeEnough = true;
- bool isLastConsistent = true;
- lock (this.ThisLock)
- {
- if (!this.Connection.IsLastKnown)
- {
- if (isTerminate)
- {
- if (this.Connection.SetTerminateSequenceLast(last, out isLastLargeEnough))
- {
- scheduleShutdown = true;
- }
- else if (isLastLargeEnough)
- {
- remoteFaultException = new ProtocolException(SR.GetString(SR.EarlyTerminateSequence));
- }
- }
- else
- {
- scheduleShutdown = this.Connection.SetCloseSequenceLast(last);
- isLastLargeEnough = scheduleShutdown;
- }
- if (scheduleShutdown)
- {
- this.ReliableSession.SetFinalAck(this.Connection.Ranges);
- this.DeliveryStrategy.Dispose();
- }
- }
- else
- {
- isLastConsistent = (last == this.Connection.Last);
- // Have seen CloseSequence already, TerminateSequence means cleanup.
- if (isTerminate && isLastConsistent && this.Connection.IsSequenceClosed)
- {
- terminate = true;
- }
- }
- }
- if (!isLastLargeEnough)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedSmallLastMsgNumber),
- SR.GetString(SR.SmallLastMsgNumberExceptionString));
- }
- else if (!isLastConsistent)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedInconsistentLastMsgNumber),
- SR.GetString(SR.InconsistentLastMsgNumberExceptionString));
- }
- else
- {
- message = isTerminate
- ? WsrmUtilities.CreateTerminateResponseMessage(this.Listener.MessageVersion,
- requestInfo.MessageId, this.ReliableSession.InputID)
- : WsrmUtilities.CreateCloseSequenceResponse(this.Listener.MessageVersion,
- requestInfo.MessageId, this.ReliableSession.InputID);
- tryAckNow = true;
- }
- }
- if (fault != null)
- {
- this.ReliableSession.OnLocalFault(fault.CreateException(), fault, null);
- }
- else
- {
- if (tryAckNow)
- {
- lock (this.ThisLock)
- {
- if (this.acknowledgementScheduled)
- {
- this.acknowledgementTimer.Cancel();
- this.acknowledgementScheduled = false;
- }
- this.pendingAcknowledgements = 0;
- }
- if (message != null)
- {
- this.AddAcknowledgementHeader(message);
- }
- else
- {
- message = this.CreateAcknowledgmentMessage();
- }
- }
- if (message != null)
- {
- using (message)
- {
- if (this.guard.Enter())
- {
- try
- {
- this.Binder.Send(message, this.DefaultSendTimeout);
- }
- finally
- {
- this.guard.Exit();
- }
- }
- }
- }
- if (terminate)
- {
- lock (this.ThisLock)
- {
- this.Connection.Terminate();
- }
- }
- if (remoteFaultException != null)
- {
- this.ReliableSession.OnRemoteFault(remoteFaultException);
- return;
- }
- if (needDispatch)
- {
- this.Dispatch();
- }
- if (scheduleShutdown)
- {
- ActionItem.Schedule(this.ShutdownCallback, null);
- }
- }
- }
- finally
- {
- if (closeMessage)
- {
- info.Message.Close();
- }
- }
- }
- }
- sealed class ReliableInputSessionChannelOverReply : ReliableInputSessionChannel
- {
- public ReliableInputSessionChannelOverReply(
- ReliableChannelListenerBase<IInputSessionChannel> listener,
- IServerReliableChannelBinder binder, FaultHelper faultHelper,
- UniqueId inputID)
- : base(listener, binder, faultHelper, inputID)
- {
- if (binder.HasSession)
- {
- try
- {
- this.StartReceiving(false);
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- this.ReliableSession.OnUnknownException(e);
- }
- }
- }
- protected override bool HandleReceiveComplete(IAsyncResult result)
- {
- RequestContext context;
- bool timeoutOkay = this.Binder.EndTryReceive(result, out context);
- if (timeoutOkay)
- {
- if (context == null)
- {
- bool terminated = false;
- lock (this.ThisLock)
- {
- terminated = this.Connection.Terminate();
- }
- if (!terminated && (this.Binder.State == CommunicationState.Opened))
- {
- Exception e = new CommunicationException(SR.GetString(SR.EarlySecurityClose));
- this.ReliableSession.OnLocalFault(e, (Message)null, null);
- }
- return false;
- }
- WsrmMessageInfo info = WsrmMessageInfo.Get(this.Listener.MessageVersion,
- this.Listener.ReliableMessagingVersion, this.Binder.Channel, this.Binder.GetInnerSession(),
- context.RequestMessage);
- this.StartReceiving(false);
- this.ProcessRequest(context, info);
- return false;
- }
- return true;
- }
- public void ProcessDemuxedRequest(RequestContext context, WsrmMessageInfo info)
- {
- try
- {
- this.ProcessRequest(context, info);
- }
- #pragma warning suppress 56500 // covered by FxCOP
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- throw;
- this.ReliableSession.OnUnknownException(e);
- }
- }
- void ProcessRequest(RequestContext context, WsrmMessageInfo info)
- {
- bool closeContext = true;
- bool closeMessage = true;
- try
- {
- if (!this.ReliableSession.ProcessInfo(info, context))
- {
- closeContext = false;
- closeMessage = false;
- return;
- }
- if (!this.ReliableSession.VerifySimplexProtocolElements(info, context))
- {
- closeContext = false;
- closeMessage = false;
- return;
- }
- this.ReliableSession.OnRemoteActivity(false);
- if (info.CreateSequenceInfo != null)
- {
- EndpointAddress acksTo;
- if (WsrmUtilities.ValidateCreateSequence<IInputSessionChannel>(info, this.Listener, this.Binder.Channel, out acksTo))
- {
- Message response = WsrmUtilities.CreateCreateSequenceResponse(this.Listener.MessageVersion,
- this.Listener.ReliableMessagingVersion, false, info.CreateSequenceInfo,
- this.Listener.Ordered, this.ReliableSession.InputID, acksTo);
- using (context)
- {
- using (response)
- {
- if (this.Binder.AddressResponse(info.Message, response))
- context.Reply(response, this.DefaultSendTimeout);
- }
- }
- }
- else
- {
- this.ReliableSession.OnLocalFault(info.FaultException, info.FaultReply, context);
- }
- closeContext = false;
- closeMessage = false;
- return;
- }
- bool needDispatch = false;
- bool scheduleShutdown = false;
- bool terminate = false;
- WsrmFault fault = null;
- Message message = null;
- Exception remoteFaultException = null;
- bool wsrmFeb2005 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessagingFebruary2005;
- bool wsrm11 = this.Listener.ReliableMessagingVersion == ReliableMessagingVersion.WSReliableMessaging11;
- bool addAck = info.AckRequestedInfo != null;
- if (info.SequencedMessageInfo != null)
- {
- lock (this.ThisLock)
- {
- if (this.Aborted || (this.State == CommunicationState.Faulted))
- {
- return;
- }
- Int64 sequenceNumber = info.SequencedMessageInfo.SequenceNumber;
- bool isLast = wsrmFeb2005 && info.SequencedMessageInfo.LastMessage;
- if (!this.Connection.IsValid(sequenceNumber, isLast))
- {
- if (wsrmFeb2005)
- {
- fault = new LastMessageNumberExceededFault(this.ReliableSession.InputID);
- }
- else
- {
- message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
- this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- else if (this.Connection.Ranges.Contains(sequenceNumber))
- {
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- else if (wsrmFeb2005 && info.Action == WsrmFeb2005Strings.LastMessageAction)
- {
- this.Connection.Merge(sequenceNumber, isLast);
- scheduleShutdown = this.Connection.AllAdded;
- }
- else if (this.State == CommunicationState.Closing)
- {
- if (wsrmFeb2005)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedSessionClosedBeforeDone),
- SR.GetString(SR.SessionClosedBeforeDone));
- }
- else
- {
- message = new SequenceClosedFault(this.ReliableSession.InputID).CreateMessage(
- this.Listener.MessageVersion, this.Listener.ReliableMessagingVersion);
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- // In the unordered case we accept no more than MaxSequenceRanges ranges to limit the
- // serialized ack size and the amount of memory taken by the ack ranges. In the
- // ordered case, the delivery strategy MaxTransferWindowSize quota mitigates this
- // threat.
- else if (this.DeliveryStrategy.CanEnqueue(sequenceNumber)
- && (this.Listener.Ordered || this.Connection.CanMerge(sequenceNumber)))
- {
- this.Connection.Merge(sequenceNumber, isLast);
- needDispatch = this.DeliveryStrategy.Enqueue(info.Message, sequenceNumber);
- scheduleShutdown = this.Connection.AllAdded;
- closeMessage = false;
- }
- else
- {
- if (PerformanceCounters.PerformanceCountersEnabled)
- PerformanceCounters.MessageDropped(this.perfCounterId);
- }
- }
- }
- else if (wsrmFeb2005 && info.TerminateSequenceInfo != null)
- {
- bool isTerminateEarly;
- lock (this.ThisLock)
- {
- isTerminateEarly = !this.Connection.Terminate();
- }
- if (isTerminateEarly)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedEarlyTerminateSequence),
- SR.GetString(SR.EarlyTerminateSequence));
- }
- else
- {
- // In the normal case, TerminateSequence is a one-way operation, returning (the finally
- // block will close the context).
- return;
- }
- }
- else if (wsrm11 && ((info.TerminateSequenceInfo != null) || (info.CloseSequenceInfo != null)))
- {
- bool isTerminate = (info.TerminateSequenceInfo != null);
- WsrmRequestInfo requestInfo = isTerminate
- ? (WsrmRequestInfo)info.TerminateSequenceInfo
- : (WsrmRequestInfo)info.CloseSequenceInfo;
- Int64 last = isTerminate ? info.TerminateSequenceInfo.LastMsgNumber : info.CloseSequenceInfo.LastMsgNumber;
- if (!WsrmUtilities.ValidateWsrmRequest(this.ReliableSession, requestInfo, this.Binder, context))
- {
- closeMessage = false;
- closeContext = false;
- return;
- }
- bool isLastLargeEnough = true;
- bool isLastConsistent = true;
- lock (this.ThisLock)
- {
- if (!this.Connection.IsLastKnown)
- {
- if (isTerminate)
- {
- if (this.Connection.SetTerminateSequenceLast(last, out isLastLargeEnough))
- {
- scheduleShutdown = true;
- }
- else if (isLastLargeEnough)
- {
- remoteFaultException = new ProtocolException(SR.GetString(SR.EarlyTerminateSequence));
- }
- }
- else
- {
- scheduleShutdown = this.Connection.SetCloseSequenceLast(last);
- isLastLargeEnough = scheduleShutdown;
- }
- if (scheduleShutdown)
- {
- this.ReliableSession.SetFinalAck(this.Connection.Ranges);
- this.DeliveryStrategy.Dispose();
- }
- }
- else
- {
- isLastConsistent = (last == this.Connection.Last);
- // Have seen CloseSequence already, TerminateSequence means cleanup.
- if (isTerminate && isLastConsistent && this.Connection.IsSequenceClosed)
- {
- terminate = true;
- }
- }
- }
- if (!isLastLargeEnough)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedSmallLastMsgNumber),
- SR.GetString(SR.SmallLastMsgNumberExceptionString));
- }
- else if (!isLastConsistent)
- {
- fault = SequenceTerminatedFault.CreateProtocolFault(this.ReliableSession.InputID,
- SR.GetString(SR.SequenceTerminatedInconsistentLastMsgNumber),
- SR.GetString(SR.InconsistentLastMsgNumberExceptionString));
- }
- else
- {
- message = isTerminate
- ? WsrmUtilities.CreateTerminateResponseMessage(this.Listener.MessageVersion,
- requestInfo.MessageId, this.ReliableSession.InputID)
- : WsrmUtilities.CreateCloseSequenceResponse(this.Listener.MessageVersion,
- requestInfo.MessageId, this.ReliableSession.InputID);
- addAck = true;
- }
- }
- if (fault != null)
- {
- this.ReliableSession.OnLocalFault(fault.CreateException(), fault, context);
- closeMessage = false;
- closeContext = false;
- return;
- }
- if (message != null && addAck)
- {
- this.AddAcknowledgementHeader(message);
- }
- else if (message == null)
- {
- message = this.CreateAcknowledgmentMessage();
- }
- using (message)
- {
- context.Reply(message);
- }
- if (terminate)
- {
- lock (this.ThisLock)
- {
- this.Connection.Terminate();
- }
- }
- if (remoteFaultException != null)
- {
- this.ReliableSession.OnRemoteFault(remoteFaultException);
- return;
- }
- if (needDispatch)
- {
- this.Dispatch();
- }
- if (scheduleShutdown)
- {
- ActionItem.Schedule(this.ShutdownCallback, null);
- }
- }
- finally
- {
- if (closeMessage)
- info.Message.Close();
- if (closeContext)
- context.Close();
- }
- }
- }
- }
|