||
- //------------------------------------------------------------
- // Copyright (c) Microsoft Corporation. All rights reserved.
- //------------------------------------------------------------
- namespace System.ServiceModel.Channels
- {
- using System.Collections;
- using System.Collections.Generic;
- using System.Diagnostics;
- using System.IO;
- using System.Net;
- using System.Runtime;
- using System.Runtime.Serialization;
- using System.ServiceModel;
- using System.ServiceModel.Diagnostics;
- using System.ServiceModel.Dispatcher;
- using System.ServiceModel.Security;
- using System.Text;
- using System.Threading;
- using System.Xml;
- partial class PeerNodeImplementation : IPeerNodeMessageHandling
- {
- const int maxViaSize = 4096;
- public delegate void MessageAvailableCallback(Message message);
- // configuration
- int connectTimeout;
- IPAddress listenIPAddress;
- Uri listenUri;
- int port;
- long maxReceivedMessageSize;
- int minNeighbors;
- int idealNeighbors;
- int maxNeighbors;
- int maxReferrals;
- string meshId;
- PeerMessagePropagationFilter messagePropagationFilter;
- SynchronizationContext messagePropagationFilterContext;
- int maintainerInterval = PeerTransportConstants.MaintainerInterval; // milliseconds before a maintainer kicks in
- PeerResolver resolver;
- PeerNodeConfig config;
- PeerSecurityManager securityManager;
- internal MessageEncodingBindingElement EncodingElement;
- // internal state
- ManualResetEvent connectCompletedEvent; // raised when maintainer has connected or given up
- MessageEncoder encoder; // used for encoding internal messages
- // Double-checked locking pattern requires volatile for read/write synchronization
- volatile bool isOpen;
- Exception openException; // exception to be thrown from Open
- Dictionary<object, MessageFilterRegistration> messageFilters;
- int refCount; // number of factories/channels that are using this instance
- SimpleStateManager stateManager; // manages open/close operations
- object thisLock = new Object();
- PeerNodeTraceRecord traceRecord;
- PeerNodeTraceRecord completeTraceRecord; // contains address info as well
- // primary infrastructure components
- internal PeerConnector connector; // Purely for testing do not take a internal dependency on this
- PeerMaintainer maintainer;
- internal PeerFlooder flooder; // Purely for testing do not take an internal dependency on this
- PeerNeighborManager neighborManager;
- PeerIPHelper ipHelper;
- PeerService service;
- object resolverRegistrationId;
- bool registered;
- public event EventHandler Offline;
- public event EventHandler Online;
- Dictionary<Uri, RefCountedSecurityProtocol> uri2SecurityProtocol;
- Dictionary<Type, object> serviceHandlers;
- BufferManager bufferManager = null;
- internal static byte[] DefaultId = new byte[0];
- XmlDictionaryReaderQuotas readerQuotas;
- long maxBufferPoolSize;
- internal int MaxSendQueue = 128, MaxReceiveQueue = 128;
- public PeerNodeImplementation()
- {
- // intialize default configuration
- connectTimeout = PeerTransportConstants.ConnectTimeout;
- maxReceivedMessageSize = TransportDefaults.MaxReceivedMessageSize;
- minNeighbors = PeerTransportConstants.MinNeighbors;
- idealNeighbors = PeerTransportConstants.IdealNeighbors;
- maxNeighbors = PeerTransportConstants.MaxNeighbors;
- maxReferrals = PeerTransportConstants.MaxReferrals;
- port = PeerTransportDefaults.Port;
- // initialize internal state
- connectCompletedEvent = new ManualResetEvent(false);
- encoder = new BinaryMessageEncodingBindingElement().CreateMessageEncoderFactory().Encoder;
- messageFilters = new Dictionary<object, MessageFilterRegistration>();
- stateManager = new SimpleStateManager(this);
- uri2SecurityProtocol = new Dictionary<Uri, RefCountedSecurityProtocol>();
- readerQuotas = new XmlDictionaryReaderQuotas();
- this.maxBufferPoolSize = TransportDefaults.MaxBufferPoolSize;
- }
- // To facilitate testing
- public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosed;
- public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosing;
- public event EventHandler NeighborConnected;
- public event EventHandler NeighborOpened;
- public event EventHandler Aborted;
- public PeerNodeConfig Config
- {
- get
- {
- return this.config;
- }
- private set
- {
- Fx.Assert(value != null, "PeerNodeImplementation.Config can not be set to null");
- this.config = value;
- }
- }
- public bool IsOnline
- {
- get
- {
- lock (ThisLock)
- {
- if (isOpen)
- return neighborManager.IsOnline;
- else
- return false;
- }
- }
- }
- internal bool IsOpen
- {
- get { return isOpen; }
- }
- public IPAddress ListenIPAddress
- {
- get { return listenIPAddress; }
- set
- {
- // No validation necessary at this point. When the service is opened, it will throw if the IP address is invalid
- lock (ThisLock)
- {
- ThrowIfOpen();
- listenIPAddress = value;
- }
- }
- }
- public Uri ListenUri
- {
- get { return listenUri; }
- set
- {
- if (value == null)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("value");
- if (value.Scheme != PeerStrings.Scheme)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("value", SR.GetString(SR.InvalidUriScheme,
- value.Scheme, PeerStrings.Scheme));
- }
- Fx.Assert(value.PathAndQuery == "/", "PeerUriCannotContainPath");
- lock (ThisLock)
- {
- ThrowIfOpen();
- listenUri = value;
- }
- }
- }
- public long MaxBufferPoolSize
- {
- get { return maxBufferPoolSize; }
- set
- {
- lock (ThisLock)
- {
- ThrowIfOpen();
- maxBufferPoolSize = value;
- }
- }
- }
- public long MaxReceivedMessageSize
- {
- get { return maxReceivedMessageSize; }
- set
- {
- if (!(value >= PeerTransportConstants.MinMessageSize))
- {
- throw Fx.AssertAndThrow("invalid MaxReceivedMessageSize");
- }
- lock (ThisLock)
- {
- ThrowIfOpen();
- maxReceivedMessageSize = value;
- }
- }
- }
- public string MeshId
- {
- get
- {
- lock (ThisLock)
- {
- ThrowIfNotOpen();
- return meshId;
- }
- }
- }
- public PeerMessagePropagationFilter MessagePropagationFilter
- {
- get { return messagePropagationFilter; }
- set
- {
- lock (ThisLock)
- {
- // null is ok and causes optimised flooding codepath
- messagePropagationFilter = value;
- messagePropagationFilterContext = ThreadBehavior.GetCurrentSynchronizationContext();
- }
- }
- }
- // Made internal to facilitate testing
- public PeerNeighborManager NeighborManager
- {
- get { return neighborManager; }
- }
- public ulong NodeId
- {
- get
- {
- ThrowIfNotOpen();
- return config.NodeId;
- }
- }
- public int Port
- {
- get { return port; }
- set
- {
- lock (ThisLock)
- {
- ThrowIfOpen();
- port = value;
- }
- }
- }
- public int ListenerPort
- {
- get
- {
- ThrowIfNotOpen();
- return config.ListenerPort;
- }
- }
- public XmlDictionaryReaderQuotas ReaderQuotas
- {
- get
- {
- return this.readerQuotas;
- }
- }
- public PeerResolver Resolver
- {
- get { return resolver; }
- set
- {
- Fx.Assert(value != null, "null Resolver");
- lock (ThisLock)
- {
- ThrowIfOpen();
- resolver = value;
- }
- }
- }
- public PeerSecurityManager SecurityManager
- {
- get { return this.securityManager; }
- set { this.securityManager = value; }
- }
- internal PeerService Service
- {
- get
- {
- return this.service;
- }
- set
- {
- lock (ThisLock)
- {
- ThrowIfNotOpen();
- this.service = value;
- }
- }
- }
- object ThisLock
- {
- get { return thisLock; }
- }
- public void Abort()
- {
- stateManager.Abort();
- }
- public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
- {
- return stateManager.BeginClose(timeout, callback, state);
- }
- public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object state, bool waitForOnline)
- {
- return stateManager.BeginOpen(timeout, callback, state, waitForOnline);
- }
- public Guid ProcessOutgoingMessage(Message message, Uri via)
- {
- Guid result = Guid.NewGuid();
- System.Xml.UniqueId messageId = new System.Xml.UniqueId(result);
- if (-1 != message.Headers.FindHeader(PeerStrings.MessageId, PeerStrings.Namespace))
- PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerStrings.MessageId);
- if (-1 != message.Headers.FindHeader(PeerOperationNames.PeerTo, PeerStrings.Namespace))
- PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.PeerTo);
- if (-1 != message.Headers.FindHeader(PeerOperationNames.PeerVia, PeerStrings.Namespace))
- PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.PeerVia);
- if (-1 != message.Headers.FindHeader(PeerOperationNames.Flood, PeerStrings.Namespace, PeerOperationNames.Demuxer))
- PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.Flood);
- message.Headers.Add(PeerDictionaryHeader.CreateMessageIdHeader(messageId));
- message.Properties.Via = via;
- message.Headers.Add(MessageHeader.CreateHeader(PeerOperationNames.PeerTo, PeerStrings.Namespace, message.Headers.To));
- message.Headers.Add(PeerDictionaryHeader.CreateViaHeader(via));
- message.Headers.Add(PeerDictionaryHeader.CreateFloodRole());
- return result;
- }
- public void SecureOutgoingMessage(ref Message message, Uri via, TimeSpan timeout, SecurityProtocol securityProtocol)
- {
- if (securityProtocol != null)
- {
- securityProtocol.SecureOutgoingMessage(ref message, timeout);
- }
- }
- public IAsyncResult BeginSend(object registrant, Message message, Uri via,
- ITransportFactorySettings settings, TimeSpan timeout, AsyncCallback callback, object state, SecurityProtocol securityProtocol)
- {
- PeerFlooder localFlooder;
- int factoryMaxReceivedMessageSize;
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- MessageBuffer messageBuffer = null;
- Message securedMessage = null;
- ulong hopcount = PeerTransportConstants.MaxHopCount;
- PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
- int messageSize = (int)-1;
- byte[] id;
- SendAsyncResult result = new SendAsyncResult(callback, state);
- AsyncCallback onFloodComplete = Fx.ThunkCallback(new AsyncCallback(result.OnFloodComplete));
- try
- {
- lock (ThisLock)
- {
- ThrowIfNotOpen();
- localFlooder = flooder;
- }
- // we know this will fit in an int because of our MaxReceivedMessageSize restrictions
- factoryMaxReceivedMessageSize = (int)Math.Min(maxReceivedMessageSize, settings.MaxReceivedMessageSize);
- Guid guid = ProcessOutgoingMessage(message, via);
- SecureOutgoingMessage(ref message, via, timeout, securityProtocol);
- if ((message is SecurityAppliedMessage))
- {
- ArraySegment<byte> buffer = encoder.WriteMessage(message, int.MaxValue, bufferManager);
- securedMessage = encoder.ReadMessage(buffer, bufferManager);
- id = (message as SecurityAppliedMessage).PrimarySignatureValue;
- messageSize = (int)buffer.Count;
- }
- else
- {
- securedMessage = message;
- id = guid.ToByteArray();
- }
- messageBuffer = securedMessage.CreateBufferedCopy(factoryMaxReceivedMessageSize);
- string contentType = settings.MessageEncoderFactory.Encoder.ContentType;
- if (this.messagePropagationFilter != null)
- {
- using (Message filterMessage = messageBuffer.CreateMessage())
- {
- propagateFlags = ((IPeerNodeMessageHandling)this).DetermineMessagePropagation(filterMessage, PeerMessageOrigination.Local);
- }
- }
- if ((propagateFlags & PeerMessagePropagation.Remote) != PeerMessagePropagation.None)
- {
- if (hopcount == 0)
- propagateFlags &= ~PeerMessagePropagation.Remote;
- }
- // flood it out
- IAsyncResult ar = null;
- if ((propagateFlags & PeerMessagePropagation.Remote) != 0)
- {
- ar = localFlooder.BeginFloodEncodedMessage(id, messageBuffer, timeoutHelper.RemainingTime(), onFloodComplete, null);
- if (DiagnosticUtility.ShouldTraceVerbose)
- {
- TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerChannelMessageSent, SR.GetString(SR.TraceCodePeerChannelMessageSent), this, message);
- }
- }
- else
- {
- ar = new CompletedAsyncResult(onFloodComplete, null);
- }
- if (ar == null)
- {
- Fx.Assert("SendAsyncResult must have an Async Result for onFloodComplete");
- }
- // queue up the pre-encoded message for local channels
- if ((propagateFlags & PeerMessagePropagation.Local) != 0)
- {
- using (Message msg = messageBuffer.CreateMessage())
- {
- int i = msg.Headers.FindHeader(SecurityJan2004Strings.Security, SecurityJan2004Strings.Namespace);
- if (i >= 0)
- {
- msg.Headers.AddUnderstood(i);
- }
- using (MessageBuffer clientBuffer = msg.CreateBufferedCopy(factoryMaxReceivedMessageSize))
- {
- DeliverMessageToClientChannels(registrant, clientBuffer, via, message.Headers.To, contentType, messageSize, -1, null);
- }
- }
- }
- result.OnLocalDispatchComplete(result);
- }
- finally
- {
- message.Close();
- if (securedMessage != null)
- securedMessage.Close();
- if (messageBuffer != null)
- messageBuffer.Close();
- }
- return result;
- }
- public void Close(TimeSpan timeout)
- {
- stateManager.Close(timeout);
- }
- void CloseCore(TimeSpan timeout, bool graceful)
- {
- PeerService lclService;
- PeerMaintainer lclMaintainer;
- PeerNeighborManager lclNeighborManager;
- PeerConnector lclConnector;
- PeerIPHelper lclIPHelper;
- PeerNodeConfig lclConfig;
- PeerFlooder lclFlooder;
- Exception exception = null;
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- if (DiagnosticUtility.ShouldTraceInformation)
- {
- TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeClosing, SR.GetString(SR.TraceCodePeerNodeClosing), this.traceRecord, this, null);
- }
- lock (ThisLock)
- {
- isOpen = false;
- lclMaintainer = maintainer;
- lclNeighborManager = neighborManager;
- lclConnector = connector;
- lclIPHelper = ipHelper;
- lclService = service;
- lclConfig = config;
- lclFlooder = flooder;
- }
- // only unregister if we are doing a g----ful shutdown
- try
- {
- if (graceful)
- {
- UnregisterAddress(timeout);
- }
- else
- {
- if (lclConfig != null)
- {
- ActionItem.Schedule(new Action<object>(UnregisterAddress), lclConfig.UnregisterTimeout);
- }
- }
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- try
- {
- if (lclConnector != null)
- lclConnector.Closing();
- if (lclService != null)
- {
- try
- {
- lclService.Abort();
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- }
- if (lclMaintainer != null)
- {
- try
- {
- lclMaintainer.Close();
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- }
- if (lclIPHelper != null)
- {
- try
- {
- lclIPHelper.Close();
- lclIPHelper.AddressChanged -= new EventHandler(stateManager.OnIPAddressesChanged);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- }
- if (lclNeighborManager != null)
- {
- lclNeighborManager.NeighborConnected -= new EventHandler(OnNeighborConnected);
- lclNeighborManager.NeighborOpened -= new EventHandler(this.securityManager.OnNeighborOpened);
- this.securityManager.OnNeighborAuthenticated -= new EventHandler(this.OnNeighborAuthenticated);
- lclNeighborManager.Online -= new EventHandler(FireOnline);
- lclNeighborManager.Offline -= new EventHandler(FireOffline);
- try
- {
- lclNeighborManager.Shutdown(graceful, timeoutHelper.RemainingTime());
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- // unregister for neighbor close events once shutdown has completed
- lclNeighborManager.NeighborClosed -= new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosed);
- lclNeighborManager.NeighborClosing -= new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosing);
- lclNeighborManager.Close();
- }
- if (lclConnector != null)
- {
- try
- {
- lclConnector.Close();
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- }
- if (lclFlooder != null)
- {
- try
- {
- lclFlooder.Close();
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- }
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- if (exception == null) exception = e;
- }
- // reset object for next call to open
- EventHandler abortedHandler = null;
- lock (ThisLock)
- {
- // clear out old components (so they can be garbage collected)
- neighborManager = null;
- connector = null;
- maintainer = null;
- flooder = null;
- ipHelper = null;
- service = null;
- // reset generated config
- config = null;
- meshId = null;
- abortedHandler = Aborted;
- }
- // Notify anyone who is interested that abort has occured
- if (!graceful && abortedHandler != null)
- {
- try
- {
- abortedHandler(this, EventArgs.Empty);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- if (exception == null) exception = e;
- }
- }
- if (DiagnosticUtility.ShouldTraceInformation)
- {
- TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeClosed, SR.GetString(SR.TraceCodePeerNodeClosed), this.traceRecord, this, null);
- }
- if (exception != null && graceful == true) // Swallows all non fatal exceptions during Abort
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(exception);
- }
- }
- // Performs case-insensitive comparison of two vias
- bool CompareVia(Uri via1, Uri via2)
- {
- return (Uri.Compare(via1, via2,
- (UriComponents.Scheme | UriComponents.UserInfo | UriComponents.Host | UriComponents.Port | UriComponents.Path),
- UriFormat.SafeUnescaped, StringComparison.OrdinalIgnoreCase) == 0);
- }
- public static void EndClose(IAsyncResult result)
- {
- if (result == null)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
- SimpleStateManager.EndClose(result);
- }
- public static void EndOpen(IAsyncResult result)
- {
- if (result == null)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
- SimpleStateManager.EndOpen(result);
- }
- public static void EndSend(IAsyncResult result)
- {
- if (result == null)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
- SendAsyncResult.End(result);
- }
- // Necessary to allow access of the EventHandlers which can only be done from inside the class
- void FireOffline(object sender, EventArgs e)
- {
- if (!isOpen)
- {
- return;
- }
- EventHandler handler = Offline;
- if (handler != null)
- {
- handler(this, EventArgs.Empty);
- }
- }
- // Necessary to allow access of the EventHandlers which can only be done from inside the class
- void FireOnline(object sender, EventArgs e)
- {
- if (!isOpen)
- {
- return;
- }
- EventHandler handler = Online;
- if (handler != null)
- {
- handler(this, EventArgs.Empty);
- }
- }
- // static Uri -> PeerNode mapping
- static internal Dictionary<Uri, PeerNodeImplementation> peerNodes = new Dictionary<Uri, PeerNodeImplementation>();
- internal static PeerNodeImplementation Get(Uri listenUri)
- {
- PeerNodeImplementation node = null;
- if (!TryGet(listenUri, out node))
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new InvalidOperationException(SR.GetString(SR.NoTransportManagerForUri, listenUri)));
- }
- return node;
- }
- internal protected static bool TryGet(Uri listenUri, out PeerNodeImplementation result)
- {
- if (listenUri == null)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenUri");
- }
- if (listenUri.Scheme != PeerStrings.Scheme)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("listenUri", SR.GetString(SR.InvalidUriScheme,
- listenUri.Scheme, PeerStrings.Scheme));
- }
- result = null;
- bool success = false;
- // build base uri
- Uri baseUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
- lock (peerNodes)
- {
- if (peerNodes.ContainsKey(baseUri))
- {
- result = peerNodes[baseUri];
- success = true;
- }
- }
- return success;
- }
- public static bool TryGet(string meshId, out PeerNodeImplementation result)
- {
- UriBuilder uriBuilder = new UriBuilder();
- uriBuilder.Host = meshId;
- uriBuilder.Scheme = PeerStrings.Scheme;
- bool success = PeerNodeImplementation.TryGet(uriBuilder.Uri, out result);
- return success;
- }
- // internal method to return an existing PeerNode or create a new one with the given settings
- public static PeerNodeImplementation Get(Uri listenUri, Registration registration)
- {
- if (listenUri == null)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenUri");
- if (listenUri.Scheme != PeerStrings.Scheme)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("listenUri", SR.GetString(SR.InvalidUriScheme,
- listenUri.Scheme, PeerStrings.Scheme));
- }
- // build base uri
- Uri baseUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
- lock (peerNodes)
- {
- PeerNodeImplementation peerNodeImpl = null;
- PeerNodeImplementation peerNode = null;
- if (peerNodes.TryGetValue(baseUri, out peerNode))
- {
- peerNodeImpl = (PeerNodeImplementation)peerNode;
- // ensure that the PeerNode is compatible
- registration.CheckIfCompatible(peerNodeImpl, listenUri);
- peerNodeImpl.refCount++;
- return peerNodeImpl;
- }
- // create a new PeerNode, and add it to the dictionary
- peerNodeImpl = registration.CreatePeerNode();
- peerNodes[baseUri] = peerNodeImpl;
- peerNodeImpl.refCount = 1;
- return peerNodeImpl;
- }
- }
- // SimpleStateManager callback - Called on final release of PeerNode.
- void InternalClose(TimeSpan timeout, bool graceful)
- {
- CloseCore(timeout, graceful);
- lock (ThisLock)
- {
- messageFilters.Clear();
- }
- }
- protected void OnAbort()
- {
- InternalClose(TimeSpan.FromTicks(0), false);
- }
- protected void OnClose(TimeSpan timeout)
- {
- InternalClose(timeout, true);
- }
- // called when the maintainer has completed the connection attempt (successful or not)
- void OnConnectionAttemptCompleted(Exception e)
- {
- // store the exception if one occured when trying to connect, so that it can be rethrown from Open
- Fx.Assert(openException == null, "OnConnectionAttemptCompleted twice");
- openException = e;
- if (openException == null && DiagnosticUtility.ShouldTraceInformation)
- {
- TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeOpened, SR.GetString(SR.TraceCodePeerNodeOpened), this.completeTraceRecord, this, null);
- }
- else if (openException != null && DiagnosticUtility.ShouldTraceError)
- {
- TraceUtility.TraceEvent(TraceEventType.Error, TraceCode.PeerNodeOpenFailed, SR.GetString(SR.TraceCodePeerNodeOpenFailed), this.completeTraceRecord, this, e);
- }
- connectCompletedEvent.Set();
- }
- bool IPeerNodeMessageHandling.ValidateIncomingMessage(ref Message message, Uri via)
- {
- SecurityProtocol protocol = null;
- if (via == null)
- {
- Fx.Assert("FloodMessage doesn't contain Via header!");
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerMessageMustHaveVia, message.Headers.Action)));
- }
- if (TryGetSecurityProtocol(via, out protocol))
- {
- protocol.VerifyIncomingMessage(ref message, ServiceDefaults.SendTimeout, null);
- return true;
- }
- return false;
- }
- internal bool TryGetSecurityProtocol(Uri via, out SecurityProtocol protocol)
- {
- lock (ThisLock)
- {
- RefCountedSecurityProtocol wrapper = null;
- bool result = false;
- protocol = null;
- if (uri2SecurityProtocol.TryGetValue(via, out wrapper))
- {
- protocol = wrapper.Protocol;
- result = true;
- }
- return result;
- }
- }
- void IPeerNodeMessageHandling.HandleIncomingMessage(MessageBuffer messageBuffer, PeerMessagePropagation propagateFlags,
- int index, MessageHeader hopHeader, Uri via, Uri to)
- {
- if (DiagnosticUtility.ShouldTraceVerbose)
- {
- TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerFloodedMessageReceived, SR.GetString(SR.TraceCodePeerFloodedMessageReceived), this.traceRecord, this, null);
- }
- if (via == null)
- {
- Fx.Assert("No VIA in the forwarded message!");
- using (Message message = messageBuffer.CreateMessage())
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerMessageMustHaveVia, message.Headers.Action)));
- }
- }
- if ((propagateFlags & PeerMessagePropagation.Local) != 0)
- {
- DeliverMessageToClientChannels(null, messageBuffer, via, to, messageBuffer.MessageContentType, (int)maxReceivedMessageSize, index, hopHeader);
- messageBuffer = null;
- }
- else
- {
- if (DiagnosticUtility.ShouldTraceVerbose)
- {
- using (Message traceMessage = messageBuffer.CreateMessage())
- {
- TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerFloodedMessageNotPropagated, SR.GetString(SR.TraceCodePeerFloodedMessageNotPropagated), this.traceRecord, this, null, traceMessage);
- }
- }
- }
- }
- PeerMessagePropagation IPeerNodeMessageHandling.DetermineMessagePropagation(Message message, PeerMessageOrigination origination)
- {
- PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
- PeerMessagePropagationFilter filter = MessagePropagationFilter;
- if (filter != null)
- {
- try
- {
- SynchronizationContext context = messagePropagationFilterContext;
- if (context != null)
- {
- context.Send(delegate(object state) { propagateFlags = filter.ShouldMessagePropagate(message, origination); }, null);
- }
- else
- {
- propagateFlags = filter.ShouldMessagePropagate(message, origination);
- }
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperCallback(SR.GetString(SR.MessagePropagationException), e);
- }
- }
- // Don't flood if the Node is closed
- if (!isOpen)
- {
- propagateFlags = PeerMessagePropagation.None;
- }
- return propagateFlags;
- }
- // Queued callback to actually process the address change
- // The design is such that any address change notifications are queued just like Open/Close operations.
- // So, we need not worry about address changes racing with other address changes or Open/Close operations.
- // Abort can happen at any time. However, Abort skips unregistering addresses, so this method doesn't have
- // to worry about undoing its work if Abort happens.
- void OnIPAddressChange()
- {
- string lclMeshId = null;
- PeerNodeAddress nodeAddress = null;
- object lclResolverRegistrationId = null;
- bool lclRegistered = false;
- PeerIPHelper lclIPHelper = ipHelper;
- PeerNodeConfig lclconfig = config;
- bool processChange = false;
- TimeoutHelper timeoutHelper = new TimeoutHelper(ServiceDefaults.SendTimeout);
- // Determine if IP addresses have really changed before notifying the resolver
- // since it is possible that another change notification ahead of this one in the queue
- // may have already completed notifying the resolver of the most current change.
- if (lclIPHelper != null && config != null)
- {
- nodeAddress = lclconfig.GetListenAddress(false);
- processChange = lclIPHelper.AddressesChanged(nodeAddress.IPAddresses);
- if (processChange)
- {
- // Build the nodeAddress with the updated IP addresses
- nodeAddress = new PeerNodeAddress(
- nodeAddress.EndpointAddress, lclIPHelper.GetLocalAddresses());
- }
- }
- lock (ThisLock)
- {
- // Skip processing if the node isn't open anymore or if addresses haven't changed
- if (processChange && isOpen)
- {
- lclMeshId = meshId;
- lclResolverRegistrationId = resolverRegistrationId;
- lclRegistered = registered;
- config.SetListenAddress(nodeAddress);
- completeTraceRecord = new PeerNodeTraceRecord(config.NodeId, meshId, nodeAddress);
- }
- else
- {
- return;
- }
- }
- //#57954 - log and ---- non-critical exceptions during network change event notifications
- try
- {
- // Do we have any addresses? If so, update or re-register. Otherwise, unregister.
- if (nodeAddress.IPAddresses.Count > 0)
- {
- if (lclRegistered)
- {
- resolver.Update(lclResolverRegistrationId, nodeAddress, timeoutHelper.RemainingTime());
- }
- else
- {
- RegisterAddress(lclMeshId, nodeAddress, timeoutHelper.RemainingTime());
- }
- }
- else
- {
- UnregisterAddress(timeoutHelper.RemainingTime());
- }
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
- }
- PingConnections();
- if (DiagnosticUtility.ShouldTraceInformation)
- {
- TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeAddressChanged, SR.GetString(SR.TraceCodePeerNodeAddressChanged), this.completeTraceRecord, this, null);
- }
- }
- // Register with the resolver
- void RegisterAddress(string lclMeshId, PeerNodeAddress nodeAddress, TimeSpan timeout)
- {
- // Register only if we have any addresses
- if (nodeAddress.IPAddresses.Count > 0)
- {
- object lclResolverRegistrationId = null;
- try
- {
- lclResolverRegistrationId = resolver.Register(lclMeshId, nodeAddress, timeout);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.ResolverException), e));
- }
- lock (ThisLock)
- {
- if (!(!registered))
- {
- throw Fx.AssertAndThrow("registered expected to be false");
- }
- registered = true;
- resolverRegistrationId = lclResolverRegistrationId;
- }
- }
- }
- // Unregister that should only be called from non-user threads.
- //since this is invoked on background threads, we log and ---- all non-critical exceptions
- //#57972
- void UnregisterAddress(object timeout)
- {
- try
- {
- UnregisterAddress((TimeSpan)timeout);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
- }
- }
- void UnregisterAddress(TimeSpan timeout)
- {
- bool needToUnregister = false;
- object lclResolverRegistrationId = null;
- lock (ThisLock)
- {
- if (registered)
- {
- needToUnregister = true;
- lclResolverRegistrationId = resolverRegistrationId;
- registered = false; // this ensures that the current thread will do unregistration
- }
- resolverRegistrationId = null;
- }
- if (needToUnregister)
- {
- try
- {
- resolver.Unregister(lclResolverRegistrationId, timeout);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.ResolverException), e));
- }
- }
- }
- void OnNeighborClosed(object sender, PeerNeighborCloseEventArgs e)
- {
- IPeerNeighbor neighbor = (IPeerNeighbor)sender;
- PeerConnector localConnector;
- PeerMaintainer localMaintainer;
- PeerFlooder localFlooder;
- localConnector = connector;
- localMaintainer = maintainer;
- localFlooder = flooder;
- UtilityExtension.OnNeighborClosed(neighbor);
- PeerChannelAuthenticatorExtension.OnNeighborClosed(neighbor);
- if (localConnector != null)
- localConnector.OnNeighborClosed(neighbor);
- if (localMaintainer != null)
- localMaintainer.OnNeighborClosed(neighbor);
- if (localFlooder != null)
- localFlooder.OnNeighborClosed(neighbor);
- // Finally notify any Peernode client
- EventHandler<PeerNeighborCloseEventArgs> handler = NeighborClosed;
- if (handler != null)
- {
- handler(this, e);
- }
- }
- void OnNeighborClosing(object sender, PeerNeighborCloseEventArgs e)
- {
- IPeerNeighbor neighbor = (IPeerNeighbor)sender;
- PeerConnector localConnector;
- localConnector = connector;
- if (localConnector != null)
- localConnector.OnNeighborClosing(neighbor, e.Reason);
- // Finally notify any Peernode client
- EventHandler<PeerNeighborCloseEventArgs> handler = NeighborClosing;
- if (handler != null)
- {
- handler(this, e);
- }
- }
- void OnNeighborConnected(object sender, EventArgs e)
- {
- IPeerNeighbor neighbor = (IPeerNeighbor)sender;
- PeerMaintainer localMaintainer = maintainer;
- PeerFlooder localFlooder = flooder;
- if (localFlooder != null)
- localFlooder.OnNeighborConnected(neighbor);
- if (localMaintainer != null)
- localMaintainer.OnNeighborConnected(neighbor);
- UtilityExtension.OnNeighborConnected(neighbor);
- // Finally notify any Peernode client
- EventHandler handler = NeighborConnected;
- if (handler != null)
- {
- handler(this, EventArgs.Empty);
- }
- }
- // raised by the neighbor manager when any connection has reached the opened state
- void OnNeighborAuthenticated(object sender, EventArgs e)
- {
- IPeerNeighbor n = (IPeerNeighbor)sender;
- //hand the authenticated neighbor over to connector.
- //If neighbor is aborted before
- PeerConnector localConnector = connector;
- if (localConnector != null)
- connector.OnNeighborAuthenticated(n);
- // Finally notify any Peernode client
- EventHandler handler = NeighborOpened;
- if (handler != null)
- {
- handler(this, EventArgs.Empty);
- }
- }
- // Open blocks the thread until either Online happens or Open times out.
- void OnOpen(TimeSpan timeout, bool waitForOnline)
- {
- bool aborted = false;
- EventHandler connectedHandler = delegate(object source, EventArgs args) { connectCompletedEvent.Set(); };
- EventHandler abortHandler = delegate(object source, EventArgs args) { aborted = true; connectCompletedEvent.Set(); };
- openException = null; // clear out the open exception from the last Open attempt
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- try
- {
- NeighborConnected += connectedHandler;
- Aborted += abortHandler;
- OpenCore(timeout);
- if (waitForOnline)
- {
- if (!TimeoutHelper.WaitOne(connectCompletedEvent, timeoutHelper.RemainingTime()))
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException());
- }
- }
- if (aborted)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.PeerNodeAborted)));
- }
- // retrieve listen addresses and register with the resolver
- if (isOpen)
- {
- if (openException != null)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(openException);
- }
- else
- {
- string lclMeshId = null;
- PeerNodeConfig lclConfig = null;
- lock (ThisLock)
- {
- lclMeshId = meshId;
- lclConfig = config;
- }
- // The design is such that any address change notifications are queued behind Open operation
- // So, we need not worry about address changes racing with the initial registration.
- RegisterAddress(lclMeshId, lclConfig.GetListenAddress(false), timeoutHelper.RemainingTime());
- }
- }
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- CloseCore(TimeSpan.FromTicks(0), false);
- throw;
- }
- finally
- {
- NeighborConnected -= connectedHandler;
- Aborted -= abortHandler;
- }
- }
- internal void Open(TimeSpan timeout, bool waitForOnline)
- {
- stateManager.Open(timeout, waitForOnline);
- }
- // the core functionality of open (all but waiting for a connection)
- void OpenCore(TimeSpan timeout)
- {
- TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
- PeerMaintainer lclMaintainer;
- PeerNodeConfig lclConfig;
- string lclMeshId;
- lock (ThisLock)
- {
- if (ListenUri == null)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.ListenUriNotSet, this.GetType())));
- }
- // extract mesh id from listen uri
- meshId = ListenUri.Host;
- // generate the node id
- byte[] bytes = new byte[sizeof(ulong)];
- ulong nodeId = 0;
- do
- {
- System.ServiceModel.Security.CryptoHelper.FillRandomBytes(bytes);
- for (int i = 0; i < sizeof(ulong); i++)
- nodeId |= ((ulong)bytes[i]) << i * 8;
- }
- while (nodeId == PeerTransportConstants.InvalidNodeId);
- // now that the node id has been generated, create the trace record that describes this
- traceRecord = new PeerNodeTraceRecord(nodeId, meshId);
- if (DiagnosticUtility.ShouldTraceInformation)
- {
- TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeOpening, SR.GetString(SR.TraceCodePeerNodeOpening), this.traceRecord, this, null);
- }
- // create the node configuration
- config = new PeerNodeConfig(meshId,
- nodeId,
- resolver,
- messagePropagationFilter,
- encoder,
- ListenUri, listenIPAddress, port,
- maxReceivedMessageSize, minNeighbors, idealNeighbors, maxNeighbors, maxReferrals,
- connectTimeout, maintainerInterval,
- securityManager,
- this.readerQuotas,
- this.maxBufferPoolSize,
- this.MaxSendQueue,
- this.MaxReceiveQueue);
- // create components
- if (listenIPAddress != null)
- ipHelper = new PeerIPHelper(listenIPAddress);
- else
- ipHelper = new PeerIPHelper();
- bufferManager = BufferManager.CreateBufferManager(64 * config.MaxReceivedMessageSize, (int)config.MaxReceivedMessageSize);
- neighborManager = new PeerNeighborManager(ipHelper,
- config,
- this);
- flooder = PeerFlooder.CreateFlooder(config, neighborManager, this);
- maintainer = new PeerMaintainer(config, neighborManager, flooder);
- connector = new PeerConnector(config, neighborManager, maintainer);
- Dictionary<Type, object> services = serviceHandlers;
- if (services == null)
- {
- services = new Dictionary<Type, object>();
- services.Add(typeof(IPeerConnectorContract), connector);
- services.Add(typeof(IPeerFlooderContract<Message, UtilityInfo>), flooder);
- }
- service = new PeerService(this.config,
- neighborManager.ProcessIncomingChannel,
- neighborManager.GetNeighborFromProxy,
- services,
- this);
- this.securityManager.MeshId = this.meshId;
- service.Open(timeoutHelper.RemainingTime());
- // register for events
- neighborManager.NeighborClosed += new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosed);
- neighborManager.NeighborClosing += new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosing);
- neighborManager.NeighborConnected += new EventHandler(OnNeighborConnected);
- neighborManager.NeighborOpened += new EventHandler(this.SecurityManager.OnNeighborOpened);
- this.securityManager.OnNeighborAuthenticated += new EventHandler(this.OnNeighborAuthenticated);
- neighborManager.Online += new EventHandler(FireOnline);
- neighborManager.Offline += new EventHandler(FireOffline);
- ipHelper.AddressChanged += new EventHandler(stateManager.OnIPAddressesChanged);
- // open components
- ipHelper.Open();
- // Set the listen address before opening any more components
- PeerNodeAddress nodeAddress = new PeerNodeAddress(service.GetListenAddress(), ipHelper.GetLocalAddresses());
- config.SetListenAddress(nodeAddress);
- neighborManager.Open(service.Binding, service);
- connector.Open();
- maintainer.Open();
- flooder.Open();
- isOpen = true;
- completeTraceRecord = new PeerNodeTraceRecord(nodeId, meshId, nodeAddress);
- // Set these locals inside the lock (Abort may occur whilst Opening)
- lclMaintainer = maintainer;
- lclMeshId = meshId;
- lclConfig = config;
- openException = null;
- }
- // retrieve listen addresses and register with the resolver
- if (isOpen)
- {
- // attempt to connect to the mesh
- lclMaintainer.ScheduleConnect(new PeerMaintainer.ConnectCallback(OnConnectionAttemptCompleted));
- }
- }
- void DeliverMessageToClientChannels(
- object registrant,
- MessageBuffer messageBuffer,
- Uri via,
- Uri peerTo,
- string contentType,
- int messageSize,
- int index,
- MessageHeader hopHeader)
- {
- Message message = null;
- try
- {
- // create a list of callbacks so they can each be called outside the lock
- ArrayList callbacks = new ArrayList();
- Uri to = peerTo;
- Fx.Assert(peerTo != null, "Invalid To header value!");
- if (isOpen)
- {
- lock (ThisLock)
- {
- if (isOpen)
- {
- foreach (MessageFilterRegistration mfr in messageFilters.Values)
- {
- // first, the via's must match
- bool match = CompareVia(via, mfr.via);
- if (messageSize < 0)
- {
- //messageSize <0 indicates that this message is coming from BeginSend
- //and the size is not computed yet.
- if (message == null)
- {
- message = messageBuffer.CreateMessage();
- Fx.Assert(message.Headers.To == to, "To Header is inconsistent in Send() case!");
- Fx.Assert(message.Properties.Via == via, "Via property is inconsistent in Send() case!");
- }
- //incoming message need not be verified MaxReceivedSize
- //only do this for local channels
- if (registrant != null)
- {
- ArraySegment<byte> buffer = encoder.WriteMessage(message, int.MaxValue, bufferManager);
- messageSize = (int)buffer.Count;
- }
- }
- // only queue the message for registrants expecting this size
- match = match && (messageSize <= mfr.settings.MaxReceivedMessageSize);
- // if a filter is specified, it must match as well
- if (match && mfr.filters != null)
- {
- for (int i = 0; match && i < mfr.filters.Length; i++)
- {
- match = mfr.filters[i].Match(via, to);
- }
- }
- if (match)
- {
- callbacks.Add(mfr.callback);
- }
- }
- }
- }
- }
- foreach (MessageAvailableCallback callback in callbacks)
- {
- Message localCopy;
- try
- {
- //this copy is free'd by SFx.
- localCopy = messageBuffer.CreateMessage();
- localCopy.Properties.Via = via;
- localCopy.Headers.To = to;
- //mark security header as understood.
- try
- {
- int i = localCopy.Headers.FindHeader(SecurityJan2004Strings.Security, SecurityJan2004Strings.Namespace);
- if (i >= 0)
- {
- localCopy.Headers.AddUnderstood(i);
- }
- }
- catch (MessageHeaderException e)
- {
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
- }
- catch (SerializationException e)
- {
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
- }
- catch (XmlException e)
- {
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
- }
- if (index != -1)
- {
- localCopy.Headers.ReplaceAt(index, hopHeader);
- }
- callback(localCopy);
- }
- catch (ObjectDisposedException e)
- {
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- }
- catch (CommunicationObjectAbortedException e)
- {
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- }
- catch (CommunicationObjectFaultedException e)
- {
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- }
- }
- }
- finally
- {
- if (message != null)
- message.Close();
- }
- }
- public void RefreshConnection()
- {
- PeerMaintainer lclMaintainer = null;
- lock (ThisLock)
- {
- ThrowIfNotOpen();
- lclMaintainer = maintainer;
- }
- if (lclMaintainer != null)
- {
- lclMaintainer.RefreshConnection();
- }
- }
- public void PingConnections()
- {
- PeerMaintainer lclMaintainer = null;
- lock (ThisLock)
- {
- lclMaintainer = maintainer;
- }
- if (lclMaintainer != null)
- {
- lclMaintainer.PingConnections();
- }
- }
- //always call methods from inside a lock (of the container)
- class RefCountedSecurityProtocol
- {
- int refCount;
- public SecurityProtocol Protocol;
- public RefCountedSecurityProtocol(SecurityProtocol securityProtocol)
- {
- this.Protocol = securityProtocol;
- this.refCount = 1;
- }
- public int AddRef()
- {
- return ++refCount;
- }
- public int Release()
- {
- return --refCount;
- }
- }
- // internal message filtering
- internal void RegisterMessageFilter(object registrant, Uri via, PeerMessageFilter[] filters,
- ITransportFactorySettings settings, MessageAvailableCallback callback, SecurityProtocol securityProtocol)
- {
- MessageFilterRegistration registration = new MessageFilterRegistration();
- registration.registrant = registrant;
- registration.via = via;
- registration.filters = filters;
- registration.settings = settings;
- registration.callback = callback;
- registration.securityProtocol = securityProtocol;
- lock (ThisLock)
- {
- messageFilters.Add(registrant, registration);
- RefCountedSecurityProtocol protocolWrapper = null;
- if (!this.uri2SecurityProtocol.TryGetValue(via, out protocolWrapper))
- {
- protocolWrapper = new RefCountedSecurityProtocol(securityProtocol);
- this.uri2SecurityProtocol.Add(via, protocolWrapper);
- }
- else
- protocolWrapper.AddRef();
- }
- }
- // internal method to release the reference on an existing PeerNode
- internal void Release()
- {
- lock (peerNodes)
- {
- if (peerNodes.ContainsValue(this))
- {
- if (--refCount == 0)
- {
- // no factories/channels are using this instance (although the application may still be
- // referring to it directly). either way, we remove this from the registry
- peerNodes.Remove(listenUri);
- }
- }
- }
- }
- // Call with null to reset to our implementation
- public void SetServiceHandlers(Dictionary<Type, object> services)
- {
- lock (ThisLock)
- {
- serviceHandlers = services;
- }
- }
- void ThrowIfNotOpen()
- {
- if (!isOpen)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.TransportManagerNotOpen)));
- }
- }
- void ThrowIfOpen()
- {
- if (isOpen)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(
- SR.TransportManagerOpen)));
- }
- }
- public override string ToString()
- {
- lock (ThisLock)
- {
- // if open return the mesh id, otherwise return the type
- if (isOpen)
- return string.Format(System.Globalization.CultureInfo.InvariantCulture,
- "{0} ({1})", MeshId, NodeId);
- else
- return this.GetType().ToString();
- }
- }
- internal void UnregisterMessageFilter(object registrant, Uri via)
- {
- lock (ThisLock)
- {
- messageFilters.Remove(registrant);
- RefCountedSecurityProtocol protocolWrapper = null;
- if (uri2SecurityProtocol.TryGetValue(via, out protocolWrapper))
- {
- if (protocolWrapper.Release() == 0)
- uri2SecurityProtocol.Remove(via);
- }
- else
- Fx.Assert(false, "Corresponding SecurityProtocol is not Found!");
- }
- }
- internal static void ValidateVia(Uri uri)
- {
- int viaSize = Encoding.UTF8.GetByteCount(uri.OriginalString);
- if (viaSize > maxViaSize)
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidDataException(SR.GetString(
- SR.PeerChannelViaTooLong, uri, viaSize, maxViaSize)));
- }
- internal class ChannelRegistration
- {
- public object registrant;
- public Uri via;
- public ITransportFactorySettings settings;
- public SecurityProtocol securityProtocol;
- public Type channelType;
- }
- // holds the registration information passed in by channels and listeners. This informtaion is used
- // to determine which channels and listeners will receive an incoming message
- class MessageFilterRegistration : ChannelRegistration
- {
- public PeerMessageFilter[] filters;
- public MessageAvailableCallback callback;
- }
- // represents the settings of a PeerListenerFactory or PeerChannelFactory, used to create a new
- // PeerNode or compare settings to an existing PeerNode
- internal class Registration
- {
- IPAddress listenIPAddress;
- Uri listenUri;
- long maxReceivedMessageSize;
- int port;
- PeerResolver resolver;
- PeerSecurityManager securityManager;
- XmlDictionaryReaderQuotas readerQuotas;
- long maxBufferPoolSize;
- public Registration(Uri listenUri, IPeerFactory factory)
- {
- if (factory.Resolver == null)
- {
- throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
- new InvalidOperationException(SR.GetString(SR.PeerResolverRequired)));
- }
- if (factory.ListenIPAddress != null)
- {
- listenIPAddress = factory.ListenIPAddress;
- }
- this.listenUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
- this.port = factory.Port;
- this.maxReceivedMessageSize = factory.MaxReceivedMessageSize;
- this.resolver = factory.Resolver;
- this.securityManager = factory.SecurityManager;
- this.readerQuotas = new XmlDictionaryReaderQuotas();
- factory.ReaderQuotas.CopyTo(this.readerQuotas);
- this.maxBufferPoolSize = factory.MaxBufferPoolSize;
- }
- bool HasMismatchedReaderQuotas(XmlDictionaryReaderQuotas existingOne, XmlDictionaryReaderQuotas newOne, out string result)
- {
- //check for properties that affect the message
- result = null;
- if (existingOne.MaxArrayLength != newOne.MaxArrayLength)
- result = PeerBindingPropertyNames.ReaderQuotasDotArrayLength;
- else if (existingOne.MaxStringContentLength != newOne.MaxStringContentLength)
- result = PeerBindingPropertyNames.ReaderQuotasDotStringLength;
- else if (existingOne.MaxDepth != newOne.MaxDepth)
- result = PeerBindingPropertyNames.ReaderQuotasDotMaxDepth;
- else if (existingOne.MaxNameTableCharCount != newOne.MaxNameTableCharCount)
- result = PeerBindingPropertyNames.ReaderQuotasDotMaxCharCount;
- else if (existingOne.MaxBytesPerRead != newOne.MaxBytesPerRead)
- result = PeerBindingPropertyNames.ReaderQuotasDotMaxBytesPerRead;
- return result != null;
- }
- public void CheckIfCompatible(PeerNodeImplementation peerNode, Uri via)
- {
- string mismatch = null;
- // test the settings that must be identical
- if (listenUri != peerNode.ListenUri)
- mismatch = PeerBindingPropertyNames.ListenUri;
- else if (port != peerNode.Port)
- mismatch = PeerBindingPropertyNames.Port;
- else if (maxReceivedMessageSize != peerNode.MaxReceivedMessageSize)
- mismatch = PeerBindingPropertyNames.MaxReceivedMessageSize;
- else if (maxBufferPoolSize != peerNode.MaxBufferPoolSize)
- mismatch = PeerBindingPropertyNames.MaxBufferPoolSize;
- else if (HasMismatchedReaderQuotas(peerNode.ReaderQuotas, readerQuotas, out mismatch))
- { }
- else if (resolver.GetType() != peerNode.Resolver.GetType())
- mismatch = PeerBindingPropertyNames.Resolver;
- else if (!resolver.Equals(peerNode.Resolver))
- mismatch = PeerBindingPropertyNames.ResolverSettings;
- else if (listenIPAddress != peerNode.ListenIPAddress)
- {
- if ((listenIPAddress == null || peerNode.ListenIPAddress == null)
- ||
- (!listenIPAddress.Equals(peerNode.ListenIPAddress)))
- mismatch = PeerBindingPropertyNames.ListenIPAddress;
- }
- else if ((securityManager == null) && (peerNode.SecurityManager != null))
- mismatch = PeerBindingPropertyNames.Security;
- if (mismatch != null)
- PeerExceptionHelper.ThrowInvalidOperation_PeerConflictingPeerNodeSettings(mismatch);
- securityManager.CheckIfCompatibleNodeSettings(peerNode.SecurityManager);
- }
- public PeerNodeImplementation CreatePeerNode()
- {
- PeerNodeImplementation peerNode = new PeerNodeImplementation();
- peerNode.ListenIPAddress = listenIPAddress;
- peerNode.ListenUri = listenUri;
- peerNode.MaxReceivedMessageSize = maxReceivedMessageSize;
- peerNode.Port = port;
- peerNode.Resolver = resolver;
- peerNode.SecurityManager = securityManager;
- this.readerQuotas.CopyTo(peerNode.readerQuotas);
- peerNode.MaxBufferPoolSize = maxBufferPoolSize;
- return peerNode;
- }
- }
- class SendAsyncResult : AsyncResult
- {
- bool floodComplete = false;
- bool localDispatchComplete = false;
- object thisLock = new object();
- object ThisLock { get { return thisLock; } }
- Exception floodException = null;
- public SendAsyncResult(AsyncCallback callback, object state) : base(callback, state) { }
- public void OnFloodComplete(IAsyncResult result)
- {
- if (this.floodComplete || this.IsCompleted)
- return;
- bool complete = false;
- lock (this.ThisLock)
- {
- if (this.localDispatchComplete)
- complete = true;
- this.floodComplete = true;
- }
- try
- {
- PeerFlooder.EndFloodEncodedMessage(result);
- }
- catch (Exception e)
- {
- if (Fx.IsFatal(e)) throw;
- DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
- floodException = e;
- }
- if (complete)
- {
- this.Complete(result.CompletedSynchronously, floodException);
- }
- }
- public void OnLocalDispatchComplete(IAsyncResult result)
- {
- SendAsyncResult sr = (SendAsyncResult)result;
- if (this.localDispatchComplete || this.IsCompleted)
- return;
- bool complete = false;
- lock (this.ThisLock)
- {
- if (this.floodComplete)
- complete = true;
- this.localDispatchComplete = true;
- }
- if (complete)
- {
- this.Complete(true, floodException);
- }
- }
- public static void End(IAsyncResult result)
- {
- AsyncResult.End<SendAsyncResult>(result);
- }
- }
- bool IPeerNodeMessageHandling.HasMessagePropagation
- {
- get
- {
- return this.messagePropagationFilter != null;
- }
- }
- bool IPeerNodeMessageHandling.IsKnownVia(Uri via)
- {
- bool result = false;
- lock (ThisLock)
- {
- result = uri2SecurityProtocol.ContainsKey(via);
- }
- return result;
- }
- bool IPeerNodeMessageHandling.IsNotSeenBefore(Message message, out byte[] id, out int cacheMiss)
- {
- PeerFlooder lclFlooder = flooder;
- id = DefaultId;
- cacheMiss = -1;
- return (lclFlooder != null && lclFlooder.IsNotSeenBefore(message, out id, out cacheMiss));
- }
- public MessageEncodingBindingElement EncodingBindingElement
- {
- get
- {
- return this.EncodingElement;
- }
- }
- }
- interface IPeerNodeMessageHandling
- {
- void HandleIncomingMessage(MessageBuffer messageBuffer, PeerMessagePropagation propagateFlags, int index, MessageHeader header, Uri via, Uri to);
- PeerMessagePropagation DetermineMessagePropagation(Message message, PeerMessageOrigination origination);
- bool HasMessagePropagation { get; }
- bool ValidateIncomingMessage(ref Message data, Uri via);
- bool IsKnownVia(Uri via);
- bool IsNotSeenBefore(Message message, out byte[] id, out int cacheMiss);
- MessageEncodingBindingElement EncodingBindingElement { get; }
- }
- }
|