CustomPeerResolverService.cs 24 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.PeerResolvers
  5. {
  6. using System;
  7. using System.Collections.Generic;
  8. using System.Runtime;
  9. using System.ServiceModel;
  10. using System.ServiceModel.Channels;
  11. using System.Threading;
  12. [ObsoleteAttribute ("PeerChannel feature is obsolete and will be removed in the future.", false)]
  13. [ServiceBehavior(UseSynchronizationContext = false, InstanceContextMode = InstanceContextMode.Single, ConcurrencyMode = ConcurrencyMode.Multiple)]
  14. public class CustomPeerResolverService : IPeerResolverContract
  15. {
  16. internal enum RegistrationState
  17. {
  18. OK, Deleted
  19. }
  20. internal class RegistrationEntry
  21. {
  22. Guid clientId;
  23. Guid registrationId;
  24. string meshId;
  25. DateTime expires;
  26. PeerNodeAddress address;
  27. RegistrationState state;
  28. public RegistrationEntry(Guid clientId, Guid registrationId, string meshId, DateTime expires, PeerNodeAddress address)
  29. {
  30. this.ClientId = clientId;
  31. this.RegistrationId = registrationId;
  32. this.MeshId = meshId;
  33. this.Expires = expires;
  34. this.Address = address;
  35. this.State = RegistrationState.OK;
  36. }
  37. public Guid ClientId
  38. {
  39. get { return clientId; }
  40. set { clientId = value; }
  41. }
  42. public Guid RegistrationId
  43. {
  44. get { return registrationId; }
  45. set { registrationId = value; }
  46. }
  47. public string MeshId
  48. {
  49. get { return meshId; }
  50. set { meshId = value; }
  51. }
  52. public DateTime Expires
  53. {
  54. get { return expires; }
  55. set { expires = value; }
  56. }
  57. public PeerNodeAddress Address
  58. {
  59. get { return address; }
  60. set { address = value; }
  61. }
  62. public RegistrationState State
  63. {
  64. get { return state; }
  65. set { state = value; }
  66. }
  67. }
  68. internal class LiteLock
  69. {
  70. bool forWrite;
  71. bool upgraded;
  72. ReaderWriterLock locker;
  73. TimeSpan timeout = TimeSpan.FromMinutes(1);
  74. LockCookie lc;
  75. LiteLock(ReaderWriterLock locker, bool forWrite)
  76. {
  77. this.locker = locker;
  78. this.forWrite = forWrite;
  79. }
  80. public static void Acquire(out LiteLock liteLock, ReaderWriterLock locker)
  81. {
  82. Acquire(out liteLock, locker, false);
  83. }
  84. public static void Acquire(out LiteLock liteLock, ReaderWriterLock locker, bool forWrite)
  85. {
  86. LiteLock theLock = new LiteLock(locker, forWrite);
  87. try { }
  88. finally
  89. {
  90. if (forWrite)
  91. {
  92. locker.AcquireWriterLock(theLock.timeout);
  93. }
  94. else
  95. {
  96. locker.AcquireReaderLock(theLock.timeout);
  97. }
  98. liteLock = theLock;
  99. }
  100. }
  101. public static void Release(LiteLock liteLock)
  102. {
  103. if (liteLock == null)
  104. {
  105. return;
  106. }
  107. if (liteLock.forWrite)
  108. {
  109. liteLock.locker.ReleaseWriterLock();
  110. }
  111. else
  112. {
  113. Fx.Assert(!liteLock.upgraded, "Can't release while upgraded!");
  114. liteLock.locker.ReleaseReaderLock();
  115. }
  116. }
  117. public void UpgradeToWriterLock()
  118. {
  119. Fx.Assert(!forWrite, "Invalid call to Upgrade!!");
  120. Fx.Assert(!upgraded, "Already upgraded!");
  121. try { }
  122. finally
  123. {
  124. lc = locker.UpgradeToWriterLock(timeout);
  125. upgraded = true;
  126. }
  127. }
  128. public void DowngradeFromWriterLock()
  129. {
  130. Fx.Assert(!forWrite, "Invalid call to Downgrade!!");
  131. if (upgraded)
  132. {
  133. locker.DowngradeFromWriterLock(ref lc);
  134. upgraded = false;
  135. }
  136. }
  137. }
  138. internal class MeshEntry
  139. {
  140. Dictionary<Guid, RegistrationEntry> entryTable;
  141. Dictionary<string, RegistrationEntry> service2EntryTable;
  142. List<RegistrationEntry> entryList;
  143. ReaderWriterLock gate;
  144. internal MeshEntry()
  145. {
  146. EntryTable = new Dictionary<Guid, RegistrationEntry>();
  147. Service2EntryTable = new Dictionary<string, RegistrationEntry>();
  148. EntryList = new List<RegistrationEntry>();
  149. Gate = new ReaderWriterLock();
  150. }
  151. public Dictionary<Guid, RegistrationEntry> EntryTable
  152. {
  153. get { return entryTable; }
  154. set { entryTable = value; }
  155. }
  156. public Dictionary<string, RegistrationEntry> Service2EntryTable
  157. {
  158. get { return service2EntryTable; }
  159. set { service2EntryTable = value; }
  160. }
  161. public List<RegistrationEntry> EntryList
  162. {
  163. get { return entryList; }
  164. set { entryList = value; }
  165. }
  166. public ReaderWriterLock Gate
  167. {
  168. get { return gate; }
  169. set { gate = value; }
  170. }
  171. }
  172. Dictionary<string, MeshEntry> meshId2Entry = new Dictionary<string, MeshEntry>();
  173. ReaderWriterLock gate;
  174. TimeSpan timeout = TimeSpan.FromMinutes(1);
  175. TimeSpan cleanupInterval = TimeSpan.FromMinutes(1);
  176. TimeSpan refreshInterval = TimeSpan.FromMinutes(10);
  177. bool controlShape;
  178. bool isCleaning;
  179. IOThreadTimer timer;
  180. object thisLock = new object();
  181. bool opened;
  182. TimeSpan LockWait = TimeSpan.FromSeconds(5);
  183. public CustomPeerResolverService()
  184. {
  185. isCleaning = false;
  186. gate = new ReaderWriterLock();
  187. }
  188. public TimeSpan CleanupInterval
  189. {
  190. get
  191. {
  192. return cleanupInterval;
  193. }
  194. set
  195. {
  196. if (value < TimeSpan.Zero)
  197. {
  198. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
  199. SR.GetString(SR.SFxTimeoutOutOfRange0)));
  200. }
  201. if (TimeoutHelper.IsTooLarge(value))
  202. {
  203. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
  204. SR.GetString(SR.SFxTimeoutOutOfRangeTooBig)));
  205. }
  206. lock (ThisLock)
  207. {
  208. ThrowIfOpened("Set CleanupInterval");
  209. this.cleanupInterval = value;
  210. }
  211. }
  212. }
  213. public TimeSpan RefreshInterval
  214. {
  215. get
  216. {
  217. return refreshInterval;
  218. }
  219. set
  220. {
  221. if (value < TimeSpan.Zero)
  222. {
  223. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
  224. SR.GetString(SR.SFxTimeoutOutOfRange0)));
  225. }
  226. if (TimeoutHelper.IsTooLarge(value))
  227. {
  228. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new ArgumentOutOfRangeException("value", value,
  229. SR.GetString(SR.SFxTimeoutOutOfRangeTooBig)));
  230. }
  231. lock (ThisLock)
  232. {
  233. ThrowIfOpened("Set RefreshInterval");
  234. this.refreshInterval = value;
  235. }
  236. }
  237. }
  238. public bool ControlShape
  239. {
  240. get
  241. {
  242. return this.controlShape;
  243. }
  244. set
  245. {
  246. lock (ThisLock)
  247. {
  248. ThrowIfOpened("Set ControlShape");
  249. this.controlShape = value;
  250. }
  251. }
  252. }
  253. MeshEntry GetMeshEntry(string meshId) { return GetMeshEntry(meshId, true); }
  254. MeshEntry GetMeshEntry(string meshId, bool createIfNotExists)
  255. {
  256. MeshEntry meshEntry = null;
  257. LiteLock ll = null;
  258. try
  259. {
  260. LiteLock.Acquire(out ll, gate);
  261. if (!this.meshId2Entry.TryGetValue(meshId, out meshEntry) && createIfNotExists)
  262. {
  263. meshEntry = new MeshEntry();
  264. try
  265. {
  266. ll.UpgradeToWriterLock();
  267. meshId2Entry.Add(meshId, meshEntry);
  268. }
  269. finally
  270. {
  271. ll.DowngradeFromWriterLock();
  272. }
  273. }
  274. }
  275. finally
  276. {
  277. LiteLock.Release(ll);
  278. }
  279. Fx.Assert(meshEntry != null || !createIfNotExists, "GetMeshEntry failed to get an entry!");
  280. return meshEntry;
  281. }
  282. public virtual RegisterResponseInfo Register(Guid clientId, string meshId, PeerNodeAddress address)
  283. {
  284. Guid registrationId = Guid.NewGuid();
  285. DateTime expiry = DateTime.UtcNow + RefreshInterval;
  286. RegistrationEntry entry = null;
  287. MeshEntry meshEntry = null;
  288. lock (ThisLock)
  289. {
  290. entry = new RegistrationEntry(clientId, registrationId, meshId, expiry, address);
  291. meshEntry = GetMeshEntry(meshId);
  292. if (meshEntry.Service2EntryTable.ContainsKey(address.ServicePath))
  293. PeerExceptionHelper.ThrowInvalidOperation_DuplicatePeerRegistration(address.ServicePath);
  294. LiteLock ll = null;
  295. try
  296. {
  297. // meshEntry.gate can be held by this thread for write if this is coming from update
  298. // else MUST not be held at all.
  299. if (!meshEntry.Gate.IsWriterLockHeld)
  300. {
  301. LiteLock.Acquire(out ll, meshEntry.Gate, true);
  302. }
  303. meshEntry.EntryTable.Add(registrationId, entry);
  304. meshEntry.EntryList.Add(entry);
  305. meshEntry.Service2EntryTable.Add(address.ServicePath, entry);
  306. }
  307. finally
  308. {
  309. if (ll != null)
  310. LiteLock.Release(ll);
  311. }
  312. }
  313. return new RegisterResponseInfo(registrationId, RefreshInterval);
  314. }
  315. public virtual RegisterResponseInfo Register(RegisterInfo registerInfo)
  316. {
  317. if (registerInfo == null)
  318. {
  319. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("registerInfo", SR.GetString(SR.PeerNullRegistrationInfo));
  320. }
  321. ThrowIfClosed("Register");
  322. if (!registerInfo.HasBody() || String.IsNullOrEmpty(registerInfo.MeshId))
  323. {
  324. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("registerInfo", SR.GetString(SR.PeerInvalidMessageBody, registerInfo));
  325. }
  326. return Register(registerInfo.ClientId, registerInfo.MeshId, registerInfo.NodeAddress);
  327. }
  328. public virtual RegisterResponseInfo Update(UpdateInfo updateInfo)
  329. {
  330. if (updateInfo == null)
  331. {
  332. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("updateInfo", SR.GetString(SR.PeerNullRegistrationInfo));
  333. }
  334. ThrowIfClosed("Update");
  335. if (!updateInfo.HasBody() || String.IsNullOrEmpty(updateInfo.MeshId) || updateInfo.NodeAddress == null)
  336. {
  337. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("updateInfo", SR.GetString(SR.PeerInvalidMessageBody, updateInfo));
  338. }
  339. Guid registrationId = updateInfo.RegistrationId;
  340. RegistrationEntry entry;
  341. MeshEntry meshEntry = GetMeshEntry(updateInfo.MeshId);
  342. LiteLock ll = null;
  343. //handle cases when Update ----s with Register.
  344. if (updateInfo.RegistrationId == Guid.Empty || meshEntry == null)
  345. return Register(updateInfo.ClientId, updateInfo.MeshId, updateInfo.NodeAddress);
  346. //
  347. // preserve locking order between ThisLock and the LiteLock.
  348. lock (ThisLock)
  349. {
  350. try
  351. {
  352. LiteLock.Acquire(out ll, meshEntry.Gate);
  353. if (!meshEntry.EntryTable.TryGetValue(updateInfo.RegistrationId, out entry))
  354. {
  355. try
  356. {
  357. // upgrade to writer lock
  358. ll.UpgradeToWriterLock();
  359. return Register(updateInfo.ClientId, updateInfo.MeshId, updateInfo.NodeAddress);
  360. }
  361. finally
  362. {
  363. ll.DowngradeFromWriterLock();
  364. }
  365. }
  366. lock (entry)
  367. {
  368. entry.Address = updateInfo.NodeAddress;
  369. entry.Expires = DateTime.UtcNow + this.RefreshInterval;
  370. }
  371. }
  372. finally
  373. {
  374. LiteLock.Release(ll);
  375. }
  376. }
  377. return new RegisterResponseInfo(registrationId, RefreshInterval);
  378. }
  379. public virtual ResolveResponseInfo Resolve(ResolveInfo resolveInfo)
  380. {
  381. if (resolveInfo == null)
  382. {
  383. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("resolveInfo", SR.GetString(SR.PeerNullResolveInfo));
  384. }
  385. ThrowIfClosed("Resolve");
  386. if (!resolveInfo.HasBody())
  387. {
  388. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("resolveInfo", SR.GetString(SR.PeerInvalidMessageBody, resolveInfo));
  389. }
  390. int currentCount = 0;
  391. int index = 0;
  392. int maxEntries = resolveInfo.MaxAddresses;
  393. ResolveResponseInfo response = new ResolveResponseInfo();
  394. List<PeerNodeAddress> results = new List<PeerNodeAddress>();
  395. List<RegistrationEntry> entries = null;
  396. PeerNodeAddress address;
  397. RegistrationEntry entry;
  398. MeshEntry meshEntry = GetMeshEntry(resolveInfo.MeshId, false);
  399. if (meshEntry != null)
  400. {
  401. LiteLock ll = null;
  402. try
  403. {
  404. LiteLock.Acquire(out ll, meshEntry.Gate);
  405. entries = meshEntry.EntryList;
  406. if (entries.Count <= maxEntries)
  407. {
  408. foreach (RegistrationEntry e in entries)
  409. {
  410. results.Add(e.Address);
  411. }
  412. }
  413. else
  414. {
  415. Random random = new Random();
  416. while (currentCount < maxEntries)
  417. {
  418. index = random.Next(entries.Count);
  419. entry = entries[index];
  420. Fx.Assert(entry.State == RegistrationState.OK, "A deleted registration is still around!");
  421. address = entry.Address;
  422. if (!results.Contains(address))
  423. results.Add(address);
  424. currentCount++;
  425. }
  426. }
  427. }
  428. finally
  429. {
  430. LiteLock.Release(ll);
  431. }
  432. }
  433. response.Addresses = results.ToArray();
  434. return response;
  435. }
  436. public virtual void Unregister(UnregisterInfo unregisterInfo)
  437. {
  438. if (unregisterInfo == null)
  439. {
  440. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("unregisterinfo", SR.GetString(SR.PeerNullRegistrationInfo));
  441. }
  442. ThrowIfClosed("Unregister");
  443. if (!unregisterInfo.HasBody() || String.IsNullOrEmpty(unregisterInfo.MeshId) || unregisterInfo.RegistrationId == Guid.Empty)
  444. {
  445. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("unregisterInfo", SR.GetString(SR.PeerInvalidMessageBody, unregisterInfo));
  446. }
  447. RegistrationEntry registration = null;
  448. MeshEntry meshEntry = GetMeshEntry(unregisterInfo.MeshId, false);
  449. //there could be a ---- that two different threads could be working on the same entry
  450. //we wont optimize for that case.
  451. LiteLock ll = null;
  452. try
  453. {
  454. LiteLock.Acquire(out ll, meshEntry.Gate, true);
  455. if (!meshEntry.EntryTable.TryGetValue(unregisterInfo.RegistrationId, out registration))
  456. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("unregisterInfo", SR.GetString(SR.PeerInvalidMessageBody, unregisterInfo));
  457. meshEntry.EntryTable.Remove(unregisterInfo.RegistrationId);
  458. meshEntry.EntryList.Remove(registration);
  459. meshEntry.Service2EntryTable.Remove(registration.Address.ServicePath);
  460. registration.State = RegistrationState.Deleted;
  461. }
  462. finally
  463. {
  464. LiteLock.Release(ll);
  465. }
  466. }
  467. public virtual RefreshResponseInfo Refresh(RefreshInfo refreshInfo)
  468. {
  469. if (refreshInfo == null)
  470. {
  471. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("refreshInfo", SR.GetString(SR.PeerNullRefreshInfo));
  472. }
  473. ThrowIfClosed("Refresh");
  474. if (!refreshInfo.HasBody() || String.IsNullOrEmpty(refreshInfo.MeshId) || refreshInfo.RegistrationId == Guid.Empty)
  475. {
  476. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("refreshInfo", SR.GetString(SR.PeerInvalidMessageBody, refreshInfo));
  477. }
  478. RefreshResult result = RefreshResult.RegistrationNotFound;
  479. RegistrationEntry entry = null;
  480. MeshEntry meshEntry = GetMeshEntry(refreshInfo.MeshId, false);
  481. LiteLock ll = null;
  482. if (meshEntry != null)
  483. {
  484. try
  485. {
  486. LiteLock.Acquire(out ll, meshEntry.Gate);
  487. if (!meshEntry.EntryTable.TryGetValue(refreshInfo.RegistrationId, out entry))
  488. return new RefreshResponseInfo(RefreshInterval, result);
  489. lock (entry)
  490. {
  491. if (entry.State == RegistrationState.OK)
  492. {
  493. entry.Expires = DateTime.UtcNow + RefreshInterval;
  494. result = RefreshResult.Success;
  495. }
  496. }
  497. }
  498. finally
  499. {
  500. LiteLock.Release(ll);
  501. }
  502. }
  503. return new RefreshResponseInfo(RefreshInterval, result);
  504. }
  505. public virtual ServiceSettingsResponseInfo GetServiceSettings()
  506. {
  507. ThrowIfClosed("GetServiceSettings");
  508. ServiceSettingsResponseInfo info = new ServiceSettingsResponseInfo(this.ControlShape);
  509. return info;
  510. }
  511. public virtual void Open()
  512. {
  513. ThrowIfOpened("Open");
  514. if (this.refreshInterval <= TimeSpan.Zero)
  515. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("RefreshInterval", SR.GetString(SR.RefreshIntervalMustBeGreaterThanZero, this.refreshInterval));
  516. if (this.CleanupInterval <= TimeSpan.Zero)
  517. throw DiagnosticUtility.ExceptionUtility.ThrowHelperArgument("CleanupInterval", SR.GetString(SR.CleanupIntervalMustBeGreaterThanZero, this.cleanupInterval));
  518. //check that we are good to open
  519. timer = new IOThreadTimer(new Action<object>(CleanupActivity), null, false);
  520. timer.Set(CleanupInterval);
  521. opened = true;
  522. }
  523. public virtual void Close()
  524. {
  525. ThrowIfClosed("Close");
  526. timer.Cancel();
  527. opened = false;
  528. }
  529. internal virtual void CleanupActivity(object state)
  530. {
  531. if (!opened)
  532. return;
  533. if (!isCleaning)
  534. {
  535. lock (ThisLock)
  536. {
  537. if (!isCleaning)
  538. {
  539. isCleaning = true;
  540. try
  541. {
  542. MeshEntry meshEntry = null;
  543. //acquire a write lock. from the reader/writer lock can we postpone until no contention?
  544. ICollection<string> keys = null;
  545. LiteLock ll = null;
  546. try
  547. {
  548. LiteLock.Acquire(out ll, gate);
  549. keys = meshId2Entry.Keys;
  550. }
  551. finally
  552. {
  553. LiteLock.Release(ll);
  554. }
  555. foreach (string meshId in keys)
  556. {
  557. meshEntry = GetMeshEntry(meshId);
  558. CleanupMeshEntry(meshEntry);
  559. }
  560. }
  561. finally
  562. {
  563. isCleaning = false;
  564. if (opened)
  565. timer.Set(this.CleanupInterval);
  566. }
  567. }
  568. }
  569. }
  570. }
  571. //always call this from a readlock
  572. void CleanupMeshEntry(MeshEntry meshEntry)
  573. {
  574. List<Guid> remove = new List<Guid>();
  575. if (!opened)
  576. return;
  577. LiteLock ll = null;
  578. try
  579. {
  580. LiteLock.Acquire(out ll, meshEntry.Gate, true);
  581. foreach (KeyValuePair<Guid, RegistrationEntry> item in meshEntry.EntryTable)
  582. {
  583. if ((item.Value.Expires <= DateTime.UtcNow) || (item.Value.State == RegistrationState.Deleted))
  584. {
  585. remove.Add(item.Key);
  586. meshEntry.EntryList.Remove(item.Value);
  587. meshEntry.Service2EntryTable.Remove(item.Value.Address.ServicePath);
  588. }
  589. }
  590. foreach (Guid id in remove)
  591. {
  592. meshEntry.EntryTable.Remove(id);
  593. }
  594. }
  595. finally
  596. {
  597. LiteLock.Release(ll);
  598. }
  599. }
  600. object ThisLock
  601. {
  602. get
  603. {
  604. return this.thisLock;
  605. }
  606. }
  607. void ThrowIfOpened(string operation)
  608. {
  609. if (opened)
  610. PeerExceptionHelper.ThrowInvalidOperation_NotValidWhenOpen(operation);
  611. }
  612. void ThrowIfClosed(string operation)
  613. {
  614. if (!opened)
  615. PeerExceptionHelper.ThrowInvalidOperation_NotValidWhenClosed(operation);
  616. }
  617. }
  618. }