| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386 |
- //-----------------------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //-----------------------------------------------------------------------------
- namespace System.ServiceModel.Dispatcher
- {
- using System;
- using System.Collections.Generic;
- using System.ServiceModel.Diagnostics;
- using System.Runtime;
- using System.ServiceModel.Channels;
- using System.Threading;
- class MultipleReceiveBinder : IChannelBinder
- {
- internal static class MultipleReceiveDefaults
- {
- internal const int MaxPendingReceives = 1;
- }
- static AsyncCallback onInnerReceiveCompleted = Fx.ThunkCallback(OnInnerReceiveCompleted);
- MultipleReceiveAsyncResult outstanding;
- IChannelBinder channelBinder;
- ReceiveScopeQueue pendingResults;
- bool ordered;
- public MultipleReceiveBinder(IChannelBinder channelBinder, int size, bool ordered)
- {
- this.ordered = ordered;
- this.channelBinder = channelBinder;
- this.pendingResults = new ReceiveScopeQueue(size);
- }
- public IChannel Channel
- {
- get { return this.channelBinder.Channel; }
- }
- public bool HasSession
- {
- get { return this.channelBinder.HasSession; }
- }
- public Uri ListenUri
- {
- get { return this.channelBinder.ListenUri; }
- }
- public EndpointAddress LocalAddress
- {
- get { return this.channelBinder.LocalAddress; }
- }
- public EndpointAddress RemoteAddress
- {
- get { return this.channelBinder.RemoteAddress; }
- }
- public void Abort()
- {
- this.channelBinder.Abort();
- }
- public void CloseAfterFault(TimeSpan timeout)
- {
- this.channelBinder.CloseAfterFault(timeout);
- }
- public bool TryReceive(TimeSpan timeout, out RequestContext requestContext)
- {
- return this.channelBinder.TryReceive(timeout, out requestContext);
- }
- public IAsyncResult BeginTryReceive(TimeSpan timeout, AsyncCallback callback, object state)
- {
- // At anytime there can be only one thread in BeginTryReceive and the
- // outstanding AsyncResult should have completed before the next one.
- // There should be no pending oustanding result here.
- Fx.AssertAndThrow(this.outstanding == null, "BeginTryReceive should not have a pending result.");
- MultipleReceiveAsyncResult multipleReceiveResult = new MultipleReceiveAsyncResult(callback, state);
- this.outstanding = multipleReceiveResult;
- EnsurePump(timeout);
- IAsyncResult innerResult;
- if (this.pendingResults.TryDequeueHead(out innerResult))
- {
- HandleReceiveRequestComplete(innerResult, true);
- }
- return multipleReceiveResult;
- }
- void EnsurePump(TimeSpan timeout)
- {
- // ensure we're running at full throttle, the BeginTryReceive calls we make below on the
- // IChannelBinder will typically complete future calls to BeginTryReceive made by CannelHandler
- // corollary to that is that most times these calls will be completed sycnhronously
- while (!this.pendingResults.IsFull)
- {
- ReceiveScopeSignalGate receiveScope = new ReceiveScopeSignalGate(this);
- // Enqueue the result without locks since this is the pump.
- // BeginTryReceive can be called only from one thread and
- // the head is not yet unlocked so no items can proceed.
- this.pendingResults.Enqueue(receiveScope);
- IAsyncResult result = this.channelBinder.BeginTryReceive(timeout, onInnerReceiveCompleted, receiveScope);
- if (result.CompletedSynchronously)
- {
- this.SignalReceiveCompleted(result);
- }
- }
- }
- static void OnInnerReceiveCompleted(IAsyncResult nestedResult)
- {
- if (nestedResult.CompletedSynchronously)
- {
- return;
- }
- ReceiveScopeSignalGate thisPtr = nestedResult.AsyncState as ReceiveScopeSignalGate;
- thisPtr.Binder.HandleReceiveAndSignalCompletion(nestedResult, false);
- }
- void HandleReceiveAndSignalCompletion(IAsyncResult nestedResult, bool completedSynchronosly)
- {
- if (SignalReceiveCompleted(nestedResult))
- {
- HandleReceiveRequestComplete(nestedResult, completedSynchronosly);
- }
- }
- private bool SignalReceiveCompleted(IAsyncResult nestedResult)
- {
- if (this.ordered)
- {
- // Ordered recevies can proceed only if its own gate has
- // been unlocked. Head is the only gate unlocked and only the
- // result that owns the is the gate at the head can proceed.
- return this.pendingResults.TrySignal((ReceiveScopeSignalGate)nestedResult.AsyncState, nestedResult);
- }
- else
- {
- // Unordered receives can proceed with any gate. If the is head
- // is not unlocked by BeginTryReceive then the result will
- // be put on the last pending gate.
- return this.pendingResults.TrySignalPending(nestedResult);
- }
- }
- void HandleReceiveRequestComplete(IAsyncResult innerResult, bool completedSynchronously)
- {
- MultipleReceiveAsyncResult receiveResult = this.outstanding;
- Exception completionException = null;
- try
- {
- Fx.AssertAndThrow(receiveResult != null, "HandleReceive invoked without an outstanding result");
- // Cleanup states
- this.outstanding = null;
- // set the context on the outer result for the ChannelHandler.
- RequestContext context;
- receiveResult.Valid = this.channelBinder.EndTryReceive(innerResult, out context);
- receiveResult.RequestContext = context;
- }
- catch (Exception ex)
- {
- if (Fx.IsFatal(ex))
- {
- throw;
- }
- completionException = ex;
- }
- receiveResult.Complete(completedSynchronously, completionException);
- }
- public bool EndTryReceive(IAsyncResult result, out RequestContext requestContext)
- {
- return MultipleReceiveAsyncResult.End(result, out requestContext);
- }
- public RequestContext CreateRequestContext(Message message)
- {
- return this.channelBinder.CreateRequestContext(message);
- }
- public void Send(Message message, TimeSpan timeout)
- {
- this.channelBinder.Send(message, timeout);
- }
- public IAsyncResult BeginSend(Message message, TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.channelBinder.BeginSend(message, timeout, callback, state);
- }
- public void EndSend(IAsyncResult result)
- {
- this.channelBinder.EndSend(result);
- }
- public Message Request(Message message, TimeSpan timeout)
- {
- return this.channelBinder.Request(message, timeout);
- }
- public IAsyncResult BeginRequest(Message message, TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.channelBinder.BeginRequest(message, timeout, callback, state);
- }
- public Message EndRequest(IAsyncResult result)
- {
- return this.channelBinder.EndRequest(result);
- }
- public bool WaitForMessage(TimeSpan timeout)
- {
- return this.channelBinder.WaitForMessage(timeout);
- }
- public IAsyncResult BeginWaitForMessage(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return this.channelBinder.BeginWaitForMessage(timeout, callback, state);
- }
- public bool EndWaitForMessage(IAsyncResult result)
- {
- return this.channelBinder.EndWaitForMessage(result);
- }
- class MultipleReceiveAsyncResult : AsyncResult
- {
- public MultipleReceiveAsyncResult(AsyncCallback callback, object state)
- : base(callback, state)
- {
- }
- public bool Valid
- {
- get;
- set;
- }
- public RequestContext RequestContext
- {
- get;
- set;
- }
- public new void Complete(bool completedSynchronously, Exception completionException)
- {
- base.Complete(completedSynchronously, completionException);
- }
- public static bool End(IAsyncResult result, out RequestContext context)
- {
- MultipleReceiveAsyncResult thisPtr = AsyncResult.End<MultipleReceiveAsyncResult>(result);
- context = thisPtr.RequestContext;
- return thisPtr.Valid;
- }
- }
- class ReceiveScopeSignalGate : SignalGate<IAsyncResult>
- {
- public ReceiveScopeSignalGate(MultipleReceiveBinder binder)
- {
- this.Binder = binder;
- }
- public MultipleReceiveBinder Binder
- {
- get;
- private set;
- }
- }
- class ReceiveScopeQueue
- {
- // This class is a circular queue with 2 pointers for pending items and head.
- // Ordered Receives : The head is unlocked by BeginTryReceive. The ReceiveGate can signal only the
- // the gate that it owns. If the gate is the head then it will proceed.
- // Unordered Receives: Any pending item can be signalled. The pending index keeps track
- // of results that haven't been completed. If the head is unlocked then it will proceed.
- int pending;
- int head;
- int count;
- readonly int size;
- ReceiveScopeSignalGate[] items;
- public ReceiveScopeQueue(int size)
- {
- this.size = size;
- this.head = 0;
- this.count = 0;
- this.pending = 0;
- items = new ReceiveScopeSignalGate[size];
- }
- internal bool IsFull
- {
- get { return this.count == this.size; }
- }
- internal void Enqueue(ReceiveScopeSignalGate receiveScope)
- {
- // This should only be called from EnsurePump which itself should only be
- // BeginTryReceive. This makes sure that we don't need locks to enqueue an item.
- Fx.AssertAndThrow(this.count < this.size, "Cannot Enqueue into a full queue.");
- this.items[(this.head + this.count) % this.size] = receiveScope;
- count++;
- }
- void Dequeue()
- {
- // Dequeue should not be called outside a signal/unlock boundary.
- // There are no locks as this boundary ensures that only one thread
- // Tries to dequeu an item either in the unlock or Signal thread.
- Fx.AssertAndThrow(this.count > 0, "Cannot Dequeue and empty queue.");
- this.items[head] = null;
- this.head = (head + 1) % this.size;
- this.count--;
- }
- internal bool TryDequeueHead(out IAsyncResult result)
- {
- // Invoked only from BeginTryReceive as only the main thread can
- // dequeue the head and is Successful only if it's already been signaled and completed.
- Fx.AssertAndThrow(this.count > 0, "Cannot unlock item when queue is empty");
- if (this.items[head].Unlock(out result))
- {
- this.Dequeue();
- return true;
- }
- return false;
- }
- public bool TrySignal(ReceiveScopeSignalGate scope, IAsyncResult nestedResult)
- {
- // Ordered receives can only signal the gate that the AsyncResult owns.
- // If the head has already been unlocked then it can proceed.
- if (scope.Signal(nestedResult))
- {
- Dequeue();
- return true;
- }
- return false;
- }
- public bool TrySignalPending(IAsyncResult result)
- {
- // free index will wrap around and always return the next free index;
- // Only the head of the queue can proceed as the head would be unlocked by
- // BeginTryReceive. All other requests will just submit their completed result.
- int nextPending = GetNextPending();
- if (this.items[nextPending].Signal(result))
- {
- Dequeue();
- return true;
- }
- return false;
- }
- int GetNextPending()
- {
- int slot = this.pending;
- while (true)
- {
- if (slot == (slot = Interlocked.CompareExchange(ref this.pending, (slot + 1) % this.size, slot)))
- {
- return slot;
- }
- }
- }
- }
- }
- }
|