PeerNodeStateManager.cs 19 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489
  1. //------------------------------------------------------------
  2. // Copyright (c) Microsoft Corporation. All rights reserved.
  3. //------------------------------------------------------------
  4. namespace System.ServiceModel.Channels
  5. {
  6. using System.Collections.Generic;
  7. using System.Diagnostics;
  8. using System.Runtime;
  9. using System.Threading;
  10. partial class PeerNodeImplementation
  11. {
  12. // A simple state manager for the PeerNode. Unlike the state managers used for channels and other
  13. // classes, a PeerNode's Open/Close is counted, a PeerNode is re-openable, and Abort only
  14. // takes effect if the outstanding number of Opens is 1.
  15. // The PeerNode defers to this object for all state related operations.
  16. //
  17. // Whenever a call is made that may change the state of the object (openCount transitions between 0 and 1),
  18. // an operation is queued. When an operation is removed from the queue, if the target state is still the
  19. // same as the operation (e.g. openCount > 0 and operation == Open) and the object is not already in that
  20. // state, the operation is performed by calling back into the PeerNode
  21. //
  22. // Because each operation is pulled form the queue one at a time, the open and close of the
  23. // PeerNode is serialized
  24. class SimpleStateManager
  25. {
  26. internal enum State { NotOpened, Opening, Opened, Closing };
  27. State currentState = State.NotOpened;
  28. object thisLock = new object();
  29. Queue<IOperation> queue = new Queue<IOperation>();
  30. bool queueRunning;
  31. int openCount;
  32. PeerNodeImplementation peerNode;
  33. public SimpleStateManager(PeerNodeImplementation peerNode)
  34. {
  35. this.peerNode = peerNode;
  36. }
  37. object ThisLock
  38. {
  39. get { return thisLock; }
  40. }
  41. public void Abort()
  42. {
  43. lock (ThisLock)
  44. {
  45. bool runAbort = false;
  46. if (openCount <= 1 && currentState != State.NotOpened)
  47. {
  48. runAbort = true;
  49. }
  50. if (openCount > 0)
  51. {
  52. --openCount;
  53. }
  54. if (runAbort)
  55. {
  56. try
  57. {
  58. peerNode.OnAbort();
  59. }
  60. finally
  61. {
  62. currentState = State.NotOpened;
  63. }
  64. }
  65. }
  66. }
  67. public IAsyncResult BeginClose(TimeSpan timeout, AsyncCallback callback, object state)
  68. {
  69. CloseOperation op = null;
  70. lock (ThisLock)
  71. {
  72. if (openCount > 0)
  73. {
  74. --openCount;
  75. }
  76. if (openCount > 0)
  77. {
  78. return new CompletedAsyncResult(callback, state);
  79. }
  80. else
  81. {
  82. op = new CloseOperation(this, peerNode, timeout, callback, state);
  83. queue.Enqueue(op);
  84. RunQueue();
  85. }
  86. }
  87. return op;
  88. }
  89. public IAsyncResult BeginOpen(TimeSpan timeout, AsyncCallback callback, object state, bool waitForOnline)
  90. {
  91. bool completedSynchronously = false;
  92. OpenOperation op = null;
  93. lock (ThisLock)
  94. {
  95. openCount++;
  96. if (openCount > 1 && currentState == State.Opened)
  97. {
  98. completedSynchronously = true;
  99. }
  100. else
  101. {
  102. op = new OpenOperation(this, peerNode, timeout, callback, state, waitForOnline);
  103. queue.Enqueue(op);
  104. RunQueue();
  105. }
  106. }
  107. if (completedSynchronously)
  108. {
  109. return new CompletedAsyncResult(callback, state);
  110. }
  111. return op;
  112. }
  113. public void Close(TimeSpan timeout)
  114. {
  115. EndClose(BeginClose(timeout, null, null));
  116. }
  117. public static void EndOpen(IAsyncResult result)
  118. {
  119. // result can be either an OpenOperation or a CompletedAsyncResult
  120. if (result is CompletedAsyncResult)
  121. CompletedAsyncResult.End(result);
  122. else
  123. OpenOperation.End(result);
  124. }
  125. public static void EndClose(IAsyncResult result)
  126. {
  127. // result can be either an CloseOperation or a CompletedAsyncResult
  128. if (result is CompletedAsyncResult)
  129. CompletedAsyncResult.End(result);
  130. else
  131. CloseOperation.End(result);
  132. }
  133. // Process IP Address change event from IP helper
  134. public void OnIPAddressesChanged(object sender, EventArgs e)
  135. {
  136. IPAddressChangeOperation op = null;
  137. lock (ThisLock)
  138. {
  139. op = new IPAddressChangeOperation(peerNode);
  140. queue.Enqueue(op);
  141. RunQueue();
  142. }
  143. }
  144. public void Open(TimeSpan timeout, bool waitForOnline)
  145. {
  146. EndOpen(BeginOpen(timeout, null, null, waitForOnline));
  147. }
  148. // Start running operations from the queue (must be called within lock)
  149. void RunQueue()
  150. {
  151. if (queueRunning)
  152. return;
  153. queueRunning = true;
  154. ActionItem.Schedule(new Action<object>(RunQueueCallback), null);
  155. }
  156. void RunQueueCallback(object state)
  157. {
  158. IOperation op;
  159. // remove an operation from the queue
  160. lock (ThisLock)
  161. {
  162. Fx.Assert(queue.Count > 0, "queue should not be empty");
  163. op = queue.Dequeue();
  164. }
  165. try
  166. {
  167. // execute the operation
  168. op.Run();
  169. }
  170. finally
  171. {
  172. lock (ThisLock)
  173. {
  174. // if there are still pending operations, schedule another thread
  175. if (queue.Count > 0)
  176. {
  177. try
  178. {
  179. ActionItem.Schedule(new Action<object>(RunQueueCallback), null);
  180. }
  181. catch (Exception e)
  182. {
  183. if (Fx.IsFatal(e)) throw;
  184. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  185. }
  186. }
  187. else
  188. {
  189. queueRunning = false;
  190. }
  191. }
  192. }
  193. }
  194. interface IOperation
  195. {
  196. void Run();
  197. }
  198. class CloseOperation : OperationBase
  199. {
  200. PeerNodeImplementation peerNode;
  201. public CloseOperation(SimpleStateManager stateManager,
  202. PeerNodeImplementation peerNode, TimeSpan timeout, AsyncCallback callback, object state)
  203. : base(stateManager, timeout, callback, state)
  204. {
  205. this.peerNode = peerNode;
  206. }
  207. protected override void Run()
  208. {
  209. Exception lclException = null;
  210. try
  211. {
  212. lock (ThisLock)
  213. {
  214. if (stateManager.openCount > 0)
  215. {
  216. // the current target state is no longer Closed
  217. invokeOperation = false;
  218. }
  219. else if (stateManager.currentState == State.NotOpened)
  220. {
  221. // the state is already Closed
  222. invokeOperation = false;
  223. }
  224. else if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
  225. {
  226. // Time out has already happened complete will be taken care of in the
  227. // OperationBase class
  228. invokeOperation = false;
  229. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException());
  230. }
  231. else
  232. {
  233. // the PeerNode needs to be closed
  234. if (!(stateManager.currentState != State.Opening && stateManager.currentState != State.Closing))
  235. {
  236. throw Fx.AssertAndThrow("Open and close are serialized by queue We should not be either in Closing or Opening state at this point");
  237. }
  238. if (stateManager.currentState != State.NotOpened)
  239. {
  240. stateManager.currentState = State.Closing;
  241. invokeOperation = true;
  242. }
  243. }
  244. }
  245. }
  246. catch (Exception e)
  247. {
  248. if (Fx.IsFatal(e)) throw;
  249. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  250. lclException = e;
  251. }
  252. if (invokeOperation)
  253. {
  254. try
  255. {
  256. peerNode.OnClose(timeoutHelper.RemainingTime());
  257. }
  258. catch (Exception e)
  259. {
  260. if (Fx.IsFatal(e)) throw;
  261. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  262. lclException = e;
  263. }
  264. lock (ThisLock)
  265. {
  266. stateManager.currentState = State.NotOpened;
  267. }
  268. }
  269. Complete(lclException);
  270. }
  271. }
  272. class OpenOperation : OperationBase
  273. {
  274. PeerNodeImplementation peerNode;
  275. bool waitForOnline;
  276. public OpenOperation(SimpleStateManager stateManager, PeerNodeImplementation peerNode, TimeSpan timeout,
  277. AsyncCallback callback, object state, bool waitForOnline)
  278. : base(stateManager, timeout, callback, state)
  279. {
  280. this.peerNode = peerNode;
  281. this.waitForOnline = waitForOnline;
  282. }
  283. protected override void Run()
  284. {
  285. Exception lclException = null;
  286. try
  287. {
  288. lock (ThisLock)
  289. {
  290. if (stateManager.openCount < 1)
  291. {
  292. // the current target state is no longer Opened
  293. invokeOperation = false;
  294. }
  295. else if (stateManager.currentState == State.Opened)
  296. {
  297. // the state is already Opened
  298. invokeOperation = false;
  299. }
  300. else if (timeoutHelper.RemainingTime() <= TimeSpan.Zero)
  301. {
  302. // Time out has already happened complete will be taken care of in the
  303. // OperationBase class
  304. invokeOperation = false;
  305. throw DiagnosticUtility.ExceptionUtility.ThrowHelperError(new TimeoutException());
  306. }
  307. else
  308. {
  309. // the PeerNode needs to be opened
  310. if (!(stateManager.currentState != State.Opening && stateManager.currentState != State.Closing))
  311. {
  312. throw Fx.AssertAndThrow("Open and close are serialized by queue We should not be either in Closing or Opening state at this point");
  313. }
  314. if (stateManager.currentState != State.Opened)
  315. {
  316. stateManager.currentState = State.Opening;
  317. invokeOperation = true;
  318. }
  319. }
  320. }
  321. }
  322. catch (Exception e)
  323. {
  324. if (Fx.IsFatal(e)) throw;
  325. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  326. lclException = e;
  327. }
  328. if (invokeOperation)
  329. {
  330. try
  331. {
  332. peerNode.OnOpen(timeoutHelper.RemainingTime(), waitForOnline);
  333. lock (ThisLock)
  334. {
  335. stateManager.currentState = State.Opened;
  336. }
  337. }
  338. catch (Exception e)
  339. {
  340. if (Fx.IsFatal(e)) throw;
  341. lock (ThisLock)
  342. {
  343. stateManager.currentState = State.NotOpened;
  344. // since Open is throwing, we roll back the openCount because a matching Close is not
  345. // expected
  346. stateManager.openCount--;
  347. }
  348. lclException = e;
  349. DiagnosticUtility.TraceHandledException(e, TraceEventType.Information);
  350. }
  351. }
  352. Complete(lclException);
  353. }
  354. }
  355. // Base class for Open and Cose
  356. abstract class OperationBase : AsyncResult, IOperation
  357. {
  358. protected SimpleStateManager stateManager;
  359. protected TimeoutHelper timeoutHelper;
  360. AsyncCallback callback;
  361. protected bool invokeOperation;
  362. // Double-checked locking pattern requires volatile for read/write synchronization
  363. volatile bool completed;
  364. public OperationBase(SimpleStateManager stateManager, TimeSpan timeout,
  365. AsyncCallback callback, object state)
  366. : base(callback, state)
  367. {
  368. this.stateManager = stateManager;
  369. timeoutHelper = new TimeoutHelper(timeout);
  370. this.callback = callback;
  371. invokeOperation = false;
  372. completed = false;
  373. }
  374. void AsyncComplete(object o)
  375. {
  376. try
  377. {
  378. base.Complete(false, (Exception)o);
  379. }
  380. catch (Exception e)
  381. {
  382. if (Fx.IsFatal(e)) throw;
  383. throw DiagnosticUtility.ExceptionUtility.ThrowHelperCallback(SR.GetString(SR.AsyncCallbackException), e);
  384. }
  385. }
  386. protected abstract void Run();
  387. void IOperation.Run()
  388. {
  389. Run();
  390. }
  391. protected void Complete(Exception exception)
  392. {
  393. if (completed)
  394. {
  395. return;
  396. }
  397. lock (ThisLock)
  398. {
  399. if (completed)
  400. {
  401. return;
  402. }
  403. completed = true;
  404. }
  405. try
  406. {
  407. if (callback != null)
  408. {
  409. // complete the AsyncResult on a separate thread so that the queue can progress.
  410. // this prevents a deadlock when the callback attempts to call Close.
  411. // this may cause the callbacks to be called in a differnet order in which they completed, but that
  412. // is ok because each callback is associated with a different object (channel or listener factory)
  413. ActionItem.Schedule(new Action<object>(AsyncComplete), exception);
  414. }
  415. else
  416. {
  417. AsyncComplete(exception);
  418. }
  419. }
  420. catch (Exception e)
  421. {
  422. if (Fx.IsFatal(e)) throw;
  423. throw DiagnosticUtility.ExceptionUtility.ThrowHelperCallback(SR.GetString(SR.MessagePropagationException), e);
  424. }
  425. }
  426. protected object ThisLock
  427. {
  428. get { return stateManager.thisLock; }
  429. }
  430. static public void End(IAsyncResult result)
  431. {
  432. AsyncResult.End<OperationBase>(result);
  433. }
  434. }
  435. // To serialize IP address change processing
  436. class IPAddressChangeOperation : IOperation
  437. {
  438. PeerNodeImplementation peerNode;
  439. public IPAddressChangeOperation(PeerNodeImplementation peerNode)
  440. {
  441. this.peerNode = peerNode;
  442. }
  443. void IOperation.Run()
  444. {
  445. peerNode.OnIPAddressChange();
  446. }
  447. }
  448. }
  449. }
  450. }