InputChannel.cs 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Collections.Generic;
  7. using System.Runtime;
  8. using System.ServiceModel;
  9. class InputChannel : InputQueueChannel<Message>, IInputChannel
  10. {
  11. EndpointAddress localAddress;
  12. public InputChannel(ChannelManagerBase channelManager, EndpointAddress localAddress)
  13. : base(channelManager)
  14. {
  15. this.localAddress = localAddress;
  16. }
  17. public EndpointAddress LocalAddress
  18. {
  19. get { return localAddress; }
  20. }
  21. public override T GetProperty<T>()
  22. {
  23. if (typeof(T) == typeof(IInputChannel))
  24. {
  25. return (T)(object)this;
  26. }
  27. T baseProperty = base.GetProperty<T>();
  28. if (baseProperty != null)
  29. {
  30. return baseProperty;
  31. }
  32. return default(T);
  33. }
  34. protected override IAsyncResult OnBeginOpen(TimeSpan timeout, AsyncCallback callback, object state)
  35. {
  36. return new CompletedAsyncResult(callback, state);
  37. }
  38. protected override void OnEndOpen(IAsyncResult result)
  39. {
  40. CompletedAsyncResult.End(result);
  41. }
  42. protected override void OnOpen(TimeSpan timeout)
  43. {
  44. }
  45. public virtual Message Receive()
  46. {
  47. return this.Receive(this.DefaultReceiveTimeout);
  48. }
  49. public virtual Message Receive(TimeSpan timeout)
  50. {
  51. if (timeout < TimeSpan.Zero)
  52. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  53. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  54. this.ThrowPending();
  55. return InputChannel.HelpReceive(this, timeout);
  56. }
  57. public virtual IAsyncResult BeginReceive(AsyncCallback callback, object state)
  58. {
  59. return this.BeginReceive(this.DefaultReceiveTimeout, callback, state);
  60. }
  61. public virtual IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
  62. {
  63. if (timeout < TimeSpan.Zero)
  64. {
  65. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  66. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  67. }
  68. this.ThrowPending();
  69. return InputChannel.HelpBeginReceive(this, timeout, callback, state);
  70. }
  71. public Message EndReceive(IAsyncResult result)
  72. {
  73. return InputChannel.HelpEndReceive(result);
  74. }
  75. public virtual bool TryReceive(TimeSpan timeout, out Message message)
  76. {
  77. if (timeout < TimeSpan.Zero)
  78. {
  79. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  80. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  81. }
  82. this.ThrowPending();
  83. return base.Dequeue(timeout, out message);
  84. }
  85. public virtual IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
  86. {
  87. if (timeout < TimeSpan.Zero)
  88. {
  89. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  90. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  91. }
  92. this.ThrowPending();
  93. return base.BeginDequeue(timeout, callback, state);
  94. }
  95. public virtual bool EndTryReceive(IAsyncResult result, out Message message)
  96. {
  97. return base.EndDequeue(result, out message);
  98. }
  99. public bool WaitForMessage(TimeSpan timeout)
  100. {
  101. if (timeout < TimeSpan.Zero)
  102. {
  103. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  104. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  105. }
  106. this.ThrowPending();
  107. return base.WaitForItem(timeout);
  108. }
  109. public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
  110. {
  111. if (timeout < TimeSpan.Zero)
  112. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  113. new ArgumentOutOfRangeException("timeout", timeout, SR.GetString(SR.SFxTimeoutOutOfRange0)));
  114. this.ThrowPending();
  115. return base.BeginWaitForItem(timeout, callback, state);
  116. }
  117. public bool EndWaitForMessage(IAsyncResult result)
  118. {
  119. return base.EndWaitForItem(result);
  120. }
  121. #region static Helpers to convert TryReceive to Receive
  122. internal static Message HelpReceive(IInputChannel channel, TimeSpan timeout)
  123. {
  124. Message message;
  125. if (channel.TryReceive(timeout, out message))
  126. {
  127. return message;
  128. }
  129. else
  130. {
  131. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(CreateReceiveTimedOutException(channel, timeout));
  132. }
  133. }
  134. internal static IAsyncResult HelpBeginReceive(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
  135. {
  136. return new HelpReceiveAsyncResult(channel, timeout, callback, state);
  137. }
  138. internal static Message HelpEndReceive(IAsyncResult result)
  139. {
  140. return HelpReceiveAsyncResult.End(result);
  141. }
  142. class HelpReceiveAsyncResult : AsyncResult
  143. {
  144. IInputChannel channel;
  145. TimeSpan timeout;
  146. static AsyncCallback onReceive = Fx.ThunkCallback(new AsyncCallback(OnReceive));
  147. Message message;
  148. public HelpReceiveAsyncResult(IInputChannel channel, TimeSpan timeout, AsyncCallback callback, object state)
  149. : base(callback, state)
  150. {
  151. this.channel = channel;
  152. this.timeout = timeout;
  153. IAsyncResult result = channel.BeginTryReceive(timeout, onReceive, this);
  154. if (!result.CompletedSynchronously)
  155. {
  156. return;
  157. }
  158. HandleReceiveComplete(result);
  159. base.Complete(true);
  160. }
  161. public static Message End(IAsyncResult result)
  162. {
  163. HelpReceiveAsyncResult thisPtr = AsyncResult.End<HelpReceiveAsyncResult>(result);
  164. return thisPtr.message;
  165. }
  166. void HandleReceiveComplete(IAsyncResult result)
  167. {
  168. if (!this.channel.EndTryReceive(result, out this.message))
  169. {
  170. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  171. InputChannel.CreateReceiveTimedOutException(this.channel, this.timeout));
  172. }
  173. }
  174. static void OnReceive(IAsyncResult result)
  175. {
  176. if (result.CompletedSynchronously)
  177. {
  178. return;
  179. }
  180. HelpReceiveAsyncResult thisPtr = (HelpReceiveAsyncResult)result.AsyncState;
  181. Exception completionException = null;
  182. try
  183. {
  184. thisPtr.HandleReceiveComplete(result);
  185. }
  186. #pragma warning suppress 56500 // [....], transferring exception to another thread
  187. catch (Exception e)
  188. {
  189. if (Fx.IsFatal(e))
  190. {
  191. throw;
  192. }
  193. completionException = e;
  194. }
  195. thisPtr.Complete(false, completionException);
  196. }
  197. }
  198. static Exception CreateReceiveTimedOutException(IInputChannel channel, TimeSpan timeout)
  199. {
  200. if (channel.LocalAddress != null)
  201. {
  202. return new TimeoutException(SR.GetString(SR.ReceiveTimedOut, channel.LocalAddress.Uri.AbsoluteUri, timeout));
  203. }
  204. else
  205. {
  206. return new TimeoutException(SR.GetString(SR.ReceiveTimedOutNoLocalAddress, timeout));
  207. }
  208. }
  209. #endregion
  210. }
  211. }