2
0

MultipleReceiveBinder.cs 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386
  1. //-----------------------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //-----------------------------------------------------------------------------
  4. namespace System.ServiceModel.Dispatcher
  5. {
  6. using System;
  7. using System.Collections.Generic;
  8. using System.ServiceModel.Diagnostics;
  9. using System.Runtime;
  10. using System.ServiceModel.Channels;
  11. using System.Threading;
  12. class MultipleReceiveBinder : IChannelBinder
  13. {
  14. internal static class MultipleReceiveDefaults
  15. {
  16. internal const int MaxPendingReceives = 1;
  17. }
  18. static AsyncCallback onInnerReceiveCompleted = Fx.ThunkCallback(OnInnerReceiveCompleted);
  19. MultipleReceiveAsyncResult outstanding;
  20. IChannelBinder channelBinder;
  21. ReceiveScopeQueue pendingResults;
  22. bool ordered;
  23. public MultipleReceiveBinder(IChannelBinder channelBinder, int size, bool ordered)
  24. {
  25. this.ordered = ordered;
  26. this.channelBinder = channelBinder;
  27. this.pendingResults = new ReceiveScopeQueue(size);
  28. }
  29. public IChannel Channel
  30. {
  31. get { return this.channelBinder.Channel; }
  32. }
  33. public bool HasSession
  34. {
  35. get { return this.channelBinder.HasSession; }
  36. }
  37. public Uri ListenUri
  38. {
  39. get { return this.channelBinder.ListenUri; }
  40. }
  41. public EndpointAddress LocalAddress
  42. {
  43. get { return this.channelBinder.LocalAddress; }
  44. }
  45. public EndpointAddress RemoteAddress
  46. {
  47. get { return this.channelBinder.RemoteAddress; }
  48. }
  49. public void Abort()
  50. {
  51. this.channelBinder.Abort();
  52. }
  53. public void CloseAfterFault(TimeSpan timeout)
  54. {
  55. this.channelBinder.CloseAfterFault(timeout);
  56. }
  57. public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
  58. {
  59. return this.channelBinder.TryReceive(timeout, out requestContext);
  60. }
  61. public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
  62. {
  63. // At anytime there can be only one thread in BeginTryReceive and the
  64. // outstanding AsyncResult should have completed before the next one.
  65. // There should be no pending oustanding result here.
  66. Fx.AssertAndThrow(this.outstanding == null, "BeginTryReceive should not have a pending result.");
  67. MultipleReceiveAsyncResult multipleReceiveResult = new MultipleReceiveAsyncResult(callback, state);
  68. this.outstanding = multipleReceiveResult;
  69. EnsurePump(timeout);
  70. IAsyncResult innerResult;
  71. if (this.pendingResults.TryDequeueHead(out innerResult))
  72. {
  73. HandleReceiveRequestComplete(innerResult, true);
  74. }
  75. return multipleReceiveResult;
  76. }
  77. void EnsurePump(TimeSpan timeout)
  78. {
  79. // ensure we're running at full throttle, the BeginTryReceive calls we make below on the
  80. // IChannelBinder will typically complete future calls to BeginTryReceive made by CannelHandler
  81. // corollary to that is that most times these calls will be completed sycnhronously
  82. while (!this.pendingResults.IsFull)
  83. {
  84. ReceiveScopeSignalGate receiveScope = new ReceiveScopeSignalGate(this);
  85. // Enqueue the result without locks since this is the pump.
  86. // BeginTryReceive can be called only from one thread and
  87. // the head is not yet unlocked so no items can proceed.
  88. this.pendingResults.Enqueue(receiveScope);
  89. IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, onInnerReceiveCompleted, receiveScope);
  90. if (result.CompletedSynchronously)
  91. {
  92. this.SignalReceiveCompleted(result);
  93. }
  94. }
  95. }
  96. static void OnInnerReceiveCompleted(IAsyncResult nestedResult)
  97. {
  98. if (nestedResult.CompletedSynchronously)
  99. {
  100. return;
  101. }
  102. ReceiveScopeSignalGate thisPtr = nestedResult.AsyncState as ReceiveScopeSignalGate;
  103. thisPtr.Binder.HandleReceiveAndSignalCompletion(nestedResult, false);
  104. }
  105. void HandleReceiveAndSignalCompletion(IAsyncResult nestedResult, bool completedSynchronosly)
  106. {
  107. if (SignalReceiveCompleted(nestedResult))
  108. {
  109. HandleReceiveRequestComplete(nestedResult, completedSynchronosly);
  110. }
  111. }
  112. private bool SignalReceiveCompleted(IAsyncResult nestedResult)
  113. {
  114. if (this.ordered)
  115. {
  116. // Ordered recevies can proceed only if its own gate has
  117. // been unlocked. Head is the only gate unlocked and only the
  118. // result that owns the is the gate at the head can proceed.
  119. return this.pendingResults.TrySignal((ReceiveScopeSignalGate)nestedResult.AsyncState, nestedResult);
  120. }
  121. else
  122. {
  123. // Unordered receives can proceed with any gate. If the is head
  124. // is not unlocked by BeginTryReceive then the result will
  125. // be put on the last pending gate.
  126. return this.pendingResults.TrySignalPending(nestedResult);
  127. }
  128. }
  129. void HandleReceiveRequestComplete(IAsyncResult innerResult, bool completedSynchronously)
  130. {
  131. MultipleReceiveAsyncResult receiveResult = this.outstanding;
  132. Exception completionException = null;
  133. try
  134. {
  135. Fx.AssertAndThrow(receiveResult != null, "HandleReceive invoked without an outstanding result");
  136. // Cleanup states
  137. this.outstanding = null;
  138. // set the context on the outer result for the ChannelHandler.
  139. RequestContext context;
  140. receiveResult.Valid = this.channelBinder.EndTryReceive(innerResult, out context);
  141. receiveResult.RequestContext = context;
  142. }
  143. catch (Exception ex)
  144. {
  145. if (Fx.IsFatal(ex))
  146. {
  147. throw;
  148. }
  149. completionException = ex;
  150. }
  151. receiveResult.Complete(completedSynchronously, completionException);
  152. }
  153. public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
  154. {
  155. return MultipleReceiveAsyncResult.End(result, out requestContext);
  156. }
  157. public RequestContext CreateRequestContext(Message message)
  158. {
  159. return this.channelBinder.CreateRequestContext(message);
  160. }
  161. public void Send(Message message, TimeSpan timeout)
  162. {
  163. this.channelBinder.Send(message, timeout);
  164. }
  165. public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  166. {
  167. return this.channelBinder.BeginSend(message, timeout, callback, state);
  168. }
  169. public void EndSend(IAsyncResult result)
  170. {
  171. this.channelBinder.EndSend(result);
  172. }
  173. public Message Request(Message message, TimeSpan timeout)
  174. {
  175. return this.channelBinder.Request(message, timeout);
  176. }
  177. public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
  178. {
  179. return this.channelBinder.BeginRequest(message, timeout, callback, state);
  180. }
  181. public Message EndRequest(IAsyncResult result)
  182. {
  183. return this.channelBinder.EndRequest(result);
  184. }
  185. public bool WaitForMessage(TimeSpan timeout)
  186. {
  187. return this.channelBinder.WaitForMessage(timeout);
  188. }
  189. public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
  190. {
  191. return this.channelBinder.BeginWaitForMessage(timeout, callback, state);
  192. }
  193. public bool EndWaitForMessage(IAsyncResult result)
  194. {
  195. return this.channelBinder.EndWaitForMessage(result);
  196. }
  197. class MultipleReceiveAsyncResult : AsyncResult
  198. {
  199. public MultipleReceiveAsyncResult(AsyncCallback callback, object state)
  200. : base(callback, state)
  201. {
  202. }
  203. public bool Valid
  204. {
  205. get;
  206. set;
  207. }
  208. public RequestContext RequestContext
  209. {
  210. get;
  211. set;
  212. }
  213. public new void Complete(bool completedSynchronously, Exception completionException)
  214. {
  215. base.Complete(completedSynchronously, completionException);
  216. }
  217. public static bool End(IAsyncResult result, out RequestContext context)
  218. {
  219. MultipleReceiveAsyncResult thisPtr = AsyncResult.End<MultipleReceiveAsyncResult>(result);
  220. context = thisPtr.RequestContext;
  221. return thisPtr.Valid;
  222. }
  223. }
  224. class ReceiveScopeSignalGate : SignalGate<IAsyncResult>
  225. {
  226. public ReceiveScopeSignalGate(MultipleReceiveBinder binder)
  227. {
  228. this.Binder = binder;
  229. }
  230. public MultipleReceiveBinder Binder
  231. {
  232. get;
  233. private set;
  234. }
  235. }
  236. class ReceiveScopeQueue
  237. {
  238. // This class is a circular queue with 2 pointers for pending items and head.
  239. // Ordered Receives : The head is unlocked by BeginTryReceive. The ReceiveGate can signal only the
  240. // the gate that it owns. If the gate is the head then it will proceed.
  241. // Unordered Receives: Any pending item can be signalled. The pending index keeps track
  242. // of results that haven't been completed. If the head is unlocked then it will proceed.
  243. int pending;
  244. int head;
  245. int count;
  246. readonly int size;
  247. ReceiveScopeSignalGate[] items;
  248. public ReceiveScopeQueue(int size)
  249. {
  250. this.size = size;
  251. this.head = 0;
  252. this.count = 0;
  253. this.pending = 0;
  254. items = new ReceiveScopeSignalGate[size];
  255. }
  256. internal bool IsFull
  257. {
  258. get { return this.count == this.size; }
  259. }
  260. internal void Enqueue(ReceiveScopeSignalGate receiveScope)
  261. {
  262. // This should only be called from EnsurePump which itself should only be
  263. // BeginTryReceive. This makes sure that we don't need locks to enqueue an item.
  264. Fx.AssertAndThrow(this.count < this.size, "Cannot Enqueue into a full queue.");
  265. this.items[(this.head + this.count) % this.size] = receiveScope;
  266. count++;
  267. }
  268. void Dequeue()
  269. {
  270. // Dequeue should not be called outside a signal/unlock boundary.
  271. // There are no locks as this boundary ensures that only one thread
  272. // Tries to dequeu an item either in the unlock or Signal thread.
  273. Fx.AssertAndThrow(this.count > 0, "Cannot Dequeue and empty queue.");
  274. this.items[head] = null;
  275. this.head = (head + 1) % this.size;
  276. this.count--;
  277. }
  278. internal bool TryDequeueHead(out IAsyncResult result)
  279. {
  280. // Invoked only from BeginTryReceive as only the main thread can
  281. // dequeue the head and is Successful only if it's already been signaled and completed.
  282. Fx.AssertAndThrow(this.count > 0, "Cannot unlock item when queue is empty");
  283. if (this.items[head].Unlock(out result))
  284. {
  285. this.Dequeue();
  286. return true;
  287. }
  288. return false;
  289. }
  290. public bool TrySignal(ReceiveScopeSignalGate scope, IAsyncResult nestedResult)
  291. {
  292. // Ordered receives can only signal the gate that the AsyncResult owns.
  293. // If the head has already been unlocked then it can proceed.
  294. if (scope.Signal(nestedResult))
  295. {
  296. Dequeue();
  297. return true;
  298. }
  299. return false;
  300. }
  301. public bool TrySignalPending(IAsyncResult result)
  302. {
  303. // free index will wrap around and always return the next free index;
  304. // Only the head of the queue can proceed as the head would be unlocked by
  305. // BeginTryReceive. All other requests will just submit their completed result.
  306. int nextPending = GetNextPending();
  307. if (this.items[nextPending].Signal(result))
  308. {
  309. Dequeue();
  310. return true;
  311. }
  312. return false;
  313. }
  314. int GetNextPending()
  315. {
  316. int slot = this.pending;
  317. while (true)
  318. {
  319. if (slot == (slot = Interlocked.CompareExchange(ref this.pending, (slot + 1) % this.size, slot)))
  320. {
  321. return slot;
  322. }
  323. }
  324. }
  325. }
  326. }
  327. }