| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254 |
- //------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //------------------------------------------------------------
- namespace System.ServiceModel.Channels
- {
- using System.Collections.Generic;
- using System.Runtime;
- using System.ServiceModel;
- class InputChannel : InputQueueChannel<Message>, IInputChannel
- {
- EndpointAddress localAddress;
- public InputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress)
- : base(channelManager)
- {
- this.localAddress = localAddress;
- }
- public EndpointAddress LocalAddress
- {
- get { return localAddress; }
- }
- public override T GetProperty<T>()
- {
- if (typeof(T) == typeof(IInputChannel))
- {
- return (T)(object)this;
- }
- T baseProperty = base.GetProperty<T>();
- if (baseProperty != null)
- {
- return baseProperty;
- }
- return default(T);
- }
- protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new CompletedAsyncResult(callback, state);
- }
- protected override void OnEndOpen(IAsyncResult result)
- {
- CompletedAsyncResult.End(result);
- }
- protected override void OnOpen(TimeSpan timeout)
- {
- }
- public virtual Message Receive()
- {
- return this.Receive(this.DefaultReceiveTimeout);
- }
- public virtual Message Receive(TimeSpan timeout)
- {
- if (timeout < TimeSpan.Zero)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
- this.ThrowPending();
- return InputChannel.HelpReceive(this, timeout);
- }
- public virtual IAsyncResult BeginReceive(AsyncCallback callback, object state)
- {
- return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
- }
- public virtual IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
- {
- if (timeout < TimeSpan.Zero)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
- }
- this.ThrowPending();
- return InputChannel.HelpBeginReceive(this, timeout, callback, state);
- }
- public Message EndReceive(IAsyncResult result)
- {
- return InputChannel.HelpEndReceive(result);
- }
- public virtual bool TryReceive(TimeSpan timeout, out Message message)
- {
- if (timeout < TimeSpan.Zero)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
- }
- this.ThrowPending();
- return base.Dequeue(timeout, out message);
- }
- public virtual IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
- {
- if (timeout < TimeSpan.Zero)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
- }
- this.ThrowPending();
- return base.BeginDequeue(timeout, callback, state);
- }
- public virtual bool EndTryReceive(IAsyncResult result, out Message message)
- {
- return base.EndDequeue(result, out message);
- }
- public bool WaitForMessage(TimeSpan timeout)
- {
- if (timeout < TimeSpan.Zero)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
- }
- this.ThrowPending();
- return base.WaitForItem(timeout);
- }
- public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
- {
- if (timeout < TimeSpan.Zero)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
- this.ThrowPending();
- return base.BeginWaitForItem(timeout, callback, state);
- }
- public bool EndWaitForMessage(IAsyncResult result)
- {
- return base.EndWaitForItem(result);
- }
- #region static Helpers to convert TryReceive to Receive
- internal static Message HelpReceive(IInputChannel channel, TimeSpan timeout)
- {
- Message message;
- if (channel.TryReceive(timeout, out message))
- {
- return message;
- }
- else
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(CreateReceiveTimedOutException(channel, timeout));
- }
- }
- internal static IAsyncResult HelpBeginReceive(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
- {
- return new HelpReceiveAsyncResult(channel, timeout, callback, state);
- }
- internal static Message HelpEndReceive(IAsyncResult result)
- {
- return HelpReceiveAsyncResult.End(result);
- }
- class HelpReceiveAsyncResult : AsyncResult
- {
- IInputChannel channel;
- TimeSpan timeout;
- static AsyncCallback onReceive = Fx.ThunkCallback(new AsyncCallback(OnReceive));
- Message message;
- public HelpReceiveAsyncResult(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
- : base(callback, state)
- {
- this.channel = channel;
- this.timeout = timeout;
- IAsyncResult result = channel.BeginTryReceive(timeout, onReceive, this);
- if (!result.CompletedSynchronously)
- {
- return;
- }
- HandleReceiveComplete(result);
- base.Complete(true);
- }
- public static Message End(IAsyncResult result)
- {
- HelpReceiveAsyncResult thisPtr = AsyncResult.End<HelpReceiveAsyncResult>(result);
- return thisPtr.message;
- }
- void HandleReceiveComplete(IAsyncResult result)
- {
- if (!this.channel.EndTryReceive(result, out this.message))
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- InputChannel.CreateReceiveTimedOutException(this.channel, this.timeout));
- }
- }
- static void OnReceive(IAsyncResult result)
- {
- if (result.CompletedSynchronously)
- {
- return;
- }
- HelpReceiveAsyncResult thisPtr = (HelpReceiveAsyncResult)result.AsyncState;
- Exception completionException = null;
- try
- {
- thisPtr.HandleReceiveComplete(result);
- }
- #pragma warning suppress 56500 // [....], transferring exception to another thread
- catch (Exception e)
- {
- if (Fx.IsFatal(e))
- {
- throw;
- }
- completionException = e;
- }
- thisPtr.Complete(false, completionException);
- }
- }
- static Exception CreateReceiveTimedOutException(IInputChannel channel, TimeSpan timeout)
- {
- if (channel.LocalAddress != null)
- {
- return new TimeoutException(SR.GetString(SR.ReceiveTimedOut, channel.LocalAddress.Uri.AbsoluteUri, timeout));
- }
- else
- {
- return new TimeoutException(SR.GetString(SR.ReceiveTimedOutNoLocalAddress, timeout));
- }
- }
- #endregion
- }
- }
|