PeerNodeImplementation.cs 74 KB


  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Collections;
  7. using System.Collections.Generic;
  8. using System.Diagnostics;
  9. using System.IO;
  10. using System.Net;
  11. using System.Runtime;
  12. using System.Runtime.Serialization;
  13. using System.ServiceModel;
  14. using System.ServiceModel.Diagnostics;
  15. using System.ServiceModel.Dispatcher;
  16. using System.ServiceModel.Security;
  17. using System.Text;
  18. using System.Threading;
  19. using System.Xml;
  20. partial class PeerNodeImplementation : IPeerNodeMessageHandling
  21. {
  22. const int maxViaSize = 4096;
  23. public delegate void MessageAvailableCallback(Message message);
  24. // configuration
  25. int connectTimeout;
  26. IPAddress listenIPAddress;
  27. Uri listenUri;
  28. int port;
  29. long maxReceivedMessageSize;
  30. int minNeighbors;
  31. int idealNeighbors;
  32. int maxNeighbors;
  33. int maxReferrals;
  34. string meshId;
  35. PeerMessagePropagationFilter messagePropagationFilter;
  36. SynchronizationContext messagePropagationFilterContext;
  37. int maintainerInterval = PeerTransportConstants.MaintainerInterval; // milliseconds before a maintainer kicks in
  38. PeerResolver resolver;
  39. PeerNodeConfig config;
  40. PeerSecurityManager securityManager;
  41. internal MessageEncodingBindingElement EncodingElement;
  42. // internal state
  43. ManualResetEvent connectCompletedEvent; // raised when maintainer has connected or given up
  44. MessageEncoder encoder; // used for encoding internal messages
  45. // Double-checked locking pattern requires volatile for read/write synchronization
  46. volatile bool isOpen;
  47. Exception openException; // exception to be thrown from Open
  48. Dictionary<object, MessageFilterRegistration> messageFilters;
  49. int refCount; // number of factories/channels that are using this instance
  50. SimpleStateManager stateManager; // manages open/close operations
  51. object thisLock = new Object();
  52. PeerNodeTraceRecord traceRecord;
  53. PeerNodeTraceRecord completeTraceRecord; // contains address info as well
  54. // primary infrastructure components
  55. internal PeerConnector connector; // Purely for testing do not take a internal dependency on this
  56. PeerMaintainer maintainer;
  57. internal PeerFlooder flooder; // Purely for testing do not take an internal dependency on this
  58. PeerNeighborManager neighborManager;
  59. PeerIPHelper ipHelper;
  60. PeerService service;
  61. object resolverRegistrationId;
  62. bool registered;
  63. public event EventHandler Offline;
  64. public event EventHandler Online;
  65. Dictionary<Uri, RefCountedSecurityProtocol> uri2SecurityProtocol;
  66. Dictionary<Type, object> serviceHandlers;
  67. BufferManager bufferManager = null;
  68. internal static byte[] DefaultId = new byte[0];
  69. XmlDictionaryReaderQuotas readerQuotas;
  70. long maxBufferPoolSize;
  71. internal int MaxSendQueue = 128, MaxReceiveQueue = 128;
  72. public PeerNodeImplementation()
  73. {
  74. // intialize default configuration
  75. connectTimeout = PeerTransportConstants.ConnectTimeout;
  76. maxReceivedMessageSize = TransportDefaults.MaxReceivedMessageSize;
  77. minNeighbors = PeerTransportConstants.MinNeighbors;
  78. idealNeighbors = PeerTransportConstants.IdealNeighbors;
  79. maxNeighbors = PeerTransportConstants.MaxNeighbors;
  80. maxReferrals = PeerTransportConstants.MaxReferrals;
  81. port = PeerTransportDefaults.Port;
  82. // initialize internal state
  83. connectCompletedEvent = new ManualResetEvent(false);
  84. encoder = new BinaryMessageEncodingBindingElement().CreateMessageEncoderFactory().Encoder;
  85. messageFilters = new Dictionary<object, MessageFilterRegistration>();
  86. stateManager = new SimpleStateManager(this);
  87. uri2SecurityProtocol = new Dictionary<Uri, RefCountedSecurityProtocol>();
  88. readerQuotas = new XmlDictionaryReaderQuotas();
  89. this.maxBufferPoolSize = TransportDefaults.MaxBufferPoolSize;
  90. }
  91. // To facilitate testing
  92. public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosed;
  93. public event EventHandler<PeerNeighborCloseEventArgs> NeighborClosing;
  94. public event EventHandler NeighborConnected;
  95. public event EventHandler NeighborOpened;
  96. public event EventHandler Aborted;
  97. public PeerNodeConfig Config
  98. {
  99. get
  100. {
  101. return this.config;
  102. }
  103. private set
  104. {
  105. Fx.Assert(value != null, "PeerNodeImplementation.Config can not be set to null");
  106. this.config = value;
  107. }
  108. }
  109. public bool IsOnline
  110. {
  111. get
  112. {
  113. lock (ThisLock)
  114. {
  115. if (isOpen)
  116. return neighborManager.IsOnline;
  117. else
  118. return false;
  119. }
  120. }
  121. }
  122. internal bool IsOpen
  123. {
  124. get { return isOpen; }
  125. }
  126. public IPAddress ListenIPAddress
  127. {
  128. get { return listenIPAddress; }
  129. set
  130. {
  131. // No validation necessary at this point. When the service is opened, it will throw if the IP address is invalid
  132. lock (ThisLock)
  133. {
  134. ThrowIfOpen();
  135. listenIPAddress = value;
  136. }
  137. }
  138. }
  139. public Uri ListenUri
  140. {
  141. get { return listenUri; }
  142. set
  143. {
  144. if (value == null)
  145. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("value");
  146. if (value.Scheme != PeerStrings.Scheme)
  147. {
  148. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("value", SR.GetString(SR.InvalidUriScheme,
  149. value.Scheme, PeerStrings.Scheme));
  150. }
  151. Fx.Assert(value.PathAndQuery == "/", "PeerUriCannotContainPath");
  152. lock (ThisLock)
  153. {
  154. ThrowIfOpen();
  155. listenUri = value;
  156. }
  157. }
  158. }
  159. public long MaxBufferPoolSize
  160. {
  161. get { return maxBufferPoolSize; }
  162. set
  163. {
  164. lock (ThisLock)
  165. {
  166. ThrowIfOpen();
  167. maxBufferPoolSize = value;
  168. }
  169. }
  170. }
  171. public long MaxReceivedMessageSize
  172. {
  173. get { return maxReceivedMessageSize; }
  174. set
  175. {
  176. if (!(value >= PeerTransportConstants.MinMessageSize))
  177. {
  178. throw Fx.AssertAndThrow("invalid MaxReceivedMessageSize");
  179. }
  180. lock (ThisLock)
  181. {
  182. ThrowIfOpen();
  183. maxReceivedMessageSize = value;
  184. }
  185. }
  186. }
  187. public string MeshId
  188. {
  189. get
  190. {
  191. lock (ThisLock)
  192. {
  193. ThrowIfNotOpen();
  194. return meshId;
  195. }
  196. }
  197. }
  198. public PeerMessagePropagationFilter MessagePropagationFilter
  199. {
  200. get { return messagePropagationFilter; }
  201. set
  202. {
  203. lock (ThisLock)
  204. {
  205. // null is ok and causes optimised flooding codepath
  206. messagePropagationFilter = value;
  207. messagePropagationFilterContext = ThreadBehavior.GetCurrentSynchronizationContext();
  208. }
  209. }
  210. }
  211. // Made internal to facilitate testing
  212. public PeerNeighborManager NeighborManager
  213. {
  214. get { return neighborManager; }
  215. }
  216. public ulong NodeId
  217. {
  218. get
  219. {
  220. ThrowIfNotOpen();
  221. return config.NodeId;
  222. }
  223. }
  224. public int Port
  225. {
  226. get { return port; }
  227. set
  228. {
  229. lock (ThisLock)
  230. {
  231. ThrowIfOpen();
  232. port = value;
  233. }
  234. }
  235. }
  236. public int ListenerPort
  237. {
  238. get
  239. {
  240. ThrowIfNotOpen();
  241. return config.ListenerPort;
  242. }
  243. }
  244. public XmlDictionaryReaderQuotas ReaderQuotas
  245. {
  246. get
  247. {
  248. return this.readerQuotas;
  249. }
  250. }
  251. public PeerResolver Resolver
  252. {
  253. get { return resolver; }
  254. set
  255. {
  256. Fx.Assert(value != null, "null Resolver");
  257. lock (ThisLock)
  258. {
  259. ThrowIfOpen();
  260. resolver = value;
  261. }
  262. }
  263. }
  264. public PeerSecurityManager SecurityManager
  265. {
  266. get { return this.securityManager; }
  267. set { this.securityManager = value; }
  268. }
  269. internal PeerService Service
  270. {
  271. get
  272. {
  273. return this.service;
  274. }
  275. set
  276. {
  277. lock (ThisLock)
  278. {
  279. ThrowIfNotOpen();
  280. this.service = value;
  281. }
  282. }
  283. }
  284. object ThisLock
  285. {
  286. get { return thisLock; }
  287. }
  288. public void Abort()
  289. {
  290. stateManager.Abort();
  291. }
  292. public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  293. {
  294. return stateManager.BeginClose(timeout, callback, state);
  295. }
  296. public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object state, bool waitForOnline)
  297. {
  298. return stateManager.BeginOpen(timeout, callback, state, waitForOnline);
  299. }
  300. public Guid ProcessOutgoingMessage(Message message, Uri via)
  301. {
  302. Guid result = Guid.NewGuid();
  303. System.Xml.UniqueId messageId = new System.Xml.UniqueId(result);
  304. if (-1 != message.Headers.FindHeader(PeerStrings.MessageId, PeerStrings.Namespace))
  305. PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerStrings.MessageId);
  306. if (-1 != message.Headers.FindHeader(PeerOperationNames.PeerTo, PeerStrings.Namespace))
  307. PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.PeerTo);
  308. if (-1 != message.Headers.FindHeader(PeerOperationNames.PeerVia, PeerStrings.Namespace))
  309. PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.PeerVia);
  310. if (-1 != message.Headers.FindHeader(PeerOperationNames.Flood, PeerStrings.Namespace, PeerOperationNames.Demuxer))
  311. PeerExceptionHelper.ThrowInvalidOperation_ConflictingHeader(PeerOperationNames.Flood);
  312. message.Headers.Add(PeerDictionaryHeader.CreateMessageIdHeader(messageId));
  313. message.Properties.Via = via;
  314. message.Headers.Add(MessageHeader.CreateHeader(PeerOperationNames.PeerTo, PeerStrings.Namespace, message.Headers.To));
  315. message.Headers.Add(PeerDictionaryHeader.CreateViaHeader(via));
  316. message.Headers.Add(PeerDictionaryHeader.CreateFloodRole());
  317. return result;
  318. }
  319. public void SecureOutgoingMessage(ref Message message, Uri via, TimeSpan timeout, SecurityProtocol securityProtocol)
  320. {
  321. if (securityProtocol != null)
  322. {
  323. securityProtocol.SecureOutgoingMessage(ref message, timeout);
  324. }
  325. }
  326. public IAsyncResult BeginSend(object registrant, Message message, Uri via,
  327. ITransportFactorySettings settings, TimeSpan timeout, AsyncCallback callback, object state, SecurityProtocol securityProtocol)
  328. {
  329. PeerFlooder localFlooder;
  330. int factoryMaxReceivedMessageSize;
  331. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  332. MessageBuffer messageBuffer = null;
  333. Message securedMessage = null;
  334. ulong hopcount = PeerTransportConstants.MaxHopCount;
  335. PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
  336. int messageSize = (int)-1;
  337. byte[] id;
  338. SendAsyncResult result = new SendAsyncResult(callback, state);
  339. AsyncCallback onFloodComplete = Fx.ThunkCallback(new AsyncCallback(result.OnFloodComplete));
  340. try
  341. {
  342. lock (ThisLock)
  343. {
  344. ThrowIfNotOpen();
  345. localFlooder = flooder;
  346. }
  347. // we know this will fit in an int because of our MaxReceivedMessageSize restrictions
  348. factoryMaxReceivedMessageSize = (int)Math.Min(maxReceivedMessageSize, settings.MaxReceivedMessageSize);
  349. Guid guid = ProcessOutgoingMessage(message, via);
  350. SecureOutgoingMessage(ref message, via, timeout, securityProtocol);
  351. if ((message is SecurityAppliedMessage))
  352. {
  353. ArraySegment<byte> buffer = encoder.WriteMessage(message, int.MaxValue, bufferManager);
  354. securedMessage = encoder.ReadMessage(buffer, bufferManager);
  355. id = (message as SecurityAppliedMessage).PrimarySignatureValue;
  356. messageSize = (int)buffer.Count;
  357. }
  358. else
  359. {
  360. securedMessage = message;
  361. id = guid.ToByteArray();
  362. }
  363. messageBuffer = securedMessage.CreateBufferedCopy(factoryMaxReceivedMessageSize);
  364. string contentType = settings.MessageEncoderFactory.Encoder.ContentType;
  365. if (this.messagePropagationFilter != null)
  366. {
  367. using (Message filterMessage = messageBuffer.CreateMessage())
  368. {
  369. propagateFlags = ((IPeerNodeMessageHandling)this).DetermineMessagePropagation(filterMessage, PeerMessageOrigination.Local);
  370. }
  371. }
  372. if ((propagateFlags & PeerMessagePropagation.Remote) != PeerMessagePropagation.None)
  373. {
  374. if (hopcount == 0)
  375. propagateFlags &= ~PeerMessagePropagation.Remote;
  376. }
  377. // flood it out
  378. IAsyncResult ar = null;
  379. if ((propagateFlags & PeerMessagePropagation.Remote) != 0)
  380. {
  381. ar = localFlooder.BeginFloodEncodedMessage(id, messageBuffer, timeoutHelper.RemainingTime(), onFloodComplete, null);
  382. if (DiagnosticUtility.ShouldTraceVerbose)
  383. {
  384. TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerChannelMessageSent, SR.GetString(SR.TraceCodePeerChannelMessageSent), this, message);
  385. }
  386. }
  387. else
  388. {
  389. ar = new CompletedAsyncResult(onFloodComplete, null);
  390. }
  391. if (ar == null)
  392. {
  393. Fx.Assert("SendAsyncResult must have an Async Result for onFloodComplete");
  394. }
  395. // queue up the pre-encoded message for local channels
  396. if ((propagateFlags & PeerMessagePropagation.Local) != 0)
  397. {
  398. using (Message msg = messageBuffer.CreateMessage())
  399. {
  400. int i = msg.Headers.FindHeader(SecurityJan2004Strings.Security, SecurityJan2004Strings.Namespace);
  401. if (i >= 0)
  402. {
  403. msg.Headers.AddUnderstood(i);
  404. }
  405. using (MessageBuffer clientBuffer = msg.CreateBufferedCopy(factoryMaxReceivedMessageSize))
  406. {
  407. DeliverMessageToClientChannels(registrant, clientBuffer, via, message.Headers.To, contentType, messageSize, -1, null);
  408. }
  409. }
  410. }
  411. result.OnLocalDispatchComplete(result);
  412. }
  413. finally
  414. {
  415. message.Close();
  416. if (securedMessage != null)
  417. securedMessage.Close();
  418. if (messageBuffer != null)
  419. messageBuffer.Close();
  420. }
  421. return result;
  422. }
  423. public void Close(TimeSpan timeout)
  424. {
  425. stateManager.Close(timeout);
  426. }
  427. void CloseCore(TimeSpan timeout, bool graceful)
  428. {
  429. PeerService lclService;
  430. PeerMaintainer lclMaintainer;
  431. PeerNeighborManager lclNeighborManager;
  432. PeerConnector lclConnector;
  433. PeerIPHelper lclIPHelper;
  434. PeerNodeConfig lclConfig;
  435. PeerFlooder lclFlooder;
  436. Exception exception = null;
  437. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  438. if (DiagnosticUtility.ShouldTraceInformation)
  439. {
  440. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeClosing, SR.GetString(SR.TraceCodePeerNodeClosing), this.traceRecord, this, null);
  441. }
  442. lock (ThisLock)
  443. {
  444. isOpen = false;
  445. lclMaintainer = maintainer;
  446. lclNeighborManager = neighborManager;
  447. lclConnector = connector;
  448. lclIPHelper = ipHelper;
  449. lclService = service;
  450. lclConfig = config;
  451. lclFlooder = flooder;
  452. }
  453. // only unregister if we are doing a g----ful shutdown
  454. try
  455. {
  456. if (graceful)
  457. {
  458. UnregisterAddress(timeout);
  459. }
  460. else
  461. {
  462. if (lclConfig != null)
  463. {
  464. ActionItem.Schedule(new Action<object>(UnregisterAddress), lclConfig.UnregisterTimeout);
  465. }
  466. }
  467. }
  468. catch (Exception e)
  469. {
  470. if (Fx.IsFatal(e)) throw;
  471. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  472. if (exception == null) exception = e;
  473. }
  474. try
  475. {
  476. if (lclConnector != null)
  477. lclConnector.Closing();
  478. if (lclService != null)
  479. {
  480. try
  481. {
  482. lclService.Abort();
  483. }
  484. catch (Exception e)
  485. {
  486. if (Fx.IsFatal(e)) throw;
  487. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  488. if (exception == null) exception = e;
  489. }
  490. }
  491. if (lclMaintainer != null)
  492. {
  493. try
  494. {
  495. lclMaintainer.Close();
  496. }
  497. catch (Exception e)
  498. {
  499. if (Fx.IsFatal(e)) throw;
  500. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  501. if (exception == null) exception = e;
  502. }
  503. }
  504. if (lclIPHelper != null)
  505. {
  506. try
  507. {
  508. lclIPHelper.Close();
  509. lclIPHelper.AddressChanged -= new EventHandler(stateManager.OnIPAddressesChanged);
  510. }
  511. catch (Exception e)
  512. {
  513. if (Fx.IsFatal(e)) throw;
  514. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  515. if (exception == null) exception = e;
  516. }
  517. }
  518. if (lclNeighborManager != null)
  519. {
  520. lclNeighborManager.NeighborConnected -= new EventHandler(OnNeighborConnected);
  521. lclNeighborManager.NeighborOpened -= new EventHandler(this.securityManager.OnNeighborOpened);
  522. this.securityManager.OnNeighborAuthenticated -= new EventHandler(this.OnNeighborAuthenticated);
  523. lclNeighborManager.Online -= new EventHandler(FireOnline);
  524. lclNeighborManager.Offline -= new EventHandler(FireOffline);
  525. try
  526. {
  527. lclNeighborManager.Shutdown(graceful, timeoutHelper.RemainingTime());
  528. }
  529. catch (Exception e)
  530. {
  531. if (Fx.IsFatal(e)) throw;
  532. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  533. if (exception == null) exception = e;
  534. }
  535. // unregister for neighbor close events once shutdown has completed
  536. lclNeighborManager.NeighborClosed -= new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosed);
  537. lclNeighborManager.NeighborClosing -= new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosing);
  538. lclNeighborManager.Close();
  539. }
  540. if (lclConnector != null)
  541. {
  542. try
  543. {
  544. lclConnector.Close();
  545. }
  546. catch (Exception e)
  547. {
  548. if (Fx.IsFatal(e)) throw;
  549. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  550. if (exception == null) exception = e;
  551. }
  552. }
  553. if (lclFlooder != null)
  554. {
  555. try
  556. {
  557. lclFlooder.Close();
  558. }
  559. catch (Exception e)
  560. {
  561. if (Fx.IsFatal(e)) throw;
  562. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  563. if (exception == null) exception = e;
  564. }
  565. }
  566. }
  567. catch (Exception e)
  568. {
  569. if (Fx.IsFatal(e)) throw;
  570. if (exception == null) exception = e;
  571. }
  572. // reset object for next call to open
  573. EventHandler abortedHandler = null;
  574. lock (ThisLock)
  575. {
  576. // clear out old components (so they can be garbage collected)
  577. neighborManager = null;
  578. connector = null;
  579. maintainer = null;
  580. flooder = null;
  581. ipHelper = null;
  582. service = null;
  583. // reset generated config
  584. config = null;
  585. meshId = null;
  586. abortedHandler = Aborted;
  587. }
  588. // Notify anyone who is interested that abort has occured
  589. if (!graceful && abortedHandler != null)
  590. {
  591. try
  592. {
  593. abortedHandler(this, EventArgs.Empty);
  594. }
  595. catch (Exception e)
  596. {
  597. if (Fx.IsFatal(e)) throw;
  598. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  599. if (exception == null) exception = e;
  600. }
  601. }
  602. if (DiagnosticUtility.ShouldTraceInformation)
  603. {
  604. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeClosed, SR.GetString(SR.TraceCodePeerNodeClosed), this.traceRecord, this, null);
  605. }
  606. if (exception != null && graceful == true) // Swallows all non fatal exceptions during Abort
  607. {
  608. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(exception);
  609. }
  610. }
  611. // Performs case-insensitive comparison of two vias
  612. bool CompareVia(Uri via1, Uri via2)
  613. {
  614. return (Uri.Compare(via1, via2,
  615. (UriComponents.Scheme | UriComponents.UserInfo | UriComponents.Host | UriComponents.Port | UriComponents.Path),
  616. UriFormat.SafeUnescaped, StringComparison.OrdinalIgnoreCase) == 0);
  617. }
  618. public static void EndClose(IAsyncResult result)
  619. {
  620. if (result == null)
  621. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
  622. SimpleStateManager.EndClose(result);
  623. }
  624. public static void EndOpen(IAsyncResult result)
  625. {
  626. if (result == null)
  627. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
  628. SimpleStateManager.EndOpen(result);
  629. }
  630. public static void EndSend(IAsyncResult result)
  631. {
  632. if (result == null)
  633. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("result");
  634. SendAsyncResult.End(result);
  635. }
  636. // Necessary to allow access of the EventHandlers which can only be done from inside the class
  637. void FireOffline(object sender, EventArgs e)
  638. {
  639. if (!isOpen)
  640. {
  641. return;
  642. }
  643. EventHandler handler = Offline;
  644. if (handler != null)
  645. {
  646. handler(this, EventArgs.Empty);
  647. }
  648. }
  649. // Necessary to allow access of the EventHandlers which can only be done from inside the class
  650. void FireOnline(object sender, EventArgs e)
  651. {
  652. if (!isOpen)
  653. {
  654. return;
  655. }
  656. EventHandler handler = Online;
  657. if (handler != null)
  658. {
  659. handler(this, EventArgs.Empty);
  660. }
  661. }
  662. // static Uri -> PeerNode mapping
  663. static internal Dictionary<Uri, PeerNodeImplementation> peerNodes = new Dictionary<Uri, PeerNodeImplementation>();
  664. internal static PeerNodeImplementation Get(Uri listenUri)
  665. {
  666. PeerNodeImplementation node = null;
  667. if (!TryGet(listenUri, out node))
  668. {
  669. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  670. new InvalidOperationException(SR.GetString(SR.NoTransportManagerForUri, listenUri)));
  671. }
  672. return node;
  673. }
  674. internal protected static bool TryGet(Uri listenUri, out PeerNodeImplementation result)
  675. {
  676. if (listenUri == null)
  677. {
  678. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenUri");
  679. }
  680. if (listenUri.Scheme != PeerStrings.Scheme)
  681. {
  682. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("listenUri", SR.GetString(SR.InvalidUriScheme,
  683. listenUri.Scheme, PeerStrings.Scheme));
  684. }
  685. result = null;
  686. bool success = false;
  687. // build base uri
  688. Uri baseUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
  689. lock (peerNodes)
  690. {
  691. if (peerNodes.ContainsKey(baseUri))
  692. {
  693. result = peerNodes[baseUri];
  694. success = true;
  695. }
  696. }
  697. return success;
  698. }
  699. public static bool TryGet(string meshId, out PeerNodeImplementation result)
  700. {
  701. UriBuilder uriBuilder = new UriBuilder();
  702. uriBuilder.Host = meshId;
  703. uriBuilder.Scheme = PeerStrings.Scheme;
  704. bool success = PeerNodeImplementation.TryGet(uriBuilder.Uri, out result);
  705. return success;
  706. }
  707. // internal method to return an existing PeerNode or create a new one with the given settings
  708. public static PeerNodeImplementation Get(Uri listenUri, Registration registration)
  709. {
  710. if (listenUri == null)
  711. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgumentNull("listenUri");
  712. if (listenUri.Scheme != PeerStrings.Scheme)
  713. {
  714. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("listenUri", SR.GetString(SR.InvalidUriScheme,
  715. listenUri.Scheme, PeerStrings.Scheme));
  716. }
  717. // build base uri
  718. Uri baseUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
  719. lock (peerNodes)
  720. {
  721. PeerNodeImplementation peerNodeImpl = null;
  722. PeerNodeImplementation peerNode = null;
  723. if (peerNodes.TryGetValue(baseUri, out peerNode))
  724. {
  725. peerNodeImpl = (PeerNodeImplementation)peerNode;
  726. // ensure that the PeerNode is compatible
  727. registration.CheckIfCompatible(peerNodeImpl, listenUri);
  728. peerNodeImpl.refCount++;
  729. return peerNodeImpl;
  730. }
  731. // create a new PeerNode, and add it to the dictionary
  732. peerNodeImpl = registration.CreatePeerNode();
  733. peerNodes[baseUri] = peerNodeImpl;
  734. peerNodeImpl.refCount = 1;
  735. return peerNodeImpl;
  736. }
  737. }
  738. // SimpleStateManager callback - Called on final release of PeerNode.
  739. void InternalClose(TimeSpan timeout, bool graceful)
  740. {
  741. CloseCore(timeout, graceful);
  742. lock (ThisLock)
  743. {
  744. messageFilters.Clear();
  745. }
  746. }
  747. protected void OnAbort()
  748. {
  749. InternalClose(TimeSpan.FromTicks(0), false);
  750. }
  751. protected void OnClose(TimeSpan timeout)
  752. {
  753. InternalClose(timeout, true);
  754. }
  755. // called when the maintainer has completed the connection attempt (successful or not)
  756. void OnConnectionAttemptCompleted(Exception e)
  757. {
  758. // store the exception if one occured when trying to connect, so that it can be rethrown from Open
  759. Fx.Assert(openException == null, "OnConnectionAttemptCompleted twice");
  760. openException = e;
  761. if (openException == null && DiagnosticUtility.ShouldTraceInformation)
  762. {
  763. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeOpened, SR.GetString(SR.TraceCodePeerNodeOpened), this.completeTraceRecord, this, null);
  764. }
  765. else if (openException != null && DiagnosticUtility.ShouldTraceError)
  766. {
  767. TraceUtility.TraceEvent(TraceEventType.Error, TraceCode.PeerNodeOpenFailed, SR.GetString(SR.TraceCodePeerNodeOpenFailed), this.completeTraceRecord, this, e);
  768. }
  769. connectCompletedEvent.Set();
  770. }
  771. bool IPeerNodeMessageHandling.ValidateIncomingMessage(ref Message message, Uri via)
  772. {
  773. SecurityProtocol protocol = null;
  774. if (via == null)
  775. {
  776. Fx.Assert("FloodMessage doesn't contain Via header!");
  777. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerMessageMustHaveVia, message.Headers.Action)));
  778. }
  779. if (TryGetSecurityProtocol(via, out protocol))
  780. {
  781. protocol.VerifyIncomingMessage(ref message, ServiceDefaults.SendTimeout, null);
  782. return true;
  783. }
  784. return false;
  785. }
  786. internal bool TryGetSecurityProtocol(Uri via, out SecurityProtocol protocol)
  787. {
  788. lock (ThisLock)
  789. {
  790. RefCountedSecurityProtocol wrapper = null;
  791. bool result = false;
  792. protocol = null;
  793. if (uri2SecurityProtocol.TryGetValue(via, out wrapper))
  794. {
  795. protocol = wrapper.Protocol;
  796. result = true;
  797. }
  798. return result;
  799. }
  800. }
  801. void IPeerNodeMessageHandling.HandleIncomingMessage(MessageBuffer messageBuffer, PeerMessagePropagation propagateFlags,
  802. int index, MessageHeader hopHeader, Uri via, Uri to)
  803. {
  804. if (DiagnosticUtility.ShouldTraceVerbose)
  805. {
  806. TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerFloodedMessageReceived, SR.GetString(SR.TraceCodePeerFloodedMessageReceived), this.traceRecord, this, null);
  807. }
  808. if (via == null)
  809. {
  810. Fx.Assert("No VIA in the forwarded message!");
  811. using (Message message = messageBuffer.CreateMessage())
  812. {
  813. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.PeerMessageMustHaveVia, message.Headers.Action)));
  814. }
  815. }
  816. if ((propagateFlags & PeerMessagePropagation.Local) != 0)
  817. {
  818. DeliverMessageToClientChannels(null, messageBuffer, via, to, messageBuffer.MessageContentType, (int)maxReceivedMessageSize, index, hopHeader);
  819. messageBuffer = null;
  820. }
  821. else
  822. {
  823. if (DiagnosticUtility.ShouldTraceVerbose)
  824. {
  825. using (Message traceMessage = messageBuffer.CreateMessage())
  826. {
  827. TraceUtility.TraceEvent(TraceEventType.Verbose, TraceCode.PeerFloodedMessageNotPropagated, SR.GetString(SR.TraceCodePeerFloodedMessageNotPropagated), this.traceRecord, this, null, traceMessage);
  828. }
  829. }
  830. }
  831. }
  832. PeerMessagePropagation IPeerNodeMessageHandling.DetermineMessagePropagation(Message message, PeerMessageOrigination origination)
  833. {
  834. PeerMessagePropagation propagateFlags = PeerMessagePropagation.LocalAndRemote;
  835. PeerMessagePropagationFilter filter = MessagePropagationFilter;
  836. if (filter != null)
  837. {
  838. try
  839. {
  840. SynchronizationContext context = messagePropagationFilterContext;
  841. if (context != null)
  842. {
  843. context.Send(delegate(object state) { propagateFlags = filter.ShouldMessagePropagate(message, origination); }, null);
  844. }
  845. else
  846. {
  847. propagateFlags = filter.ShouldMessagePropagate(message, origination);
  848. }
  849. }
  850. catch (Exception e)
  851. {
  852. if (Fx.IsFatal(e)) throw;
  853. throw DiagnosticUtility.ExceptionUtility.ThrowHelperCallback(SR.GetString(SR.MessagePropagationException), e);
  854. }
  855. }
  856. // Don't flood if the Node is closed
  857. if (!isOpen)
  858. {
  859. propagateFlags = PeerMessagePropagation.None;
  860. }
  861. return propagateFlags;
  862. }
  863. // Queued callback to actually process the address change
  864. // The design is such that any address change notifications are queued just like Open/Close operations.
  865. // So, we need not worry about address changes racing with other address changes or Open/Close operations.
  866. // Abort can happen at any time. However, Abort skips unregistering addresses, so this method doesn't have
  867. // to worry about undoing its work if Abort happens.
  868. void OnIPAddressChange()
  869. {
  870. string lclMeshId = null;
  871. PeerNodeAddress nodeAddress = null;
  872. object lclResolverRegistrationId = null;
  873. bool lclRegistered = false;
  874. PeerIPHelper lclIPHelper = ipHelper;
  875. PeerNodeConfig lclconfig = config;
  876. bool processChange = false;
  877. TimeoutHelper timeoutHelper = new TimeoutHelper(ServiceDefaults.SendTimeout);
  878. // Determine if IP addresses have really changed before notifying the resolver
  879. // since it is possible that another change notification ahead of this one in the queue
  880. // may have already completed notifying the resolver of the most current change.
  881. if (lclIPHelper != null && config != null)
  882. {
  883. nodeAddress = lclconfig.GetListenAddress(false);
  884. processChange = lclIPHelper.AddressesChanged(nodeAddress.IPAddresses);
  885. if (processChange)
  886. {
  887. // Build the nodeAddress with the updated IP addresses
  888. nodeAddress = new PeerNodeAddress(
  889. nodeAddress.EndpointAddress, lclIPHelper.GetLocalAddresses());
  890. }
  891. }
  892. lock (ThisLock)
  893. {
  894. // Skip processing if the node isn't open anymore or if addresses haven't changed
  895. if (processChange && isOpen)
  896. {
  897. lclMeshId = meshId;
  898. lclResolverRegistrationId = resolverRegistrationId;
  899. lclRegistered = registered;
  900. config.SetListenAddress(nodeAddress);
  901. completeTraceRecord = new PeerNodeTraceRecord(config.NodeId, meshId, nodeAddress);
  902. }
  903. else
  904. {
  905. return;
  906. }
  907. }
  908. //#57954 - log and ---- non-critical exceptions during network change event notifications
  909. try
  910. {
  911. // Do we have any addresses? If so, update or re-register. Otherwise, unregister.
  912. if (nodeAddress.IPAddresses.Count > 0)
  913. {
  914. if (lclRegistered)
  915. {
  916. resolver.Update(lclResolverRegistrationId, nodeAddress, timeoutHelper.RemainingTime());
  917. }
  918. else
  919. {
  920. RegisterAddress(lclMeshId, nodeAddress, timeoutHelper.RemainingTime());
  921. }
  922. }
  923. else
  924. {
  925. UnregisterAddress(timeoutHelper.RemainingTime());
  926. }
  927. }
  928. catch (Exception e)
  929. {
  930. if (Fx.IsFatal(e)) throw;
  931. DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
  932. }
  933. PingConnections();
  934. if (DiagnosticUtility.ShouldTraceInformation)
  935. {
  936. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeAddressChanged, SR.GetString(SR.TraceCodePeerNodeAddressChanged), this.completeTraceRecord, this, null);
  937. }
  938. }
  939. // Register with the resolver
  940. void RegisterAddress(string lclMeshId, PeerNodeAddress nodeAddress, TimeSpan timeout)
  941. {
  942. // Register only if we have any addresses
  943. if (nodeAddress.IPAddresses.Count > 0)
  944. {
  945. object lclResolverRegistrationId = null;
  946. try
  947. {
  948. lclResolverRegistrationId = resolver.Register(lclMeshId, nodeAddress, timeout);
  949. }
  950. catch (Exception e)
  951. {
  952. if (Fx.IsFatal(e)) throw;
  953. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.ResolverException), e));
  954. }
  955. lock (ThisLock)
  956. {
  957. if (!(!registered))
  958. {
  959. throw Fx.AssertAndThrow("registered expected to be false");
  960. }
  961. registered = true;
  962. resolverRegistrationId = lclResolverRegistrationId;
  963. }
  964. }
  965. }
  966. // Unregister that should only be called from non-user threads.
  967. //since this is invoked on background threads, we log and ---- all non-critical exceptions
  968. //#57972
  969. void UnregisterAddress(object timeout)
  970. {
  971. try
  972. {
  973. UnregisterAddress((TimeSpan)timeout);
  974. }
  975. catch (Exception e)
  976. {
  977. if (Fx.IsFatal(e)) throw;
  978. DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
  979. }
  980. }
  981. void UnregisterAddress(TimeSpan timeout)
  982. {
  983. bool needToUnregister = false;
  984. object lclResolverRegistrationId = null;
  985. lock (ThisLock)
  986. {
  987. if (registered)
  988. {
  989. needToUnregister = true;
  990. lclResolverRegistrationId = resolverRegistrationId;
  991. registered = false; // this ensures that the current thread will do unregistration
  992. }
  993. resolverRegistrationId = null;
  994. }
  995. if (needToUnregister)
  996. {
  997. try
  998. {
  999. resolver.Unregister(lclResolverRegistrationId, timeout);
  1000. }
  1001. catch (Exception e)
  1002. {
  1003. if (Fx.IsFatal(e)) throw;
  1004. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationException(SR.GetString(SR.ResolverException), e));
  1005. }
  1006. }
  1007. }
  1008. void OnNeighborClosed(object sender, PeerNeighborCloseEventArgs e)
  1009. {
  1010. IPeerNeighbor neighbor = (IPeerNeighbor)sender;
  1011. PeerConnector localConnector;
  1012. PeerMaintainer localMaintainer;
  1013. PeerFlooder localFlooder;
  1014. localConnector = connector;
  1015. localMaintainer = maintainer;
  1016. localFlooder = flooder;
  1017. UtilityExtension.OnNeighborClosed(neighbor);
  1018. PeerChannelAuthenticatorExtension.OnNeighborClosed(neighbor);
  1019. if (localConnector != null)
  1020. localConnector.OnNeighborClosed(neighbor);
  1021. if (localMaintainer != null)
  1022. localMaintainer.OnNeighborClosed(neighbor);
  1023. if (localFlooder != null)
  1024. localFlooder.OnNeighborClosed(neighbor);
  1025. // Finally notify any Peernode client
  1026. EventHandler<PeerNeighborCloseEventArgs> handler = NeighborClosed;
  1027. if (handler != null)
  1028. {
  1029. handler(this, e);
  1030. }
  1031. }
  1032. void OnNeighborClosing(object sender, PeerNeighborCloseEventArgs e)
  1033. {
  1034. IPeerNeighbor neighbor = (IPeerNeighbor)sender;
  1035. PeerConnector localConnector;
  1036. localConnector = connector;
  1037. if (localConnector != null)
  1038. localConnector.OnNeighborClosing(neighbor, e.Reason);
  1039. // Finally notify any Peernode client
  1040. EventHandler<PeerNeighborCloseEventArgs> handler = NeighborClosing;
  1041. if (handler != null)
  1042. {
  1043. handler(this, e);
  1044. }
  1045. }
  1046. void OnNeighborConnected(object sender, EventArgs e)
  1047. {
  1048. IPeerNeighbor neighbor = (IPeerNeighbor)sender;
  1049. PeerMaintainer localMaintainer = maintainer;
  1050. PeerFlooder localFlooder = flooder;
  1051. if (localFlooder != null)
  1052. localFlooder.OnNeighborConnected(neighbor);
  1053. if (localMaintainer != null)
  1054. localMaintainer.OnNeighborConnected(neighbor);
  1055. UtilityExtension.OnNeighborConnected(neighbor);
  1056. // Finally notify any Peernode client
  1057. EventHandler handler = NeighborConnected;
  1058. if (handler != null)
  1059. {
  1060. handler(this, EventArgs.Empty);
  1061. }
  1062. }
  1063. // raised by the neighbor manager when any connection has reached the opened state
  1064. void OnNeighborAuthenticated(object sender, EventArgs e)
  1065. {
  1066. IPeerNeighbor n = (IPeerNeighbor)sender;
  1067. //hand the authenticated neighbor over to connector.
  1068. //If neighbor is aborted before
  1069. PeerConnector localConnector = connector;
  1070. if (localConnector != null)
  1071. connector.OnNeighborAuthenticated(n);
  1072. // Finally notify any Peernode client
  1073. EventHandler handler = NeighborOpened;
  1074. if (handler != null)
  1075. {
  1076. handler(this, EventArgs.Empty);
  1077. }
  1078. }
  1079. // Open blocks the thread until either Online happens or Open times out.
  1080. void OnOpen(TimeSpan timeout, bool waitForOnline)
  1081. {
  1082. bool aborted = false;
  1083. EventHandler connectedHandler = delegate(object source, EventArgs args) { connectCompletedEvent.Set(); };
  1084. EventHandler abortHandler = delegate(object source, EventArgs args) { aborted = true; connectCompletedEvent.Set(); };
  1085. openException = null; // clear out the open exception from the last Open attempt
  1086. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  1087. try
  1088. {
  1089. NeighborConnected += connectedHandler;
  1090. Aborted += abortHandler;
  1091. OpenCore(timeout);
  1092. if (waitForOnline)
  1093. {
  1094. if (!TimeoutHelper.WaitOne(connectCompletedEvent, timeoutHelper.RemainingTime()))
  1095. {
  1096. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException());
  1097. }
  1098. }
  1099. if (aborted)
  1100. {
  1101. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new CommunicationObjectAbortedException(SR.GetString(SR.PeerNodeAborted)));
  1102. }
  1103. // retrieve listen addresses and register with the resolver
  1104. if (isOpen)
  1105. {
  1106. if (openException != null)
  1107. {
  1108. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(openException);
  1109. }
  1110. else
  1111. {
  1112. string lclMeshId = null;
  1113. PeerNodeConfig lclConfig = null;
  1114. lock (ThisLock)
  1115. {
  1116. lclMeshId = meshId;
  1117. lclConfig = config;
  1118. }
  1119. // The design is such that any address change notifications are queued behind Open operation
  1120. // So, we need not worry about address changes racing with the initial registration.
  1121. RegisterAddress(lclMeshId, lclConfig.GetListenAddress(false), timeoutHelper.RemainingTime());
  1122. }
  1123. }
  1124. }
  1125. catch (Exception e)
  1126. {
  1127. if (Fx.IsFatal(e)) throw;
  1128. CloseCore(TimeSpan.FromTicks(0), false);
  1129. throw;
  1130. }
  1131. finally
  1132. {
  1133. NeighborConnected -= connectedHandler;
  1134. Aborted -= abortHandler;
  1135. }
  1136. }
  1137. internal void Open(TimeSpan timeout, bool waitForOnline)
  1138. {
  1139. stateManager.Open(timeout, waitForOnline);
  1140. }
  1141. // the core functionality of open (all but waiting for a connection)
  1142. void OpenCore(TimeSpan timeout)
  1143. {
  1144. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  1145. PeerMaintainer lclMaintainer;
  1146. PeerNodeConfig lclConfig;
  1147. string lclMeshId;
  1148. lock (ThisLock)
  1149. {
  1150. if (ListenUri == null)
  1151. {
  1152. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.ListenUriNotSet, this.GetType())));
  1153. }
  1154. // extract mesh id from listen uri
  1155. meshId = ListenUri.Host;
  1156. // generate the node id
  1157. byte[] bytes = new byte[sizeof(ulong)];
  1158. ulong nodeId = 0;
  1159. do
  1160. {
  1161. System.ServiceModel.Security.CryptoHelper.FillRandomBytes(bytes);
  1162. for (int i = 0; i < sizeof(ulong); i++)
  1163. nodeId |= ((ulong)bytes[i]) << i * 8;
  1164. }
  1165. while (nodeId == PeerTransportConstants.InvalidNodeId);
  1166. // now that the node id has been generated, create the trace record that describes this
  1167. traceRecord = new PeerNodeTraceRecord(nodeId, meshId);
  1168. if (DiagnosticUtility.ShouldTraceInformation)
  1169. {
  1170. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerNodeOpening, SR.GetString(SR.TraceCodePeerNodeOpening), this.traceRecord, this, null);
  1171. }
  1172. // create the node configuration
  1173. config = new PeerNodeConfig(meshId,
  1174. nodeId,
  1175. resolver,
  1176. messagePropagationFilter,
  1177. encoder,
  1178. ListenUri, listenIPAddress, port,
  1179. maxReceivedMessageSize, minNeighbors, idealNeighbors, maxNeighbors, maxReferrals,
  1180. connectTimeout, maintainerInterval,
  1181. securityManager,
  1182. this.readerQuotas,
  1183. this.maxBufferPoolSize,
  1184. this.MaxSendQueue,
  1185. this.MaxReceiveQueue);
  1186. // create components
  1187. if (listenIPAddress != null)
  1188. ipHelper = new PeerIPHelper(listenIPAddress);
  1189. else
  1190. ipHelper = new PeerIPHelper();
  1191. bufferManager = BufferManager.CreateBufferManager(64 * config.MaxReceivedMessageSize, (int)config.MaxReceivedMessageSize);
  1192. neighborManager = new PeerNeighborManager(ipHelper,
  1193. config,
  1194. this);
  1195. flooder = PeerFlooder.CreateFlooder(config, neighborManager, this);
  1196. maintainer = new PeerMaintainer(config, neighborManager, flooder);
  1197. connector = new PeerConnector(config, neighborManager, maintainer);
  1198. Dictionary<Type, object> services = serviceHandlers;
  1199. if (services == null)
  1200. {
  1201. services = new Dictionary<Type, object>();
  1202. services.Add(typeof(IPeerConnectorContract), connector);
  1203. services.Add(typeof(IPeerFlooderContract<Message, UtilityInfo>), flooder);
  1204. }
  1205. service = new PeerService(this.config,
  1206. neighborManager.ProcessIncomingChannel,
  1207. neighborManager.GetNeighborFromProxy,
  1208. services,
  1209. this);
  1210. this.securityManager.MeshId = this.meshId;
  1211. service.Open(timeoutHelper.RemainingTime());
  1212. // register for events
  1213. neighborManager.NeighborClosed += new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosed);
  1214. neighborManager.NeighborClosing += new EventHandler<PeerNeighborCloseEventArgs>(OnNeighborClosing);
  1215. neighborManager.NeighborConnected += new EventHandler(OnNeighborConnected);
  1216. neighborManager.NeighborOpened += new EventHandler(this.SecurityManager.OnNeighborOpened);
  1217. this.securityManager.OnNeighborAuthenticated += new EventHandler(this.OnNeighborAuthenticated);
  1218. neighborManager.Online += new EventHandler(FireOnline);
  1219. neighborManager.Offline += new EventHandler(FireOffline);
  1220. ipHelper.AddressChanged += new EventHandler(stateManager.OnIPAddressesChanged);
  1221. // open components
  1222. ipHelper.Open();
  1223. // Set the listen address before opening any more components
  1224. PeerNodeAddress nodeAddress = new PeerNodeAddress(service.GetListenAddress(), ipHelper.GetLocalAddresses());
  1225. config.SetListenAddress(nodeAddress);
  1226. neighborManager.Open(service.Binding, service);
  1227. connector.Open();
  1228. maintainer.Open();
  1229. flooder.Open();
  1230. isOpen = true;
  1231. completeTraceRecord = new PeerNodeTraceRecord(nodeId, meshId, nodeAddress);
  1232. // Set these locals inside the lock (Abort may occur whilst Opening)
  1233. lclMaintainer = maintainer;
  1234. lclMeshId = meshId;
  1235. lclConfig = config;
  1236. openException = null;
  1237. }
  1238. // retrieve listen addresses and register with the resolver
  1239. if (isOpen)
  1240. {
  1241. // attempt to connect to the mesh
  1242. lclMaintainer.ScheduleConnect(new PeerMaintainer.ConnectCallback(OnConnectionAttemptCompleted));
  1243. }
  1244. }
  1245. void DeliverMessageToClientChannels(
  1246. object registrant,
  1247. MessageBuffer messageBuffer,
  1248. Uri via,
  1249. Uri peerTo,
  1250. string contentType,
  1251. int messageSize,
  1252. int index,
  1253. MessageHeader hopHeader)
  1254. {
  1255. Message message = null;
  1256. try
  1257. {
  1258. // create a list of callbacks so they can each be called outside the lock
  1259. ArrayList callbacks = new ArrayList();
  1260. Uri to = peerTo;
  1261. Fx.Assert(peerTo != null, "Invalid To header value!");
  1262. if (isOpen)
  1263. {
  1264. lock (ThisLock)
  1265. {
  1266. if (isOpen)
  1267. {
  1268. foreach (MessageFilterRegistration mfr in messageFilters.Values)
  1269. {
  1270. // first, the via's must match
  1271. bool match = CompareVia(via, mfr.via);
  1272. if (messageSize < 0)
  1273. {
  1274. //messageSize <0 indicates that this message is coming from BeginSend
  1275. //and the size is not computed yet.
  1276. if (message == null)
  1277. {
  1278. message = messageBuffer.CreateMessage();
  1279. Fx.Assert(message.Headers.To == to, "To Header is inconsistent in Send() case!");
  1280. Fx.Assert(message.Properties.Via == via, "Via property is inconsistent in Send() case!");
  1281. }
  1282. //incoming message need not be verified MaxReceivedSize
  1283. //only do this for local channels
  1284. if (registrant != null)
  1285. {
  1286. ArraySegment<byte> buffer = encoder.WriteMessage(message, int.MaxValue, bufferManager);
  1287. messageSize = (int)buffer.Count;
  1288. }
  1289. }
  1290. // only queue the message for registrants expecting this size
  1291. match = match && (messageSize <= mfr.settings.MaxReceivedMessageSize);
  1292. // if a filter is specified, it must match as well
  1293. if (match && mfr.filters != null)
  1294. {
  1295. for (int i = 0; match && i < mfr.filters.Length; i++)
  1296. {
  1297. match = mfr.filters[i].Match(via, to);
  1298. }
  1299. }
  1300. if (match)
  1301. {
  1302. callbacks.Add(mfr.callback);
  1303. }
  1304. }
  1305. }
  1306. }
  1307. }
  1308. foreach (MessageAvailableCallback callback in callbacks)
  1309. {
  1310. Message localCopy;
  1311. try
  1312. {
  1313. //this copy is free'd by SFx.
  1314. localCopy = messageBuffer.CreateMessage();
  1315. localCopy.Properties.Via = via;
  1316. localCopy.Headers.To = to;
  1317. //mark security header as understood.
  1318. try
  1319. {
  1320. int i = localCopy.Headers.FindHeader(SecurityJan2004Strings.Security, SecurityJan2004Strings.Namespace);
  1321. if (i >= 0)
  1322. {
  1323. localCopy.Headers.AddUnderstood(i);
  1324. }
  1325. }
  1326. catch (MessageHeaderException e)
  1327. {
  1328. DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
  1329. }
  1330. catch (SerializationException e)
  1331. {
  1332. DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
  1333. }
  1334. catch (XmlException e)
  1335. {
  1336. DiagnosticUtility.TraceHandledException(e, TraceEventType.Warning);
  1337. }
  1338. if (index != -1)
  1339. {
  1340. localCopy.Headers.ReplaceAt(index, hopHeader);
  1341. }
  1342. callback(localCopy);
  1343. }
  1344. catch (ObjectDisposedException e)
  1345. {
  1346. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  1347. }
  1348. catch (CommunicationObjectAbortedException e)
  1349. {
  1350. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  1351. }
  1352. catch (CommunicationObjectFaultedException e)
  1353. {
  1354. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  1355. }
  1356. }
  1357. }
  1358. finally
  1359. {
  1360. if (message != null)
  1361. message.Close();
  1362. }
  1363. }
  1364. public void RefreshConnection()
  1365. {
  1366. PeerMaintainer lclMaintainer = null;
  1367. lock (ThisLock)
  1368. {
  1369. ThrowIfNotOpen();
  1370. lclMaintainer = maintainer;
  1371. }
  1372. if (lclMaintainer != null)
  1373. {
  1374. lclMaintainer.RefreshConnection();
  1375. }
  1376. }
  1377. public void PingConnections()
  1378. {
  1379. PeerMaintainer lclMaintainer = null;
  1380. lock (ThisLock)
  1381. {
  1382. lclMaintainer = maintainer;
  1383. }
  1384. if (lclMaintainer != null)
  1385. {
  1386. lclMaintainer.PingConnections();
  1387. }
  1388. }
  1389. //always call methods from inside a lock (of the container)
  1390. class RefCountedSecurityProtocol
  1391. {
  1392. int refCount;
  1393. public SecurityProtocol Protocol;
  1394. public RefCountedSecurityProtocol(SecurityProtocol securityProtocol)
  1395. {
  1396. this.Protocol = securityProtocol;
  1397. this.refCount = 1;
  1398. }
  1399. public int AddRef()
  1400. {
  1401. return ++refCount;
  1402. }
  1403. public int Release()
  1404. {
  1405. return --refCount;
  1406. }
  1407. }
  1408. // internal message filtering
  1409. internal void RegisterMessageFilter(object registrant, Uri via, PeerMessageFilter[] filters,
  1410. ITransportFactorySettings settings, MessageAvailableCallback callback, SecurityProtocol securityProtocol)
  1411. {
  1412. MessageFilterRegistration registration = new MessageFilterRegistration();
  1413. registration.registrant = registrant;
  1414. registration.via = via;
  1415. registration.filters = filters;
  1416. registration.settings = settings;
  1417. registration.callback = callback;
  1418. registration.securityProtocol = securityProtocol;
  1419. lock (ThisLock)
  1420. {
  1421. messageFilters.Add(registrant, registration);
  1422. RefCountedSecurityProtocol protocolWrapper = null;
  1423. if (!this.uri2SecurityProtocol.TryGetValue(via, out protocolWrapper))
  1424. {
  1425. protocolWrapper = new RefCountedSecurityProtocol(securityProtocol);
  1426. this.uri2SecurityProtocol.Add(via, protocolWrapper);
  1427. }
  1428. else
  1429. protocolWrapper.AddRef();
  1430. }
  1431. }
  1432. // internal method to release the reference on an existing PeerNode
  1433. internal void Release()
  1434. {
  1435. lock (peerNodes)
  1436. {
  1437. if (peerNodes.ContainsValue(this))
  1438. {
  1439. if (--refCount == 0)
  1440. {
  1441. // no factories/channels are using this instance (although the application may still be
  1442. // referring to it directly). either way, we remove this from the registry
  1443. peerNodes.Remove(listenUri);
  1444. }
  1445. }
  1446. }
  1447. }
  1448. // Call with null to reset to our implementation
  1449. public void SetServiceHandlers(Dictionary<Type, object> services)
  1450. {
  1451. lock (ThisLock)
  1452. {
  1453. serviceHandlers = services;
  1454. }
  1455. }
  1456. void ThrowIfNotOpen()
  1457. {
  1458. if (!isOpen)
  1459. {
  1460. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(SR.TransportManagerNotOpen)));
  1461. }
  1462. }
  1463. void ThrowIfOpen()
  1464. {
  1465. if (isOpen)
  1466. {
  1467. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidOperationException(SR.GetString(
  1468. SR.TransportManagerOpen)));
  1469. }
  1470. }
  1471. public override string ToString()
  1472. {
  1473. lock (ThisLock)
  1474. {
  1475. // if open return the mesh id, otherwise return the type
  1476. if (isOpen)
  1477. return string.Format(System.Globalization.CultureInfo.InvariantCulture,
  1478. "{0} ({1})", MeshId, NodeId);
  1479. else
  1480. return this.GetType().ToString();
  1481. }
  1482. }
  1483. internal void UnregisterMessageFilter(object registrant, Uri via)
  1484. {
  1485. lock (ThisLock)
  1486. {
  1487. messageFilters.Remove(registrant);
  1488. RefCountedSecurityProtocol protocolWrapper = null;
  1489. if (uri2SecurityProtocol.TryGetValue(via, out protocolWrapper))
  1490. {
  1491. if (protocolWrapper.Release() == 0)
  1492. uri2SecurityProtocol.Remove(via);
  1493. }
  1494. else
  1495. Fx.Assert(false, "Corresponding SecurityProtocol is not Found!");
  1496. }
  1497. }
  1498. internal static void ValidateVia(Uri uri)
  1499. {
  1500. int viaSize = Encoding.UTF8.GetByteCount(uri.OriginalString);
  1501. if (viaSize > maxViaSize)
  1502. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new InvalidDataException(SR.GetString(
  1503. SR.PeerChannelViaTooLong, uri, viaSize, maxViaSize)));
  1504. }
  1505. internal class ChannelRegistration
  1506. {
  1507. public object registrant;
  1508. public Uri via;
  1509. public ITransportFactorySettings settings;
  1510. public SecurityProtocol securityProtocol;
  1511. public Type channelType;
  1512. }
  1513. // holds the registration information passed in by channels and listeners. This informtaion is used
  1514. // to determine which channels and listeners will receive an incoming message
  1515. class MessageFilterRegistration : ChannelRegistration
  1516. {
  1517. public PeerMessageFilter[] filters;
  1518. public MessageAvailableCallback callback;
  1519. }
  1520. // represents the settings of a PeerListenerFactory or PeerChannelFactory, used to create a new
  1521. // PeerNode or compare settings to an existing PeerNode
  1522. internal class Registration
  1523. {
  1524. IPAddress listenIPAddress;
  1525. Uri listenUri;
  1526. long maxReceivedMessageSize;
  1527. int port;
  1528. PeerResolver resolver;
  1529. PeerSecurityManager securityManager;
  1530. XmlDictionaryReaderQuotas readerQuotas;
  1531. long maxBufferPoolSize;
  1532. public Registration(Uri listenUri, IPeerFactory factory)
  1533. {
  1534. if (factory.Resolver == null)
  1535. {
  1536. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(
  1537. new InvalidOperationException(SR.GetString(SR.PeerResolverRequired)));
  1538. }
  1539. if (factory.ListenIPAddress != null)
  1540. {
  1541. listenIPAddress = factory.ListenIPAddress;
  1542. }
  1543. this.listenUri = new UriBuilder(PeerStrings.Scheme, listenUri.Host).Uri;
  1544. this.port = factory.Port;
  1545. this.maxReceivedMessageSize = factory.MaxReceivedMessageSize;
  1546. this.resolver = factory.Resolver;
  1547. this.securityManager = factory.SecurityManager;
  1548. this.readerQuotas = new XmlDictionaryReaderQuotas();
  1549. factory.ReaderQuotas.CopyTo(this.readerQuotas);
  1550. this.maxBufferPoolSize = factory.MaxBufferPoolSize;
  1551. }
  1552. bool HasMismatchedReaderQuotas(XmlDictionaryReaderQuotas existingOne, XmlDictionaryReaderQuotas newOne, out string result)
  1553. {
  1554. //check for properties that affect the message
  1555. result = null;
  1556. if (existingOne.MaxArrayLength != newOne.MaxArrayLength)
  1557. result = PeerBindingPropertyNames.ReaderQuotasDotArrayLength;
  1558. else if (existingOne.MaxStringContentLength != newOne.MaxStringContentLength)
  1559. result = PeerBindingPropertyNames.ReaderQuotasDotStringLength;
  1560. else if (existingOne.MaxDepth != newOne.MaxDepth)
  1561. result = PeerBindingPropertyNames.ReaderQuotasDotMaxDepth;
  1562. else if (existingOne.MaxNameTableCharCount != newOne.MaxNameTableCharCount)
  1563. result = PeerBindingPropertyNames.ReaderQuotasDotMaxCharCount;
  1564. else if (existingOne.MaxBytesPerRead != newOne.MaxBytesPerRead)
  1565. result = PeerBindingPropertyNames.ReaderQuotasDotMaxBytesPerRead;
  1566. return result != null;
  1567. }
  1568. public void CheckIfCompatible(PeerNodeImplementation peerNode, Uri via)
  1569. {
  1570. string mismatch = null;
  1571. // test the settings that must be identical
  1572. if (listenUri != peerNode.ListenUri)
  1573. mismatch = PeerBindingPropertyNames.ListenUri;
  1574. else if (port != peerNode.Port)
  1575. mismatch = PeerBindingPropertyNames.Port;
  1576. else if (maxReceivedMessageSize != peerNode.MaxReceivedMessageSize)
  1577. mismatch = PeerBindingPropertyNames.MaxReceivedMessageSize;
  1578. else if (maxBufferPoolSize != peerNode.MaxBufferPoolSize)
  1579. mismatch = PeerBindingPropertyNames.MaxBufferPoolSize;
  1580. else if (HasMismatchedReaderQuotas(peerNode.ReaderQuotas, readerQuotas, out mismatch))
  1581. { }
  1582. else if (resolver.GetType() != peerNode.Resolver.GetType())
  1583. mismatch = PeerBindingPropertyNames.Resolver;
  1584. else if (!resolver.Equals(peerNode.Resolver))
  1585. mismatch = PeerBindingPropertyNames.ResolverSettings;
  1586. else if (listenIPAddress != peerNode.ListenIPAddress)
  1587. {
  1588. if ((listenIPAddress == null || peerNode.ListenIPAddress == null)
  1589. ||
  1590. (!listenIPAddress.Equals(peerNode.ListenIPAddress)))
  1591. mismatch = PeerBindingPropertyNames.ListenIPAddress;
  1592. }
  1593. else if ((securityManager == null) && (peerNode.SecurityManager != null))
  1594. mismatch = PeerBindingPropertyNames.Security;
  1595. if (mismatch != null)
  1596. PeerExceptionHelper.ThrowInvalidOperation_PeerConflictingPeerNodeSettings(mismatch);
  1597. securityManager.CheckIfCompatibleNodeSettings(peerNode.SecurityManager);
  1598. }
  1599. public PeerNodeImplementation CreatePeerNode()
  1600. {
  1601. PeerNodeImplementation peerNode = new PeerNodeImplementation();
  1602. peerNode.ListenIPAddress = listenIPAddress;
  1603. peerNode.ListenUri = listenUri;
  1604. peerNode.MaxReceivedMessageSize = maxReceivedMessageSize;
  1605. peerNode.Port = port;
  1606. peerNode.Resolver = resolver;
  1607. peerNode.SecurityManager = securityManager;
  1608. this.readerQuotas.CopyTo(peerNode.readerQuotas);
  1609. peerNode.MaxBufferPoolSize = maxBufferPoolSize;
  1610. return peerNode;
  1611. }
  1612. }
  1613. class SendAsyncResult : AsyncResult
  1614. {
  1615. bool floodComplete = false;
  1616. bool localDispatchComplete = false;
  1617. object thisLock = new object();
  1618. object ThisLock { get { return thisLock; } }
  1619. Exception floodException = null;
  1620. public SendAsyncResult(AsyncCallback callback, object state) : base(callback, state) { }
  1621. public void OnFloodComplete(IAsyncResult result)
  1622. {
  1623. if (this.floodComplete || this.IsCompleted)
  1624. return;
  1625. bool complete = false;
  1626. lock (this.ThisLock)
  1627. {
  1628. if (this.localDispatchComplete)
  1629. complete = true;
  1630. this.floodComplete = true;
  1631. }
  1632. try
  1633. {
  1634. PeerFlooder.EndFloodEncodedMessage(result);
  1635. }
  1636. catch (Exception e)
  1637. {
  1638. if (Fx.IsFatal(e)) throw;
  1639. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  1640. floodException = e;
  1641. }
  1642. if (complete)
  1643. {
  1644. this.Complete(result.CompletedSynchronously, floodException);
  1645. }
  1646. }
  1647. public void OnLocalDispatchComplete(IAsyncResult result)
  1648. {
  1649. SendAsyncResult sr = (SendAsyncResult)result;
  1650. if (this.localDispatchComplete || this.IsCompleted)
  1651. return;
  1652. bool complete = false;
  1653. lock (this.ThisLock)
  1654. {
  1655. if (this.floodComplete)
  1656. complete = true;
  1657. this.localDispatchComplete = true;
  1658. }
  1659. if (complete)
  1660. {
  1661. this.Complete(true, floodException);
  1662. }
  1663. }
  1664. public static void End(IAsyncResult result)
  1665. {
  1666. AsyncResult.End<SendAsyncResult>(result);
  1667. }
  1668. }
  1669. bool IPeerNodeMessageHandling.HasMessagePropagation
  1670. {
  1671. get
  1672. {
  1673. return this.messagePropagationFilter != null;
  1674. }
  1675. }
  1676. bool IPeerNodeMessageHandling.IsKnownVia(Uri via)
  1677. {
  1678. bool result = false;
  1679. lock (ThisLock)
  1680. {
  1681. result = uri2SecurityProtocol.ContainsKey(via);
  1682. }
  1683. return result;
  1684. }
  1685. bool IPeerNodeMessageHandling.IsNotSeenBefore(Message message, out byte[] id, out int cacheMiss)
  1686. {
  1687. PeerFlooder lclFlooder = flooder;
  1688. id = DefaultId;
  1689. cacheMiss = -1;
  1690. return (lclFlooder != null && lclFlooder.IsNotSeenBefore(message, out id, out cacheMiss));
  1691. }
  1692. public MessageEncodingBindingElement EncodingBindingElement
  1693. {
  1694. get
  1695. {
  1696. return this.EncodingElement;
  1697. }
  1698. }
  1699. }
  1700. interface IPeerNodeMessageHandling
  1701. {
  1702. void HandleIncomingMessage(MessageBuffer messageBuffer, PeerMessagePropagation propagateFlags, int index, MessageHeader header, Uri via, Uri to);
  1703. PeerMessagePropagation DetermineMessagePropagation(Message message, PeerMessageOrigination origination);
  1704. bool HasMessagePropagation { get; }
  1705. bool ValidateIncomingMessage(ref Message data, Uri via);
  1706. bool IsKnownVia(Uri via);
  1707. bool IsNotSeenBefore(Message message, out byte[] id, out int cacheMiss);
  1708. MessageEncodingBindingElement EncodingBindingElement { get; }
  1709. }
  1710. }