ConnectAlgorithms.cs 18 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.Runtime;
  9. using System.ServiceModel;
  10. using System.ServiceModel.Diagnostics;
  11. using System.Threading;
  12. using System.ServiceModel.Diagnostics.Application;
  13. // Graph maintainence algorithms.
  14. sealed class ConnectAlgorithms : IConnectAlgorithms
  15. {
  16. static Random random = new Random();
  17. int wantedConnectionCount = 0;
  18. EventWaitHandle addNeighbor = new EventWaitHandle(true, EventResetMode.ManualReset);
  19. EventWaitHandle maintainerClosed = new EventWaitHandle(false, EventResetMode.ManualReset);
  20. EventWaitHandle welcomeReceived = new EventWaitHandle(false, EventResetMode.ManualReset);
  21. Dictionary<Uri, PeerNodeAddress> nodeAddresses = new Dictionary<Uri, PeerNodeAddress>();
  22. PeerNodeConfig config;
  23. Dictionary<Uri, PeerNodeAddress> pendingConnectedNeighbor = new Dictionary<Uri, PeerNodeAddress>();
  24. object thisLock = new object();
  25. IPeerMaintainer maintainer = null;
  26. bool disposed = false;
  27. public void Initialize(IPeerMaintainer maintainer, PeerNodeConfig config, int wantedConnectionCount, Dictionary<EndpointAddress, Referral> referralCache)
  28. {
  29. this.maintainer = maintainer;
  30. this.config = config;
  31. this.wantedConnectionCount = wantedConnectionCount;
  32. UpdateEndpointsCollection(referralCache.Values); // Add to the endpoints connection anything in the referralsCache
  33. // Hook up the event handlers
  34. maintainer.NeighborClosed += OnNeighborClosed;
  35. maintainer.NeighborConnected += OnNeighborConnected;
  36. maintainer.MaintainerClosed += OnMaintainerClosed;
  37. maintainer.ReferralsAdded += OnReferralsAdded;
  38. }
  39. // instance lock
  40. object ThisLock
  41. {
  42. get { return thisLock; }
  43. }
  44. public void Connect(TimeSpan timeout)
  45. {
  46. TimeoutHelper timeoutHelper = new TimeoutHelper(timeout);
  47. addNeighbor.Set(); // We are trying to add a neighbor
  48. List<IAsyncResult> results = new List<IAsyncResult>();
  49. List<WaitHandle> handles = new List<WaitHandle>();
  50. // While we have more to endpoints try and we have connections pending and we are not connected upto ideal yet, and the maintainer is still open
  51. while (results.Count != 0
  52. || (((nodeAddresses.Count != 0 || pendingConnectedNeighbor.Count != 0) && maintainer.IsOpen)
  53. && maintainer.ConnectedNeighborCount < wantedConnectionCount))
  54. {
  55. try
  56. {
  57. handles.Clear();
  58. foreach (IAsyncResult iar in results)
  59. {
  60. handles.Add(iar.AsyncWaitHandle);
  61. }
  62. handles.Add(welcomeReceived); // One of our connect requests resulted in a welcome or neighborManager was shutting down
  63. handles.Add(maintainerClosed); // One of our connect requests resulted in a welcome or neighborManager was shutting down
  64. handles.Add(addNeighbor); // Make the last waithandle the add a neighbor signal
  65. int index = WaitHandle.WaitAny(handles.ToArray(), config.ConnectTimeout, false);
  66. if (index == results.Count) // welcomeReceived was signalled
  67. {
  68. welcomeReceived.Reset();
  69. }
  70. else if (index == results.Count + 1) // maintainerClosed was signalled
  71. {
  72. maintainerClosed.Reset();
  73. lock (ThisLock)
  74. {
  75. nodeAddresses.Clear();
  76. }
  77. }
  78. else if (index == results.Count + 2) // addNeighbor was signalled
  79. {
  80. // We need to open a new neighbor
  81. if (nodeAddresses.Count > 0)
  82. {
  83. if (pendingConnectedNeighbor.Count + maintainer.ConnectedNeighborCount < wantedConnectionCount)
  84. {
  85. PeerNodeAddress epr = null;
  86. lock (ThisLock)
  87. {
  88. if (nodeAddresses.Count == 0 || !maintainer.IsOpen) // nodeAddresses or maintainer is closed got updated better cycle
  89. {
  90. addNeighbor.Reset();
  91. continue;
  92. }
  93. int index2 = random.Next() % nodeAddresses.Count;
  94. ICollection<Uri> keys = nodeAddresses.Keys;
  95. int i = 0;
  96. Uri key = null;
  97. foreach (Uri uri in keys)
  98. {
  99. if (i++ == index2)
  100. {
  101. key = uri;
  102. break;
  103. }
  104. }
  105. Fx.Assert(key != null, "key cannot be null here");
  106. epr = nodeAddresses[key];
  107. Fx.Assert(epr != null, "epr cannot be null here");
  108. nodeAddresses.Remove(key);
  109. }
  110. if (maintainer.FindDuplicateNeighbor(epr) == null
  111. && pendingConnectedNeighbor.ContainsKey(GetEndpointUri(epr)) == false)
  112. {
  113. lock (ThisLock)
  114. {
  115. pendingConnectedNeighbor.Add(GetEndpointUri(epr), epr);
  116. }
  117. // If the neighborManager is not open this call is going to throw.
  118. // It throws ObjectDisposed exception.
  119. // This check merely eliminates the perf hit, this check is not strictly necessary
  120. // but cuts down the window for the ---- that will result in a throw to a miniscule level
  121. // We ---- the throw because we are closing down
  122. try
  123. {
  124. if (maintainer.IsOpen)
  125. {
  126. if (DiagnosticUtility.ShouldTraceInformation)
  127. {
  128. PeerMaintainerTraceRecord record = new PeerMaintainerTraceRecord(SR.GetString(SR.PeerMaintainerConnect, epr, this.config.MeshId));
  129. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerMaintainerActivity, SR.GetString(SR.TraceCodePeerMaintainerActivity),
  130. record, this, null);
  131. }
  132. IAsyncResult iar = maintainer.BeginOpenNeighbor(epr, timeoutHelper.RemainingTime(), null, epr);
  133. results.Add(iar);
  134. }
  135. }
  136. catch (Exception e)
  137. {
  138. if (Fx.IsFatal(e)) throw;
  139. if (DiagnosticUtility.ShouldTraceInformation)
  140. {
  141. PeerMaintainerTraceRecord record = new PeerMaintainerTraceRecord(SR.GetString(SR.PeerMaintainerConnectFailure, epr, this.config.MeshId, e.Message));
  142. TraceUtility.TraceEvent(TraceEventType.Information, TraceCode.PeerMaintainerActivity, SR.GetString(SR.TraceCodePeerMaintainerActivity),
  143. record, this, null);
  144. }
  145. // I need to remove the epr just began because the BeginOpen threw.
  146. // However Object Disposed can arise as a result of a ---- between PeerNode.Close()
  147. // and Connect trying to reconnect nodes.
  148. pendingConnectedNeighbor.Remove(GetEndpointUri(epr));
  149. if (!(e is ObjectDisposedException)) throw;
  150. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  151. }
  152. }
  153. }
  154. }
  155. if (nodeAddresses.Count == 0 || pendingConnectedNeighbor.Count + maintainer.ConnectedNeighborCount == wantedConnectionCount)
  156. {
  157. addNeighbor.Reset();
  158. }
  159. }
  160. else if (index != WaitHandle.WaitTimeout)
  161. {
  162. // We have completed this thing remove it from results
  163. IAsyncResult iar = results[index];
  164. results.RemoveAt(index);
  165. IPeerNeighbor neighbor = null;
  166. try
  167. {
  168. // Get opened neighbor and fire NeighborOpened notification
  169. neighbor = maintainer.EndOpenNeighbor(iar);
  170. }
  171. catch (Exception e)
  172. {
  173. if (Fx.IsFatal(e)) throw;
  174. pendingConnectedNeighbor.Remove(GetEndpointUri((PeerNodeAddress)iar.AsyncState));
  175. throw;
  176. }
  177. }
  178. else
  179. {
  180. //A timeout occured no connections progressed, try some more connections
  181. //This may result in more than wantedConnectionCount connections if the timeout connections were
  182. // merely being slow
  183. pendingConnectedNeighbor.Clear();
  184. results.Clear();
  185. addNeighbor.Set();
  186. }
  187. }
  188. catch (CommunicationException e)
  189. {
  190. // mostly likely the endpoint could not be reached, but any channel exception means we should try another node
  191. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  192. addNeighbor.Set();
  193. }
  194. catch (TimeoutException e)
  195. {
  196. if (TD.OpenTimeoutIsEnabled())
  197. {
  198. TD.OpenTimeout(e.Message);
  199. }
  200. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  201. addNeighbor.Set();
  202. }
  203. }
  204. }
  205. void IDisposable.Dispose()
  206. {
  207. if (!disposed)
  208. {
  209. lock (ThisLock)
  210. {
  211. if (!disposed)
  212. {
  213. disposed = true;
  214. maintainer.ReferralsAdded -= OnReferralsAdded;
  215. maintainer.MaintainerClosed -= OnMaintainerClosed;
  216. maintainer.NeighborClosed -= OnNeighborClosed;
  217. maintainer.NeighborConnected -= OnNeighborConnected;
  218. addNeighbor.Close();
  219. maintainerClosed.Close();
  220. welcomeReceived.Close();
  221. }
  222. }
  223. }
  224. }
  225. // This method exists to minimize code churn if PeerNodeAddress is refactored later to derive from EndpointAddress
  226. static Uri GetEndpointUri(PeerNodeAddress address)
  227. {
  228. return address.EndpointAddress.Uri;
  229. }
  230. // Algorithm to prune connections
  231. // This implementation will reduce the number of connections to config.IdealNeighbors
  232. // by examining LinkUtility and selecting the neighbor with the lowest and then disconnecting it
  233. public void PruneConnections()
  234. {
  235. while (maintainer.NonClosingNeighborCount > config.IdealNeighbors && maintainer.IsOpen)
  236. {
  237. IPeerNeighbor leastUseful = maintainer.GetLeastUsefulNeighbor();
  238. if (leastUseful == null)
  239. break;
  240. maintainer.CloseNeighbor(leastUseful, PeerCloseReason.NotUsefulNeighbor);
  241. }
  242. }
  243. // Helper method for updating the end points list
  244. public void UpdateEndpointsCollection(ICollection<PeerNodeAddress> src)
  245. {
  246. if (src != null)
  247. {
  248. lock (ThisLock)
  249. {
  250. foreach (PeerNodeAddress address in src)
  251. {
  252. UpdateEndpointsCollection(address);
  253. }
  254. }
  255. }
  256. }
  257. public void UpdateEndpointsCollection(ICollection<Referral> src)
  258. {
  259. if (src != null)
  260. {
  261. lock (ThisLock)
  262. {
  263. foreach (Referral referral in src)
  264. {
  265. UpdateEndpointsCollection(referral.Address);
  266. }
  267. }
  268. }
  269. }
  270. void UpdateEndpointsCollection(PeerNodeAddress address)
  271. {
  272. // Don't accept invalid addresses
  273. if (PeerValidateHelper.ValidNodeAddress(address))
  274. {
  275. Uri key = GetEndpointUri(address);
  276. if (!nodeAddresses.ContainsKey(key) && key != GetEndpointUri(maintainer.GetListenAddress()))
  277. {
  278. nodeAddresses[key] = address;
  279. }
  280. }
  281. }
  282. // When a connection occurs remove it from the list to look at
  283. void OnNeighborClosed(IPeerNeighbor neighbor)
  284. {
  285. if (neighbor.ListenAddress != null)
  286. {
  287. Uri address = GetEndpointUri(neighbor.ListenAddress);
  288. if (!disposed)
  289. {
  290. lock (ThisLock)
  291. {
  292. if (!disposed)
  293. {
  294. if (address != null && pendingConnectedNeighbor.ContainsKey(address))
  295. {
  296. pendingConnectedNeighbor.Remove(address);
  297. addNeighbor.Set();
  298. }
  299. }
  300. }
  301. }
  302. }
  303. }
  304. // When a connection occurs remove it from the list to look at
  305. void OnNeighborConnected(IPeerNeighbor neighbor)
  306. {
  307. Uri address = GetEndpointUri(neighbor.ListenAddress);
  308. if (!disposed)
  309. {
  310. lock (ThisLock)
  311. {
  312. if (!disposed)
  313. {
  314. if (address != null && pendingConnectedNeighbor.ContainsKey(address))
  315. {
  316. pendingConnectedNeighbor.Remove(address);
  317. }
  318. welcomeReceived.Set();
  319. }
  320. }
  321. }
  322. }
  323. void OnMaintainerClosed()
  324. {
  325. if (!disposed)
  326. {
  327. lock (ThisLock)
  328. {
  329. if (!disposed)
  330. {
  331. maintainerClosed.Set();
  332. }
  333. }
  334. }
  335. }
  336. // When a connection occurs add those to the group I look at
  337. void OnReferralsAdded(IList<Referral> referrals, IPeerNeighbor neighbor)
  338. {
  339. bool added = false;
  340. // Do some stuff here
  341. foreach (Referral referral in referrals)
  342. {
  343. if (!disposed)
  344. {
  345. lock (ThisLock)
  346. {
  347. if (!disposed)
  348. {
  349. if (!maintainer.IsOpen)
  350. return;
  351. Uri key = GetEndpointUri(referral.Address);
  352. if (key != GetEndpointUri(maintainer.GetListenAddress())) // make sure the referral is not mine
  353. {
  354. if (!nodeAddresses.ContainsKey(key)
  355. && !pendingConnectedNeighbor.ContainsKey(key)
  356. && maintainer.FindDuplicateNeighbor(referral.Address) == null)
  357. {
  358. nodeAddresses[key] = referral.Address;
  359. added = true;
  360. }
  361. }
  362. }
  363. }
  364. }
  365. }
  366. if (added)
  367. {
  368. if (maintainer.ConnectedNeighborCount < wantedConnectionCount)
  369. {
  370. addNeighbor.Set();
  371. }
  372. }
  373. }
  374. }
  375. }