| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130113111321133113411351136113711381139114011411142114311441145114611471148114911501151115211531154115511561157115811591160116111621163116411651166116711681169117011711172117311741175117611771178117911801181118211831184118511861187118811891190119111921193119411951196119711981199120012011202120312041205120612071208120912101211121212131214121512161217121812191220122112221223122412251226122712281229123012311232123312341235123612371238123912401241124212431244124512461247124812491250125112521253125412551256125712581259126012611262126312641265126612671268126912701271127212731274127512761277127812791280128112821283128412851286128712881289129012911292129312941295129612971298129913001301130213031304130513061307130813091310131113121313131413151316131713181319132013211322132313241325132613271328132913301331133213331334133513361337133813391340134113421343134413451346134713481349135013511352135313541355135613571358135913601361136213631364136513661367136813691370137113721373137413751376137713781379138013811382138313841385138613871388138913901391139213931394139513961397139813991400140114021403140414051406140714081409141014111412141314141415141614171418141914201421142214231424142514261427142814291430143114321433143414351436143714381439144014411442144314441445144614471448144914501451145214531454145514561457145814591460146114621463146414651466146714681469147014711472147314741475147614771478147914801481148214831484148514861487148814891490149114921493149414951496149714981499150015011502150315041505150615071508150915101511151215131514151515161517151815191520152115221523152415251526152715281529153015311532153315341535153615371538153915401541154215431544154515461547154815491550155115521553155415551556155715581559156015611562156315641565156615671568156915701571157215731574157515761577157815791580158115821583158415851586158715881589159015911592159315941595159615971598159916001601160216031604160516061607160816091610161116121613161416151616161716181619162016211622162316241625162616271628162916301631163216331634163516361637163816391640164116421643164416451646164716481649165016511652165316541655165616571658165916601661166216631664166516661667166816691670167116721673167416751676167716781679168016811682168316841685168616871688168916901691169216931694169516961697169816991700170117021703170417051706170717081709171017111712171317141715171617171718171917201721172217231724172517261727172817291730173117321733173417351736173717381739174017411742174317441745174617471748174917501751175217531754175517561757175817591760176117621763176417651766176717681769177017711772177317741775177617771778177917801781178217831784178517861787178817891790179117921793179417951796179717981799180018011802180318041805180618071808180918101811181218131814181518161817181818191820182118221823182418251826182718281829183018311832183318341835183618371838183918401841184218431844184518461847184818491850185118521853185418551856185718581859186018611862186318641865186618671868186918701871187218731874187518761877187818791880188118821883188418851886188718881889189018911892189318941895189618971898189919001901190219031904190519061907190819091910191119121913191419151916191719181919192019211922192319241925192619271928192919301931193219331934193519361937193819391940194119421943194419451946194719481949195019511952195319541955195619571958195919601961196219631964196519661967196819691970197119721973197419751976197719781979198019811982198319841985198619871988198919901991 |
- //------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //------------------------------------------------------------
- namespace System.ServiceModel.Channels
- {
- using System;
- using System.Diagnostics;
- using System.IO;
- using System.Net;
- using System.Runtime;
- using System.Runtime.CompilerServices;
- using System.Security.Authentication.ExtendedProtection;
- using System.ServiceModel;
- using System.ServiceModel.Activation;
- using System.ServiceModel.Description;
- using System.ServiceModel.Diagnostics;
- using System.ServiceModel.Dispatcher;
- using System.ServiceModel.Security;
- using System.Threading;
- using System.Xml;
- using System.ServiceModel.Diagnostics.Application;
- delegate void ServerSingletonPreambleCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
- delegate ISingletonChannelListener SingletonPreambleDemuxCallback(ServerSingletonPreambleConnectionReader serverSingletonPreambleReader);
- interface ISingletonChannelListener
- {
- TimeSpan ReceiveTimeout { get; }
- void ReceiveRequest(RequestContext requestContext, Action callback, bool canDispatchOnThisThread);
- }
- class ServerSingletonPreambleConnectionReader : InitialServerConnectionReader
- {
- ServerSingletonDecoder decoder;
- ServerSingletonPreambleCallback callback;
- WaitCallback onAsyncReadComplete;
- IConnectionOrientedTransportFactorySettings transportSettings;
- TransportSettingsCallback transportSettingsCallback;
- SecurityMessageProperty security;
- Uri via;
- IConnection rawConnection;
- byte[] connectionBuffer;
- bool isReadPending;
- int offset;
- int size;
- TimeoutHelper receiveTimeoutHelper;
- Action<Uri> viaDelegate;
- ChannelBinding channelBindingToken;
- static AsyncCallback onValidate;
- public ServerSingletonPreambleConnectionReader(IConnection connection, Action connectionDequeuedCallback,
- long streamPosition, int offset, int size, TransportSettingsCallback transportSettingsCallback,
- ConnectionClosedCallback closedCallback, ServerSingletonPreambleCallback callback)
- : base(connection, closedCallback)
- {
- this.decoder = new ServerSingletonDecoder(streamPosition, MaxViaSize, MaxContentTypeSize);
- this.offset = offset;
- this.size = size;
- this.callback = callback;
- this.transportSettingsCallback = transportSettingsCallback;
- this.rawConnection = connection;
- this.ConnectionDequeuedCallback = connectionDequeuedCallback;
- }
- public ChannelBinding ChannelBinding
- {
- get
- {
- return this.channelBindingToken;
- }
- }
- public int BufferOffset
- {
- get { return this.offset; }
- }
- public int BufferSize
- {
- get { return this.size; }
- }
- public ServerSingletonDecoder Decoder
- {
- get { return this.decoder; }
- }
- public IConnection RawConnection
- {
- get { return this.rawConnection; }
- }
- public Uri Via
- {
- get { return this.via; }
- }
- public IConnectionOrientedTransportFactorySettings TransportSettings
- {
- get { return this.transportSettings; }
- }
- public SecurityMessageProperty Security
- {
- get { return this.security; }
- }
- TimeSpan GetRemainingTimeout()
- {
- return this.receiveTimeoutHelper.RemainingTime();
- }
- void ReadAndDispatch()
- {
- bool success = false;
- try
- {
- while ((size > 0 || !isReadPending) && !IsClosed)
- {
- if (size == 0)
- {
- isReadPending = true;
- if (onAsyncReadComplete == null)
- {
- onAsyncReadComplete = new WaitCallback(OnAsyncReadComplete);
- }
- if (Connection.BeginRead(0, connectionBuffer.Length, GetRemainingTimeout(),
- onAsyncReadComplete, null) == AsyncCompletionResult.Queued)
- {
- break;
- }
- HandleReadComplete();
- }
- int bytesRead = decoder.Decode(connectionBuffer, offset, size);
- if (bytesRead > 0)
- {
- offset += bytesRead;
- size -= bytesRead;
- }
- if (decoder.CurrentState == ServerSingletonDecoder.State.PreUpgradeStart)
- {
- if (onValidate == null)
- {
- onValidate = Fx.ThunkCallback(new AsyncCallback(OnValidate));
- }
- this.via = decoder.Via;
- IAsyncResult result = this.Connection.BeginValidate(this.via, onValidate, this);
- if (result.CompletedSynchronously)
- {
- if (!VerifyValidationResult(result))
- {
- // This goes through the failure path (Abort) even though it doesn't throw.
- return;
- }
- }
- break; //exit loop, set success=true;
- }
- }
- success = true;
- }
- catch (CommunicationException exception)
- {
- DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
- }
- catch (TimeoutException exception)
- {
- if (TD.ReceiveTimeoutIsEnabled())
- {
- TD.ReceiveTimeout(exception.Message);
- }
- DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- if (!ExceptionHandler.HandleTransportExceptionHelper(e))
- {
- throw;
- }
- // containment -- we abort ourselves for any error, no extra containment needed
- }
- finally
- {
- if (!success)
- {
- Abort();
- }
- }
- }
- //returns true if validation was successful
- bool VerifyValidationResult(IAsyncResult result)
- {
- return this.Connection.EndValidate(result) && this.ContinuePostValidationProcessing();
- }
- static void OnValidate(IAsyncResult result)
- {
- bool success = false;
- ServerSingletonPreambleConnectionReader thisPtr = (ServerSingletonPreambleConnectionReader)result.AsyncState;
- try
- {
- if (!result.CompletedSynchronously)
- {
- if (!thisPtr.VerifyValidationResult(result))
- {
- // This goes through the failure path (Abort) even though it doesn't throw.
- return;
- }
- }
- success = true;
- }
- catch (CommunicationException exception)
- {
- DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
- }
- catch (TimeoutException exception)
- {
- if (TD.ReceiveTimeoutIsEnabled())
- {
- TD.ReceiveTimeout(exception.Message);
- }
- DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- }
- finally
- {
- if (!success)
- {
- thisPtr.Abort();
- }
- }
- }
- //returns false if the connection should be aborted
- bool ContinuePostValidationProcessing()
- {
- if (viaDelegate != null)
- {
- try
- {
- viaDelegate(via);
- }
- catch (ServiceActivationException e)
- {
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- // return fault and close connection
- SendFault(FramingEncodingString.ServiceActivationFailedFault);
- return true;
- }
- }
- this.transportSettings = transportSettingsCallback(via);
- if (transportSettings == null)
- {
- EndpointNotFoundException e = new EndpointNotFoundException(SR.GetString(SR.EndpointNotFound, decoder.Via));
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- // return fault and close connection
- SendFault(FramingEncodingString.EndpointNotFoundFault);
- return false;
- }
- // we have enough information to hand off to a channel. Our job is done
- callback(this);
- return true;
- }
- public void SendFault(string faultString)
- {
- SendFault(faultString, ref this.receiveTimeoutHelper);
- }
- void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
- {
- InitialServerConnectionReader.SendFault(Connection, faultString,
- connectionBuffer, timeoutHelper.RemainingTime(), TransportDefaults.MaxDrainSize);
- }
- public IAsyncResult BeginCompletePreamble(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new CompletePreambleAsyncResult(timeout, this, callback, state);
- }
- public IConnection EndCompletePreamble(IAsyncResult result)
- {
- return CompletePreambleAsyncResult.End(result);
- }
- class CompletePreambleAsyncResult : TypedAsyncResult<IConnection>
- {
- static WaitCallback onReadCompleted = new WaitCallback(OnReadCompleted);
- static WaitCallback onWriteCompleted = new WaitCallback(OnWriteCompleted);
- static AsyncCallback onUpgradeComplete = Fx.ThunkCallback(OnUpgradeComplete);
- TimeoutHelper timeoutHelper;
- ServerSingletonPreambleConnectionReader parent;
- StreamUpgradeAcceptor upgradeAcceptor;
- StreamUpgradeProvider upgrade;
- IStreamUpgradeChannelBindingProvider channelBindingProvider;
- IConnection currentConnection;
- UpgradeState upgradeState = UpgradeState.None;
-
- public CompletePreambleAsyncResult(TimeSpan timeout, ServerSingletonPreambleConnectionReader parent, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.timeoutHelper = new TimeoutHelper(timeout);
- this.parent = parent;
- Initialize();
- if (ContinueWork(null))
- {
- Complete(this.currentConnection, true);
- }
- }
- byte[] ConnectionBuffer
- {
- get
- {
- return this.parent.connectionBuffer;
- }
- set
- {
- this.parent.connectionBuffer = value;
- }
- }
- int Offset
- {
- get
- {
- return this.parent.offset;
- }
- set
- {
- this.parent.offset = value;
- }
- }
- int Size
- {
- get
- {
- return this.parent.size;
- }
- set
- {
- this.parent.size = value;
- }
- }
- bool CanReadAndDecode
- {
- get
- {
- //ok to read/decode before we start the upgrade
- //and between UpgradeComplete/WritingPreambleAck
- return this.upgradeState == UpgradeState.None
- || this.upgradeState == UpgradeState.UpgradeComplete;
- }
- }
- ServerSingletonDecoder Decoder
- {
- get
- {
- return this.parent.decoder;
- }
- }
- void Initialize()
- {
- if (!this.parent.transportSettings.MessageEncoderFactory.Encoder.IsContentTypeSupported(Decoder.ContentType))
- {
- SendFault(FramingEncodingString.ContentTypeInvalidFault, ref timeoutHelper);
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(
- SR.ContentTypeMismatch, Decoder.ContentType, parent.transportSettings.MessageEncoderFactory.Encoder.ContentType)));
- }
- upgrade = this.parent.transportSettings.Upgrade;
- if (upgrade != null)
- {
- channelBindingProvider = upgrade.GetProperty<IStreamUpgradeChannelBindingProvider>();
- upgradeAcceptor = upgrade.CreateUpgradeAcceptor();
- }
- this.currentConnection = this.parent.Connection;
- }
- void SendFault(string faultString, ref TimeoutHelper timeoutHelper)
- {
- this.parent.SendFault(faultString, ref timeoutHelper);
- }
- bool BeginRead()
- {
- this.Offset = 0;
- return this.currentConnection.BeginRead(0, this.ConnectionBuffer.Length, timeoutHelper.RemainingTime(), onReadCompleted, this) == AsyncCompletionResult.Completed;
- }
- void EndRead()
- {
- this.Size = currentConnection.EndRead();
- if (this.Size == 0)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(this.Decoder.CreatePrematureEOFException());
- }
- }
- bool ContinueWork(IAsyncResult upgradeAsyncResult)
- {
- if (upgradeAsyncResult != null)
- {
- Fx.AssertAndThrow(this.upgradeState == UpgradeState.EndUpgrade, "upgradeAsyncResult should only be passed in from OnUpgradeComplete callback");
- }
- for (;;)
- {
- if (Size == 0 && this.CanReadAndDecode)
- {
- if (BeginRead())
- {
- EndRead();
- }
- else
- {
- //when read completes, we will re-enter this loop.
- break;
- }
- }
- for (;;)
- {
- if (this.CanReadAndDecode)
- {
- int bytesRead = Decoder.Decode(ConnectionBuffer, Offset, Size);
- if (bytesRead > 0)
- {
- Offset += bytesRead;
- Size -= bytesRead;
- }
- }
- switch (Decoder.CurrentState)
- {
- case ServerSingletonDecoder.State.UpgradeRequest:
- switch (this.upgradeState)
- {
- case UpgradeState.None:
- //change the state so that we don't read/decode until it is safe
- ChangeUpgradeState(UpgradeState.VerifyingUpgradeRequest);
- break;
- case UpgradeState.VerifyingUpgradeRequest:
- if (this.upgradeAcceptor == null)
- {
- SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ProtocolException(SR.GetString(SR.UpgradeRequestToNonupgradableService, Decoder.Upgrade)));
- }
- if (!this.upgradeAcceptor.CanUpgrade(Decoder.Upgrade))
- {
- SendFault(FramingEncodingString.UpgradeInvalidFault, ref timeoutHelper);
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ProtocolException(SR.GetString(SR.UpgradeProtocolNotSupported, Decoder.Upgrade)));
- }
- ChangeUpgradeState(UpgradeState.WritingUpgradeAck);
- // accept upgrade
- if (this.currentConnection.BeginWrite(ServerSingletonEncoder.UpgradeResponseBytes, 0, ServerSingletonEncoder.UpgradeResponseBytes.Length,
- true, timeoutHelper.RemainingTime(), onWriteCompleted, this) == AsyncCompletionResult.Queued)
- {
- //OnWriteCompleted will:
- // 1) set upgradeState to UpgradeAckSent
- // 2) call EndWrite
- return false;
- }
- else
- {
- this.currentConnection.EndWrite();
- }
- ChangeUpgradeState(UpgradeState.UpgradeAckSent);
- break;
- case UpgradeState.UpgradeAckSent:
- IConnection connectionToUpgrade = this.currentConnection;
- if (Size > 0)
- {
- connectionToUpgrade = new PreReadConnection(connectionToUpgrade, ConnectionBuffer, Offset, Size);
- }
- ChangeUpgradeState(UpgradeState.BeginUpgrade);
- break;
- case UpgradeState.BeginUpgrade:
- try
- {
- if (!BeginUpgrade(out upgradeAsyncResult))
- {
- //OnUpgradeComplete will set upgradeState to EndUpgrade
- return false;
- }
- ChangeUpgradeState(UpgradeState.EndUpgrade);
- }
- catch (Exception exception)
- {
- if (Fx.IsFatal(exception))
- throw;
-
- this.parent.WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
- throw;
- }
- break;
- case UpgradeState.EndUpgrade://Must be a different state here than UpgradeComplete so that we don't try to read from the connection
- try
- {
- EndUpgrade(upgradeAsyncResult);
- ChangeUpgradeState(UpgradeState.UpgradeComplete);
- }
- catch (Exception exception)
- {
- if (Fx.IsFatal(exception))
- throw;
- this.parent.WriteAuditFailure(upgradeAcceptor as StreamSecurityUpgradeAcceptor, exception);
- throw;
- }
- break;
- case UpgradeState.UpgradeComplete:
- //Client is doing more than one upgrade, reset the state
- ChangeUpgradeState(UpgradeState.VerifyingUpgradeRequest);
- break;
- }
- break;
- case ServerSingletonDecoder.State.Start:
- this.parent.SetupSecurityIfNecessary(upgradeAcceptor);
- if (this.upgradeState == UpgradeState.UpgradeComplete //We have done at least one upgrade, but we are now done.
- || this.upgradeState == UpgradeState.None)//no upgrade, just send the preample end bytes
- {
- ChangeUpgradeState(UpgradeState.WritingPreambleEnd);
- // we've finished the preamble. Ack and return.
- if (this.currentConnection.BeginWrite(ServerSessionEncoder.AckResponseBytes, 0, ServerSessionEncoder.AckResponseBytes.Length,
- true, timeoutHelper.RemainingTime(), onWriteCompleted, this) == AsyncCompletionResult.Queued)
- {
- //OnWriteCompleted will:
- // 1) set upgradeState to PreambleEndSent
- // 2) call EndWrite
- return false;
- }
- else
- {
- this.currentConnection.EndWrite();
- }
-
- //terminal state
- ChangeUpgradeState(UpgradeState.PreambleEndSent);
- }
-
- //we are done, this.currentConnection is the upgraded connection
- return true;
- }
- if (Size == 0)
- {
- break;
- }
- }
- }
- return false;
- }
- bool BeginUpgrade(out IAsyncResult upgradeAsyncResult)
- {
- upgradeAsyncResult = InitialServerConnectionReader.BeginUpgradeConnection(this.currentConnection, upgradeAcceptor, this.parent.transportSettings, onUpgradeComplete, this);
- if (!upgradeAsyncResult.CompletedSynchronously)
- {
- upgradeAsyncResult = null; //caller shouldn't use this out param unless completed [....].
- return false;
- }
- return true;
- }
- void EndUpgrade(IAsyncResult upgradeAsyncResult)
- {
- this.currentConnection = InitialServerConnectionReader.EndUpgradeConnection(upgradeAsyncResult);
- this.ConnectionBuffer = this.currentConnection.AsyncReadBuffer;
- if (this.channelBindingProvider != null
- && this.channelBindingProvider.IsChannelBindingSupportEnabled
- && this.parent.channelBindingToken == null)//first one wins in the case of multiple upgrades.
- {
- this.parent.channelBindingToken = channelBindingProvider.GetChannelBinding(this.upgradeAcceptor, ChannelBindingKind.Endpoint);
- }
- }
- void ChangeUpgradeState(UpgradeState newState)
- {
- switch (newState)
- {
- case UpgradeState.None:
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- case UpgradeState.VerifyingUpgradeRequest:
- if (this.upgradeState != UpgradeState.None //starting first upgrade
- && this.upgradeState != UpgradeState.UpgradeComplete)//completing one upgrade and starting another
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- case UpgradeState.WritingUpgradeAck:
- if (this.upgradeState != UpgradeState.VerifyingUpgradeRequest)
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- case UpgradeState.UpgradeAckSent:
- if (this.upgradeState != UpgradeState.WritingUpgradeAck)
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- case UpgradeState.BeginUpgrade:
- if (this.upgradeState != UpgradeState.UpgradeAckSent)
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- case UpgradeState.EndUpgrade:
- if (this.upgradeState != UpgradeState.BeginUpgrade)
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- case UpgradeState.UpgradeComplete:
- if (this.upgradeState != UpgradeState.EndUpgrade)
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- case UpgradeState.WritingPreambleEnd:
- if (this.upgradeState != UpgradeState.None //no upgrade being used
- && this.upgradeState != UpgradeState.UpgradeComplete)//upgrades are now complete, end the preamble handshake.
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- case UpgradeState.PreambleEndSent:
- if (this.upgradeState != UpgradeState.WritingPreambleEnd)
- {
- throw Fx.AssertAndThrow("Invalid State Transition: currentState=" + this.upgradeState + ", newState=" + newState);
- }
- break;
- default:
- throw Fx.AssertAndThrow("Unexpected Upgrade State: " + newState);
- }
- this.upgradeState = newState;
- }
- static void OnReadCompleted(object state)
- {
- CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)state;
- Exception completionException = null;
- bool completeSelf = false;
- try
- {
- thisPtr.EndRead();
- completeSelf = thisPtr.ContinueWork(null);
- }
- catch (Exception ex)
- {
- if (Fx.IsFatal(ex))
- {
- throw;
- }
- completionException = ex;
- completeSelf = true;
- }
- if (completeSelf)
- {
- if (completionException != null)
- {
- thisPtr.Complete(false, completionException);
- }
- else
- {
- thisPtr.Complete(thisPtr.currentConnection, false);
- }
- }
- }
- static void OnWriteCompleted(object state)
- {
- CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)state;
- Exception completionException = null;
- bool completeSelf = false;
- try
- {
- thisPtr.currentConnection.EndWrite();
- switch (thisPtr.upgradeState)
- {
- case UpgradeState.WritingUpgradeAck:
- thisPtr.ChangeUpgradeState(UpgradeState.UpgradeAckSent);
- break;
- case UpgradeState.WritingPreambleEnd:
- thisPtr.ChangeUpgradeState(UpgradeState.PreambleEndSent);
- break;
- }
- completeSelf = thisPtr.ContinueWork(null);
- }
- catch (Exception ex)
- {
- if (Fx.IsFatal(ex))
- {
- throw;
- }
- completionException = ex;
- completeSelf = true;
- }
- if (completeSelf)
- {
- if (completionException != null)
- {
- thisPtr.Complete(false, completionException);
- }
- else
- {
- thisPtr.Complete(thisPtr.currentConnection, false);
- }
- }
- }
-
- static void OnUpgradeComplete(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- CompletePreambleAsyncResult thisPtr = (CompletePreambleAsyncResult)result.AsyncState;
- Exception completionException = null;
- bool completeSelf = false;
-
- try
- {
- thisPtr.ChangeUpgradeState(UpgradeState.EndUpgrade);
- completeSelf = thisPtr.ContinueWork(result);
- }
- catch (Exception ex)
- {
- if (Fx.IsFatal(ex))
- {
- throw;
- }
- completionException = ex;
- completeSelf = true;
- }
- if (completeSelf)
- {
- if (completionException != null)
- {
- thisPtr.Complete(false, completionException);
- }
- else
- {
- thisPtr.Complete(thisPtr.currentConnection, false);
- }
- }
- }
- enum UpgradeState
- {
- None,
- VerifyingUpgradeRequest,
- WritingUpgradeAck,
- UpgradeAckSent,
- BeginUpgrade,
- EndUpgrade,
- UpgradeComplete,
- WritingPreambleEnd,
- PreambleEndSent,
- }
- }
- void SetupSecurityIfNecessary(StreamUpgradeAcceptor upgradeAcceptor)
- {
- StreamSecurityUpgradeAcceptor securityUpgradeAcceptor = upgradeAcceptor as StreamSecurityUpgradeAcceptor;
- if (securityUpgradeAcceptor != null)
- {
- this.security = securityUpgradeAcceptor.GetRemoteSecurity();
- if (this.security == null)
- {
- Exception securityFailedException = new ProtocolException(
- SR.GetString(SR.RemoteSecurityNotNegotiatedOnStreamUpgrade, this.Via));
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(securityFailedException);
- }
- // Audit Authentication Success
- WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Success, null);
- }
- }
- #region Transport Security Auditing
- void WriteAuditFailure(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, Exception exception)
- {
- try
- {
- WriteAuditEvent(securityUpgradeAcceptor, AuditLevel.Failure, exception);
- }
- #pragma warning suppress 56500 // covered by FxCop
- catch (Exception auditException)
- {
- if (Fx.IsFatal(auditException))
- {
- throw;
- }
- DiagnosticUtility.TraceHandledException(auditException, TraceEventType.Error);
- }
- }
- void WriteAuditEvent(StreamSecurityUpgradeAcceptor securityUpgradeAcceptor, AuditLevel auditLevel, Exception exception)
- {
- if ((this.transportSettings.AuditBehavior.MessageAuthenticationAuditLevel & auditLevel) != auditLevel)
- {
- return;
- }
- if (securityUpgradeAcceptor == null)
- {
- return;
- }
- String primaryIdentity = String.Empty;
- SecurityMessageProperty clientSecurity = securityUpgradeAcceptor.GetRemoteSecurity();
- if (clientSecurity != null)
- {
- primaryIdentity = GetIdentityNameFromContext(clientSecurity);
- }
- ServiceSecurityAuditBehavior auditBehavior = this.transportSettings.AuditBehavior;
- if (auditLevel == AuditLevel.Success)
- {
- SecurityAuditHelper.WriteTransportAuthenticationSuccessEvent(auditBehavior.AuditLogLocation,
- auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity);
- }
- else
- {
- SecurityAuditHelper.WriteTransportAuthenticationFailureEvent(auditBehavior.AuditLogLocation,
- auditBehavior.SuppressAuditFailure, null, this.Via, primaryIdentity, exception);
- }
- }
- [MethodImpl(MethodImplOptions.NoInlining)]
- static string GetIdentityNameFromContext(SecurityMessageProperty clientSecurity)
- {
- return SecurityUtils.GetIdentityNamesFromContext(
- clientSecurity.ServiceSecurityContext.AuthorizationContext);
- }
- #endregion
- void HandleReadComplete()
- {
- offset = 0;
- size = Connection.EndRead();
- isReadPending = false;
- if (size == 0)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
- }
- }
- void OnAsyncReadComplete(object state)
- {
- bool success = false;
- try
- {
- HandleReadComplete();
- ReadAndDispatch();
- success = true;
- }
- catch (CommunicationException exception)
- {
- DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
- }
- catch (TimeoutException exception)
- {
- if (TD.ReceiveTimeoutIsEnabled())
- {
- TD.ReceiveTimeout(exception.Message);
- }
- DiagnosticUtility.TraceHandledException(exception, TraceEventType.Information);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- if (!ExceptionHandler.HandleTransportExceptionHelper(e))
- {
- throw;
- }
- // containment -- we abort ourselves for any error, no extra containment needed
- }
- finally
- {
- if (!success)
- {
- Abort();
- }
- }
- }
- public void StartReading(Action<Uri> viaDelegate, TimeSpan timeout)
- {
- this.viaDelegate = viaDelegate;
- this.receiveTimeoutHelper = new TimeoutHelper(timeout);
- this.connectionBuffer = Connection.AsyncReadBuffer;
- ReadAndDispatch();
- }
- }
- class ServerSingletonConnectionReader : SingletonConnectionReader
- {
- ConnectionDemuxer connectionDemuxer;
- ServerSingletonDecoder decoder;
- IConnection rawConnection;
- string contentType;
- ChannelBinding channelBindingToken;
- public ServerSingletonConnectionReader(ServerSingletonPreambleConnectionReader preambleReader,
- IConnection upgradedConnection, ConnectionDemuxer connectionDemuxer)
- : base(upgradedConnection, preambleReader.BufferOffset, preambleReader.BufferSize,
- preambleReader.Security, preambleReader.TransportSettings, preambleReader.Via)
- {
- this.decoder = preambleReader.Decoder;
- this.contentType = this.decoder.ContentType;
- this.connectionDemuxer = connectionDemuxer;
- this.rawConnection = preambleReader.RawConnection;
- this.channelBindingToken = preambleReader.ChannelBinding;
- }
- protected override string ContentType
- {
- get { return this.contentType; }
- }
- protected override long StreamPosition
- {
- get { return this.decoder.StreamPosition; }
- }
- protected override bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof)
- {
- while (size > 0)
- {
- int bytesRead = decoder.Decode(buffer, offset, size);
- if (bytesRead > 0)
- {
- offset += bytesRead;
- size -= bytesRead;
- }
- switch (decoder.CurrentState)
- {
- case ServerSingletonDecoder.State.EnvelopeStart:
- // we're at the envelope
- return true;
- case ServerSingletonDecoder.State.End:
- isAtEof = true;
- return false;
- }
- }
- return false;
- }
- protected override void OnClose(TimeSpan timeout)
- {
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- // send back EOF and then recycle the connection
- this.Connection.Write(SingletonEncoder.EndBytes, 0, SingletonEncoder.EndBytes.Length, true, timeoutHelper.RemainingTime());
- this.connectionDemuxer.ReuseConnection(this.rawConnection, timeoutHelper.RemainingTime());
- ChannelBindingUtility.Dispose(ref this.channelBindingToken);
- }
- protected override void PrepareMessage(Message message)
- {
- base.PrepareMessage(message);
- IPEndPoint remoteEndPoint = this.rawConnection.RemoteIPEndPoint;
- // pipes will return null
- if (remoteEndPoint != null)
- {
- RemoteEndpointMessageProperty remoteEndpointProperty = new RemoteEndpointMessageProperty(remoteEndPoint);
- message.Properties.Add(RemoteEndpointMessageProperty.Name, remoteEndpointProperty);
- }
- if (this.channelBindingToken != null)
- {
- ChannelBindingMessageProperty property = new ChannelBindingMessageProperty(this.channelBindingToken, false);
- property.AddTo(message);
- property.Dispose(); //message.Properties.Add() creates a copy...
- }
- }
- }
- abstract class SingletonConnectionReader
- {
- IConnection connection;
- bool doneReceiving;
- bool doneSending;
- bool isAtEof;
- bool isClosed;
- SecurityMessageProperty security;
- object thisLock = new object();
- int offset;
- int size;
- IConnectionOrientedTransportFactorySettings transportSettings;
- Uri via;
- Stream inputStream;
- protected SingletonConnectionReader(IConnection connection, int offset, int size, SecurityMessageProperty security,
- IConnectionOrientedTransportFactorySettings transportSettings, Uri via)
- {
- this.connection = connection;
- this.offset = offset;
- this.size = size;
- this.security = security;
- this.transportSettings = transportSettings;
- this.via = via;
- }
- protected IConnection Connection
- {
- get
- {
- return this.connection;
- }
- }
- protected object ThisLock
- {
- get
- {
- return this.thisLock;
- }
- }
- protected virtual string ContentType
- {
- get { return null; }
- }
- protected abstract long StreamPosition { get; }
- public void Abort()
- {
- this.connection.Abort();
- }
- public void DoneReceiving(bool atEof)
- {
- DoneReceiving(atEof, this.transportSettings.CloseTimeout);
- }
- void DoneReceiving(bool atEof, TimeSpan timeout)
- {
- if (!this.doneReceiving)
- {
- this.isAtEof = atEof;
- this.doneReceiving = true;
- if (this.doneSending)
- {
- this.Close(timeout);
- }
- }
- }
- public void Close(TimeSpan timeout)
- {
- lock (ThisLock)
- {
- if (this.isClosed)
- {
- return;
- }
- this.isClosed = true;
- }
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- bool success = false;
- try
- {
- // first drain our stream if necessary
- if (this.inputStream != null)
- {
- byte[] dummy = DiagnosticUtility.Utility.AllocateByteArray(transportSettings.ConnectionBufferSize);
- while (!this.isAtEof)
- {
- this.inputStream.ReadTimeout = TimeoutHelper.ToMilliseconds(timeoutHelper.RemainingTime());
- int bytesRead = this.inputStream.Read(dummy, 0, dummy.Length);
- if (bytesRead == 0)
- {
- this.isAtEof = true;
- }
- }
- }
- OnClose(timeoutHelper.RemainingTime());
- success = true;
- }
- finally
- {
- if (!success)
- {
- this.Abort();
- }
- }
- }
- protected abstract void OnClose(TimeSpan timeout);
- public void DoneSending(TimeSpan timeout)
- {
- this.doneSending = true;
- if (this.doneReceiving)
- {
- this.Close(timeout);
- }
- }
- protected abstract bool DecodeBytes(byte[] buffer, ref int offset, ref int size, ref bool isAtEof);
- protected virtual void PrepareMessage(Message message)
- {
- message.Properties.Via = this.via;
- message.Properties.Security = (this.security != null) ? (SecurityMessageProperty)this.security.CreateCopy() : null;
- }
- public RequestContext ReceiveRequest(TimeSpan timeout)
- {
- Message requestMessage = Receive(timeout);
- return new StreamedFramingRequestContext(this, requestMessage);
- }
- public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new ReceiveAsyncResult(this, timeout, callback, state);
- }
- public virtual Message EndReceive(IAsyncResult result)
- {
- return ReceiveAsyncResult.End(result);
- }
- public Message Receive(TimeSpan timeout)
- {
- byte[] buffer = DiagnosticUtility.Utility.AllocateByteArray(connection.AsyncReadBufferSize);
- if (size > 0)
- {
- Buffer.BlockCopy(connection.AsyncReadBuffer, offset, buffer, offset, size);
- }
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- for (;;)
- {
- if (DecodeBytes(buffer, ref offset, ref size, ref isAtEof))
- {
- break;
- }
- if (this.isAtEof)
- {
- DoneReceiving(true, timeoutHelper.RemainingTime());
- return null;
- }
- if (size == 0)
- {
- offset = 0;
- size = connection.Read(buffer, 0, buffer.Length, timeoutHelper.RemainingTime());
- if (size == 0)
- {
- DoneReceiving(true, timeoutHelper.RemainingTime());
- return null;
- }
- }
- }
- // we're ready to read a message
- IConnection singletonConnection = this.connection;
- if (size > 0)
- {
- byte[] initialData = DiagnosticUtility.Utility.AllocateByteArray(size);
- Buffer.BlockCopy(buffer, offset, initialData, 0, size);
- singletonConnection = new PreReadConnection(singletonConnection, initialData);
- }
- Stream connectionStream = new SingletonInputConnectionStream(this, singletonConnection, this.transportSettings);
- this.inputStream = new MaxMessageSizeStream(connectionStream, transportSettings.MaxReceivedMessageSize);
- using (ServiceModelActivity activity = DiagnosticUtility.ShouldUseActivity ? ServiceModelActivity.CreateBoundedActivity(true) : null)
- {
- if (DiagnosticUtility.ShouldUseActivity)
- {
- ServiceModelActivity.Start(activity, SR.GetString(SR.ActivityProcessingMessage, TraceUtility.RetrieveMessageNumber()), ActivityType.ProcessMessage);
- }
- Message message = null;
- try
- {
- message = transportSettings.MessageEncoderFactory.Encoder.ReadMessage(
- this.inputStream, transportSettings.MaxBufferSize, this.ContentType);
- }
- catch (XmlException xmlException)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ProtocolException(SR.GetString(SR.MessageXmlProtocolError), xmlException));
- }
- if (DiagnosticUtility.ShouldUseActivity)
- {
- TraceUtility.TransferFromTransport(message);
- }
- PrepareMessage(message);
- return message;
- }
- }
- class ReceiveAsyncResult : AsyncResult
- {
- static Action<object> onReceiveScheduled = new Action<object>(OnReceiveScheduled);
- Message message;
- SingletonConnectionReader parent;
- TimeSpan timeout;
- public ReceiveAsyncResult(SingletonConnectionReader parent, TimeSpan timeout, AsyncCallback callback,
- object state)
- : base(callback, state)
- {
- this.parent = parent;
- this.timeout = timeout;
- //
- ActionItem.Schedule(onReceiveScheduled, this);
- }
- public static Message End(IAsyncResult result)
- {
- ReceiveAsyncResult receiveAsyncResult = AsyncResult.End<ReceiveAsyncResult>(result);
- return receiveAsyncResult.message;
- }
- static void OnReceiveScheduled(object state)
- {
- ReceiveAsyncResult thisPtr = (ReceiveAsyncResult)state;
- Exception completionException = null;
- try
- {
- thisPtr.message = thisPtr.parent.Receive(thisPtr.timeout);
- }
- #pragma warning suppress 56500 // [....], transferring exception to another thread
- catch (Exception exception)
- {
- if (Fx.IsFatal(exception))
- {
- throw;
- }
- completionException = exception;
- }
- thisPtr.Complete(false, completionException);
- }
- }
- class StreamedFramingRequestContext : RequestContextBase
- {
- IConnection connection;
- SingletonConnectionReader parent;
- IConnectionOrientedTransportFactorySettings settings;
- TimeoutHelper timeoutHelper;
- public StreamedFramingRequestContext(SingletonConnectionReader parent, Message requestMessage)
- : base(requestMessage, parent.transportSettings.CloseTimeout, parent.transportSettings.SendTimeout)
- {
- this.parent = parent;
- this.connection = parent.connection;
- this.settings = parent.transportSettings;
- }
- protected override void OnAbort()
- {
- this.parent.Abort();
- }
- protected override void OnClose(TimeSpan timeout)
- {
- this.parent.Close(timeout);
- }
- protected override void OnReply(Message message, TimeSpan timeout)
- {
- ICompressedMessageEncoder compressedMessageEncoder = this.settings.MessageEncoderFactory.Encoder as ICompressedMessageEncoder;
- if (compressedMessageEncoder != null && compressedMessageEncoder.CompressionEnabled)
- {
- compressedMessageEncoder.AddCompressedMessageProperties(message, this.parent.ContentType);
- }
- timeoutHelper = new TimeoutHelper(timeout);
- StreamingConnectionHelper.WriteMessage(message, this.connection, false, this.settings, ref timeoutHelper);
- parent.DoneSending(timeoutHelper.RemainingTime());
- }
- protected override IAsyncResult OnBeginReply(Message message, TimeSpan timeout, AsyncCallback callback, object state)
- {
- ICompressedMessageEncoder compressedMessageEncoder = this.settings.MessageEncoderFactory.Encoder as ICompressedMessageEncoder;
- if (compressedMessageEncoder != null && compressedMessageEncoder.CompressionEnabled)
- {
- compressedMessageEncoder.AddCompressedMessageProperties(message, this.parent.ContentType);
- }
- timeoutHelper = new TimeoutHelper(timeout);
- return StreamingConnectionHelper.BeginWriteMessage(message, this.connection, false, this.settings,
- ref timeoutHelper, callback, state);
- }
- protected override void OnEndReply(IAsyncResult result)
- {
- StreamingConnectionHelper.EndWriteMessage(result);
- parent.DoneSending(timeoutHelper.RemainingTime());
- }
- }
- // ensures that the reader is notified at end-of-stream, and takes care of the framing chunk headers
- class SingletonInputConnectionStream : ConnectionStream
- {
- SingletonMessageDecoder decoder;
- SingletonConnectionReader reader;
- bool atEof;
- byte[] chunkBuffer; // used for when we have overflow
- int chunkBufferOffset;
- int chunkBufferSize;
- int chunkBytesRemaining;
- public SingletonInputConnectionStream(SingletonConnectionReader reader, IConnection connection,
- IDefaultCommunicationTimeouts defaultTimeouts)
- : base(connection, defaultTimeouts)
- {
- this.reader = reader;
- this.decoder = new SingletonMessageDecoder(reader.StreamPosition);
- this.chunkBytesRemaining = 0;
- this.chunkBuffer = new byte[IntEncoder.MaxEncodedSize];
- }
- void AbortReader()
- {
- this.reader.Abort();
- }
- public override void Close()
- {
- this.reader.DoneReceiving(this.atEof);
- }
- // run chunk data through the decoder
- void DecodeData(byte[] buffer, int offset, int size)
- {
- while (size > 0)
- {
- int bytesRead = decoder.Decode(buffer, offset, size);
- offset += bytesRead;
- size -= bytesRead;
- Fx.Assert(decoder.CurrentState == SingletonMessageDecoder.State.ReadingEnvelopeBytes || decoder.CurrentState == SingletonMessageDecoder.State.ChunkEnd, "");
- }
- }
- // run the current data through the decoder to get valid message bytes
- void DecodeSize(byte[] buffer, ref int offset, ref int size)
- {
- while (size > 0)
- {
- int bytesRead = decoder.Decode(buffer, offset, size);
- if (bytesRead > 0)
- {
- offset += bytesRead;
- size -= bytesRead;
- }
- switch (decoder.CurrentState)
- {
- case SingletonMessageDecoder.State.ChunkStart:
- this.chunkBytesRemaining = decoder.ChunkSize;
- // if we have overflow and we're not decoding out of our buffer, copy over
- if (size > 0 && !object.ReferenceEquals(buffer, this.chunkBuffer))
- {
- Fx.Assert(size <= this.chunkBuffer.Length, "");
- Buffer.BlockCopy(buffer, offset, this.chunkBuffer, 0, size);
- this.chunkBufferOffset = 0;
- this.chunkBufferSize = size;
- }
- return;
- case SingletonMessageDecoder.State.End:
- ProcessEof();
- return;
- }
- }
- }
- int ReadCore(byte[] buffer, int offset, int count)
- {
- int bytesRead = -1;
- try
- {
- bytesRead = base.Read(buffer, offset, count);
- if (bytesRead == 0)
- {
- ProcessEof();
- }
- }
- finally
- {
- if (bytesRead == -1) // there was an exception
- {
- AbortReader();
- }
- }
- return bytesRead;
- }
- public override int Read(byte[] buffer, int offset, int count)
- {
- int result = 0;
- while (true)
- {
- if (count == 0)
- {
- return result;
- }
- if (this.atEof)
- {
- return result;
- }
- // first deal with any residual carryover
- if (this.chunkBufferSize > 0)
- {
- int bytesToCopy = Math.Min(chunkBytesRemaining,
- Math.Min(this.chunkBufferSize, count));
- Buffer.BlockCopy(this.chunkBuffer, this.chunkBufferOffset, buffer, offset, bytesToCopy);
- // keep decoder up to date
- DecodeData(this.chunkBuffer, this.chunkBufferOffset, bytesToCopy);
- this.chunkBufferOffset += bytesToCopy;
- this.chunkBufferSize -= bytesToCopy;
- this.chunkBytesRemaining -= bytesToCopy;
- if (this.chunkBytesRemaining == 0 && this.chunkBufferSize > 0)
- {
- DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
- }
- result += bytesToCopy;
- offset += bytesToCopy;
- count -= bytesToCopy;
- }
- else if (chunkBytesRemaining > 0)
- {
- // We're in the middle of a chunk. Try and include the next chunk size as well
- int bytesToRead = count;
- if (int.MaxValue - chunkBytesRemaining >= IntEncoder.MaxEncodedSize)
- {
- bytesToRead = Math.Min(count, chunkBytesRemaining + IntEncoder.MaxEncodedSize);
- }
- int bytesRead = ReadCore(buffer, offset, bytesToRead);
- // keep decoder up to date
- DecodeData(buffer, offset, Math.Min(bytesRead, this.chunkBytesRemaining));
- if (bytesRead > chunkBytesRemaining)
- {
- result += this.chunkBytesRemaining;
- int overflowCount = bytesRead - chunkBytesRemaining;
- int overflowOffset = offset + chunkBytesRemaining;
- this.chunkBytesRemaining = 0;
- // read at least part of the next chunk, and put any overflow in this.chunkBuffer
- DecodeSize(buffer, ref overflowOffset, ref overflowCount);
- }
- else
- {
- result += bytesRead;
- this.chunkBytesRemaining -= bytesRead;
- }
- return result;
- }
- else
- {
- // Final case: we have a new chunk. Read the size, and loop around again
- if (count < IntEncoder.MaxEncodedSize)
- {
- // we don't have space for MaxEncodedSize, so it's worth the copy cost to read into a temp buffer
- this.chunkBufferOffset = 0;
- this.chunkBufferSize = ReadCore(this.chunkBuffer, 0, this.chunkBuffer.Length);
- DecodeSize(this.chunkBuffer, ref this.chunkBufferOffset, ref this.chunkBufferSize);
- }
- else
- {
- int bytesRead = ReadCore(buffer, offset, IntEncoder.MaxEncodedSize);
- int sizeOffset = offset;
- DecodeSize(buffer, ref sizeOffset, ref bytesRead);
- }
- }
- }
- }
- void ProcessEof()
- {
- if (!this.atEof)
- {
- this.atEof = true;
- if (this.chunkBufferSize > 0 || this.chunkBytesRemaining > 0
- || decoder.CurrentState != SingletonMessageDecoder.State.End)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(decoder.CreatePrematureEOFException());
- }
- this.reader.DoneReceiving(true);
- }
- }
- public override IAsyncResult BeginRead(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
- {
- return new ReadAsyncResult(this, buffer, offset, count, callback, state);
- }
- public override int EndRead(IAsyncResult result)
- {
- return ReadAsyncResult.End(result);
- }
- public class ReadAsyncResult : AsyncResult
- {
- SingletonInputConnectionStream parent;
- int result;
- public ReadAsyncResult(SingletonInputConnectionStream parent,
- byte[] buffer, int offset, int count, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.parent = parent;
- //
- this.result = this.parent.Read(buffer, offset, count);
- base.Complete(true);
- }
- public static int End(IAsyncResult result)
- {
- ReadAsyncResult thisPtr = AsyncResult.End<ReadAsyncResult>(result);
- return thisPtr.result;
- }
- }
- }
- }
- static class StreamingConnectionHelper
- {
- public static void WriteMessage(Message message, IConnection connection, bool isRequest,
- IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper)
- {
- byte[] endBytes = null;
- if (message != null)
- {
- MessageEncoder messageEncoder = settings.MessageEncoderFactory.Encoder;
- byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
- bool writeStreamed;
- if (isRequest)
- {
- endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
- writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
- }
- else
- {
- endBytes = SingletonEncoder.EnvelopeEndBytes;
- writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
- }
- if (writeStreamed)
- {
- connection.Write(envelopeStartBytes, 0, envelopeStartBytes.Length, false, timeoutHelper.RemainingTime());
- Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
- Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
- messageEncoder.WriteMessage(message, writeTimeoutStream);
- }
- else
- {
- ArraySegment<byte> messageData = messageEncoder.WriteMessage(message,
- int.MaxValue, settings.BufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
- messageData = SingletonEncoder.EncodeMessageFrame(messageData);
- Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
- envelopeStartBytes.Length);
- connection.Write(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
- messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(), settings.BufferManager);
- }
- }
- else if (isRequest) // context handles response end bytes
- {
- endBytes = SingletonEncoder.EndBytes;
- }
- if (endBytes != null)
- {
- connection.Write(endBytes, 0, endBytes.Length,
- true, timeoutHelper.RemainingTime());
- }
- }
- public static IAsyncResult BeginWriteMessage(Message message, IConnection connection, bool isRequest,
- IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
- AsyncCallback callback, object state)
- {
- return new WriteMessageAsyncResult(message, connection, isRequest, settings, ref timeoutHelper, callback, state);
- }
- public static void EndWriteMessage(IAsyncResult result)
- {
- WriteMessageAsyncResult.End(result);
- }
- // overrides ConnectionStream to add a Framing int at the beginning of each record
- class StreamingOutputConnectionStream : ConnectionStream
- {
- byte[] encodedSize;
- public StreamingOutputConnectionStream(IConnection connection, IDefaultCommunicationTimeouts timeouts)
- : base(connection, timeouts)
- {
- this.encodedSize = new byte[IntEncoder.MaxEncodedSize];
- }
- void WriteChunkSize(int size)
- {
- if (size > 0)
- {
- int bytesEncoded = IntEncoder.Encode(size, encodedSize, 0);
- base.Connection.Write(encodedSize, 0, bytesEncoded, false, TimeSpan.FromMilliseconds(this.WriteTimeout));
- }
- }
- public override IAsyncResult BeginWrite(byte[] buffer, int offset, int count, AsyncCallback callback, object state)
- {
- WriteChunkSize(count);
- return base.BeginWrite(buffer, offset, count, callback, state);
- }
- public override void WriteByte(byte value)
- {
- WriteChunkSize(1);
- base.WriteByte(value);
- }
- public override void Write(byte[] buffer, int offset, int count)
- {
- WriteChunkSize(count);
- base.Write(buffer, offset, count);
- }
- }
- class WriteMessageAsyncResult : AsyncResult
- {
- IConnection connection;
- MessageEncoder encoder;
- BufferManager bufferManager;
- Message message;
- static WaitCallback onWriteBufferedMessage;
- static WaitCallback onWriteStartBytes;
- static Action<object> onWriteStartBytesScheduled;
- static WaitCallback onWriteEndBytes =
- Fx.ThunkCallback(new WaitCallback(OnWriteEndBytes));
- byte[] bufferToFree;
- IConnectionOrientedTransportFactorySettings settings;
- TimeoutHelper timeoutHelper;
- byte[] endBytes;
- public WriteMessageAsyncResult(Message message, IConnection connection, bool isRequest,
- IConnectionOrientedTransportFactorySettings settings, ref TimeoutHelper timeoutHelper,
- AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.connection = connection;
- this.encoder = settings.MessageEncoderFactory.Encoder;
- this.bufferManager = settings.BufferManager;
- this.timeoutHelper = timeoutHelper;
- this.message = message;
- this.settings = settings;
- bool throwing = true;
- bool completeSelf = false;
- if (message == null)
- {
- if (isRequest) // context takes care of the end bytes on Close/reader.EOF
- {
- this.endBytes = SingletonEncoder.EndBytes;
- }
- completeSelf = WriteEndBytes();
- }
- else
- {
- try
- {
- byte[] envelopeStartBytes = SingletonEncoder.EnvelopeStartBytes;
- bool writeStreamed;
- if (isRequest)
- {
- this.endBytes = SingletonEncoder.EnvelopeEndFramingEndBytes;
- writeStreamed = TransferModeHelper.IsRequestStreamed(settings.TransferMode);
- }
- else
- {
- this.endBytes = SingletonEncoder.EnvelopeEndBytes;
- writeStreamed = TransferModeHelper.IsResponseStreamed(settings.TransferMode);
- }
- if (writeStreamed)
- {
- if (onWriteStartBytes == null)
- {
- onWriteStartBytes = Fx.ThunkCallback(new WaitCallback(OnWriteStartBytes));
- }
- AsyncCompletionResult writeStartBytesResult = connection.BeginWrite(envelopeStartBytes, 0, envelopeStartBytes.Length, true,
- timeoutHelper.RemainingTime(), onWriteStartBytes, this);
- if (writeStartBytesResult == AsyncCompletionResult.Completed)
- {
- if (onWriteStartBytesScheduled == null)
- {
- onWriteStartBytesScheduled = new Action<object>(OnWriteStartBytes);
- }
- ActionItem.Schedule(onWriteStartBytesScheduled, this);
- }
- }
- else
- {
- ArraySegment<byte> messageData = settings.MessageEncoderFactory.Encoder.WriteMessage(message,
- int.MaxValue, this.bufferManager, envelopeStartBytes.Length + IntEncoder.MaxEncodedSize);
- messageData = SingletonEncoder.EncodeMessageFrame(messageData);
- this.bufferToFree = messageData.Array;
- Buffer.BlockCopy(envelopeStartBytes, 0, messageData.Array, messageData.Offset - envelopeStartBytes.Length,
- envelopeStartBytes.Length);
- if (onWriteBufferedMessage == null)
- {
- onWriteBufferedMessage = Fx.ThunkCallback(new WaitCallback(OnWriteBufferedMessage));
- }
- AsyncCompletionResult writeBufferedResult =
- connection.BeginWrite(messageData.Array, messageData.Offset - envelopeStartBytes.Length,
- messageData.Count + envelopeStartBytes.Length, true, timeoutHelper.RemainingTime(),
- onWriteBufferedMessage, this);
- if (writeBufferedResult == AsyncCompletionResult.Completed)
- {
- completeSelf = HandleWriteBufferedMessage();
- }
- }
- throwing = false;
- }
- finally
- {
- if (throwing)
- {
- Cleanup();
- }
- }
- }
- if (completeSelf)
- {
- base.Complete(true);
- }
- }
- public static void End(IAsyncResult result)
- {
- AsyncResult.End<WriteMessageAsyncResult>(result);
- }
- void Cleanup()
- {
- if (bufferToFree != null)
- {
- this.bufferManager.ReturnBuffer(bufferToFree);
- }
- }
- bool HandleWriteStartBytes()
- {
- connection.EndWrite();
- Stream connectionStream = new StreamingOutputConnectionStream(connection, settings);
- Stream writeTimeoutStream = new TimeoutStream(connectionStream, ref timeoutHelper);
- this.encoder.WriteMessage(message, writeTimeoutStream);
- return WriteEndBytes();
- }
- bool HandleWriteBufferedMessage()
- {
- this.connection.EndWrite();
- return WriteEndBytes();
- }
- bool WriteEndBytes()
- {
- if (this.endBytes == null)
- {
- Cleanup();
- return true;
- }
- AsyncCompletionResult result = connection.BeginWrite(endBytes, 0,
- endBytes.Length, true, timeoutHelper.RemainingTime(), onWriteEndBytes, this);
- if (result == AsyncCompletionResult.Queued)
- {
- return false;
- }
- return HandleWriteEndBytes();
- }
- bool HandleWriteEndBytes()
- {
- this.connection.EndWrite();
- Cleanup();
- return true;
- }
- static void OnWriteStartBytes(object asyncState)
- {
- OnWriteStartBytesCallbackHelper(asyncState);
- }
- static void OnWriteStartBytesCallbackHelper(object asyncState)
- {
- WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
- Exception completionException = null;
- bool completeSelf = false;
- bool throwing = true;
- try
- {
- completeSelf = thisPtr.HandleWriteStartBytes();
- throwing = false;
- }
- #pragma warning suppress 56500 // [....], transferring exception to another thread
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- completeSelf = true;
- completionException = e;
- }
- finally
- {
- if (throwing)
- {
- thisPtr.Cleanup();
- }
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- static void OnWriteBufferedMessage(object asyncState)
- {
- WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
- Exception completionException = null;
- bool completeSelf = false;
- bool throwing = true;
- try
- {
- completeSelf = thisPtr.HandleWriteBufferedMessage();
- throwing = false;
- }
- #pragma warning suppress 56500 // [....], transferring exception to another thread
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- completeSelf = true;
- completionException = e;
- }
- finally
- {
- if (throwing)
- {
- thisPtr.Cleanup();
- }
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- static void OnWriteEndBytes(object asyncState)
- {
- WriteMessageAsyncResult thisPtr = (WriteMessageAsyncResult)asyncState;
- Exception completionException = null;
- bool completeSelf = false;
- bool success = false;
- try
- {
- completeSelf = thisPtr.HandleWriteEndBytes();
- success = true;
- }
- #pragma warning suppress 56500 // [....], transferring exception to another thread
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- completeSelf = true;
- completionException = e;
- }
- finally
- {
- if (!success)
- {
- thisPtr.Cleanup();
- }
- }
- if (completeSelf)
- {
- thisPtr.Complete(false, completionException);
- }
- }
- }
- }
- }
|