SynchronizedMessageSource.cs 9.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Runtime;
  7. using System.ServiceModel;
  8. using System.Threading;
  9. class SynchronizedMessageSource
  10. {
  11. IMessageSource source;
  12. ThreadNeutralSemaphore sourceLock;
  13. public SynchronizedMessageSource(IMessageSource source)
  14. {
  15. this.source = source;
  16. this.sourceLock = new ThreadNeutralSemaphore(1);
  17. }
  18. public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
  19. {
  20. return new WaitForMessageAsyncResult(this, timeout, callback, state);
  21. }
  22. public bool EndWaitForMessage(IAsyncResult result)
  23. {
  24. return WaitForMessageAsyncResult.End(result);
  25. }
  26. public bool WaitForMessage(TimeSpan timeout)
  27. {
  28. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  29. if (!this.sourceLock.TryEnter(timeoutHelper.RemainingTime()))
  30. {
  31. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  32. new TimeoutException(SR.GetString(SR.WaitForMessageTimedOut, timeout),
  33. ThreadNeutralSemaphore.CreateEnterTimedOutException(timeout)));
  34. }
  35. try
  36. {
  37. return source.WaitForMessage(timeoutHelper.RemainingTime());
  38. }
  39. finally
  40. {
  41. this.sourceLock.Exit();
  42. }
  43. }
  44. public IAsyncResult BeginReceive(TimeSpan timeout, AsyncCallback callback, object state)
  45. {
  46. return new ReceiveAsyncResult(this, timeout, callback, state);
  47. }
  48. public Message EndReceive(IAsyncResult result)
  49. {
  50. return ReceiveAsyncResult.End(result);
  51. }
  52. public Message Receive(TimeSpan timeout)
  53. {
  54. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  55. if (!this.sourceLock.TryEnter(timeoutHelper.RemainingTime()))
  56. {
  57. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  58. new TimeoutException(SR.GetString(SR.ReceiveTimedOut2, timeout),
  59. ThreadNeutralSemaphore.CreateEnterTimedOutException(timeout)));
  60. }
  61. try
  62. {
  63. return source.Receive(timeoutHelper.RemainingTime());
  64. }
  65. finally
  66. {
  67. this.sourceLock.Exit();
  68. }
  69. }
  70. abstract class SynchronizedAsyncResult<T> : AsyncResult
  71. {
  72. T returnValue;
  73. bool exitLock;
  74. SynchronizedMessageSource syncSource;
  75. static FastAsyncCallback onEnterComplete = new FastAsyncCallback(OnEnterComplete);
  76. TimeoutHelper timeoutHelper;
  77. public SynchronizedAsyncResult(SynchronizedMessageSource syncSource, TimeSpan timeout,
  78. AsyncCallback callback, object state)
  79. : base(callback, state)
  80. {
  81. this.syncSource = syncSource;
  82. this.timeoutHelper = new TimeoutHelper(timeout);
  83. if (!syncSource.sourceLock.EnterAsync(this.timeoutHelper.RemainingTime(), onEnterComplete, this))
  84. {
  85. return;
  86. }
  87. exitLock = true;
  88. bool success = false;
  89. bool completeSelf;
  90. try
  91. {
  92. completeSelf = PerformOperation(timeoutHelper.RemainingTime());
  93. success = true;
  94. }
  95. finally
  96. {
  97. if (!success)
  98. {
  99. ExitLock();
  100. }
  101. }
  102. if (completeSelf)
  103. {
  104. CompleteWithUnlock(true);
  105. }
  106. }
  107. protected IMessageSource Source
  108. {
  109. get { return syncSource.source; }
  110. }
  111. protected void SetReturnValue(T returnValue)
  112. {
  113. this.returnValue = returnValue;
  114. }
  115. protected abstract bool PerformOperation(TimeSpan timeout);
  116. void ExitLock()
  117. {
  118. if (exitLock)
  119. {
  120. syncSource.sourceLock.Exit();
  121. exitLock = false;
  122. }
  123. }
  124. protected void CompleteWithUnlock(bool synchronous)
  125. {
  126. CompleteWithUnlock(synchronous, null);
  127. }
  128. protected void CompleteWithUnlock(bool synchronous, Exception exception)
  129. {
  130. ExitLock();
  131. base.Complete(synchronous, exception);
  132. }
  133. public static T End(IAsyncResult result)
  134. {
  135. SynchronizedAsyncResult<T> thisPtr = AsyncResult.End<SynchronizedAsyncResult<T>>(result);
  136. return thisPtr.returnValue;
  137. }
  138. static void OnEnterComplete(object state, Exception asyncException)
  139. {
  140. SynchronizedAsyncResult<T> thisPtr = (SynchronizedAsyncResult<T>)state;
  141. Exception completionException = asyncException;
  142. bool completeSelf;
  143. if (completionException != null)
  144. {
  145. completeSelf = true;
  146. }
  147. else
  148. {
  149. try
  150. {
  151. thisPtr.exitLock = true;
  152. completeSelf = thisPtr.PerformOperation(thisPtr.timeoutHelper.RemainingTime());
  153. }
  154. #pragma warning suppress 56500 // [....], transferring exception to another thread
  155. catch (Exception e)
  156. {
  157. if (Fx.IsFatal(e))
  158. {
  159. throw;
  160. }
  161. completeSelf = true;
  162. completionException = e;
  163. }
  164. }
  165. if (completeSelf)
  166. {
  167. thisPtr.CompleteWithUnlock(false, completionException);
  168. }
  169. }
  170. }
  171. class ReceiveAsyncResult : SynchronizedAsyncResult<Message>
  172. {
  173. static WaitCallback onReceiveComplete = new WaitCallback(OnReceiveComplete);
  174. public ReceiveAsyncResult(SynchronizedMessageSource syncSource, TimeSpan timeout,
  175. AsyncCallback callback, object state)
  176. : base(syncSource, timeout, callback, state)
  177. {
  178. }
  179. protected override bool PerformOperation(TimeSpan timeout)
  180. {
  181. if (Source.BeginReceive(timeout, onReceiveComplete, this) == AsyncReceiveResult.Completed)
  182. {
  183. SetReturnValue(Source.EndReceive());
  184. return true;
  185. }
  186. return false;
  187. }
  188. static void OnReceiveComplete(object state)
  189. {
  190. ReceiveAsyncResult thisPtr = ((ReceiveAsyncResult)state);
  191. Exception completionException = null;
  192. try
  193. {
  194. thisPtr.SetReturnValue(thisPtr.Source.EndReceive());
  195. }
  196. #pragma warning suppress 56500 // [....], transferring exception to another thread
  197. catch (Exception e)
  198. {
  199. if (Fx.IsFatal(e))
  200. {
  201. throw;
  202. }
  203. completionException = e;
  204. }
  205. thisPtr.CompleteWithUnlock(false, completionException);
  206. }
  207. }
  208. class WaitForMessageAsyncResult : SynchronizedAsyncResult<bool>
  209. {
  210. static WaitCallback onWaitForMessageComplete = new WaitCallback(OnWaitForMessageComplete);
  211. public WaitForMessageAsyncResult(SynchronizedMessageSource syncSource, TimeSpan timeout,
  212. AsyncCallback callback, object state)
  213. : base(syncSource, timeout, callback, state)
  214. {
  215. }
  216. protected override bool PerformOperation(TimeSpan timeout)
  217. {
  218. if (Source.BeginWaitForMessage(timeout, onWaitForMessageComplete, this) == AsyncReceiveResult.Completed)
  219. {
  220. SetReturnValue(Source.EndWaitForMessage());
  221. return true;
  222. }
  223. return false;
  224. }
  225. static void OnWaitForMessageComplete(object state)
  226. {
  227. WaitForMessageAsyncResult thisPtr = (WaitForMessageAsyncResult)state;
  228. Exception completionException = null;
  229. try
  230. {
  231. thisPtr.SetReturnValue(thisPtr.Source.EndWaitForMessage());
  232. }
  233. #pragma warning suppress 56500 // [....], transferring exception to another thread
  234. catch (Exception e)
  235. {
  236. if (Fx.IsFatal(e))
  237. {
  238. throw;
  239. }
  240. completionException = e;
  241. }
  242. thisPtr.CompleteWithUnlock(false, completionException);
  243. }
  244. }
  245. }
  246. }